diff --git a/boms/geode-all-bom/src/test/resources/expected-pom.xml b/boms/geode-all-bom/src/test/resources/expected-pom.xml
index f4e03f7f2eed..e503da0905fe 100644
--- a/boms/geode-all-bom/src/test/resources/expected-pom.xml
+++ b/boms/geode-all-bom/src/test/resources/expected-pom.xml
@@ -912,6 +912,12 @@
${version}
compile
+
+ org.apache.geode
+ geode-wan-txgrouping
+ ${version}
+ compile
+
org.apache.geode
geode-web
diff --git a/buildSrc/src/main/resources/japicmp_exceptions.json b/buildSrc/src/main/resources/japicmp_exceptions.json
index d7ee51113eee..46a5f052bdd5 100755
--- a/buildSrc/src/main/resources/japicmp_exceptions.json
+++ b/buildSrc/src/main/resources/japicmp_exceptions.json
@@ -2,5 +2,11 @@
"Class org.apache.geode.management.builder.GeodeClusterManagementServiceBuilder": "Moved internal class to fix split packages between geode-core and geode-management",
"Class org.apache.geode.management.api.ClusterManagementOperation": "Fixed missing @Experimental annotation",
"Method org.apache.geode.management.api.ClusterManagementOperation.getEndpoint()": "Fixed missing @Experimental annotation",
- "Method org.apache.geode.management.api.ClusterManagementOperation.getOperator()": "Fixed missing @Experimental annotation"
+ "Method org.apache.geode.management.api.ClusterManagementOperation.getOperator()": "Fixed missing @Experimental annotation",
+ "Class org.apache.geode.cache.wan.GatewaySender":"Added to support new types of gatewaysenders",
+ "Method org.apache.geode.cache.wan.GatewaySender.getType()":"Added to support new types of gatewaysenders",
+ "Class org.apache.geode.cache.wan.GatewaySenderFactory":"Added to support new types of gatewaysenders",
+ "Method org.apache.geode.cache.wan.GatewaySenderFactory.setType(java.lang.String)":"Added to support new types of gatewaysenders",
+ "Class org.apache.geode.management.GatewaySenderMXBean":"Added to support new types of gatewaysenders",
+ "Method org.apache.geode.management.GatewaySenderMXBean.getType()":"Added to support new types of gatewaysenders"
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 1adae4f07485..43f208fda456 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -24,7 +24,7 @@
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
+import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.test.junit.categories.AEQTest;
@Category({AEQTest.class})
@@ -55,7 +55,7 @@ public void tearDown() {
@Test
public void testStopClearsStats() {
- GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+ GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
@@ -81,7 +81,7 @@ public void testStopClearsStats() {
@Test
public void testStopStart() {
- GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+ GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index 700cc4baeff9..f64557782fe9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -28,7 +28,7 @@
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
-import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
+import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.xmlcache.AsyncEventQueueCreation;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
@@ -53,13 +53,14 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
* Used internally to pass the attributes from this factory to the real GatewaySender it is
* creating.
*/
- private final GatewaySenderAttributes gatewaySenderAttributes;
+ private final GatewaySenderAttributesImpl gatewaySenderAttributes;
public AsyncEventQueueFactoryImpl(InternalCache cache) {
- this(cache, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL);
+ this(cache, new GatewaySenderAttributesImpl(), DEFAULT_BATCH_TIME_INTERVAL);
}
- AsyncEventQueueFactoryImpl(InternalCache cache, GatewaySenderAttributes gatewaySenderAttributes,
+ AsyncEventQueueFactoryImpl(InternalCache cache,
+ GatewaySenderAttributesImpl gatewaySenderAttributes,
int batchTimeInterval) {
this.cache = cache;
this.gatewaySenderAttributes = gatewaySenderAttributes;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 4afb51d8724e..662649f70cd7 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -210,6 +210,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}
+ @Override
+ public String getType() {
+ return "ParallelAsyncEventQueue";
+ }
+
private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
@@ -218,4 +223,5 @@ private ThreadsMonitoring getThreadMonitorObj() {
return null;
}
}
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 1713feff76aa..b335b1511779 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -264,6 +264,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}
+ @Override
+ public String getType() {
+ return "SerialAsyncEventQueue";
+ }
+
private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
index ac33509bfa16..049d4678a4f5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
@@ -2588,6 +2588,7 @@ public void setOverflowDirectory(String value) {
* <attribute name="id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" />
* <attribute name="remote-distributed-system-id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" />
* <attribute name="parallel" type="{http://www.w3.org/2001/XMLSchema}boolean" />
+ * <attribute name="type" type="{http://www.w3.org/2001/XMLSchema}string" />
* <attribute name="manual-start" type="{http://www.w3.org/2001/XMLSchema}boolean" />
* <attribute name="socket-buffer-size" type="{http://www.w3.org/2001/XMLSchema}string" />
* <attribute name="socket-read-timeout" type="{http://www.w3.org/2001/XMLSchema}string" />
@@ -2628,6 +2629,8 @@ public static class GatewaySender {
protected String remoteDistributedSystemId;
@XmlAttribute(name = "parallel")
protected Boolean parallel;
+ @XmlAttribute(name = "type")
+ protected String type;
@XmlAttribute(name = "manual-start")
protected Boolean manualStart;
@XmlAttribute(name = "socket-buffer-size")
@@ -2820,6 +2823,29 @@ public void setParallel(Boolean value) {
parallel = value;
}
+ /**
+ * Gets the value of the parallel property.
+ *
+ * possible object is
+ * {@link String }
+ *
+ */
+
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * Sets the value of the type property.
+ *
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setType(String value) {
+ this.type = value;
+ }
+
/**
* Gets the value of the manualStart property.
*
diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
index 32dcec71b6d4..1fd3442b5366 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
@@ -17,8 +17,6 @@
import java.util.List;
import org.apache.geode.annotations.Immutable;
-import org.apache.geode.internal.lang.SystemProperty;
-import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.util.internal.GeodeGlossary;
/**
@@ -155,32 +153,6 @@ public interface GatewaySender {
int CONNECTION_RETRY_INTERVAL = Integer
.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "gateway-connection-retry-interval", 1000);
- /**
- * Number of times to retry to get events for a transaction from the gateway sender queue when
- * group-transaction-events is set to true.
- * When group-transaction-events is set to true and a batch ready to be sent does not contain
- * all the events for all the transactions to which the events belong, the gateway sender will try
- * to get the missing events of the transactions from the queue to add them to the batch
- * before sending it.
- * If the missing events are not in the queue when the gateway sender tries to get them
- * it will retry for a maximum of times equal to the value set in this parameter before
- * delivering the batch without the missing events and logging an error.
- * Setting this parameter to a very low value could cause that under heavy load and
- * group-transaction-events set to true, batches are sent with incomplete transactions. Setting it
- * to a high value could cause that under heavy load and group-transaction-events set to true,
- * batches are held for some time before being sent.
- */
- int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES =
- Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-retries",
- 10);
- /**
- * Milliseconds to wait before retrying to get events for a transaction from the
- * gateway sender queue when group-transaction-events is true.
- */
- int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS =
- SystemProperty.getProductIntegerProperty(
- SystemPropertyHelper.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS).orElse(1);
-
/**
* The order policy. This enum is applicable only when concurrency-level is > 1.
*
@@ -418,10 +390,13 @@ enum OrderPolicy {
*/
boolean isParallel();
+ String getType();
+
/**
* Returns groupTransactionEvents boolean property for this GatewaySender.
*
* @return groupTransactionEvents boolean property for this GatewaySender
+ * @deprecated use {@link #getType()}.
*
*/
boolean mustGroupTransactionEvents();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
index 6c9e92bbb491..2f3372ee5625 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
@@ -49,6 +49,8 @@ public interface GatewaySenderFactory {
*/
GatewaySenderFactory setGroupTransactionEvents(boolean groupTransactionEvents);
+ GatewaySenderFactory setType(String type);
+
/**
* Adds a GatewayEventFilter
*
@@ -188,7 +190,8 @@ public interface GatewaySenderFactory {
*
* @param filter The GatewayEventSubstitutionFilter
*/
- GatewaySenderFactory setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
+ GatewaySenderFactory setGatewayEventSubstitutionFilter(
+ GatewayEventSubstitutionFilter, ?> filter);
/**
* If true, receiver member id is checked by all dispatcher threads when the connection is
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index e5fa44a83e27..0408f02e93ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -15,6 +15,7 @@
package org.apache.geode.internal.cache.wan;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import java.io.IOException;
import java.util.ArrayList;
@@ -28,6 +29,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
@@ -101,8 +103,6 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
protected int remoteDSId;
- protected String locName;
-
protected int socketBufferSize;
protected int socketReadTimeout;
@@ -121,14 +121,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
protected volatile int alertThreshold;
+ @Deprecated
protected boolean manualStart;
protected boolean isParallel;
- protected boolean groupTransactionEvents;
-
- protected int retriesToGetTransactionEventsFromQueue;
-
protected boolean isForInternalUse;
protected boolean isDiskSynchronous;
@@ -143,7 +140,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
protected boolean forwardExpirationDestroy;
- protected GatewayEventSubstitutionFilter substitutionFilter;
+ protected GatewayEventSubstitutionFilter, ?> substitutionFilter;
protected LocatorDiscoveryCallback locatorDiscoveryCallback;
@@ -176,7 +173,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
private String expectedReceiverUniqueId = "";
- protected Object queuedEventsSync = new Object();
+ protected final Object queuedEventsSync = new Object();
protected volatile boolean enqueuedAllTempQueueEvents = false;
@@ -196,17 +193,16 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
Integer.getInteger("GatewaySender.QUEUE_SIZE_THRESHOLD", 5000);
@MutableForTesting
- public static int TOKEN_TIMEOUT =
- Integer.getInteger("GatewaySender.TOKEN_TIMEOUT", 120000);
+ public static int TOKEN_TIMEOUT = Integer.getInteger("GatewaySender.TOKEN_TIMEOUT", 120000);
/**
- * The name of the DistributedLockService used when accessing the GatewaySender's meta data
+ * The name of the DistributedLockService used when accessing the GatewaySender's metadata
* region.
*/
public static final String LOCK_SERVICE_NAME = "gatewayEventIdIndexMetaData_lockService";
/**
- * The name of the GatewaySender's meta data region.
+ * The name of the GatewaySender's metadata region.
*/
protected static final String META_DATA_REGION_NAME = "gatewayEventIdIndexMetaData";
@@ -255,10 +251,8 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
isConflation = attrs.isBatchConflationEnabled();
isPersistence = attrs.isPersistenceEnabled();
alertThreshold = attrs.getAlertThreshold();
- manualStart = attrs.isManualStart();
+ copyDeprecatedAttributes(attrs);
isParallel = attrs.isParallel();
- groupTransactionEvents = attrs.mustGroupTransactionEvents();
- retriesToGetTransactionEventsFromQueue = attrs.getRetriesToGetTransactionEventsFromQueue();
isForInternalUse = attrs.isForInternalUse();
diskStoreName = attrs.getDiskStoreName();
remoteDSId = attrs.getRemoteDSId();
@@ -266,7 +260,7 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
transFilters = Collections.unmodifiableList(attrs.getGatewayTransportFilters());
listeners = attrs.getAsyncEventListeners();
substitutionFilter = attrs.getGatewayEventSubstitutionFilter();
- locatorDiscoveryCallback = attrs.getGatewayLocatoDiscoveryCallback();
+ locatorDiscoveryCallback = attrs.getGatewayLocatorDiscoveryCallback();
isDiskSynchronous = attrs.isDiskSynchronous();
policy = attrs.getOrderPolicy();
dispatcherThreads = attrs.getDispatcherThreads();
@@ -293,6 +287,11 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
forwardExpirationDestroy = attrs.isForwardExpirationDestroy();
}
+ @SuppressWarnings("deprecation")
+ private void copyDeprecatedAttributes(final GatewaySenderAttributes attrs) {
+ manualStart = attrs.isManualStart();
+ }
+
public GatewaySenderAdvisor getSenderAdvisor() {
return senderAdvisor;
}
@@ -351,7 +350,7 @@ public List getGatewayEventFilters() {
}
@Override
- public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
+ public GatewayEventSubstitutionFilter, ?> getGatewayEventSubstitutionFilter() {
return substitutionFilter;
}
@@ -378,15 +377,12 @@ public List getAsyncEventListeners() {
return listeners;
}
- public boolean hasListeners() {
- return !listeners.isEmpty();
- }
-
@Override
public boolean isForwardExpirationDestroy() {
return forwardExpirationDestroy;
}
+ @Deprecated
@Override
public boolean isManualStart() {
return manualStart;
@@ -397,7 +393,7 @@ public int getMaximumQueueMemory() {
return queueMemory;
}
- public int getMaximumMemeoryPerDispatcherQueue() {
+ public int getMaximumMemoryPerDispatcherQueue() {
return maxMemoryPerDispatcherQueue;
}
@@ -560,19 +556,10 @@ public boolean isParallel() {
return isParallel;
}
+ @Deprecated
@Override
public boolean mustGroupTransactionEvents() {
- return groupTransactionEvents;
- }
-
- /**
- * Returns retriesToGetTransactionEventsFromQueue int property for this GatewaySender.
- *
- * @return retriesToGetTransactionEventsFromQueue int property for this GatewaySender
- *
- */
- public int getRetriesToGetTransactionEventsFromQueue() {
- return retriesToGetTransactionEventsFromQueue;
+ return false;
}
public boolean isForInternalUse() {
@@ -589,7 +576,7 @@ public boolean isForInternalUse() {
public abstract void stop();
/**
- * Destroys the GatewaySender. Before destroying the sender, caller needs to to ensure that the
+ * Destroys the GatewaySender. Before destroying the sender, caller needs to ensure that the
* sender is stopped so that all the resources (threads, connection pool etc.) will be released
* properly. Stopping the sender is not handled in the destroy. Destroy is carried out in
* following steps: 1. Take the lifeCycleLock. 2. If the sender is attached to any application
@@ -731,19 +718,6 @@ public void setBatchTimeInterval(int batchTimeInterval) {
}
}
- /**
- * Set GroupTransactionEvents for this GatewaySender.
- *
- * Care must be taken to set this consistently across all gateway senders in the cluster and only
- * when safe to do so.
- *
- * @since Geode 1.15
- *
- */
- public void setGroupTransactionEvents(boolean groupTransactionEvents) {
- this.groupTransactionEvents = groupTransactionEvents;
- }
-
/**
* Set GatewayEventFilters for this GatewaySender.
*
@@ -761,15 +735,13 @@ public void setGatewayEventFilters(List filters) {
}
}
- public boolean beforeEnqueue(GatewayQueueEvent gatewayEvent) {
- boolean enqueue = true;
- for (GatewayEventFilter filter : getGatewayEventFilters()) {
- enqueue = filter.beforeEnqueue(gatewayEvent);
- if (!enqueue) {
- return enqueue;
+ public boolean beforeEnqueue(final GatewayQueueEvent, ?> gatewayEvent) {
+ for (final GatewayEventFilter filter : getGatewayEventFilters()) {
+ if (!filter.beforeEnqueue(gatewayEvent)) {
+ return false;
}
}
- return enqueue;
+ return true;
}
protected void stopProcessing() {
@@ -847,23 +819,21 @@ public synchronized boolean setServerLocation(ServerLocation location) {
return true;
}
- private class Stopper extends CancelCriterion {
- final CancelCriterion stper;
+ private static class Stopper extends CancelCriterion {
+ final CancelCriterion stopper;
Stopper(CancelCriterion stopper) {
- stper = stopper;
+ this.stopper = stopper;
}
@Override
public String cancelInProgress() {
- // checkFailure(); // done by stopper
- return stper.cancelInProgress();
+ return stopper.cancelInProgress();
}
@Override
public RuntimeException generateCancelledException(Throwable e) {
- RuntimeException result = stper.generateCancelledException(e);
- return result;
+ return stopper.generateCancelledException(e);
}
}
@@ -1013,7 +983,7 @@ public AbstractGatewaySenderEventProcessor getEventProcessor() {
*
* @return boolean True if the event is allowed.
*/
- private boolean checkForDistribution(EntryEventImpl event, GatewaySenderStats stats) {
+ private boolean checkForDistribution(EntryEventImpl event) {
if (event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL)) {
return false;
}
@@ -1026,11 +996,6 @@ private boolean checkForDistribution(EntryEventImpl event, GatewaySenderStats st
return true;
}
- public void distribute(EnumListenerEvent operation, EntryEventImpl event,
- List allRemoteDSIds) {
- distribute(operation, event, allRemoteDSIds, false);
- }
-
public void distribute(EnumListenerEvent operation, EntryEventImpl event,
List allRemoteDSIds, boolean isLastEventInTransaction) {
@@ -1045,13 +1010,13 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
final GatewaySenderStats stats = getStatistics();
stats.incEventsReceived();
- if (!checkForDistribution(event, stats)) {
+ if (!checkForDistribution(event)) {
stats.incEventsNotQueued();
return;
}
// this filter is defined by Asif which exist in old wan too. new wan has
- // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
+ // other GatewayEventFilter. Do we need to get rid of this filter? Cheetah is
// not considering this filter
if (!filter.enqueueEvent(event)) {
stats.incEventsFiltered();
@@ -1071,49 +1036,9 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
}
if (callbackArg instanceof GatewaySenderEventCallbackArgument) {
- GatewaySenderEventCallbackArgument seca = (GatewaySenderEventCallbackArgument) callbackArg;
- if (isDebugEnabled) {
- logger.debug(
- "{}: Event originated in {}. My DS id is {}. The remote DS id is {}. The recipients are: {}",
- this, seca.getOriginatingDSId(), getMyDSId(), getRemoteDSId(),
- seca.getRecipientDSIds());
- }
- if (seca.getOriginatingDSId() == DEFAULT_DISTRIBUTED_SYSTEM_ID) {
- if (isDebugEnabled) {
- logger.debug(
- "{}: Event originated in {}. My DS id is {}. The remote DS id is {}. The recipients are: {}",
- this, seca.getOriginatingDSId(), getMyDSId(), getRemoteDSId(),
- seca.getRecipientDSIds());
- }
-
- seca.setOriginatingDSId(getMyDSId());
- seca.initializeReceipientDSIds(allRemoteDSIds);
-
- } else {
- // if the dispatcher is GatewaySenderEventCallbackDispatcher (which is the case of WBCL),
- // skip the below check of remoteDSId.
- // Fix for #46517
- AbstractGatewaySenderEventProcessor ep = getEventProcessor();
- // if manual-start is true, ep is null
- if (ep == null || !(ep.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
- if (seca.getOriginatingDSId() == getRemoteDSId()) {
- if (isDebugEnabled) {
- logger.debug(
- "{}: Event originated in {}. My DS id is {}. It is being dropped as remote is originator.",
- this, seca.getOriginatingDSId(), getMyDSId());
- }
- return;
- } else if (seca.getRecipientDSIds().contains(getRemoteDSId())) {
- if (isDebugEnabled) {
- logger.debug(
- "{}: Event originated in {}. My DS id is {}. The remote DS id is {}.. It is being dropped as remote ds is already a recipient. Recipients are: {}",
- this, seca.getOriginatingDSId(), getMyDSId(), getRemoteDSId(),
- seca.getRecipientDSIds());
- }
- return;
- }
- }
- seca.getRecipientDSIds().addAll(allRemoteDSIds);
+ if (handleGatewaySenderCallbackArgument(allRemoteDSIds, isDebugEnabled,
+ (GatewaySenderEventCallbackArgument) callbackArg)) {
+ return;
}
} else {
GatewaySenderEventCallbackArgument geCallbackArg =
@@ -1169,12 +1094,9 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
if (ev == null) {
getStopper().checkCancelInProgress(null);
getCache().getDistributedSystem().getCancelCriterion().checkCancelInProgress(null);
- // event processor will be null if there was an authorization
- // problem
- // connecting to the other site (bug #40681)
- if (ev == null) {
- throw new GatewayCancelledException("Event processor thread is gone");
- }
+ // event processor will be null if there was an authorization problem connecting to the
+ // other site
+ throw new GatewayCancelledException("Event processor thread is gone");
}
// Get substitution value to enqueue if necessary
@@ -1205,6 +1127,54 @@ this, getId(), operation, clonedEvent),
}
}
+ private boolean handleGatewaySenderCallbackArgument(final @NotNull List allRemoteDSIds,
+ final boolean isDebugEnabled,
+ final @NotNull GatewaySenderEventCallbackArgument argument) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "{}: Event originated in {}. My DS id is {}. The remote DS id is {}. The recipients are: {}",
+ this, argument.getOriginatingDSId(), getMyDSId(), getRemoteDSId(),
+ argument.getRecipientDSIds());
+ }
+ if (argument.getOriginatingDSId() == DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "{}: Event originated in {}. My DS id is {}. The remote DS id is {}. The recipients are: {}",
+ this, argument.getOriginatingDSId(), getMyDSId(), getRemoteDSId(),
+ argument.getRecipientDSIds());
+ }
+
+ argument.setOriginatingDSId(getMyDSId());
+ argument.initializeReceipientDSIds(allRemoteDSIds);
+
+ } else {
+ // if the dispatcher is GatewaySenderEventCallbackDispatcher (which is the case of WBCL),
+ // skip the below check of remoteDSId.
+ AbstractGatewaySenderEventProcessor ep = getEventProcessor();
+ // if manual-start is true, ep is null
+ if (ep == null || !(ep.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
+ if (argument.getOriginatingDSId() == getRemoteDSId()) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "{}: Event originated in {}. My DS id is {}. It is being dropped as remote is originator.",
+ this, argument.getOriginatingDSId(), getMyDSId());
+ }
+ return true;
+ } else if (argument.getRecipientDSIds().contains(getRemoteDSId())) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "{}: Event originated in {}. My DS id is {}. The remote DS id is {}.. It is being dropped as remote ds is already a recipient. Recipients are: {}",
+ this, argument.getOriginatingDSId(), getMyDSId(), getRemoteDSId(),
+ argument.getRecipientDSIds());
+ }
+ return true;
+ }
+ }
+ argument.getRecipientDSIds().addAll(allRemoteDSIds);
+ }
+ return false;
+ }
+
private void recordDroppedEvent(EntryEventImpl event) {
if (eventProcessor != null) {
eventProcessor.registerEventDroppedInPrimaryQueue(event);
@@ -1223,7 +1193,7 @@ int getTmpDroppedEventSize() {
/**
* During sender is getting started, if there are any cache operation on queue then that event
- * will be stored in temp queue. Once sender is started, these event from tmp queue will be added
+ * will be stored in temp queue. Once sender is started, these events from tmp queue will be added
* to sender queue.
*
* Apart from sender's start() method, this method also gets called from
@@ -1278,7 +1248,7 @@ this, getId(), nextEvent.getOperation(), nextEvent),
* tmpQueueEvents.
*
*/
- public boolean removeFromTempQueueEvents(Object tailKey) {
+ public void removeFromTempQueueEvents(Object tailKey) {
synchronized (queuedEventsSync) {
Iterator itr = tmpQueuedEvents.iterator();
while (itr.hasNext()) {
@@ -1291,20 +1261,19 @@ public boolean removeFromTempQueueEvents(Object tailKey) {
}
event.release();
itr.remove();
- return true;
+ return;
}
}
- return false;
}
}
/**
* During sender is getting stopped, if there are any cache operation on queue then that event
- * will be stored in temp queue. Once sender is started, these event from tmp queue will be
+ * will be stored in temp queue. Once sender is started, these events from tmp queue will be
* cleared.
*/
public void clearTempEventsAfterSenderStopped() {
- TmpQueueEvent nextEvent = null;
+ TmpQueueEvent nextEvent;
while ((nextEvent = tmpQueuedEvents.poll()) != null) {
nextEvent.release();
}
@@ -1326,7 +1295,7 @@ public Object getSubstituteValue(EntryEventImpl clonedEvent, EnumListenerEvent o
Object substituteValue = null;
if (substitutionFilter != null) {
try {
- substituteValue = substitutionFilter.getSubstituteValue(clonedEvent);
+ substituteValue = substitutionFilter.getSubstituteValue(uncheckedCast(clonedEvent));
// If null is returned from the filter, null is set in the value
if (substituteValue == null) {
substituteValue = GatewaySenderEventImpl.TOKEN_NULL;
@@ -1361,7 +1330,7 @@ protected void initializeEventIdIndex() {
Region region = getEventIdIndexMetaDataRegion();
// Get or create the index
- int index = 0;
+ final int index;
String messagePrefix = null;
if (region.containsKey(getId())) {
index = region.get(getId());
@@ -1414,7 +1383,7 @@ private static synchronized Region initializeEventIdIndexMetaDa
InternalRegionFactory factory =
cache.createInternalRegionFactory(RegionShortcut.REPLICATE);
- // Create a stats holder for the meta data stats
+ // Create a stats holder for the metadata stats
final HasCachePerfStats statsHolder = () -> new CachePerfStats(cache.getDistributedSystem(),
"RegionStats-" + META_DATA_REGION_NAME, sender.statisticsClock);
factory.setIsUsedForMetaRegion(true);
@@ -1489,7 +1458,7 @@ public ReentrantReadWriteLock getLifeCycleLock() {
@Override
public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException {
- boolean result = false;
+ final boolean result;
if (isParallel()) {
try {
WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
@@ -1547,7 +1516,7 @@ public EventWrapper(GatewaySenderEventImpl e) {
* of the off-heap work, the GatewaySenderEventImpl no longer has a EntryEventImpl. So this class
* allows us to defer creation of the GatewaySenderEventImpl until we are ready to actually
* enqueue it. The caller is responsible for giving us an EntryEventImpl that we own and that we
- * will release. This is done by making a copy/clone of the original event. This fixes bug 52029.
+ * will release. This is done by making a copy/clone of the original event.
*/
public static class TmpQueueEvent implements Releasable {
private final EnumListenerEvent operation;
@@ -1578,17 +1547,17 @@ public void release() {
}
}
- protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
- GatewayQueueEvent event = null;
- for (RegionQueue queue : getQueues()) {
- Region region = queue.getRegion();
+ protected GatewayQueueEvent, ?> getSynchronizationEvent(Object key, long timestamp) {
+ GatewayQueueEvent, ?> event = null;
+ for (final RegionQueue queue : getQueues()) {
+ final Region, GatewaySenderEventImpl> region = uncheckedCast(queue.getRegion());
if (region == null) {
continue;
}
- for (final Object o : region.values()) {
- GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) o;
- if (gsei.getKey().equals(key) && gsei.getVersionTimeStamp() == timestamp) {
- event = gsei;
+ for (final GatewaySenderEventImpl gatewaySenderEvent : region.values()) {
+ if (gatewaySenderEvent.getKey().equals(key)
+ && gatewaySenderEvent.getVersionTimeStamp() == timestamp) {
+ event = gatewaySenderEvent;
logger.info("{}: Providing synchronization event for key={}; timestamp={}: {}",
this, key, timestamp, event);
getStatistics().incSynchronizationEventsProvided();
@@ -1599,7 +1568,7 @@ protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp)
return event;
}
- protected void putSynchronizationEvent(GatewayQueueEvent event) {
+ protected void putSynchronizationEvent(GatewayQueueEvent, ?> event) {
if (eventProcessor != null) {
lifeCycleLock.readLock().lock();
try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
index 74f9c42685ba..3ea060f6b665 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
@@ -14,314 +14,81 @@
*/
package org.apache.geode.internal.cache.wan;
-import java.util.ArrayList;
import java.util.List;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewaySender;
-import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewayTransportFilter;
-public class GatewaySenderAttributes {
-
- public static final boolean DEFAULT_IS_BUCKETSORTED = true;
- public static final boolean DEFAULT_IS_META_QUEUE = false;
-
-
- private int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
-
- private int socketReadTimeout = GatewaySender.DEFAULT_SOCKET_READ_TIMEOUT;
-
- private int maximumQueueMemory = GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY;
-
- private int batchSize = GatewaySender.DEFAULT_BATCH_SIZE;
-
- private int batchTimeInterval = GatewaySender.DEFAULT_BATCH_TIME_INTERVAL;
-
- private boolean isBatchConflationEnabled = GatewaySender.DEFAULT_BATCH_CONFLATION;
-
- private boolean isPersistenceEnabled = GatewaySender.DEFAULT_PERSISTENCE_ENABLED;
-
- private int alertThreshold = GatewaySender.DEFAULT_ALERT_THRESHOLD;
-
- private boolean manualStart = GatewaySender.DEFAULT_MANUAL_START;
-
- private String diskStoreName;
-
- private final List eventFilters = new ArrayList<>();
-
- private final ArrayList transFilters =
- new ArrayList<>();
-
- private final List listeners = new ArrayList<>();
-
- private GatewayEventSubstitutionFilter eventSubstitutionFilter;
-
- private String id;
-
- private int remoteDs = GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID;
-
- private LocatorDiscoveryCallback locatorDiscoveryCallback;
-
- private boolean isDiskSynchronous = GatewaySender.DEFAULT_DISK_SYNCHRONOUS;
-
- private OrderPolicy policy;
-
- private int dispatcherThreads = GatewaySender.DEFAULT_DISPATCHER_THREADS;
-
- private int parallelism = GatewaySender.DEFAULT_PARALLELISM_REPLICATED_REGION;
-
- private boolean isParallel = GatewaySender.DEFAULT_IS_PARALLEL;
-
- private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS;
-
- private int retriesToGetTransactionEventsFromQueue =
- GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES;
-
- private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE;
-
- private boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
-
- private boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
-
- private boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY;
-
- private boolean enforceThreadsConnectSameReceiver =
- GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER;
-
- public void setSocketBufferSize(int bufferSize) {
- socketBufferSize = bufferSize;
- }
-
- public void setSocketReadTimeout(int readTimeout) {
- socketReadTimeout = readTimeout;
- }
-
- public void setMaximumQueueMemory(int maxQueueMemory) {
- maximumQueueMemory = maxQueueMemory;
- }
-
- public void setBatchSize(int batchsize) {
- batchSize = batchsize;
- }
-
- public void setBatchTimeInterval(int batchtimeinterval) {
- batchTimeInterval = batchtimeinterval;
- }
-
- public void setBatchConflationEnabled(boolean batchConfEnabled) {
- isBatchConflationEnabled = batchConfEnabled;
- }
-
- public void setPersistenceEnabled(boolean persistenceEnabled) {
- isPersistenceEnabled = persistenceEnabled;
- }
-
- public void setAlertThreshold(int alertThresh) {
- alertThreshold = alertThresh;
- }
-
- public void setManualStart(boolean manualstart) {
- manualStart = manualstart;
- }
-
- public void setDiskStoreName(String diskstorename) {
- diskStoreName = diskstorename;
- }
-
- public void setEventSubstitutionFilter(GatewayEventSubstitutionFilter eventsubstitutionfilter) {
- eventSubstitutionFilter = eventsubstitutionfilter;
- }
-
- public void setId(String idString) {
- id = idString;
- }
-
- public void setRemoteDs(int rDs) {
- remoteDs = rDs;
- }
-
- public void setLocatorDiscoveryCallback(LocatorDiscoveryCallback locatorDiscCall) {
- locatorDiscoveryCallback = locatorDiscCall;
- }
-
- public void setDiskSynchronous(boolean diskSynchronous) {
- isDiskSynchronous = diskSynchronous;
- }
-
- public void setOrderPolicy(OrderPolicy orderpolicy) {
- policy = orderpolicy;
- }
-
- public void setDispatcherThreads(int dispatchThreads) {
- dispatcherThreads = dispatchThreads;
- }
-
- public void setParallelism(int tempParallelism) {
- parallelism = tempParallelism;
- }
-
- public void setParallel(boolean parallel) {
- isParallel = parallel;
- }
-
- public void setGroupTransactionEvents(boolean groupTransEvents) {
- groupTransactionEvents = groupTransEvents;
- }
-
- public void setRetriesToGetTransactionEventsFromQueue(int retries) {
- retriesToGetTransactionEventsFromQueue = retries;
- }
-
- public void setForInternalUse(boolean forInternalUse) {
- isForInternalUse = forInternalUse;
- }
-
- public void setBucketSorted(boolean bucketSorted) {
- isBucketSorted = bucketSorted;
- }
-
- public void setMetaQueue(boolean metaQueue) {
- isMetaQueue = metaQueue;
- }
-
- public void setForwardExpirationDestroy(boolean forwardexpirationdestroy) {
- forwardExpirationDestroy = forwardexpirationdestroy;
- }
-
- public void setEnforceThreadsConnectSameReceiver(boolean enforcethreadsconnectsamereceiver) {
- enforceThreadsConnectSameReceiver = enforcethreadsconnectsamereceiver;
- }
-
- public int getSocketBufferSize() {
- return socketBufferSize;
- }
-
- public boolean isDiskSynchronous() {
- return isDiskSynchronous;
- }
-
- public int getSocketReadTimeout() {
- return socketReadTimeout;
- }
-
- public String getDiskStoreName() {
- return diskStoreName;
- }
-
- public int getMaximumQueueMemory() {
- return maximumQueueMemory;
- }
+public interface GatewaySenderAttributes {
+ int getSocketBufferSize();
- public int getBatchSize() {
- return batchSize;
- }
+ boolean isDiskSynchronous();
- public int getBatchTimeInterval() {
- return batchTimeInterval;
- }
+ int getSocketReadTimeout();
- public boolean isBatchConflationEnabled() {
- return isBatchConflationEnabled;
- }
+ String getDiskStoreName();
- public boolean isPersistenceEnabled() {
- return isPersistenceEnabled;
- }
+ int getMaximumQueueMemory();
- public int getAlertThreshold() {
- return alertThreshold;
- }
+ int getBatchSize();
- public List getGatewayEventFilters() {
- return eventFilters;
- }
+ int getBatchTimeInterval();
- public List getGatewayTransportFilters() {
- return transFilters;
- }
+ boolean isBatchConflationEnabled();
- public List getAsyncEventListeners() {
- return listeners;
- }
+ boolean isPersistenceEnabled();
- public LocatorDiscoveryCallback getGatewayLocatoDiscoveryCallback() {
- return locatorDiscoveryCallback;
- }
+ int getAlertThreshold();
- public boolean isManualStart() {
- return manualStart;
- }
+ @NotNull
+ List getGatewayEventFilters();
- public boolean isParallel() {
- return isParallel;
- }
+ @NotNull
+ List getGatewayTransportFilters();
- public boolean mustGroupTransactionEvents() {
- return groupTransactionEvents;
- }
+ @NotNull
+ List getAsyncEventListeners();
- public int getRetriesToGetTransactionEventsFromQueue() {
- return retriesToGetTransactionEventsFromQueue;
- }
+ @Nullable
+ LocatorDiscoveryCallback getGatewayLocatorDiscoveryCallback();
- public boolean isForInternalUse() {
- return isForInternalUse;
- }
+ @Deprecated
+ boolean isManualStart();
- public void addGatewayEventFilter(GatewayEventFilter filter) {
- eventFilters.add(filter);
- }
+ boolean isParallel();
- public void addGatewayTransportFilter(GatewayTransportFilter filter) {
- transFilters.add(filter);
- }
+ boolean mustGroupTransactionEvents();
- public void addAsyncEventListener(AsyncEventListener listener) {
- listeners.add(listener);
- }
+ boolean isForInternalUse();
- public String getId() {
- return id;
- }
+ String getId();
- public int getRemoteDSId() {
- return remoteDs;
- }
+ int getRemoteDSId();
- public int getDispatcherThreads() {
- return dispatcherThreads;
- }
+ int getDispatcherThreads();
- public int getParallelismForReplicatedRegion() {
- return parallelism;
- }
+ int getParallelismForReplicatedRegion();
- public OrderPolicy getOrderPolicy() {
- return policy;
- }
+ @Nullable
+ GatewaySender.OrderPolicy getOrderPolicy();
- public boolean isBucketSorted() {
- return isBucketSorted;
- }
+ boolean isBucketSorted();
- public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
- return eventSubstitutionFilter;
- }
+ @Nullable
+ GatewayEventSubstitutionFilter, ?> getGatewayEventSubstitutionFilter();
- public boolean isMetaQueue() {
- return isMetaQueue;
- }
+ boolean isMetaQueue();
- public boolean isForwardExpirationDestroy() {
- return forwardExpirationDestroy;
- }
+ boolean isForwardExpirationDestroy();
- public boolean getEnforceThreadsConnectSameReceiver() {
- return enforceThreadsConnectSameReceiver;
- }
+ boolean getEnforceThreadsConnectSameReceiver();
+ String getType();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java
new file mode 100644
index 000000000000..87addac4e0df
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
+import org.apache.geode.cache.wan.GatewayTransportFilter;
+
+public class GatewaySenderAttributesImpl implements MutableGatewaySenderAttributes {
+
+ private static final boolean DEFAULT_IS_BUCKET_SORTED = true;
+
+ private static final boolean DEFAULT_IS_META_QUEUE = false;
+
+ private int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
+
+ private int socketReadTimeout = GatewaySender.DEFAULT_SOCKET_READ_TIMEOUT;
+
+ private int maximumQueueMemory = GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY;
+
+ private int batchSize = GatewaySender.DEFAULT_BATCH_SIZE;
+
+ private int batchTimeInterval = GatewaySender.DEFAULT_BATCH_TIME_INTERVAL;
+
+ private boolean isBatchConflationEnabled = GatewaySender.DEFAULT_BATCH_CONFLATION;
+
+ private boolean isPersistenceEnabled = GatewaySender.DEFAULT_PERSISTENCE_ENABLED;
+
+ private int alertThreshold = GatewaySender.DEFAULT_ALERT_THRESHOLD;
+
+ @Deprecated
+ private boolean manualStart = GatewaySender.DEFAULT_MANUAL_START;
+
+ private String diskStoreName;
+
+ private final List eventFilters = new ArrayList<>();
+
+ private final ArrayList transFilters = new ArrayList<>();
+
+ private final List listeners = new ArrayList<>();
+
+ private GatewayEventSubstitutionFilter, ?> eventSubstitutionFilter;
+
+ private String id;
+
+ private int remoteDs = GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID;
+
+ private LocatorDiscoveryCallback locatorDiscoveryCallback;
+
+ private boolean isDiskSynchronous = GatewaySender.DEFAULT_DISK_SYNCHRONOUS;
+
+ private OrderPolicy orderPolicy;
+
+ private int dispatcherThreads = GatewaySender.DEFAULT_DISPATCHER_THREADS;
+
+ private int parallelism = GatewaySender.DEFAULT_PARALLELISM_REPLICATED_REGION;
+
+ private boolean isParallel = GatewaySender.DEFAULT_IS_PARALLEL;
+
+ private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS;
+
+ private String type;
+
+ private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE;
+
+ private boolean isBucketSorted = GatewaySenderAttributesImpl.DEFAULT_IS_BUCKET_SORTED;
+
+ private boolean isMetaQueue = GatewaySenderAttributesImpl.DEFAULT_IS_META_QUEUE;
+
+ private boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY;
+
+ private boolean enforceThreadsConnectSameReceiver =
+ GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER;
+
+ public void setSocketBufferSize(int bufferSize) {
+ socketBufferSize = bufferSize;
+ }
+
+ public void setSocketReadTimeout(int readTimeout) {
+ socketReadTimeout = readTimeout;
+ }
+
+ public void setMaximumQueueMemory(int maxQueueMemory) {
+ maximumQueueMemory = maxQueueMemory;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public void setBatchTimeInterval(int batchTimeInterval) {
+ this.batchTimeInterval = batchTimeInterval;
+ }
+
+ public void setBatchConflationEnabled(boolean batchConfEnabled) {
+ isBatchConflationEnabled = batchConfEnabled;
+ }
+
+ public void setPersistenceEnabled(boolean persistenceEnabled) {
+ isPersistenceEnabled = persistenceEnabled;
+ }
+
+ public void setAlertThreshold(int alertThresh) {
+ alertThreshold = alertThresh;
+ }
+
+ @Deprecated
+ public void setManualStart(boolean manualStart) {
+ this.manualStart = manualStart;
+ }
+
+ public void setDiskStoreName(String diskStoreName) {
+ this.diskStoreName = diskStoreName;
+ }
+
+ public void setEventSubstitutionFilter(
+ @Nullable GatewayEventSubstitutionFilter, ?> eventSubstitutionFilter) {
+ this.eventSubstitutionFilter = eventSubstitutionFilter;
+ }
+
+ public void setId(String idString) {
+ id = idString;
+ }
+
+ public void setRemoteDs(int rDs) {
+ remoteDs = rDs;
+ }
+
+ public void setLocatorDiscoveryCallback(@Nullable LocatorDiscoveryCallback locatorDiscCall) {
+ locatorDiscoveryCallback = locatorDiscCall;
+ }
+
+ public void setDiskSynchronous(boolean diskSynchronous) {
+ isDiskSynchronous = diskSynchronous;
+ }
+
+ @Override
+ public void setOrderPolicy(final @Nullable OrderPolicy orderPolicy) {
+ this.orderPolicy = orderPolicy;
+ }
+
+ public void setDispatcherThreads(int dispatchThreads) {
+ dispatcherThreads = dispatchThreads;
+ }
+
+ public void setParallelism(int tempParallelism) {
+ parallelism = tempParallelism;
+ }
+
+ public void setParallel(boolean parallel) {
+ isParallel = parallel;
+ }
+
+ public void setGroupTransactionEvents(boolean groupTransEvents) {
+ groupTransactionEvents = groupTransEvents;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ isParallel = type.contains("Parallel") ? true : false;
+ }
+
+ public void setForInternalUse(boolean forInternalUse) {
+ isForInternalUse = forInternalUse;
+ }
+
+ public void setBucketSorted(boolean bucketSorted) {
+ isBucketSorted = bucketSorted;
+ }
+
+ public void setMetaQueue(boolean metaQueue) {
+ isMetaQueue = metaQueue;
+ }
+
+ public void setForwardExpirationDestroy(boolean forwardExpirationDestroy) {
+ this.forwardExpirationDestroy = forwardExpirationDestroy;
+ }
+
+ public void setEnforceThreadsConnectSameReceiver(boolean enforceThreadsConnectSameReceiver) {
+ this.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver;
+ }
+
+ @Override
+ public int getSocketBufferSize() {
+ return socketBufferSize;
+ }
+
+ @Override
+ public boolean isDiskSynchronous() {
+ return isDiskSynchronous;
+ }
+
+ @Override
+ public int getSocketReadTimeout() {
+ return socketReadTimeout;
+ }
+
+ @Override
+ public String getDiskStoreName() {
+ return diskStoreName;
+ }
+
+ @Override
+ public int getMaximumQueueMemory() {
+ return maximumQueueMemory;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public int getBatchTimeInterval() {
+ return batchTimeInterval;
+ }
+
+ @Override
+ public boolean isBatchConflationEnabled() {
+ return isBatchConflationEnabled;
+ }
+
+ @Override
+ public boolean isPersistenceEnabled() {
+ return isPersistenceEnabled;
+ }
+
+ @Override
+ public int getAlertThreshold() {
+ return alertThreshold;
+ }
+
+ @Override
+ public @NotNull List getGatewayEventFilters() {
+ return eventFilters;
+ }
+
+ @Override
+ public @NotNull List getGatewayTransportFilters() {
+ return transFilters;
+ }
+
+ @Override
+ public @NotNull List getAsyncEventListeners() {
+ return listeners;
+ }
+
+ @Override
+ public @Nullable LocatorDiscoveryCallback getGatewayLocatorDiscoveryCallback() {
+ return locatorDiscoveryCallback;
+ }
+
+ @Override
+ @Deprecated
+ public boolean isManualStart() {
+ return manualStart;
+ }
+
+ @Override
+ public boolean isParallel() {
+ return isParallel;
+ }
+
+ @Override
+ public boolean mustGroupTransactionEvents() {
+ return groupTransactionEvents;
+ }
+
+ @Override
+ public boolean isForInternalUse() {
+ return isForInternalUse;
+ }
+
+ public void addGatewayEventFilter(GatewayEventFilter filter) {
+ eventFilters.add(filter);
+ }
+
+ public void addGatewayTransportFilter(GatewayTransportFilter filter) {
+ transFilters.add(filter);
+ }
+
+ public void addAsyncEventListener(AsyncEventListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public int getRemoteDSId() {
+ return remoteDs;
+ }
+
+ @Override
+ public int getDispatcherThreads() {
+ return dispatcherThreads;
+ }
+
+ @Override
+ public int getParallelismForReplicatedRegion() {
+ return parallelism;
+ }
+
+ @Override
+ public @Nullable OrderPolicy getOrderPolicy() {
+ return orderPolicy;
+ }
+
+ @Override
+ public boolean isBucketSorted() {
+ return isBucketSorted;
+ }
+
+ @Override
+ public @Nullable GatewayEventSubstitutionFilter, ?> getGatewayEventSubstitutionFilter() {
+ return eventSubstitutionFilter;
+ }
+
+ @Override
+ public boolean isMetaQueue() {
+ return isMetaQueue;
+ }
+
+ @Override
+ public boolean isForwardExpirationDestroy() {
+ return forwardExpirationDestroy;
+ }
+
+ @Override
+ public boolean getEnforceThreadsConnectSameReceiver() {
+ return enforceThreadsConnectSameReceiver;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySenderFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySenderFactory.java
index 6b941c846015..18641fa15390 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySenderFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySenderFactory.java
@@ -27,17 +27,7 @@ public interface InternalGatewaySenderFactory extends GatewaySenderFactory {
GatewaySenderFactory setBucketSorted(boolean bucketSorted);
- GatewaySender create(String senderIdFromAsyncEventQueueId);
-
void configureGatewaySender(GatewaySender senderCreation);
GatewaySenderFactory setLocatorDiscoveryCallback(LocatorDiscoveryCallback myLocatorCallback);
-
- /**
- * Sets the maximum number of retries to get events from the queue
- * to complete a transaction when groupTransactionEvents is true.
- *
- * @param retries the maximum number of retries.
- */
- GatewaySenderFactory setRetriesToGetTransactionEventsFromQueue(int retries);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/MutableGatewaySenderAttributes.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/MutableGatewaySenderAttributes.java
new file mode 100644
index 000000000000..4ccc5a43d6df
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/MutableGatewaySenderAttributes.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.wan;
+
+import org.jetbrains.annotations.Nullable;
+
+import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
+
+public interface MutableGatewaySenderAttributes extends GatewaySenderAttributes {
+
+ void setOrderPolicy(@Nullable OrderPolicy orderPolicy);
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index f01e227bced9..cccc3e3d4a16 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -20,6 +20,7 @@
import java.util.concurrent.BlockingQueue;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
@@ -77,8 +78,8 @@ protected void initializeMessageQueue(String id, boolean cleanQueues) {
logger.debug("The target Regions are(PGSEP) {}", targetRs);
}
- ParallelGatewaySenderQueue queue =
- new ParallelGatewaySenderQueue(sender, targetRs, index, nDispatcher, cleanQueues);
+ final ParallelGatewaySenderQueue queue =
+ createParallelGatewaySenderQueue(sender, targetRs, index, nDispatcher, cleanQueues);
queue.start();
this.queue = queue;
@@ -88,6 +89,14 @@ protected void initializeMessageQueue(String id, boolean cleanQueues) {
}
}
+ protected @NotNull ParallelGatewaySenderQueue createParallelGatewaySenderQueue(
+ final @NotNull AbstractGatewaySender sender,
+ final @NotNull Set> targetRegions, final int index, final int dispatcherThreads,
+ final boolean cleanQueues) {
+ return new ParallelGatewaySenderQueue(sender, targetRegions, index, dispatcherThreads,
+ cleanQueues);
+ }
+
@Override
public int eventQueueSize() {
ParallelGatewaySenderQueue queue = (ParallelGatewaySenderQueue) getQueue();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 54715b7ccd29..f8aadbeed889 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -16,14 +16,12 @@
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_SIZE;
-import static org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -34,10 +32,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
@@ -56,7 +54,6 @@
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -162,14 +159,14 @@ boolean getCleanQueues() {
* The peekedEventsProcessing queue is used when the batch size is reduced due to a
* MessageTooLargeException
*/
- private final BlockingQueue peekedEventsProcessing =
- new LinkedBlockingQueue<>();
+ protected BlockingQueue peekedEventsProcessing =
+ new LinkedBlockingQueue();
/**
* The peekedEventsProcessingInProgress boolean denotes that processing existing peeked events is
* in progress
*/
- private boolean peekedEventsProcessingInProgress = false;
+ protected boolean peekedEventsProcessingInProgress = false;
public final AbstractGatewaySender sender;
@@ -245,8 +242,7 @@ private Object deserialize(Object serializedBytes) {
private final MetaRegionFactory metaRegionFactory;
public ParallelGatewaySenderQueue(AbstractGatewaySender sender, Set> userRegions,
- int idx,
- int nDispatcher, boolean cleanQueues) {
+ int idx, int nDispatcher, boolean cleanQueues) {
this(sender, userRegions, idx, nDispatcher, new MetaRegionFactory(), cleanQueues);
}
@@ -1340,10 +1336,7 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
}
}
- if (batch.size() > 0) {
- peekEventsFromIncompleteTransactions(batch, prQ);
- }
-
+ postProcessBatch(prQ, batch);
if (isDebugEnabled) {
logger.debug("{}: Peeked a batch of {} entries. The size of the queue is {}. localSize is {}",
@@ -1356,6 +1349,9 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
return batch;
}
+ protected void postProcessBatch(final @NotNull PartitionedRegion partitionedRegion,
+ final @NotNull List batch) {}
+
private boolean stopPeekingDueToTime(int timeToWait, long end) {
final boolean isDebugEnabled = logger.isDebugEnabled();
// If time to wait is -1 (don't wait) or time interval has elapsed
@@ -1372,78 +1368,6 @@ private boolean stopPeekingDueToTime(int timeToWait, long end) {
return false;
}
- protected boolean mustGroupTransactionEvents() {
- return sender.mustGroupTransactionEvents();
- }
-
- @VisibleForTesting
- void peekEventsFromIncompleteTransactions(List batch,
- PartitionedRegion prQ) {
- if (!mustGroupTransactionEvents()) {
- return;
- }
-
- Map incompleteTransactionIdsInBatch =
- getIncompleteTransactionsInBatch(batch);
- if (incompleteTransactionIdsInBatch.size() == 0) {
- return;
- }
-
- int retries = 0;
- while (true) {
- for (Iterator> iter =
- incompleteTransactionIdsInBatch.entrySet().iterator(); iter.hasNext();) {
- Map.Entry pendingTransaction = iter.next();
- TransactionId transactionId = pendingTransaction.getKey();
- int bucketId = pendingTransaction.getValue();
- List