diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java index f6ff52cf..9c0618ab 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java @@ -80,6 +80,7 @@ import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED; import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING; +import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.UNASSIGNED; /** * A worker to schedule all connectors and tasks in a process. @@ -564,7 +565,8 @@ public Set getCleanedStoppedTasks() { } public void maintainConnectorState() { - + // STEP 1: redress running connectors status + redressRunningConnectors(); } /** @@ -935,6 +937,18 @@ private void redressRunningStatus(WorkerTask workerTask) { } } + private void redressRunningConnectors() { + for (WorkerConnector connector : connectors.values()) { + ConnectorStatus connectorStatus = stateManagementService.get(connector.getConnectorName()); + if (connectorStatus != null && connectorStatus.getState() == UNASSIGNED && connector.getKeyValue().getTargetState() == TargetState.STARTED && + connector.getState() == WorkerConnector.State.STARTED) { + ConnectorStatus redressStatus = new ConnectorStatus(connector.getConnectorName(), RUNNING, workerConfig.getWorkerId(), System.currentTimeMillis()); + log.warn("Connector {}, Old connector status is {}, new connector status {}", connector.getConnectorName(), connectorStatus, redressStatus); + stateManagementService.put(redressStatus); + } + } + } + private Map> newTasks(Map> taskConfigs) { Map> newTasks = new HashMap<>(); for (String connectorName : taskConfigs.keySet()) { diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java index 0c146555..b266cdbd 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java @@ -369,10 +369,18 @@ public String toString() { return sb; } - private enum State { + public enum State { INIT, STOPPED, STARTED, FAILED, } + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } }