diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index f55da4be67bc..5e5143e5f06d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -86,6 +86,15 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_SNAPSHOT_KEY; + public class PipeDataNodeTaskAgent extends PipeTaskAgent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class); @@ -426,23 +435,12 @@ protected void collectPipeMetaListInternal( || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); - final String extractorModeValue = - pipeMeta - .getStaticMeta() - .getExtractorParameters() - .getStringOrDefault( - Arrays.asList( - PipeExtractorConstant.EXTRACTOR_MODE_KEY, - PipeExtractorConstant.SOURCE_MODE_KEY), - PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE); + final boolean includeDataAndNeedDrop = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( pipeMeta.getStaticMeta().getExtractorParameters()) .getLeft() - && (extractorModeValue.equalsIgnoreCase( - PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE) - || extractorModeValue.equalsIgnoreCase( - PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE)); + && isSnapshotMode(pipeMeta.getStaticMeta().getExtractorParameters()); final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; final Pair remainingEventAndTime = @@ -641,6 +639,24 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) { } } + private boolean isSnapshotMode(final PipeParameters parameters) { + final boolean isSnapshotMode; + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) { + isSnapshotMode = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY), + EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE); + } else { + final String extractorModeValue = + parameters.getStringOrDefault( + Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); + isSnapshotMode = + extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) + || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); + } + return isSnapshotMode; + } + ///////////////////////// Pipe Consensus ///////////////////////// public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) {