diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java index b2c69266baae..1fd3442b5366 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java @@ -153,8 +153,6 @@ public interface GatewaySender { int CONNECTION_RETRY_INTERVAL = Integer .getInteger(GeodeGlossary.GEMFIRE_PREFIX + "gateway-connection-retry-interval", 1000); - String DEFAULT_TYPE = "SerialGatewaySender"; - /** * The order policy. This enum is applicable only when concurrency-level is > 1. * diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java index bdfeb4de5e76..87addac4e0df 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java @@ -81,7 +81,7 @@ public class GatewaySenderAttributesImpl implements MutableGatewaySenderAttribut private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS; - private String type = GatewaySender.DEFAULT_TYPE; + private String type; private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE; @@ -179,7 +179,7 @@ public void setGroupTransactionEvents(boolean groupTransEvents) { public void setType(String type) { this.type = type; - isParallel = type.equals("Parallel") ? true : false; + isParallel = type.contains("Parallel") ? true : false; } public void setForInternalUse(boolean forInternalUse) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java index 60b9eac839cf..b4195ee670ab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java @@ -14,7 +14,7 @@ */ package org.apache.geode.internal.lang; -import java.util.Optional; +import static org.apache.geode.internal.lang.SystemProperty.getProductBooleanProperty; import org.apache.geode.internal.cache.eviction.LRUListWithAsyncSorting; @@ -26,9 +26,6 @@ */ public class SystemPropertyHelper { - public static final String GEODE_PREFIX = "geode."; - public static final String GEMFIRE_PREFIX = "gemfire."; - /** * When set to "true" enables asynchronous eviction algorithm (defaults to true). For more details * see {@link LRUListWithAsyncSorting}. @@ -109,98 +106,6 @@ public class SystemPropertyHelper { */ public static final String RE_AUTHENTICATE_WAIT_TIME = "reauthenticate.wait.time"; - /** - * This method will try to look up "geode." and "gemfire." versions of the system property. It - * will check and prefer "geode." setting first, then try to check "gemfire." setting. - * - * @param name system property name set in Geode - * @return an Optional containing the Boolean value of the system property - */ - public static Optional getProductBooleanProperty(String name) { - String property = getProperty(name); - return property != null ? Optional.of(Boolean.parseBoolean(property)) : Optional.empty(); - } - - /** - * This method will try to look up "geode." and "gemfire." versions of the system property. It - * will check and prefer "geode." setting first, then try to check "gemfire." setting. - * - * @param name system property name set in Geode - * @return an Optional containing the Integer value of the system property - */ - public static Optional getProductIntegerProperty(String name) { - Integer propertyValue = Integer.getInteger(GEODE_PREFIX + name); - if (propertyValue == null) { - propertyValue = Integer.getInteger(GEMFIRE_PREFIX + name); - } - - if (propertyValue != null) { - return Optional.of(propertyValue); - } else { - return Optional.empty(); - } - } - - /** - * This method will try to look up "geode." and "gemfire." versions of the system property. It - * will check and prefer "geode." setting first, then try to check "gemfire." setting. - * - * @param name system property name set in Geode - * @return an Optional containing the Long value of the system property - */ - public static Optional getProductLongProperty(String name) { - Long propertyValue = Long.getLong(GEODE_PREFIX + name); - if (propertyValue == null) { - propertyValue = Long.getLong(GEMFIRE_PREFIX + name); - } - - if (propertyValue != null) { - return Optional.of(propertyValue); - } else { - return Optional.empty(); - } - } - - /** - * This method will try to look up "geode." and "gemfire." versions of the system property. It - * will check and prefer "geode." setting first, then try to check "gemfire." setting. - * - * @param name system property name set in Geode - * @return the integer value of the system property if exits or the default value - */ - public static Integer getProductIntegerProperty(String name, int defaultValue) { - return getProductIntegerProperty(name).orElse(defaultValue); - } - - public static Long getProductLongProperty(String name, long defaultValue) { - return getProductLongProperty(name).orElse(defaultValue); - } - - /** - * This method will try to look up "geode." and "gemfire." versions of the system property. It - * will check and prefer "geode." setting first, then try to check "gemfire." setting. - * - * @param name system property name set in Geode - * @return an Optional containing the String value of the system property - */ - public static Optional getProductStringProperty(String name) { - String property = getProperty(name); - return property != null ? Optional.of(property) : Optional.empty(); - } - - public static String getProperty(String name) { - String property = getGeodeProperty(name); - return property != null ? property : getGemfireProperty(name); - } - - private static String getGeodeProperty(String name) { - return System.getProperty(GEODE_PREFIX + name); - } - - private static String getGemfireProperty(String name) { - return System.getProperty(GEMFIRE_PREFIX + name); - } - /** * As of Geode 1.4.0, a region set operation will be in a transaction even if it is the first * operation in the transaction. diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java index 7251cc2b41ef..d65e9f1617ec 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java @@ -84,8 +84,8 @@ private GatewaySender createGatewaySender(Cache cache, String type = gatewaySenderCreateArgs.getType(); if (type != null) { - gateway.setType(gatewaySenderCreateArgs.getType()); - gateway.setParallel(gatewaySenderCreateArgs.getType().contains("Parallel")); + gateway.setType(type); + gateway.setParallel(type.contains("Parallel")); } else { Boolean isParallel = gatewaySenderCreateArgs.isParallel(); if (isParallel != null) { diff --git a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java index 93cc11c5d440..5a48897bc576 100644 --- a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java +++ b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java @@ -40,7 +40,7 @@ @RunWith(GeodeParamsRunner.class) public class TxGroupingPartitionedRegionDUnitTest extends TxGroupingBaseDUnitTest { @Test - @Parameters({"TxGroupParallelGatewaySender", "TxGroupSerialGatewaySender"}) + @Parameters({"TxGroupingParallelGatewaySender", "TxGroupingSerialGatewaySender"}) public void testPartitionedRegionPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions( String type) throws Exception { diff --git a/geode-wan-txgrouping/src/integrationTest/java/org/apache/geode/internal/cache/wan/txgrouping/WanTxGroupingConfigurationJUnitTest.java b/geode-wan-txgrouping/src/integrationTest/java/org/apache/geode/internal/cache/wan/txgrouping/WanTxGroupingConfigurationJUnitTest.java index fcf94ef3bd35..f6f67b414bb2 100644 --- a/geode-wan-txgrouping/src/integrationTest/java/org/apache/geode/internal/cache/wan/txgrouping/WanTxGroupingConfigurationJUnitTest.java +++ b/geode-wan-txgrouping/src/integrationTest/java/org/apache/geode/internal/cache/wan/txgrouping/WanTxGroupingConfigurationJUnitTest.java @@ -30,6 +30,7 @@ import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.cache.wan.GatewaySenderFactory; +import org.apache.geode.cache.wan.internal.txgrouping.serial.TxGroupingSerialGatewaySenderImpl; import org.apache.geode.internal.cache.wan.GatewaySenderException; public class WanTxGroupingConfigurationJUnitTest { @@ -63,9 +64,8 @@ public void test_ValidateSerialGatewaySenderGroupTransactionEventsAttributeSetTo public void test_create_SerialGatewaySender_ThrowsException_when_GroupTransactionEvents_isTrue_and_DispatcherThreads_is_greaterThanOne() { cache = new CacheFactory().set(MCAST_PORT, "0").create(); GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(false); fact.setDispatcherThreads(2); - fact.setGroupTransactionEvents(true); + fact.setType(TxGroupingSerialGatewaySenderImpl.TYPE); assertThatThrownBy(() -> fact.create("NYSender", 2)) .isInstanceOf(GatewaySenderException.class) .hasMessageContaining( @@ -77,7 +77,7 @@ public void test_create_GatewaySender_ThrowsException_when_GroupTransactionEvent cache = new CacheFactory().set(MCAST_PORT, "0").create(); GatewaySenderFactory fact = cache.createGatewaySenderFactory(); fact.setBatchConflationEnabled(true); - fact.setGroupTransactionEvents(true); + fact.setType(TxGroupingSerialGatewaySenderImpl.TYPE); assertThatThrownBy(() -> fact.create("NYSender", 2)) .isInstanceOf(GatewaySenderException.class) .hasMessageContaining( diff --git a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/TxGroupingGatewaySenderProperties.java b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/TxGroupingGatewaySenderProperties.java index a3235f4f5f5d..155650cb306c 100644 --- a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/TxGroupingGatewaySenderProperties.java +++ b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/TxGroupingGatewaySenderProperties.java @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger; -import org.apache.geode.internal.lang.SystemPropertyHelper; +import org.apache.geode.internal.lang.SystemProperty; import org.apache.geode.util.internal.GeodeGlossary; public class TxGroupingGatewaySenderProperties implements TxGroupingGatewaySender { @@ -52,7 +52,7 @@ public class TxGroupingGatewaySenderProperties implements TxGroupingGatewaySende * gateway sender queue when group-transaction-events is true. */ public static final int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS = - SystemPropertyHelper.getProductIntegerProperty( + SystemProperty.getProductIntegerProperty( GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS_PROPERTY).orElse(1); private AtomicInteger retriesToGetTransactionEventsFromQueue = diff --git a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingRemoteSerialGatewaySenderEventProcessor.java b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingRemoteSerialGatewaySenderEventProcessor.java index a1a903353b18..20334208569f 100644 --- a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingRemoteSerialGatewaySenderEventProcessor.java +++ b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingRemoteSerialGatewaySenderEventProcessor.java @@ -18,7 +18,10 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.apache.geode.cache.CacheListener; +import org.apache.geode.cache.asyncqueue.AsyncEvent; import org.apache.geode.cache.wan.internal.serial.RemoteSerialGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.monitoring.ThreadsMonitoring; @@ -31,11 +34,10 @@ public TxGroupingRemoteSerialGatewaySenderEventProcessor( super(sender, id, threadsMonitoring, cleanQueues); } - // @Override - // protected @NotNull RegionQueue createRegionQueue( - // final @NotNull AbstractGatewaySender sender, final @NotNull String regionName, - // final @NotNull CacheListener listener, final boolean cleanQueues) { - // return new TxGroupingSerialGatewaySenderQueue((TxGroupingSerialGatewaySenderImpl) sender, - // regionName, listener, cleanQueues); - // } + @Override + protected @NotNull RegionQueue createRegionQueue( + final @NotNull AbstractGatewaySender sender, final @NotNull String regionName, + final @NotNull CacheListener> listener, final boolean cleanQueues) { + return new TxGroupingSerialGatewaySenderQueue(sender, regionName, listener, cleanQueues); + } } diff --git a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java index cd67e846dfce..cbe54e441c09 100644 --- a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java +++ b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java @@ -41,6 +41,7 @@ import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.cache.wan.GatewaySenderFactory; import org.apache.geode.cache.wan.GatewayTransportFilter; +import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl; import org.apache.geode.cache30.MyGatewayEventFilter1; import org.apache.geode.cache30.MyGatewayTransportFilter1; import org.apache.geode.cache30.MyGatewayTransportFilter2; @@ -69,7 +70,7 @@ public void test_GatewaySender_without_Locator() { cache = new CacheFactory().set(MCAST_PORT, "0").create(); GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true); + fact.setType(ParallelGatewaySenderImpl.TYPE); GatewaySender sender1 = fact.create("NYSender", 2); sender1.start(); fail("Expected IllegalStateException but not thrown"); diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java index 2cafaa1bfc65..8fff910e715d 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java @@ -31,6 +31,8 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.cache.wan.GatewaySenderFactory; import org.apache.geode.cache.wan.GatewayTransportFilter; +import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl; +import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderImpl; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; @@ -220,6 +222,13 @@ public GatewaySenderFactory setEnforceThreadsConnectSameReceiver( public @NotNull GatewaySender create(final @NotNull String id, final int remoteDSId) { attrs.setId(id); attrs.setRemoteDs(remoteDSId); + if (attrs.getType() == null) { + if (attrs.isParallel()) { + attrs.setType(ParallelGatewaySenderImpl.TYPE); + } else { + attrs.setType(SerialGatewaySenderImpl.TYPE); + } + } validate(cache, attrs);