diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 84309aac4f83..c75044e3814d 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1925,6 +1925,10 @@ org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage,2 fromData,15 toData,15 +org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage,2 +fromData,26 +toData,26 + org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation$DestroyMessage,2 fromData,46 toData,41 diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java index 4afb51d8724e..d5395df9e905 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java @@ -114,6 +114,9 @@ private void start(boolean cleanQueues) { } } + @Override + public void prepareForStop() {} + @Override public void stop() { getLifeCycleLock().writeLock().lock(); diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java index 1713feff76aa..06e6e594e2d5 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java @@ -135,6 +135,9 @@ protected AbstractGatewaySenderEventProcessor createEventProcessor(boolean clean return eventProcessor; } + @Override + public void prepareForStop() {} + @Override public void stop() { if (logger.isDebugEnabled()) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index d11a9435b9f1..e6e3cb96cfe9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -400,6 +400,7 @@ import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationOperation; import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage; +import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage; import org.apache.geode.internal.cache.wan.serial.BatchDestroyOperation; import org.apache.geode.internal.serialization.DSFIDSerializer; import org.apache.geode.internal.serialization.DataSerializableFixedID; @@ -985,6 +986,8 @@ private void registerDSFIDTypes(DSFIDSerializer serializer) { serializer.register(ABORT_BACKUP_REQUEST, AbortBackupRequest.class); serializer.register(HOST_AND_PORT, HostAndPort.class); serializer.register(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class); + serializer.register(PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE, + ParallelQueueSetPossibleDuplicateMessage.class); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index 03339a366762..3d9272b71d1e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -14,8 +14,11 @@ */ package org.apache.geode.internal.cache; +import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.LOAD_FROM_TEMP_QUEUE; + import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,14 +34,20 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.TimeoutException; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderStats; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage; import org.apache.geode.internal.offheap.OffHeapClearRequired; +import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.util.internal.GeodeGlossary; @@ -218,11 +227,12 @@ protected void loadEventsFromTempQueue() { if (queues != null) { ConcurrentParallelGatewaySenderQueue prq = (ConcurrentParallelGatewaySenderQueue) queues.toArray()[0]; - // synchronized (prq.getBucketToTempQueueMap()) { + BlockingQueue tempQueue = prq.getBucketTmpQueue(getId()); - // .getBucketToTempQueueMap().get(getId()); if (tempQueue != null && !tempQueue.isEmpty()) { synchronized (tempQueue) { + Map>> regionToDuplicateEventsMap = + new HashMap<>(); try { // ParallelQueueRemovalMessage checks for the key in BucketRegionQueue // and if not found there, it removes it from tempQueue. When tempQueue @@ -235,6 +245,9 @@ protected void loadEventsFromTempQueue() { try { event.setPossibleDuplicate(true); if (addToQueue(event.getShadowKey(), event)) { + if (notifyDuplicateSupported()) { + addDuplicateEvent(regionToDuplicateEventsMap, event); + } event = null; } } catch (ForceReattemptException e) { @@ -257,13 +270,57 @@ protected void loadEventsFromTempQueue() { } getInitializationLock().writeLock().unlock(); } + notifyDuplicateEvents(regionToDuplicateEventsMap); } } + } + } + + private boolean notifyDuplicateSupported() { + return !(this.getPartitionedRegion().getParallelGatewaySender().getEventProcessor() + .getDispatcher() instanceof GatewaySenderEventCallbackDispatcher); + } + + private void notifyDuplicateEvents( + Map>> regionToDuplicateEventsMap) { + if (regionToDuplicateEventsMap.isEmpty()) { + return; + } + if (getPartitionedRegion().getRegionAdvisor() == null) { + return; + } + + Set recipients = + getPartitionedRegion().getRegionAdvisor().adviseDataStore(); + + if (recipients.isEmpty()) { + return; + } - // } + InternalDistributedSystem ids = getCache().getInternalDistributedSystem(); + DistributionManager dm = ids.getDistributionManager(); + dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0); + + if (!recipients.isEmpty()) { + ParallelQueueSetPossibleDuplicateMessage possibleDuplicateMessage = + new ParallelQueueSetPossibleDuplicateMessage(LOAD_FROM_TEMP_QUEUE, + regionToDuplicateEventsMap); + possibleDuplicateMessage.setRecipients(recipients); + dm.putOutgoing(possibleDuplicateMessage); } } + private void addDuplicateEvent(Map>> regionToDuplicateEventsMap, + GatewaySenderEventImpl event) { + Map> bucketIdToDispatchedKeys = regionToDuplicateEventsMap + .computeIfAbsent(getPartitionedRegion().getFullPath(), k -> new HashMap<>()); + + List dispatchedKeys = + bucketIdToDispatchedKeys.computeIfAbsent(getId(), k -> new ArrayList<>()); + + dispatchedKeys.add(event.getShadowKey()); + } + @Override public void forceSerialized(EntryEventImpl event) { // NOOP since we want the value in the region queue to stay in object form. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index e6dcd3fb8fcd..5c3e98dfbf3d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -164,6 +164,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor { private BucketAdvisor startingBucketAdvisor; + private volatile boolean hasBecomePrimary = false; + private final PartitionedRegion pRegion; final ConcurrentMap destroyedShadowBuckets = new ConcurrentHashMap<>(); @@ -498,6 +500,13 @@ public DistributedRegion getRegionForDeltaGII() { } + BucketAdvisor getParentAdvisor() { + return parentAdvisor; + } + + boolean getHasBecomePrimary() { + return hasBecomePrimary; + } /** * Called by the RegionAdvisor.profileRemoved, this method tests to see if the missing member is @@ -1153,6 +1162,7 @@ private boolean acquiredPrimaryLock() { try { synchronized (this) { if (isHosting() && (isVolunteering() || isBecomingPrimary())) { + hasBecomePrimary = isBecomingPrimary(); Bucket br = regionAdvisor.getBucket(getBucket().getId()); if (br instanceof BucketRegion) { ((BucketRegion) br).beforeAcquiringPrimaryState(); @@ -1167,6 +1177,8 @@ private boolean acquiredPrimaryLock() { if (hasPrimary() && isPrimary()) { shouldInvokeListeners = true; } + } else { + hasBecomePrimary = false; } } } @@ -2159,6 +2171,7 @@ private boolean requestPrimaryState(byte requestedState) { private void changeFromPrimaryTo(byte requestedState) { try { primaryState = requestedState; + hasBecomePrimary = false; } finally { getPartitionedRegionStats().incPrimaryBucketCount(-1); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index ffb110911570..8991441a5aa5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -217,6 +217,8 @@ public Object getDeserialized(boolean copyOnRead) { } } + private boolean receivedGatewaySenderStoppedMessage = false; + private final int redundancy; /** the partitioned region to which this bucket belongs */ @@ -2535,4 +2537,12 @@ Set getDestroyRegionRecipients() { return getSystem().getDistributionManager().getOtherDistributionManagerIds(); } + public boolean isReceivedGatewaySenderStoppedMessage() { + return receivedGatewaySenderStoppedMessage; + } + + public void setReceivedGatewaySenderStoppedMessage(boolean notified) { + receivedGatewaySenderStoppedMessage = notified; + } + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index aba62d4d2ac3..8fb572c23618 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -43,6 +43,7 @@ import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; @@ -202,6 +203,23 @@ private void destroyFailedBatchRemovalMessageKeys() { @Override public void beforeAcquiringPrimaryState() { + PartitionedRegion region = getPartitionedRegion(); + + if (region != null && region.getParallelGatewaySender() != null) { + AbstractGatewaySenderEventProcessor ep = + region.getParallelGatewaySender().getEventProcessor(); + + if (ep != null && !(ep.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) { + if (isReceivedGatewaySenderStoppedMessage()) { + setReceivedGatewaySenderStoppedMessage(false); + return; + } + BucketAdvisor parent = getParentAdvisor(getBucketAdvisor()); + if (parent.getHasBecomePrimary()) { + return; + } + } + } markAsDuplicate.addAll(eventSeqNumDeque); } @@ -660,4 +678,24 @@ public List getHelperQueueList() { } } + public void setAsPossibleDuplicate(Object key) { + Object object = optimalGet(key); + if (object != null) { + ((GatewaySenderEventImpl) object).setPossibleDuplicate(true); + } + } + + public boolean checkIfQueueContainsKey(Object key) { + return eventSeqNumDeque.contains(key); + } + + BucketAdvisor getParentAdvisor(BucketAdvisor advisor) { + BucketAdvisor parent = advisor.getParentAdvisor(); + while (parent != null) { + advisor = parent; + parent = advisor.getParentAdvisor(); + } + return advisor; + + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index e385581fbf1c..71d30adbf617 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -233,6 +233,7 @@ import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor; import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationListener; +import org.apache.geode.internal.cache.wan.InternalGatewaySender; import org.apache.geode.internal.cache.wan.WANServiceProvider; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.geode.internal.cache.xmlcache.CacheServerCreation; @@ -2182,9 +2183,20 @@ boolean doClose(String reason, Throwable systemFailureCause, boolean keepAlive, return false; } + boolean isDebugEnabled = logger.isDebugEnabled(); + + for (GatewaySender sender : allGatewaySenders) { + try { + ((InternalGatewaySender) sender).prepareForStop(); + } catch (Exception exception) { + if (isDebugEnabled) { + logger.debug("When calling Prepare for stop gw sender, ignore exception " + exception); + } + } + } + CLOSING_THREAD.set(Thread.currentThread()); try { - boolean isDebugEnabled = logger.isDebugEnabled(); // First close the ManagementService system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 03dd3de0271f..896af325764d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -771,6 +771,8 @@ public synchronized String dump() { private boolean regionCreationNotified; + private boolean sentGatewaySenderStoppedMessage = false; + public interface RegionAdvisorFactory { RegionAdvisor create(PartitionedRegion region); } @@ -10170,4 +10172,12 @@ void notifyRegionCreated() { public boolean areRecoveriesInProgress() { return prStats.getRecoveriesInProgress() > 0; } + + public boolean isSentGatewaySenderStoppedMessage() { + return sentGatewaySenderStoppedMessage; + } + + public void setSentGatewaySenderStoppedMessage(boolean notified) { + sentGatewaySenderStoppedMessage = notified; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index 14e3ccf0616e..f133bc51c9fb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -186,6 +186,10 @@ public void cmdExecute(final @NotNull Message clientMessage, } boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01; + if (possibleDuplicate) { + stats.incPossibleDuplicateEventsReceived(); + } + // Retrieve the region name from the message parts Part regionNamePart = clientMessage.getPart(partNumber + 2); String regionName = regionNamePart.getCachedString(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index e5fa44a83e27..bd1a898da4d8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -585,6 +585,9 @@ public boolean isForInternalUse() { @Override public abstract void startWithCleanQueue(); + @Override + public abstract void prepareForStop(); + @Override public abstract void stop(); @@ -1298,6 +1301,27 @@ public boolean removeFromTempQueueEvents(Object tailKey) { } } + public boolean markAsDuplicateInTempQueueEvents(Object tailKey) { + synchronized (queuedEventsSync) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + + for (TmpQueueEvent event : tmpQueuedEvents) { + if (tailKey.equals(event.getEvent().getTailKey())) { + if (isDebugEnabled) { + logger.debug( + "shadowKey {} is found in tmpQueueEvents at AbstractGatewaySender level. Marking it..", + tailKey); + } + event.getEvent().setPossibleDuplicate(true); + return true; + } + } + + return false; + } + } + + /** * During sender is getting stopped, if there are any cache operation on queue then that event * will be stored in temp queue. Once sender is started, these event from tmp queue will be diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index fbdc1d9c2cb3..c7413aa91fca 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -18,6 +18,9 @@ import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE; import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE; import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT; +import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.RESET_BATCH; +import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.STOPPED_GATEWAY_SENDER; +import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.UNSUCCESSFULLY_DISPATCHED; import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import java.io.IOException; @@ -29,10 +32,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -46,7 +51,11 @@ import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.ColocationHelper; import org.apache.geode.internal.cache.Conflatable; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; @@ -58,8 +67,10 @@ import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; +import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage; import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue; import org.apache.geode.internal.monitoring.ThreadsMonitoring; +import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.logging.internal.executors.LoggingThread; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.pdx.internal.PeerTypeRegistration; @@ -469,7 +480,7 @@ protected void processQueue() { boolean interrupted = Thread.interrupted(); try { if (resetLastPeekedEvents) { - pendingEventsInBatchesMarkAsPossibleDuplicate(); + notifyPossibleDuplicate(RESET_BATCH, pendingEventsInBatches()); resetLastPeekedEvents(); resetLastPeekedEvents = false; } @@ -967,16 +978,8 @@ private void handleSuccessfulBatchDispatch(List filteredList, private void handleUnSuccessfulBatchDispatch(List events) { final GatewaySenderStats statistics = sender.getStatistics(); statistics.incBatchesRedistributed(); - // Set posDup flag on each event in the batch - Iterator it = events.iterator(); - while (it.hasNext() && !isStopped) { - Object o = it.next(); - if (o instanceof GatewaySenderEventImpl) { - GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o; - ge.setPossibleDuplicate(true); - } - } + notifyPossibleDuplicate(UNSUCCESSFULLY_DISPATCHED, events); } /** @@ -1217,7 +1220,7 @@ public void closeProcessor() { logger.warn("Destroying GatewayEventDispatcher with actively queued data."); } if (resetLastPeekedEvents) { - pendingEventsInBatchesMarkAsPossibleDuplicate(); + notifyPossibleDuplicate(STOPPED_GATEWAY_SENDER, pendingEventsInBatches()); resetLastPeekedEvents(); resetLastPeekedEvents = false; } @@ -1322,17 +1325,181 @@ protected void afterExecute() { protected abstract void enqueueEvent(GatewayQueueEvent event); - private void pendingEventsInBatchesMarkAsPossibleDuplicate() { + private void notifyPossibleDuplicate(int reason, List events) { + Map>> regionToDispatchedKeysMap = new HashMap<>(); + boolean pgwsender = (getSender().isParallel() + && !(getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)); + + for (Object o : events) { + if (o instanceof GatewaySenderEventImpl) { + GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o; + if (!ge.getPossibleDuplicate()) { + if (pgwsender) { + addDuplicateEvent(regionToDispatchedKeysMap, ge); + } + ge.setPossibleDuplicate(true); + } + } + } + + if (!pgwsender) { + return; + } + + PartitionedRegion queueRegion; + if (queue instanceof ConcurrentParallelGatewaySenderQueue) { + queueRegion = + (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) queue).getRegion(); + } else { + queueRegion = + (PartitionedRegion) ((ParallelGatewaySenderQueue) queue).getRegion(); + } + + if (queueRegion == null || queueRegion.getRegionAdvisor() == null + || queueRegion.getDataStore() == null) { + return; + } + + if (reason == STOPPED_GATEWAY_SENDER) { + final Set buckets = queueRegion.getDataStore().getAllLocalPrimaryBucketIds(); + if (regionToDispatchedKeysMap.isEmpty()) { + if (queueRegion.isSentGatewaySenderStoppedMessage()) { + return; + } + Map> bucketIdToDispatchedKeys = new HashMap<>(); + for (Integer bId : buckets) { + bucketIdToDispatchedKeys.put(bId, Collections.emptyList()); + } + regionToDispatchedKeysMap.put(queueRegion.getFullPath(), bucketIdToDispatchedKeys); + + } else { + Map> bucketIdToDispatchedKeys = + regionToDispatchedKeysMap.get(queueRegion.getFullPath()); + if (bucketIdToDispatchedKeys == null) { + return; + } + for (Integer bId : buckets) { + bucketIdToDispatchedKeys.putIfAbsent(bId, Collections.emptyList()); + } + } + } + + if (regionToDispatchedKeysMap.size() > 0) { + Set recipients = + getAllRecipients(sender.getCache(), regionToDispatchedKeysMap); + + if (recipients.isEmpty()) { + return; + } + + if (reason == STOPPED_GATEWAY_SENDER) { + if (!queueRegion.isSentGatewaySenderStoppedMessage()) { + queueRegion.setSentGatewaySenderStoppedMessage(true); + } + } + + InternalDistributedSystem ids = sender.getCache().getInternalDistributedSystem(); + DistributionManager dm = ids.getDistributionManager(); + dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0); + + if (!recipients.isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug( + "notifyPossibleDuplicate send ParallelQueueSetPossibleDuplicateMessage recipients {}.", + recipients); + } + + ParallelQueueSetPossibleDuplicateMessage pqspdm = + new ParallelQueueSetPossibleDuplicateMessage(reason, regionToDispatchedKeysMap); + pqspdm.setRecipients(recipients); + dm.putOutgoing(pqspdm); + } + } + + } + + protected void addDuplicateEvent( + Map>> regionToDispatchedKeysMap, + GatewaySenderEventImpl event) { + PartitionedRegion prQ = null; + int bucketId = -1; + Object key = null; + InternalCache cache = sender.getCache(); + String regionPath = event.getRegionPath(); + + if (event.getRegion() != null) { + if (cache.getRegion(regionPath) instanceof DistributedRegion) { + prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(event.getRegion().getFullPath()); + bucketId = event.getEventId().getBucketID(); + key = event.getEventId(); + } else { + prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(ColocationHelper + .getLeaderRegion((PartitionedRegion) event.getRegion()).getFullPath()); + bucketId = event.getBucketId(); + key = event.getShadowKey(); + } + } else { + Region region = (PartitionedRegion) cache.getRegion(regionPath); + if (region != null && !region.isDestroyed()) { + if (region instanceof DistributedRegion) { + prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(region.getFullPath()); + bucketId = event.getBucketId(); + key = event.getEventId(); + } else { + prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion( + ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath()); + bucketId = event.getBucketId(); + key = event.getShadowKey(); + } + } + } + + if (prQ == null) { + return; + } + + Map> bucketIdToDispatchedKeys = + regionToDispatchedKeysMap.get(prQ.getFullPath()); + if (bucketIdToDispatchedKeys == null) { + bucketIdToDispatchedKeys = new ConcurrentHashMap<>(); + regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys); + } + + List dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId); + if (dispatchedKeys == null) { + dispatchedKeys = new ArrayList<>(); + bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys); + } + dispatchedKeys.add(key); + + } + + public void prepareForStopProcessing() { + notifyPossibleDuplicate(STOPPED_GATEWAY_SENDER, pendingEventsInBatches()); + } + + private Set getAllRecipients(InternalCache cache, + Map>> map) { + Set recipients = new ObjectOpenHashSet<>(); + for (Object pr : map.keySet()) { + PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion((String) pr); + if (partitionedRegion != null && partitionedRegion.getRegionAdvisor() != null) { + recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore()); + } + } + return recipients; + } + + + private List pendingEventsInBatches() { + List pendingEvents = new ArrayList<>(); if (!batchIdToEventsMap.isEmpty()) { for (Map.Entry[]> entry : batchIdToEventsMap .entrySet()) { - for (GatewaySenderEventImpl event : entry.getValue()[0]) { - if (!event.getPossibleDuplicate()) { - event.setPossibleDuplicate(true); - } - } + pendingEvents.addAll(entry.getValue()[0]); } } + return pendingEvents; } protected static class SenderStopperCallable implements Callable { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java index 73af7b0b7b4e..e5764e41518f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java @@ -32,6 +32,12 @@ public class GatewayReceiverStats extends CacheServerStats { */ private static final String DUPLICATE_BATCHES_RECEIVED = "duplicateBatchesReceived"; + /** + * Name of the number of events with possible duplicate indication received statistic + */ + private static final String POSSIBLE_DUPLICATE_EVENTS_RECEIVED = + "possibleduplicateEventsReceived"; + /** * Name of the event queue time statistic */ @@ -86,6 +92,11 @@ public class GatewayReceiverStats extends CacheServerStats { */ private final int duplicateBatchesReceivedId; + /** + * Id of the number of events with possible duplicate indication received statistic + */ + private final int possibleduplicateEventsReceivedId; + /** * Id of the event queue time statistic */ @@ -159,7 +170,11 @@ public static GatewayReceiverStats createGatewayReceiverStats(StatisticsFactory f.createLongCounter(EXCEPTIONS_OCCURRED, "number of exceptions occurred while porcessing the batches", "operations"), f.createLongCounter(EVENTS_RETRIED, - "total number events retried by this GatewayReceiver due to exceptions", "operations")}; + "total number events retried by this GatewayReceiver due to exceptions", "operations"), + f.createLongCounter(POSSIBLE_DUPLICATE_EVENTS_RECEIVED, + "total number of possible duplicate events received by this GatewayReceiver", + "operations") + }; return new GatewayReceiverStats(f, ownerName, typeName, descriptors, meterRegistry); } @@ -178,6 +193,7 @@ public GatewayReceiverStats(StatisticsFactory f, String ownerName, String typeNa unknowsOperationsReceivedId = statType.nameToId(UNKNOWN_OPERATIONS_RECEIVED); exceptionsOccurredId = statType.nameToId(EXCEPTIONS_OCCURRED); eventsRetriedId = statType.nameToId(EVENTS_RETRIED); + possibleduplicateEventsReceivedId = statType.nameToId(POSSIBLE_DUPLICATE_EVENTS_RECEIVED); this.meterRegistry = meterRegistry; eventsReceivedCounter = LegacyStatCounter.builder(EVENTS_RECEIVED_COUNTER_NAME) @@ -198,6 +214,19 @@ public long getDuplicateBatchesReceived() { return stats.getLong(duplicateBatchesReceivedId); } + + /** + * Increments the number of duplicate events received by 1. + */ + public void incPossibleDuplicateEventsReceived() { + stats.incLong(possibleduplicateEventsReceivedId, 1); + } + + public long getPossibleDuplicateEventsReceived() { + return stats.getLong(possibleduplicateEventsReceivedId); + } + + /** * Increments the number of out of order batches received by 1. */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java index 13e36e7f9bf6..c4f48b7dd6af 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java @@ -49,4 +49,8 @@ public interface InternalGatewaySender extends GatewaySender { void setStartEventProcessorInPausedState(); int getEventQueueSize(); + + + void prepareForStop(); + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 678286c92b3f..c37b4aefc8de 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -593,6 +593,13 @@ public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR, } finally { if (prQ != null) { userRegionNameToShadowPRMap.put(userPR.getFullPath(), prQ); + prQ.setSentGatewaySenderStoppedMessage(false); + if (prQ.getDataStore() != null) { + final Set buckets = prQ.getDataStore().getAllLocalBucketRegions(); + for (BucketRegion br : buckets) { + br.setReceivedGatewaySenderStoppedMessage(false); + } + } } /* * Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java new file mode 100644 index 000000000000..d1c2de3166d5 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.DataSerializer; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.PooledDistributionMessage; +import org.apache.geode.internal.cache.BucketRegionQueue; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.LocalRegion.InitializationLevel; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +/** + * Sets events in the remote secondary queues to possible duplicate + * + * @since Geode 1.15 + */ +public class ParallelQueueSetPossibleDuplicateMessage extends PooledDistributionMessage { + + private static final Logger logger = LogService.getLogger(); + + private int reason; + private Map>> regionToDuplicateEventsMap; + + public static final int UNSUCCESSFULLY_DISPATCHED = 0; + public static final int RESET_BATCH = 1; + public static final int LOAD_FROM_TEMP_QUEUE = 2; + public static final int STOPPED_GATEWAY_SENDER = 3; + + + public ParallelQueueSetPossibleDuplicateMessage() {} + + public ParallelQueueSetPossibleDuplicateMessage(int reason, + Map>> regionToDuplicateEventsMap) { + this.reason = reason; + this.regionToDuplicateEventsMap = regionToDuplicateEventsMap; + } + + @Override + public int getDSFID() { + return PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE; + } + + @Override + public String toString() { + String cname = getShortClassName(); + final StringBuilder sb = new StringBuilder(cname); + sb.append("reason=").append(reason); + sb.append(" regionToDispatchedKeysMap=").append(regionToDuplicateEventsMap); + sb.append(" sender=").append(getSender()); + return sb.toString(); + } + + @Override + protected void process(ClusterDistributionManager dm) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + final InternalCache cache = dm.getCache(); + + if (cache == null) { + return; + } + final InitializationLevel oldLevel = + LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); + try { + for (String name : regionToDuplicateEventsMap.keySet()) { + final PartitionedRegion region = (PartitionedRegion) cache.getRegion(name); + if (region == null) { + continue; + } + + AbstractGatewaySender abstractSender = region.getParallelGatewaySender(); + // Find the map: bucketId to dispatchedKeys + // Find the bucket + // Destroy the keys + Map> bucketIdToDispatchedKeys = + this.regionToDuplicateEventsMap.get(name); + for (Integer bId : bucketIdToDispatchedKeys.keySet()) { + final String bucketFullPath = + SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR + + region.getBucketName(bId); + BucketRegionQueue brq = + (BucketRegionQueue) cache.getInternalRegionByPath(bucketFullPath); + + if (brq != null && reason == STOPPED_GATEWAY_SENDER) { + brq.setReceivedGatewaySenderStoppedMessage(true); + } + + if (isDebugEnabled) { + logger.debug( + "ParallelQueueSetPossibleDuplicateMessage : The bucket in the cache is bucketRegionName : {} bucket: {}", + bucketFullPath, brq); + } + + List dispatchedKeys = bucketIdToDispatchedKeys.get(bId); + if (dispatchedKeys != null && !dispatchedKeys.isEmpty()) { + for (Object key : dispatchedKeys) { + // First, clear the Event from tempQueueEvents at AbstractGatewaySender level, if + // exists + // synchronize on AbstractGatewaySender.queuedEventsSync while doing so + abstractSender.markAsDuplicateInTempQueueEvents(key); + + if (brq != null) { + if (isDebugEnabled) { + logger.debug( + "ParallelQueueSetPossibleDuplicateMessage : The bucket {} key {}.", + brq, key); + } + + if (brq.checkIfQueueContainsKey(key)) { + brq.setAsPossibleDuplicate(key); + } + } + } + } + } + } // + } finally { + LocalRegion.setThreadInitLevelRequirement(oldLevel); + } + } + + @Override + public void toData(DataOutput out, + SerializationContext context) throws IOException { + super.toData(out, context); + DataSerializer.writeInteger(this.reason, out); + DataSerializer.writeHashMap(this.regionToDuplicateEventsMap, out); + } + + @Override + public void fromData(DataInput in, + DeserializationContext context) throws IOException, ClassNotFoundException { + super.fromData(in, context); + this.reason = DataSerializer.readInteger(in); + this.regionToDuplicateEventsMap = DataSerializer.readHashMap(in); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java index b299925fbdb6..b9433dcbc65a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java @@ -48,6 +48,9 @@ public void start() {} @Override public void startWithCleanQueue() {} + @Override + public void prepareForStop() {} + @Override public void stop() {} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java index 82adbee5dd6d..561f0b46349c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java @@ -48,6 +48,9 @@ public void start() {} @Override public void startWithCleanQueue() {} + @Override + public void prepareForStop() {} + @Override public void stop() {} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java index 4b44799e0690..834f6ad474a2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java @@ -47,6 +47,9 @@ public void start() {} @Override public void startWithCleanQueue() {} + @Override + public void prepareForStop() {} + @Override public void stop() {} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java index b0f02ceb91a8..a239fd997259 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java @@ -48,6 +48,9 @@ public void start() {} @Override public void startWithCleanQueue() {} + @Override + public void prepareForStop() {} + @Override public void stop() {} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java index 92d1601d8bb6..7e8000f0d1af 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java @@ -556,10 +556,13 @@ public void testCleanQueueExecuteShadowPRWaitForBucketRecovery() { targetRs.add(region); PartitionedRegion shadowRegion = mock(PartitionedRegion.class); + PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class); when(regionFactory.create(any())).thenReturn(shadowRegion); when(shadowRegion.getFullPath()).thenReturn("_PARALLEL_GATEWAY_SENDER_QUEUE"); + when(shadowRegion.getDataStore()).thenReturn(dataStore); + when(dataStore.getAllLocalBucketRegions()).thenReturn(Collections.emptySet()); mockGatewaySenderStats(); @@ -624,10 +627,13 @@ public void testCleanQueueExecuteShadowPRWaitForRegionDestroyingToFinish() { targetRs.add(region); PartitionedRegion shadowRegion = mock(PartitionedRegion.class); + PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class); when(regionFactory.create(any())).thenReturn(shadowRegion); when(shadowRegion.getFullPath()).thenReturn("_PARALLEL_GATEWAY_SENDER_QUEUE"); + when(shadowRegion.getDataStore()).thenReturn(dataStore); + when(dataStore.getAllLocalBucketRegions()).thenReturn(Collections.emptySet()); mockGatewaySenderStats(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java new file mode 100644 index 000000000000..0b822ba36101 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.UNSUCCESSFULLY_DISPATCHED; +import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.ToDoubleFunction; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.Statistics; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DSClock; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.BucketRegionQueue; +import org.apache.geode.internal.cache.BucketRegionQueueHelper; +import org.apache.geode.internal.cache.CachePerfStats; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; +import org.apache.geode.internal.cache.wan.GatewaySenderStats; +import org.apache.geode.internal.statistics.DummyStatisticsFactory; +import org.apache.geode.internal.statistics.StatisticsManager; + +public class ParallelQueueSetPossibleDuplicateMessageJUnitTest { + + private static final String GATEWAY_SENDER_ID = "ny"; + private static final int BUCKET_ID = 85; + private static final long KEY = 198; + + private GemFireCacheImpl cache; + private PartitionedRegion queueRegion; + private AbstractGatewaySender sender; + private PartitionedRegion rootRegion; + private BucketRegionQueue bucketRegionQueue; + private BucketRegionQueueHelper bucketRegionQueueHelper; + private GatewaySenderStats stats; + + @Before + public void setUpGemFire() { + createCache(); + createQueueRegion(); + createGatewaySender(); + createRootRegion(); + createBucketRegionQueue(); + } + + private void createCache() { + // Mock cache + cache = mock(GemFireCacheImpl.class); + DistributedSystem ds = mock(DistributedSystem.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + + MeterRegistry mr = mock(MeterRegistry.class); + StatisticsManager sm = mock(StatisticsManager.class); + ClusterDistributionManager dm = mock(ClusterDistributionManager.class); + + when(cache.getDistributedSystem()).thenReturn(ds); + when(cache.getInternalDistributedSystem()).thenReturn(ids); + when(cache.getDistributionManager()).thenReturn(dm); + + when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class)); + when(cache.getMeterRegistry()).thenReturn(mr); + when(cache.getCancelCriterion()).thenReturn(mock(CancelCriterion.class)); + + + cache.getCancelCriterion().checkCancelInProgress(null); + + when(ds.createAtomicStatistics(any(), anyString())).thenReturn(mock(Statistics.class)); + when(ids.getStatisticsManager()).thenReturn(sm); + when(ids.getClock()).thenReturn(mock(DSClock.class)); + when(ids.getDistributionManager()).thenReturn(dm); + when(ids.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class)); + + when(mr.timer(any(), any(), any())).thenReturn(mock(Timer.class)); + when(mr.gauge(anyString(), any(), any(ToDoubleFunction.class))).thenReturn(mock(Gauge.class)); + when(mr.config()).thenReturn(mock(MeterRegistry.Config.class)); + + when(sm.createAtomicStatistics(any(), anyString())).thenReturn(mock(Statistics.class)); + + when(dm.getConfig()).thenReturn(mock(DistributionConfig.class)); + when(dm.getCache()).thenReturn(cache); + + } + + private void createQueueRegion() { + // Mock queue region + queueRegion = + ParallelGatewaySenderHelper.createMockQueueRegion(cache, + ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID)); + } + + private void createGatewaySender() { + // Mock gateway sender + sender = ParallelGatewaySenderHelper.createGatewaySender(cache); + when(queueRegion.getParallelGatewaySender()).thenReturn(sender); + when(sender.getQueues()).thenReturn(null); + when(sender.getDispatcherThreads()).thenReturn(1); + stats = new GatewaySenderStats(new DummyStatisticsFactory(), "gatewaySenderStats-", "ln", + disabledClock()); + when(sender.getStatistics()).thenReturn(stats); + } + + private void createRootRegion() { + // Mock root region + rootRegion = mock(PartitionedRegion.class); + when(rootRegion.getFullPath()) + .thenReturn(SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME); + when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)) + .thenReturn(rootRegion); + when(cache.getRegion(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID))) + .thenReturn(queueRegion); + } + + private void createBucketRegionQueue() { + // Create BucketRegionQueue + BucketRegionQueue realBucketRegionQueue = ParallelGatewaySenderHelper + .createBucketRegionQueue(cache, rootRegion, queueRegion, BUCKET_ID); + bucketRegionQueue = spy(realBucketRegionQueue); + + bucketRegionQueueHelper = + new BucketRegionQueueHelper(cache, queueRegion, bucketRegionQueue); + } + + @Test + public void validateSetPossibleDuplicateKeyInUninitializedBucketRegionQueue() throws Exception { + + assertThat(bucketRegionQueue.isInitialized()).isFalse(); + + // Create a real ConcurrentParallelGatewaySenderQueue + ParallelGatewaySenderEventProcessor processor = + ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(sender); + GatewaySenderEventImpl gsEvent = mock(GatewaySenderEventImpl.class); + + // Add a mock GatewaySenderEventImpl to the temp queue + BlockingQueue tempQueue = + createTempQueueAndAddEvent(processor, gsEvent); + assertThat(tempQueue.size()).isEqualTo(1); + + createAndProcessParallelQueueSetPossibleDuplicateMessage(); + + verify(sender, times(1)).markAsDuplicateInTempQueueEvents(KEY); + verify(bucketRegionQueue, times(0)).setAsPossibleDuplicate(KEY); + + } + + @Test + public void validateSetPossibleDuplicateKeyInInitializedBucketRegionQueue() throws Exception { + + assertThat(bucketRegionQueue.isInitialized()).isFalse(); + + // Create a real ConcurrentParallelGatewaySenderQueue + ParallelGatewaySenderEventProcessor processor = + ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(sender); + GatewaySenderEventImpl gsEvent = mock(GatewaySenderEventImpl.class); + + // Add a mock GatewaySenderEventImpl to the temp queue + BlockingQueue tempQueue = + createTempQueueAndAddEvent(processor, gsEvent); + assertThat(tempQueue.size()).isEqualTo(1); + + // Clean up destroyed tokens + bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete(); + + assertThat(bucketRegionQueue.isInitialized()).isTrue(); + + bucketRegionQueue.pushKeyIntoQueue(KEY); + + createAndProcessParallelQueueSetPossibleDuplicateMessage(); + + verify(sender, times(1)).markAsDuplicateInTempQueueEvents(KEY); + verify(bucketRegionQueue, times(1)).setAsPossibleDuplicate(KEY); + + } + + private void createAndProcessParallelQueueSetPossibleDuplicateMessage() { + ParallelQueueSetPossibleDuplicateMessage message = + new ParallelQueueSetPossibleDuplicateMessage(UNSUCCESSFULLY_DISPATCHED, + createRegionToDispatchedKeysMap()); + message.process((ClusterDistributionManager) cache.getDistributionManager()); + } + + private HashMap>> createRegionToDispatchedKeysMap() { + HashMap>> regionToDispatchedKeys = new HashMap<>(); + Map> bucketIdToDispatchedKeys = new HashMap<>(); + List dispatchedKeys = new ArrayList<>(); + dispatchedKeys.add(KEY); + bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys); + regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID), + bucketIdToDispatchedKeys); + return regionToDispatchedKeys; + } + + private BlockingQueue createTempQueueAndAddEvent( + ParallelGatewaySenderEventProcessor processor, GatewaySenderEventImpl event) { + ParallelGatewaySenderQueue queue = (ParallelGatewaySenderQueue) processor.getQueue(); + Map> tempQueueMap = + queue.getBucketToTempQueueMap(); + BlockingQueue tempQueue = new LinkedBlockingQueue<>(); + when(event.getShadowKey()).thenReturn(KEY); + tempQueue.add(event); + tempQueueMap.put(BUCKET_ID, tempQueue); + return tempQueue; + } +} diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java index bf7ff093803e..a7c9e45b11d7 100644 --- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java +++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java @@ -683,6 +683,7 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer short ABORT_BACKUP_REQUEST = 2183; short MEMBER_IDENTIFIER = 2184; short HOST_AND_PORT = 2185; + short PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE = 2186; // NOTE, codes > 65535 will take 4 bytes to serialize diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 7461bffe3d64..2969a872961c 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -1281,6 +1281,7 @@ public static List getReceiverStats() { statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived()); statsList.add(gatewayReceiverStats.getEarlyAcks()); statsList.add(gatewayReceiverStats.getExceptionsOccurred()); + statsList.add(gatewayReceiverStats.getPossibleDuplicateEventsReceived()); return statsList; } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java new file mode 100644 index 000000000000..f9248ef7d758 --- /dev/null +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.logging.log4j.Logger; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.junit.categories.WanTest; + +@Category({WanTest.class}) +public class ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest + extends WANTestBase { + + private static final long serialVersionUID = 2L; + private static final Logger logger = LogService.getLogger(); + + public ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest() { + super(); + } + + + private final int localId = 1; + private final int remoteId = 2; + + @Override + protected final void postSetUpWANTestBase() throws Exception { + // The restart tests log this string + IgnoredException.addIgnoredException("failed accepting client connection"); + } + + /** + * When gateway senders starts to unqueue, and check that received events are + * not marked as possible duplicate. + */ + @Test + public void testPersistentPartitionedRegionWithGatewaySenderCheckReceiverNoPossibleDuplicate() + throws InterruptedException { + int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId)); + int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5); + vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5)); + vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5)); + + vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false)); + + vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + + + vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, + 100, isOffHeap())); + vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, + 100, isOffHeap())); + + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000)); + + long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7)); + long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7)); + + assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(0); + } + + + /** + * When gateway senders starts to unqueue, stop gateway sender, and check that some evnts are + * dispatched to receiving side, + * but events are not removed on sending side. + */ + @Test + public void testPersistentPartitionedRegionWithGatewaySenderCheckReceiverPossibleDuplicate() + throws InterruptedException { + int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId)); + int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5); + vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5)); + vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5)); + + vm4.invoke( + () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false)); + vm5.invoke( + () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false)); + + vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + + + vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, + 100, isOffHeap())); + vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, + 100, isOffHeap())); + + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000)); + + createReceiverInVMs(vm2, vm3); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000)); + + long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7)); + long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7)); + + assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(100); + } + + @Test + public void testpersistentWanGateway_CheckReceiverPossibleDuplicate_afterSenderRestarted() { + int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId)); + int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort)); + createCacheInVMs(nyPort, vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + // keep a larger batch to minimize number of exception occurrences in the log + vm4.invoke( + () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false)); + vm5.invoke( + () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false)); + vm6.invoke( + () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false)); + vm7.invoke( + () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false)); + + vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + + + vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, + 100, isOffHeap())); + vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, + 100, isOffHeap())); + + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000)); + + // Just making sure that though the remote site is started later, + // remote site is still able to get the data. Since the receivers are + // started before creating partition region it is quite possible that the + // region may loose some of the events. This needs to be handled by the code + + vm5.invoke(() -> WANTestBase.killSender()); + + createReceiverInVMs(vm2, vm3); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000)); + + long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7)); + long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7)); + + assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(40); + } + + @Test + public void testpersistentWanGateway_checkPossibleDuplicateEvents_afterServerDown() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + // keep a larger batch to minimize number of exception occurrences in the log + vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true)); + vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true)); + vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true)); + + vm4.invoke(createPartitionedRegionRunnable()); + vm5.invoke(createPartitionedRegionRunnable()); + vm6.invoke(createPartitionedRegionRunnable()); + vm7.invoke(createPartitionedRegionRunnable()); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + // make sure all the senders are running before doing any puts + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000)); + + // Just making sure that though the remote site is started later, + // remote site is still able to get the data. Since the receivers are + // started before creating partition region it is quite possible that the + // region may loose some of the events. This needs to be handled by the code + + vm4.invoke(() -> stopSender("ln")); + vm5.invoke(() -> stopSender("ln")); + vm6.invoke(() -> stopSender("ln")); + vm7.invoke(() -> stopSender("ln")); + + Integer vm4NumDupplicate = vm4.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln")); + Integer vm5NumDupplicate = vm5.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln")); + Integer vm6NumDupplicate = vm6.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln")); + Integer vm7NumDupplicate = vm7.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln")); + + assertThat(vm4NumDupplicate + vm5NumDupplicate + vm6NumDupplicate + vm7NumDupplicate) + .isEqualTo(800); + + vm5.invoke(() -> WANTestBase.killSender()); + + vm4NumDupplicate = vm4.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln")); + vm6NumDupplicate = vm6.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln")); + vm7NumDupplicate = vm7.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln")); + + assertThat(vm4NumDupplicate + vm6NumDupplicate + vm7NumDupplicate).isEqualTo(800); + } + + protected SerializableRunnableIF createPartitionedRegionRunnable() { + return () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 1, 100, + isOffHeap()); + } + + protected SerializableRunnableIF waitForSenderRunnable() { + return () -> WANTestBase.waitForSenderRunningState("ln"); + } + +} diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java index a0edf12f68b8..7079f485aa85 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java @@ -107,6 +107,17 @@ private void start(boolean cleanQueues) { } } + @Override + public void prepareForStop() { + if (!isRunning()) { + return; + } + pause(); + if (eventProcessor != null && !eventProcessor.isStopped()) { + eventProcessor.prepareForStopProcessing(); + } + } + @Override public void stop() { getLifeCycleLock().writeLock().lock(); diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java index a639d3366d4a..f32d5b9a0507 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java @@ -129,6 +129,9 @@ protected AbstractGatewaySenderEventProcessor createEventProcessor(boolean clean return eventProcessor; } + @Override + public void prepareForStop() {} + @Override public void stop() { if (logger.isDebugEnabled()) {