From 6bd18b2314efd439388891b235eec1c062bd89b5 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Wed, 8 Jan 2025 15:43:12 +0800 Subject: [PATCH 1/3] refactor(controller): consider brokers that has recently `CONTROLLED_SHUTDOWN` as `SHUTTING_DOWN` Signed-off-by: Ning Yu --- .../controller/BrokerHeartbeatManager.java | 41 +++++++++++++++++++ .../controller/ClusterControlManager.java | 4 ++ .../stream/DefaultNodeRuntimeInfoGetter.java | 19 ++++----- .../kafka/controller/stream/NodeState.java | 27 +++++++++++- 4 files changed, 80 insertions(+), 11 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index e54615cc77..5d3bc72935 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.stream.NodeState; import org.apache.kafka.controller.stream.OverloadCircuitBreaker; import org.apache.kafka.metadata.placement.UsableBroker; @@ -83,6 +84,16 @@ static class BrokerHeartbeatState { */ private long controlledShutdownOffset; + // AutoMQ inject start + /** + * The last time the broker was controlled shutdown, in monotonic nanoseconds, or 0 + * if the broker has never been controlled shutdown since the most recent start. + * It will be updated on receiving a broker heartbeat with controlled shutdown request. + * It will be reset to 0 when the broker is active again. + */ + private long lastControlledShutdownNs; + // AutoMQ inject end + /** * The previous entry in the unfenced list, or null if the broker is not in that list. */ @@ -100,6 +111,9 @@ static class BrokerHeartbeatState { this.next = null; this.metadataOffset = -1; this.controlledShutdownOffset = -1; + // AutoMQ inject start + this.lastControlledShutdownNs = 0; + // AutoMQ inject end } /** @@ -122,6 +136,12 @@ boolean fenced() { boolean shuttingDown() { return controlledShutdownOffset >= 0; } + + // AutoMQ inject start + long lastControlledShutdownNs() { + return lastControlledShutdownNs; + } + // AutoMQ inject end } static class MetadataOffsetComparator implements Comparator { @@ -441,6 +461,9 @@ void maybeUpdateControlledShutdownOffset(int brokerId, long controlledShutDownOf throw new RuntimeException("Fenced brokers cannot enter controlled shutdown."); } active.remove(broker); + // AutoMQ inject start + broker.lastControlledShutdownNs = time.nanoseconds(); + // AutoMQ inject end if (broker.controlledShutdownOffset < 0) { broker.controlledShutdownOffset = controlledShutDownOffset; log.debug("Updated the controlled shutdown offset for broker {} to {}.", @@ -489,6 +512,24 @@ Iterator usableBrokers( } // AutoMQ inject start + public NodeState brokerState(int brokerId, long shutdownTimeoutNs) { + BrokerHeartbeatState broker = brokers.get(brokerId); + if (broker == null) { + return NodeState.UNKNOWN; + } + if (broker.shuttingDown()) { + return NodeState.SHUTTING_DOWN; + } + if (broker.fenced()) { + if (broker.lastControlledShutdownNs() + shutdownTimeoutNs > time.nanoseconds()) { + // The broker is still in controlled shutdown. + return NodeState.SHUTTING_DOWN; + } + return NodeState.SHUTDOWN; + } + return NodeState.ACTIVE; + } + long nextCheckTimeNs() { if (overloadCircuitBreaker.isOverload()) { return Long.MAX_VALUE; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index bc95534f22..dbaf5c5045 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -967,5 +967,9 @@ public List getActiveBrokers() { .filter(b -> isActive(b.id())) .collect(Collectors.toList()); } + + public BrokerHeartbeatManager getHeartbeatManager() { + return heartbeatManager; + } // AutoMQ inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java index e71d391afa..51a84026c7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java @@ -11,10 +11,14 @@ package org.apache.kafka.controller.stream; +import org.apache.kafka.controller.BrokerHeartbeatManager; import org.apache.kafka.controller.ClusterControlManager; -import org.apache.kafka.metadata.BrokerRegistration; + +import java.util.concurrent.TimeUnit; public class DefaultNodeRuntimeInfoGetter implements NodeRuntimeInfoGetter { + private static final long SHUTDOWN_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(60); + private final ClusterControlManager clusterControlManager; private final StreamControlManager streamControlManager; @@ -25,17 +29,12 @@ public DefaultNodeRuntimeInfoGetter(ClusterControlManager clusterControlManager, @Override public NodeState state(int nodeId) { - BrokerRegistration brokerRegistration = clusterControlManager.registration(nodeId); - if (brokerRegistration == null) { + BrokerHeartbeatManager brokerHeartbeatManager = clusterControlManager.getHeartbeatManager(); + if (null == brokerHeartbeatManager) { + // This controller is not the active controller, so we don't have the heartbeat manager. return NodeState.UNKNOWN; } - if (brokerRegistration.fenced()) { - return NodeState.FENCED; - } - if (brokerRegistration.inControlledShutdown()) { - return NodeState.CONTROLLED_SHUTDOWN; - } - return NodeState.ACTIVE; + return brokerHeartbeatManager.brokerState(nodeId, SHUTDOWN_TIMEOUT_NS); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java index 108b08d28e..97eb8fc497 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java @@ -12,5 +12,30 @@ package org.apache.kafka.controller.stream; public enum NodeState { - ACTIVE, FENCED, CONTROLLED_SHUTDOWN, UNKNOWN + /** + * The node is active and can handle requests. + */ + ACTIVE, + /** + * The node is shutting down in a controlled manner. + */ + SHUTTING_DOWN, + /** + * The node is shut down and cannot handle requests. + */ + SHUTDOWN, + /** + * Use @{@link #SHUTTING_DOWN} instead. + */ + @Deprecated + CONTROLLED_SHUTDOWN, + /** + * Use @{@link #SHUTDOWN} instead. + */ + @Deprecated + FENCED, + /** + * The state of the node is unknown, possibly because it has not yet registered. + */ + UNKNOWN } From fa9c6c389c20536189fd76dafd51209224b588ed Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Wed, 8 Jan 2025 16:28:13 +0800 Subject: [PATCH 2/3] test: test `BrokerHeartbeatManager#brokerState` Signed-off-by: Ning Yu --- .../BrokerHeartbeatManagerTest.java | 40 +++++++++++++++++++ .../stream/NodeControlManagerTest.java | 3 +- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index 9a8776f721..294909475d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateIterator; import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateList; import org.apache.kafka.controller.BrokerHeartbeatManager.UsableBrokerIterator; +import org.apache.kafka.controller.stream.NodeState; import org.apache.kafka.metadata.placement.UsableBroker; import org.junit.jupiter.api.Test; @@ -362,4 +363,43 @@ public void testTouchThrowsExceptionUnlessRegistered() { assertThrows(IllegalStateException.class, () -> manager.touch(4, false, 0)).getMessage()); } + + // AutoMQ inject start + @Test + public void testBrokerState() { + final long shutdownTimeoutNs = 10_000_000; // 10ms + // init + BrokerHeartbeatManager manager = newBrokerHeartbeatManager(); + manager.time().sleep(1000); + manager.register(0, true); + + // FENCED Broker + assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + + // UNFENCED Broker + manager.touch(0, false, 100); + assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs)); + + // CONTROLLED_SHUTDOWN Broker + manager.maybeUpdateControlledShutdownOffset(0, 100); + assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + + // SHUTDOWN_NOW Broker within shutdownTimeoutNs + manager.touch(0, true, 100); + manager.time().sleep(5); + assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + + // SHUTDOWN_NOW Broker after shutdownTimeoutNs + manager.time().sleep(6); + assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + + // UNFENCED Broker after SHUTDOWN + manager.touch(0, false, 100); + assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs)); + + // UNREGISTERED Broker + manager.remove(0); + assertEquals(NodeState.UNKNOWN, manager.brokerState(0, shutdownTimeoutNs)); + } + // AutoMQ inject end } diff --git a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java index 7aadb9403a..1b0266837e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java @@ -95,7 +95,7 @@ public void testRegister() { assertTrue(nodeControlManager.nodeMetadataMap.containsKey(0)); when(nodeRuntimeInfoGetter.hasOpeningStreams(eq(0))).thenReturn(true); - when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.FENCED); + when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.SHUTDOWN); ControllerResult getRst = nodeControlManager.getMetadata( new AutomqGetNodesRequest(new AutomqGetNodesRequestData().setNodeIds(List.of(0, 1)), @@ -107,6 +107,7 @@ public void testRegister() { assertEquals(0, nodes.get(0).nodeId()); assertEquals(2L, nodes.get(0).nodeEpoch()); assertEquals("wal2", nodes.get(0).walConfig()); + assertEquals("SHUTDOWN", nodes.get(0).state()); } AutomqRegisterNodeRequestData.TagCollection tags(Map tags) { From 100a7459225f8638cf9ef6f47ed9b82423a096d6 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Wed, 8 Jan 2025 18:00:57 +0800 Subject: [PATCH 3/3] revert(NodeState): revert `SHUTDOWN` and `SHUTTING_DOWN` to `FENCED` and `CONTROLLED_SHUTDOWN` Signed-off-by: Ning Yu --- .../controller/BrokerHeartbeatManager.java | 6 +++--- .../kafka/controller/stream/NodeState.java | 18 ++++++------------ .../controller/BrokerHeartbeatManagerTest.java | 8 ++++---- .../stream/NodeControlManagerTest.java | 4 ++-- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index 5d3bc72935..8be03d2280 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -518,14 +518,14 @@ public NodeState brokerState(int brokerId, long shutdownTimeoutNs) { return NodeState.UNKNOWN; } if (broker.shuttingDown()) { - return NodeState.SHUTTING_DOWN; + return NodeState.CONTROLLED_SHUTDOWN; } if (broker.fenced()) { if (broker.lastControlledShutdownNs() + shutdownTimeoutNs > time.nanoseconds()) { // The broker is still in controlled shutdown. - return NodeState.SHUTTING_DOWN; + return NodeState.CONTROLLED_SHUTDOWN; } - return NodeState.SHUTDOWN; + return NodeState.FENCED; } return NodeState.ACTIVE; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java index 97eb8fc497..c21fcd8114 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java @@ -11,29 +11,23 @@ package org.apache.kafka.controller.stream; +import org.apache.kafka.controller.BrokerControlState; + public enum NodeState { /** * The node is active and can handle requests. */ ACTIVE, - /** - * The node is shutting down in a controlled manner. - */ - SHUTTING_DOWN, /** * The node is shut down and cannot handle requests. */ - SHUTDOWN, + FENCED, /** - * Use @{@link #SHUTTING_DOWN} instead. + * The node is shutting down in a controlled manner. + * Note: In AutoMQ, this state is different from {@link BrokerControlState#CONTROLLED_SHUTDOWN}. In some cases, + * a node in {@link BrokerControlState#FENCED} state may still be shutting down in a controlled manner. */ - @Deprecated CONTROLLED_SHUTDOWN, - /** - * Use @{@link #SHUTDOWN} instead. - */ - @Deprecated - FENCED, /** * The state of the node is unknown, possibly because it has not yet registered. */ diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index 294909475d..9c96de0b8b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -374,7 +374,7 @@ public void testBrokerState() { manager.register(0, true); // FENCED Broker - assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); // UNFENCED Broker manager.touch(0, false, 100); @@ -382,16 +382,16 @@ public void testBrokerState() { // CONTROLLED_SHUTDOWN Broker manager.maybeUpdateControlledShutdownOffset(0, 100); - assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); // SHUTDOWN_NOW Broker within shutdownTimeoutNs manager.touch(0, true, 100); manager.time().sleep(5); - assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); // SHUTDOWN_NOW Broker after shutdownTimeoutNs manager.time().sleep(6); - assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); // UNFENCED Broker after SHUTDOWN manager.touch(0, false, 100); diff --git a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java index 1b0266837e..b725d9e69a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java @@ -95,7 +95,7 @@ public void testRegister() { assertTrue(nodeControlManager.nodeMetadataMap.containsKey(0)); when(nodeRuntimeInfoGetter.hasOpeningStreams(eq(0))).thenReturn(true); - when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.SHUTDOWN); + when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.FENCED); ControllerResult getRst = nodeControlManager.getMetadata( new AutomqGetNodesRequest(new AutomqGetNodesRequestData().setNodeIds(List.of(0, 1)), @@ -107,7 +107,7 @@ public void testRegister() { assertEquals(0, nodes.get(0).nodeId()); assertEquals(2L, nodes.get(0).nodeEpoch()); assertEquals("wal2", nodes.get(0).walConfig()); - assertEquals("SHUTDOWN", nodes.get(0).state()); + assertEquals(NodeState.FENCED.name(), nodes.get(0).state()); } AutomqRegisterNodeRequestData.TagCollection tags(Map tags) {