diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java index f533c7796076..3734eedce469 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java @@ -3007,6 +3007,13 @@ public class CliStrings { public static final String GATEWAY_RECEIVER_IS_NOT_AVAILABLE_ON_MEMBER_0 = "GatewayReceiver is not available on member {0}"; + public static final String START_GATEWAYSENDER_REJECTED = "Command rejected. Reasons:"; + + public static final String REJECT_START_GATEWAYSENDER_REASON = "Reasons command is rejected"; + + public static final String EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS = + "Command must be executed on all members on which gateway sender is created"; + public static final String GATEWAY_SENDER_IS_NOT_AVAILABLE = "GatewaySender is not available"; public static final String GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1 = "GatewaySender {0} is already started on member {1}"; diff --git a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb index f25b6b5be036..b41bfb02bb22 100644 --- a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb +++ b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb @@ -121,7 +121,7 @@ start gateway-sender --id=value [--groups=value(,value)*] [--members=value(,valu | ‑‑id | *Required.* ID of the GatewaySender. | | | ‑‑groups | Group(s) of members on which to start the Gateway Sender. | | | ‑‑members | Member(s) on which to start the Gateway Sender. | | -| ‑‑clean-queues | Option to clean existing queue at start of the Gateway Sender. This option is only applicable for Gateway Senders with enabled persistence. | false | +| ‑‑clean-queues | Option to clean existing queue at start of the Gateway Sender. This option can be executed only on all members on which Gateway Sender is created. | false | **Example Commands:** diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java index 996e85be8e24..d02277d4ac28 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java @@ -77,6 +77,44 @@ public ResultModel startGatewaySender(@CliOption(key = CliStrings.START_GATEWAYS return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE); } + if (cleanQueues) { + + GatewaySenderMXBean bean; + boolean commandRejected = false; + + ResultModel rejectResultModel = + ResultModel.createError(CliStrings.START_GATEWAYSENDER_REJECTED); + TabularResultModel rejectResultData = + rejectResultModel.addTable(CliStrings.REJECT_START_GATEWAYSENDER_REASON); + + Set allServers = findMembers(null, null); + + for (DistributedMember member : allServers) { + if (cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId())) { + bean = service.getLocalGatewaySenderMXBean(senderId); + } else { + ObjectName objectName = service.getGatewaySenderMBeanName(member, senderId); + bean = service.getMBeanProxy(objectName, GatewaySenderMXBean.class); + } + if (bean != null) { + if (!dsMembers.contains(member)) { + return ResultModel.createError(CliStrings.EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS); + } + + if (bean.isRunning()) { + commandRejected = true; + rejectResultData.addMemberStatusResultRow(member.getId(), CliStrings.GATEWAY_ERROR, + CliStrings.format( + CliStrings.GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1, id, + member.getId())); + } + } + } + if (commandRejected) { + return rejectResultModel; + } + } + ExecutorService execService = LoggingExecutors.newCachedThreadPool("Start Sender Command Thread ", true); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java index edff65d58212..b41ac9f91676 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java @@ -19,6 +19,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.createSender; import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.startSender; import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy; import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState; import static org.assertj.core.api.Assertions.assertThat; @@ -431,6 +432,96 @@ public void testStartGatewaySender_clean_queues_false() throws Exception { server3.invoke(() -> verifySenderState("ln", true, false)); } + @Test + public void testStartGatewaySender_clean_queues_sender_on_one_server_allready_started() + throws Exception { + Integer locator1Port = locatorSite1.getPort(); + + // setup servers in Site #1 + server1 = clusterStartupRule.startServerVM(3, locator1Port); + server2 = clusterStartupRule.startServerVM(4, locator1Port); + server3 = clusterStartupRule.startServerVM(5, locator1Port); + + server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + + server1.invoke(() -> startSender("ln")); + + server1.invoke(() -> verifySenderState("ln", true, false)); + server2.invoke(() -> verifySenderState("ln", false, false)); + server3.invoke(() -> verifySenderState("ln", false, false)); + + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), "ln", true, false)); + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), "ln", false, false)); + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), "ln", false, false)); + + String command = + CliStrings.START_GATEWAYSENDER + " --" + CliStrings.START_GATEWAYSENDER__ID + "=ln --" + + CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + "=true"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + assertThat(cmdResult).isNotNull(); + assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR); + + TabularResultModel resultData = cmdResult.getResultData() + .getTableSection(CliStrings.REJECT_START_GATEWAYSENDER_REASON); + List status = resultData.getValuesInColumn("Result"); + assertThat(status).containsExactlyInAnyOrder("Error"); + + } + + + @Test + public void testStartGatewaySender_clean_queues_on_one_member() throws Exception { + Integer locator1Port = locatorSite1.getPort(); + + // setup servers in Site #1 + server1 = clusterStartupRule.startServerVM(3, locator1Port); + server2 = clusterStartupRule.startServerVM(4, locator1Port); + server3 = clusterStartupRule.startServerVM(5, locator1Port); + + DistributedMember vm1Member = getMember(server1.getVM()); + + server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + + server1.invoke(() -> verifySenderState("ln", false, false)); + server2.invoke(() -> verifySenderState("ln", false, false)); + server3.invoke(() -> verifySenderState("ln", false, false)); + + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), "ln", false, false)); + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), "ln", false, false)); + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), "ln", false, false)); + + String command = + CliStrings.START_GATEWAYSENDER + " --" + CliStrings.START_GATEWAYSENDER__ID + "=ln --" + + CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + CliStrings.MEMBER + "=" + + vm1Member.getId(); + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + assertThat(cmdResult).isNotNull(); + assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR); + + + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), "ln", false, false)); + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), "ln", false, false)); + locatorSite1.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), "ln", false, false)); + + server1.invoke(() -> verifySenderState("ln", false, false)); + server2.invoke(() -> verifySenderState("ln", false, false)); + server3.invoke(() -> verifySenderState("ln", false, false)); + } + + private CommandResult executeCommandWithIgnoredExceptions(String command) throws Exception { try (IgnoredException ie = IgnoredException.addIgnoredException("Could not connect")) {