From cb9c6b26f55be45120064cba408e17aaf9878f7e Mon Sep 17 00:00:00 2001 From: jto Date: Thu, 23 Jan 2025 14:39:40 +0100 Subject: [PATCH] [Flink] merge --- .../wrappers/streaming/io/source/FlinkSource.java | 13 ++++++++++--- .../io/source/LazyFlinkSourceSplitEnumerator.java | 11 ++++++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java index fb52021c3cf5..0c1ba73b2a2d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java @@ -104,13 +104,20 @@ public Boundedness getBoundedness() { @Override public SplitEnumerator, Map>>> createEnumerator(SplitEnumeratorContext> enumContext) throws Exception { + return createEnumerator(enumContext, false); + } + + public SplitEnumerator, Map>>> + createEnumerator( + SplitEnumeratorContext> enumContext, boolean splitInitialized) + throws Exception { if (boundedness == Boundedness.BOUNDED) { return new LazyFlinkSourceSplitEnumerator<>( - enumContext, beamSource, serializablePipelineOptions.get(), numSplits); + enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized); } else { return new FlinkSourceSplitEnumerator<>( - enumContext, beamSource, serializablePipelineOptions.get(), numSplits); + enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized); } } @@ -121,7 +128,7 @@ public Boundedness getBoundedness() { Map>> checkpoint) throws Exception { SplitEnumerator, Map>>> enumerator = - createEnumerator(enumContext); + createEnumerator(enumContext, true); checkpoint.forEach( (subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId)); return enumerator; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java index 0f4fb0a0c75d..f5cd53c42ffa 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java @@ -53,21 +53,30 @@ public class LazyFlinkSourceSplitEnumerator private final PipelineOptions pipelineOptions; private final int numSplits; private final List> pendingSplits; + private boolean splitsInitialized; public LazyFlinkSourceSplitEnumerator( SplitEnumeratorContext> context, Source beamSource, PipelineOptions pipelineOptions, - int numSplits) { + int numSplits, + boolean splitInitialized) { this.context = context; this.beamSource = beamSource; this.pipelineOptions = pipelineOptions; this.numSplits = numSplits; this.pendingSplits = new ArrayList<>(numSplits); + this.splitsInitialized = splitInitialized; } @Override public void start() { + if (!splitsInitialized) { + initializeSplits(); + } + } + + public void initializeSplits() { context.callAsync( () -> { try {