From b3716d9498dd315ab1509389a92fadd69d8600e5 Mon Sep 17 00:00:00 2001 From: Donal Evans Date: Fri, 10 Jun 2022 09:27:15 -0700 Subject: [PATCH] GEODE-10329: Handle RejectedExecutionException (#7721) Do not throw RejectedExecutionException if the GMSHealthMonitor is stopping Authored-by: Donal Evans --- .../membership/gms/fd/GMSHealthMonitor.java | 20 ++++-- .../gms/fd/GMSHealthMonitorTest.java | 63 +++++++++++++++++++ 2 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 geode-membership/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorTest.java diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index 016e78caee68..e91178435b30 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -456,7 +456,7 @@ private void checkMember(final ID mbr) { setNextNeighbor(cv, mbr); // we need to check this member - checkExecutor.execute(() -> { + doAndIgnoreRejectedExecutionExceptionIfStopping(() -> checkExecutor.execute(() -> { boolean pinged; try { pinged = doCheckMember(mbr, true); @@ -475,8 +475,7 @@ private void checkMember(final ID mbr) { // back to previous one setNextNeighbor(currentView, null); } - }); - + })); } private void initiateSuspicion(ID mbr, String reason) { @@ -1237,8 +1236,9 @@ private void checkIfAvailable(final ID initiator, final String reason = sr.getReason(); logger.debug("Scheduling availability check for member {}; reason={}", mbr, reason); + // its a coordinator - checkExecutor.execute(() -> { + doAndIgnoreRejectedExecutionExceptionIfStopping(() -> checkExecutor.execute(() -> { try { inlineCheckIfAvailable(initiator, cv, true, mbr, reason); } catch (MembershipClosedException e) { @@ -1246,7 +1246,7 @@ private void checkIfAvailable(final ID initiator, } catch (Exception e) { logger.info("Unexpected exception while verifying member", e); } - }); + })); } } @@ -1425,6 +1425,16 @@ private void sendSuspectRequest(final List> requests) { processMessage(smm); } + void doAndIgnoreRejectedExecutionExceptionIfStopping(final Runnable runnable) { + try { + runnable.run(); + } catch (RejectedExecutionException e) { + if (!isStopping) { + throw e; + } + } + } + private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher { final Timer scheduler; diff --git a/geode-membership/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorTest.java b/geode-membership/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorTest.java new file mode 100644 index 000000000000..8e172e67a8b2 --- /dev/null +++ b/geode-membership/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal.membership.gms.fd; + +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.RejectedExecutionException; + +import org.junit.jupiter.api.Test; + +import org.apache.geode.distributed.internal.membership.api.MemberIdentifier; +import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreatorImpl; + +public class GMSHealthMonitorTest { + + @Test + public void throwRejectedExecutionExceptionIfMonitorIsNotStopping() { + final GMSHealthMonitor monitor = + new GMSHealthMonitor<>(new TcpSocketCreatorImpl()); + assertThatThrownBy(() -> { + monitor.doAndIgnoreRejectedExecutionExceptionIfStopping(() -> { + throw new RejectedExecutionException(); + }); + }).isInstanceOf(RejectedExecutionException.class); + } + + @Test + public void doNotThrowRejectedExecutionExceptionIfMonitorIsStopping() { + final GMSHealthMonitor monitor = + new GMSHealthMonitor<>(new TcpSocketCreatorImpl()); + monitor.stop(); + assertThatNoException().isThrownBy(() -> { + monitor.doAndIgnoreRejectedExecutionExceptionIfStopping(() -> { + throw new RejectedExecutionException(); + }); + }); + } + + @Test + public void throwOtherExceptionIfMonitorIsStopping() { + final GMSHealthMonitor monitor = + new GMSHealthMonitor<>(new TcpSocketCreatorImpl()); + monitor.stop(); + assertThatThrownBy(() -> { + monitor.doAndIgnoreRejectedExecutionExceptionIfStopping(() -> { + throw new RuntimeException(); + }); + }).isInstanceOf(RuntimeException.class); + } +}