Skip to content

Commit

Permalink
Pipe: Fix mode.snapshot configuration not taking effect
Browse files Browse the repository at this point in the history
  • Loading branch information
luoluoyuyu committed Oct 24, 2024
1 parent f61db48 commit 0b7e819
Showing 1 changed file with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.PersistentResource;
Expand Down Expand Up @@ -81,6 +80,12 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_DEFAULT_VALUE;
Expand All @@ -93,6 +98,8 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_SNAPSHOT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY;
Expand Down Expand Up @@ -374,15 +381,19 @@ public void customize(
listeningOptionPair.getRight());
}

final String extractorModeValue =
parameters.getStringOrDefault(
Arrays.asList(
PipeExtractorConstant.EXTRACTOR_MODE_KEY, PipeExtractorConstant.SOURCE_MODE_KEY),
PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
shouldTerminatePipeOnAllHistoricalEventsConsumed =
extractorModeValue.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE)
|| extractorModeValue.equalsIgnoreCase(
PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE);
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) {
shouldTerminatePipeOnAllHistoricalEventsConsumed =
parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY),
EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
} else {
final String extractorModeValue =
parameters.getStringOrDefault(
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE);
shouldTerminatePipeOnAllHistoricalEventsConsumed =
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
|| extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
}

if (LOGGER.isInfoEnabled()) {
LOGGER.info(
Expand Down

0 comments on commit 0b7e819

Please sign in to comment.