Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
luoluoyuyu committed Oct 24, 2024
1 parent 0b7e819 commit 5e54f31
Showing 1 changed file with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_SNAPSHOT_KEY;

public class PipeDataNodeTaskAgent extends PipeTaskAgent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
Expand Down Expand Up @@ -426,23 +435,12 @@ protected void collectPipeMetaListInternal(
|| pipeTaskMap.entrySet().stream()
.filter(entry -> dataRegionIds.contains(entry.getKey()))
.allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted());
final String extractorModeValue =
pipeMeta
.getStaticMeta()
.getExtractorParameters()
.getStringOrDefault(
Arrays.asList(
PipeExtractorConstant.EXTRACTOR_MODE_KEY,
PipeExtractorConstant.SOURCE_MODE_KEY),
PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE);

final boolean includeDataAndNeedDrop =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
pipeMeta.getStaticMeta().getExtractorParameters())
.getLeft()
&& (extractorModeValue.equalsIgnoreCase(
PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE)
|| extractorModeValue.equalsIgnoreCase(
PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE));
&& isSnapshotMode(pipeMeta.getStaticMeta().getExtractorParameters());

final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
Expand Down Expand Up @@ -641,6 +639,24 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) {
}
}

private boolean isSnapshotMode(final PipeParameters parameters) {
final boolean isSnapshotMode;
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) {
isSnapshotMode =
parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY),
EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
} else {
final String extractorModeValue =
parameters.getStringOrDefault(
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE);
isSnapshotMode =
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
|| extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
}
return isSnapshotMode;
}

///////////////////////// Pipe Consensus /////////////////////////

public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) {
Expand Down

0 comments on commit 5e54f31

Please sign in to comment.