diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index 5328a25f9dcf..291f3871fece 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -73,8 +73,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -123,7 +126,7 @@ public PipeConsensus(ConsensusConfig config, IStateMachine.Registry registry) { @Override public synchronized void start() throws IOException { - initAndRecover(); + Future recoverFuture = initAndRecover(); rpcService.initSyncedServiceImpl(new PipeConsensusRPCServiceProcessor(this, config.getPipe())); try { @@ -132,19 +135,31 @@ public synchronized void start() throws IOException { throw new IOException(e); } + try { + recoverFuture.get(); + } catch (CancellationException ce) { + LOGGER.info("IoTV2 Recover Task is cancelled", ce); + } catch (ExecutionException ee) { + LOGGER.error("Exception while waiting for recover future completion", ee); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.warn("IoTV2 Recover Task is interrupted", ie); + } + // only when we recover all consensus group can we launch async backend checker thread consensusPipeGuardian.start( CONSENSUS_PIPE_GUARDIAN_TASK_ID, this::checkAllConsensusPipe, config.getPipe().getConsensusPipeGuardJobIntervalInSeconds()); } - private void initAndRecover() throws IOException { + private Future initAndRecover() throws IOException { if (!storageDir.exists()) { // init if (!storageDir.mkdirs()) { LOGGER.warn("Unable to create consensus dir at {}", storageDir); throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); } + return CompletableFuture.completedFuture(null); } else { // asynchronously recover, retry logic is implemented at PipeConsensusImpl CompletableFuture future = @@ -155,20 +170,31 @@ private void initAndRecover() throws IOException { for (Path path : stream) { ConsensusGroupId consensusGroupId = parsePeerFileName(path.getFileName().toString()); - PipeConsensusServerImpl consensus = - new PipeConsensusServerImpl( - new Peer(consensusGroupId, thisNodeId, thisNode), - registry.apply(consensusGroupId), - path.toString(), - new ArrayList<>(), - config, - consensusPipeManager, - syncClientManager); - stateMachineMap.put(consensusGroupId, consensus); - checkPeerListAndStartIfEligible(consensusGroupId, consensus); + try { + PipeConsensusServerImpl consensus = + new PipeConsensusServerImpl( + new Peer(consensusGroupId, thisNodeId, thisNode), + registry.apply(consensusGroupId), + path.toString(), + new ArrayList<>(), + config, + consensusPipeManager, + syncClientManager); + stateMachineMap.put(consensusGroupId, consensus); + checkPeerListAndStartIfEligible(consensusGroupId, consensus); + } catch (Exception e) { + LOGGER.error( + "Failed to recover consensus from {} for {}, ignore it and continue recover other group, async backend checker thread will automatically deregister related pipe side effects for this failed consensus group.", + storageDir, + consensusGroupId, + e); + } } - } catch (Exception e) { - LOGGER.error("Failed to recover consensus from {}", storageDir, e); + } catch (IOException e) { + LOGGER.error( + "Failed to recover consensus from {} because read dir failed", + storageDir, + e); } }) .exceptionally( @@ -176,6 +202,7 @@ private void initAndRecover() throws IOException { LOGGER.error("Failed to recover consensus from {}", storageDir, e); return null; }); + return future; } }