Skip to content

Commit

Permalink
GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage (#7323)
Browse files Browse the repository at this point in the history
* GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage to signal duplicate events on secondary buckets
  • Loading branch information
mivanac authored Jun 29, 2022
1 parent ae4a99f commit ef7dc45
Show file tree
Hide file tree
Showing 28 changed files with 1,108 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,10 @@ org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage,2
fromData,15
toData,15

org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage,2
fromData,26
toData,26

org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation$DestroyMessage,2
fromData,46
toData,41
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ private void start(boolean cleanQueues) {
}
}

@Override
public void prepareForStop() {}

@Override
public void stop() {
getLifeCycleLock().writeLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ protected AbstractGatewaySenderEventProcessor createEventProcessor(boolean clean
return eventProcessor;
}

@Override
public void prepareForStop() {}

@Override
public void stop() {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationOperation;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
import org.apache.geode.internal.cache.wan.serial.BatchDestroyOperation;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
Expand Down Expand Up @@ -985,6 +986,8 @@ private void registerDSFIDTypes(DSFIDSerializer serializer) {
serializer.register(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
serializer.register(HOST_AND_PORT, HostAndPort.class);
serializer.register(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class);
serializer.register(PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE,
ParallelQueueSetPossibleDuplicateMessage.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
*/
package org.apache.geode.internal.cache;

import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.LOAD_FROM_TEMP_QUEUE;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -31,14 +34,20 @@
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
import org.apache.geode.internal.offheap.OffHeapClearRequired;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;
Expand Down Expand Up @@ -218,11 +227,12 @@ protected void loadEventsFromTempQueue() {
if (queues != null) {
ConcurrentParallelGatewaySenderQueue prq =
(ConcurrentParallelGatewaySenderQueue) queues.toArray()[0];
// synchronized (prq.getBucketToTempQueueMap()) {

BlockingQueue<GatewaySenderEventImpl> tempQueue = prq.getBucketTmpQueue(getId());
// .getBucketToTempQueueMap().get(getId());
if (tempQueue != null && !tempQueue.isEmpty()) {
synchronized (tempQueue) {
Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap =
new HashMap<>();
try {
// ParallelQueueRemovalMessage checks for the key in BucketRegionQueue
// and if not found there, it removes it from tempQueue. When tempQueue
Expand All @@ -235,6 +245,9 @@ protected void loadEventsFromTempQueue() {
try {
event.setPossibleDuplicate(true);
if (addToQueue(event.getShadowKey(), event)) {
if (notifyDuplicateSupported()) {
addDuplicateEvent(regionToDuplicateEventsMap, event);
}
event = null;
}
} catch (ForceReattemptException e) {
Expand All @@ -257,13 +270,57 @@ protected void loadEventsFromTempQueue() {
}
getInitializationLock().writeLock().unlock();
}
notifyDuplicateEvents(regionToDuplicateEventsMap);
}
}
}
}

private boolean notifyDuplicateSupported() {
return !(this.getPartitionedRegion().getParallelGatewaySender().getEventProcessor()
.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher);
}

private void notifyDuplicateEvents(
Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap) {
if (regionToDuplicateEventsMap.isEmpty()) {
return;
}
if (getPartitionedRegion().getRegionAdvisor() == null) {
return;
}

Set<InternalDistributedMember> recipients =
getPartitionedRegion().getRegionAdvisor().adviseDataStore();

if (recipients.isEmpty()) {
return;
}

// }
InternalDistributedSystem ids = getCache().getInternalDistributedSystem();
DistributionManager dm = ids.getDistributionManager();
dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0);

if (!recipients.isEmpty()) {
ParallelQueueSetPossibleDuplicateMessage possibleDuplicateMessage =
new ParallelQueueSetPossibleDuplicateMessage(LOAD_FROM_TEMP_QUEUE,
regionToDuplicateEventsMap);
possibleDuplicateMessage.setRecipients(recipients);
dm.putOutgoing(possibleDuplicateMessage);
}
}

private void addDuplicateEvent(Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap,
GatewaySenderEventImpl event) {
Map<Integer, List<Object>> bucketIdToDispatchedKeys = regionToDuplicateEventsMap
.computeIfAbsent(getPartitionedRegion().getFullPath(), k -> new HashMap<>());

List<Object> dispatchedKeys =
bucketIdToDispatchedKeys.computeIfAbsent(getId(), k -> new ArrayList<>());

dispatchedKeys.add(event.getShadowKey());
}

@Override
public void forceSerialized(EntryEventImpl event) {
// NOOP since we want the value in the region queue to stay in object form.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor {

private BucketAdvisor startingBucketAdvisor;

private volatile boolean hasBecomePrimary = false;

private final PartitionedRegion pRegion;

final ConcurrentMap<String, Boolean> destroyedShadowBuckets = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -498,6 +500,13 @@ public DistributedRegion getRegionForDeltaGII() {
}


BucketAdvisor getParentAdvisor() {
return parentAdvisor;
}

boolean getHasBecomePrimary() {
return hasBecomePrimary;
}

/**
* Called by the RegionAdvisor.profileRemoved, this method tests to see if the missing member is
Expand Down Expand Up @@ -1153,6 +1162,7 @@ private boolean acquiredPrimaryLock() {
try {
synchronized (this) {
if (isHosting() && (isVolunteering() || isBecomingPrimary())) {
hasBecomePrimary = isBecomingPrimary();
Bucket br = regionAdvisor.getBucket(getBucket().getId());
if (br instanceof BucketRegion) {
((BucketRegion) br).beforeAcquiringPrimaryState();
Expand All @@ -1167,6 +1177,8 @@ private boolean acquiredPrimaryLock() {
if (hasPrimary() && isPrimary()) {
shouldInvokeListeners = true;
}
} else {
hasBecomePrimary = false;
}
}
}
Expand Down Expand Up @@ -2159,6 +2171,7 @@ private boolean requestPrimaryState(byte requestedState) {
private void changeFromPrimaryTo(byte requestedState) {
try {
primaryState = requestedState;
hasBecomePrimary = false;
} finally {
getPartitionedRegionStats().incPrimaryBucketCount(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public Object getDeserialized(boolean copyOnRead) {
}
}

private boolean receivedGatewaySenderStoppedMessage = false;

private final int redundancy;

/** the partitioned region to which this bucket belongs */
Expand Down Expand Up @@ -2535,4 +2537,12 @@ Set<InternalDistributedMember> getDestroyRegionRecipients() {
return getSystem().getDistributionManager().getOtherDistributionManagerIds();
}

public boolean isReceivedGatewaySenderStoppedMessage() {
return receivedGatewaySenderStoppedMessage;
}

public void setReceivedGatewaySenderStoppedMessage(boolean notified) {
receivedGatewaySenderStoppedMessage = notified;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
Expand Down Expand Up @@ -202,6 +203,23 @@ private void destroyFailedBatchRemovalMessageKeys() {

@Override
public void beforeAcquiringPrimaryState() {
PartitionedRegion region = getPartitionedRegion();

if (region != null && region.getParallelGatewaySender() != null) {
AbstractGatewaySenderEventProcessor ep =
region.getParallelGatewaySender().getEventProcessor();

if (ep != null && !(ep.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
if (isReceivedGatewaySenderStoppedMessage()) {
setReceivedGatewaySenderStoppedMessage(false);
return;
}
BucketAdvisor parent = getParentAdvisor(getBucketAdvisor());
if (parent.getHasBecomePrimary()) {
return;
}
}
}
markAsDuplicate.addAll(eventSeqNumDeque);
}

Expand Down Expand Up @@ -660,4 +678,24 @@ public List<Object> getHelperQueueList() {
}
}

public void setAsPossibleDuplicate(Object key) {
Object object = optimalGet(key);
if (object != null) {
((GatewaySenderEventImpl) object).setPossibleDuplicate(true);
}
}

public boolean checkIfQueueContainsKey(Object key) {
return eventSeqNumDeque.contains(key);
}

BucketAdvisor getParentAdvisor(BucketAdvisor advisor) {
BucketAdvisor parent = advisor.getParentAdvisor();
while (parent != null) {
advisor = parent;
parent = advisor.getParentAdvisor();
}
return advisor;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationListener;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.wan.WANServiceProvider;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.xmlcache.CacheServerCreation;
Expand Down Expand Up @@ -2182,9 +2183,20 @@ boolean doClose(String reason, Throwable systemFailureCause, boolean keepAlive,
return false;
}

boolean isDebugEnabled = logger.isDebugEnabled();

for (GatewaySender sender : allGatewaySenders) {
try {
((InternalGatewaySender) sender).prepareForStop();
} catch (Exception exception) {
if (isDebugEnabled) {
logger.debug("When calling Prepare for stop gw sender, ignore exception " + exception);
}
}
}

CLOSING_THREAD.set(Thread.currentThread());
try {
boolean isDebugEnabled = logger.isDebugEnabled();

// First close the ManagementService
system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,8 @@ public synchronized String dump() {

private boolean regionCreationNotified;

private boolean sentGatewaySenderStoppedMessage = false;

public interface RegionAdvisorFactory {
RegionAdvisor create(PartitionedRegion region);
}
Expand Down Expand Up @@ -10170,4 +10172,12 @@ void notifyRegionCreated() {
public boolean areRecoveriesInProgress() {
return prStats.getRecoveriesInProgress() > 0;
}

public boolean isSentGatewaySenderStoppedMessage() {
return sentGatewaySenderStoppedMessage;
}

public void setSentGatewaySenderStoppedMessage(boolean notified) {
sentGatewaySenderStoppedMessage = notified;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ public void cmdExecute(final @NotNull Message clientMessage,
}
boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;

if (possibleDuplicate) {
stats.incPossibleDuplicateEventsReceived();
}

// Retrieve the region name from the message parts
Part regionNamePart = clientMessage.getPart(partNumber + 2);
String regionName = regionNamePart.getCachedString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,9 @@ public boolean isForInternalUse() {
@Override
public abstract void startWithCleanQueue();

@Override
public abstract void prepareForStop();

@Override
public abstract void stop();

Expand Down Expand Up @@ -1298,6 +1301,27 @@ public boolean removeFromTempQueueEvents(Object tailKey) {
}
}

public boolean markAsDuplicateInTempQueueEvents(Object tailKey) {
synchronized (queuedEventsSync) {
final boolean isDebugEnabled = logger.isDebugEnabled();

for (TmpQueueEvent event : tmpQueuedEvents) {
if (tailKey.equals(event.getEvent().getTailKey())) {
if (isDebugEnabled) {
logger.debug(
"shadowKey {} is found in tmpQueueEvents at AbstractGatewaySender level. Marking it..",
tailKey);
}
event.getEvent().setPossibleDuplicate(true);
return true;
}
}

return false;
}
}


/**
* During sender is getting stopped, if there are any cache operation on queue then that event
* will be stored in temp queue. Once sender is started, these event from tmp queue will be
Expand Down
Loading

0 comments on commit ef7dc45

Please sign in to comment.