From 4d57aa3e16d35bc7766094a80f7b4be3836fc46b Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 2 Jan 2025 17:22:32 +0800 Subject: [PATCH 1/3] fix recover failed --- .../iotdb/consensus/pipe/PipeConsensus.java | 54 +++++++++++++------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index 5328a25f9dcf..a05cedad2cf3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -72,9 +72,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -123,7 +126,7 @@ public PipeConsensus(ConsensusConfig config, IStateMachine.Registry registry) { @Override public synchronized void start() throws IOException { - initAndRecover(); + Future recoverFuture = initAndRecover(); rpcService.initSyncedServiceImpl(new PipeConsensusRPCServiceProcessor(this, config.getPipe())); try { @@ -132,19 +135,28 @@ public synchronized void start() throws IOException { throw new IOException(e); } + if (Objects.nonNull(recoverFuture)) { + try { + recoverFuture.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Exception while waiting for recover future completion", e); + } + } + // only when we recover all consensus group can we launch async backend checker thread consensusPipeGuardian.start( CONSENSUS_PIPE_GUARDIAN_TASK_ID, this::checkAllConsensusPipe, config.getPipe().getConsensusPipeGuardJobIntervalInSeconds()); } - private void initAndRecover() throws IOException { + private Future initAndRecover() throws IOException { if (!storageDir.exists()) { // init if (!storageDir.mkdirs()) { LOGGER.warn("Unable to create consensus dir at {}", storageDir); throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); } + return null; } else { // asynchronously recover, retry logic is implemented at PipeConsensusImpl CompletableFuture future = @@ -155,20 +167,31 @@ private void initAndRecover() throws IOException { for (Path path : stream) { ConsensusGroupId consensusGroupId = parsePeerFileName(path.getFileName().toString()); - PipeConsensusServerImpl consensus = - new PipeConsensusServerImpl( - new Peer(consensusGroupId, thisNodeId, thisNode), - registry.apply(consensusGroupId), - path.toString(), - new ArrayList<>(), - config, - consensusPipeManager, - syncClientManager); - stateMachineMap.put(consensusGroupId, consensus); - checkPeerListAndStartIfEligible(consensusGroupId, consensus); + try { + PipeConsensusServerImpl consensus = + new PipeConsensusServerImpl( + new Peer(consensusGroupId, thisNodeId, thisNode), + registry.apply(consensusGroupId), + path.toString(), + new ArrayList<>(), + config, + consensusPipeManager, + syncClientManager); + stateMachineMap.put(consensusGroupId, consensus); + checkPeerListAndStartIfEligible(consensusGroupId, consensus); + } catch (Exception e) { + LOGGER.error( + "Failed to recover consensus from {} for {}, ignore it and continue recover other group, async backend checker thread will automatically deregister related pipe side effects for this failed consensus group.", + storageDir, + consensusGroupId, + e); + } } - } catch (Exception e) { - LOGGER.error("Failed to recover consensus from {}", storageDir, e); + } catch (IOException e) { + LOGGER.error( + "Failed to recover consensus from {} because read dir failed", + storageDir, + e); } }) .exceptionally( @@ -176,6 +199,7 @@ private void initAndRecover() throws IOException { LOGGER.error("Failed to recover consensus from {}", storageDir, e); return null; }); + return future; } } From ddb995ab8fd015b13fd545c22207f24f47c36785 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Mon, 6 Jan 2025 11:17:01 +0800 Subject: [PATCH 2/3] fix review --- .../apache/iotdb/consensus/pipe/PipeConsensus.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index a05cedad2cf3..4039fe6e3c7b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -72,7 +72,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -135,12 +134,10 @@ public synchronized void start() throws IOException { throw new IOException(e); } - if (Objects.nonNull(recoverFuture)) { - try { - recoverFuture.get(); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Exception while waiting for recover future completion", e); - } + try { + recoverFuture.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Exception while waiting for recover future completion", e); } // only when we recover all consensus group can we launch async backend checker thread consensusPipeGuardian.start( @@ -156,7 +153,7 @@ private Future initAndRecover() throws IOException { LOGGER.warn("Unable to create consensus dir at {}", storageDir); throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); } - return null; + return CompletableFuture.completedFuture(null); } else { // asynchronously recover, retry logic is implemented at PipeConsensusImpl CompletableFuture future = From 2cbd171bcda5d70c4f1fb51789b573d430a04dd0 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 10 Jan 2025 17:39:41 +0800 Subject: [PATCH 3/3] fix review --- .../org/apache/iotdb/consensus/pipe/PipeConsensus.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index 4039fe6e3c7b..291f3871fece 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -73,6 +73,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -136,8 +137,13 @@ public synchronized void start() throws IOException { try { recoverFuture.get(); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Exception while waiting for recover future completion", e); + } catch (CancellationException ce) { + LOGGER.info("IoTV2 Recover Task is cancelled", ce); + } catch (ExecutionException ee) { + LOGGER.error("Exception while waiting for recover future completion", ee); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.warn("IoTV2 Recover Task is interrupted", ie); } // only when we recover all consensus group can we launch async backend checker thread consensusPipeGuardian.start(