Skip to content

Commit

Permalink
GEODE-10329: Handle RejectedExecutionException (#7721)
Browse files Browse the repository at this point in the history
Do not throw RejectedExecutionException if the GMSHealthMonitor is
stopping

Authored-by: Donal Evans <[email protected]>
  • Loading branch information
DonalEvans authored Jun 10, 2022
1 parent dcee9ba commit b3716d9
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ private void checkMember(final ID mbr) {
setNextNeighbor(cv, mbr);

// we need to check this member
checkExecutor.execute(() -> {
doAndIgnoreRejectedExecutionExceptionIfStopping(() -> checkExecutor.execute(() -> {
boolean pinged;
try {
pinged = doCheckMember(mbr, true);
Expand All @@ -475,8 +475,7 @@ private void checkMember(final ID mbr) {
// back to previous one
setNextNeighbor(currentView, null);
}
});

}));
}

private void initiateSuspicion(ID mbr, String reason) {
Expand Down Expand Up @@ -1237,16 +1236,17 @@ private void checkIfAvailable(final ID initiator,

final String reason = sr.getReason();
logger.debug("Scheduling availability check for member {}; reason={}", mbr, reason);

// its a coordinator
checkExecutor.execute(() -> {
doAndIgnoreRejectedExecutionExceptionIfStopping(() -> checkExecutor.execute(() -> {
try {
inlineCheckIfAvailable(initiator, cv, true, mbr, reason);
} catch (MembershipClosedException e) {
// shutting down
} catch (Exception e) {
logger.info("Unexpected exception while verifying member", e);
}
});
}));
}
}

Expand Down Expand Up @@ -1425,6 +1425,16 @@ private void sendSuspectRequest(final List<SuspectRequest<ID>> requests) {
processMessage(smm);
}

void doAndIgnoreRejectedExecutionExceptionIfStopping(final Runnable runnable) {
try {
runnable.run();
} catch (RejectedExecutionException e) {
if (!isStopping) {
throw e;
}
}
}

private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher {

final Timer scheduler;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms.fd;

import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.concurrent.RejectedExecutionException;

import org.junit.jupiter.api.Test;

import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreatorImpl;

public class GMSHealthMonitorTest {

@Test
public void throwRejectedExecutionExceptionIfMonitorIsNotStopping() {
final GMSHealthMonitor<MemberIdentifier> monitor =
new GMSHealthMonitor<>(new TcpSocketCreatorImpl());
assertThatThrownBy(() -> {
monitor.doAndIgnoreRejectedExecutionExceptionIfStopping(() -> {
throw new RejectedExecutionException();
});
}).isInstanceOf(RejectedExecutionException.class);
}

@Test
public void doNotThrowRejectedExecutionExceptionIfMonitorIsStopping() {
final GMSHealthMonitor<MemberIdentifier> monitor =
new GMSHealthMonitor<>(new TcpSocketCreatorImpl());
monitor.stop();
assertThatNoException().isThrownBy(() -> {
monitor.doAndIgnoreRejectedExecutionExceptionIfStopping(() -> {
throw new RejectedExecutionException();
});
});
}

@Test
public void throwOtherExceptionIfMonitorIsStopping() {
final GMSHealthMonitor<MemberIdentifier> monitor =
new GMSHealthMonitor<>(new TcpSocketCreatorImpl());
monitor.stop();
assertThatThrownBy(() -> {
monitor.doAndIgnoreRejectedExecutionExceptionIfStopping(() -> {
throw new RuntimeException();
});
}).isInstanceOf(RuntimeException.class);
}
}

0 comments on commit b3716d9

Please sign in to comment.