Skip to content

Commit

Permalink
GEODE-10421: Improve start gw sender with clean-queue (#7856)
Browse files Browse the repository at this point in the history
* GEODE-10421: added check gw status

* GEODE-10421: added TC

* GEODE-10421: add document impacts

* GEODE-10421: update after comments
  • Loading branch information
mivanac authored Sep 16, 2022
1 parent 0b0c6f8 commit c4e5a03
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,13 @@ public class CliStrings {
public static final String GATEWAY_RECEIVER_IS_NOT_AVAILABLE_ON_MEMBER_0 =
"GatewayReceiver is not available on member {0}";

public static final String START_GATEWAYSENDER_REJECTED = "Command rejected. Reasons:";

public static final String REJECT_START_GATEWAYSENDER_REASON = "Reasons command is rejected";

public static final String EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS =
"Command must be executed on all members on which gateway sender is created";

public static final String GATEWAY_SENDER_IS_NOT_AVAILABLE = "GatewaySender is not available";
public static final String GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1 =
"GatewaySender {0} is already started on member {1}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ start gateway-sender --id=value [--groups=value(,value)*] [--members=value(,valu
| ‑‑id | *Required.* ID of the GatewaySender. | |
| ‑‑groups | Group(s) of members on which to start the Gateway Sender. | |
| ‑‑members | Member(s) on which to start the Gateway Sender. | |
| ‑‑clean-queues | Option to clean existing queue at start of the Gateway Sender. This option is only applicable for Gateway Senders with enabled persistence. | false |
| ‑‑clean-queues | Option to clean existing queue at start of the Gateway Sender. This option can be executed only on all members on which Gateway Sender is created. | false |
**Example Commands:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,44 @@ public ResultModel startGatewaySender(@CliOption(key = CliStrings.START_GATEWAYS
return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
}

if (cleanQueues) {

GatewaySenderMXBean bean;
boolean commandRejected = false;

ResultModel rejectResultModel =
ResultModel.createError(CliStrings.START_GATEWAYSENDER_REJECTED);
TabularResultModel rejectResultData =
rejectResultModel.addTable(CliStrings.REJECT_START_GATEWAYSENDER_REASON);

Set<DistributedMember> allServers = findMembers(null, null);

for (DistributedMember member : allServers) {
if (cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId())) {
bean = service.getLocalGatewaySenderMXBean(senderId);
} else {
ObjectName objectName = service.getGatewaySenderMBeanName(member, senderId);
bean = service.getMBeanProxy(objectName, GatewaySenderMXBean.class);
}
if (bean != null) {
if (!dsMembers.contains(member)) {
return ResultModel.createError(CliStrings.EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS);
}

if (bean.isRunning()) {
commandRejected = true;
rejectResultData.addMemberStatusResultRow(member.getId(), CliStrings.GATEWAY_ERROR,
CliStrings.format(
CliStrings.GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1, id,
member.getId()));
}
}
}
if (commandRejected) {
return rejectResultModel;
}
}

ExecutorService execService =
LoggingExecutors.newCachedThreadPool("Start Sender Command Thread ", true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.createSender;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.startSender;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -431,6 +432,96 @@ public void testStartGatewaySender_clean_queues_false() throws Exception {
server3.invoke(() -> verifySenderState("ln", true, false));
}

@Test
public void testStartGatewaySender_clean_queues_sender_on_one_server_allready_started()
throws Exception {
Integer locator1Port = locatorSite1.getPort();

// setup servers in Site #1
server1 = clusterStartupRule.startServerVM(3, locator1Port);
server2 = clusterStartupRule.startServerVM(4, locator1Port);
server3 = clusterStartupRule.startServerVM(5, locator1Port);

server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true));
server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true));
server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true));

server1.invoke(() -> startSender("ln"));

server1.invoke(() -> verifySenderState("ln", true, false));
server2.invoke(() -> verifySenderState("ln", false, false));
server3.invoke(() -> verifySenderState("ln", false, false));

locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), "ln", true, false));
locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), "ln", false, false));
locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), "ln", false, false));

String command =
CliStrings.START_GATEWAYSENDER + " --" + CliStrings.START_GATEWAYSENDER__ID + "=ln --"
+ CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + "=true";
CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
assertThat(cmdResult).isNotNull();
assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR);

TabularResultModel resultData = cmdResult.getResultData()
.getTableSection(CliStrings.REJECT_START_GATEWAYSENDER_REASON);
List<String> status = resultData.getValuesInColumn("Result");
assertThat(status).containsExactlyInAnyOrder("Error");

}


@Test
public void testStartGatewaySender_clean_queues_on_one_member() throws Exception {
Integer locator1Port = locatorSite1.getPort();

// setup servers in Site #1
server1 = clusterStartupRule.startServerVM(3, locator1Port);
server2 = clusterStartupRule.startServerVM(4, locator1Port);
server3 = clusterStartupRule.startServerVM(5, locator1Port);

DistributedMember vm1Member = getMember(server1.getVM());

server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true));
server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true));
server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true));

server1.invoke(() -> verifySenderState("ln", false, false));
server2.invoke(() -> verifySenderState("ln", false, false));
server3.invoke(() -> verifySenderState("ln", false, false));

locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), "ln", false, false));
locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), "ln", false, false));
locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), "ln", false, false));

String command =
CliStrings.START_GATEWAYSENDER + " --" + CliStrings.START_GATEWAYSENDER__ID + "=ln --"
+ CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + CliStrings.MEMBER + "="
+ vm1Member.getId();
CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
assertThat(cmdResult).isNotNull();
assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR);


locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), "ln", false, false));
locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), "ln", false, false));
locatorSite1.invoke(
() -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), "ln", false, false));

server1.invoke(() -> verifySenderState("ln", false, false));
server2.invoke(() -> verifySenderState("ln", false, false));
server3.invoke(() -> verifySenderState("ln", false, false));
}



private CommandResult executeCommandWithIgnoredExceptions(String command) throws Exception {
try (IgnoredException ie = IgnoredException.addIgnoredException("Could not connect")) {
Expand Down

0 comments on commit c4e5a03

Please sign in to comment.