Skip to content

Commit

Permalink
[Flink] merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Jan 23, 2025
1 parent 10c480c commit cb9c6b2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,20 @@ public Boundedness getBoundedness() {
@Override
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {
return createEnumerator(enumContext, false);
}

public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>
createEnumerator(
SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext, boolean splitInitialized)
throws Exception {

if (boundedness == Boundedness.BOUNDED) {
return new LazyFlinkSourceSplitEnumerator<>(
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);
} else {
return new FlinkSourceSplitEnumerator<>(
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);
}
}

Expand All @@ -121,7 +128,7 @@ public Boundedness getBoundedness() {
Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)
throws Exception {
SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> enumerator =
createEnumerator(enumContext);
createEnumerator(enumContext, true);
checkpoint.forEach(
(subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));
return enumerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,30 @@ public class LazyFlinkSourceSplitEnumerator<T>
private final PipelineOptions pipelineOptions;
private final int numSplits;
private final List<FlinkSourceSplit<T>> pendingSplits;
private boolean splitsInitialized;

public LazyFlinkSourceSplitEnumerator(
SplitEnumeratorContext<FlinkSourceSplit<T>> context,
Source<T> beamSource,
PipelineOptions pipelineOptions,
int numSplits) {
int numSplits,
boolean splitInitialized) {
this.context = context;
this.beamSource = beamSource;
this.pipelineOptions = pipelineOptions;
this.numSplits = numSplits;
this.pendingSplits = new ArrayList<>(numSplits);
this.splitsInitialized = splitInitialized;
}

@Override
public void start() {
if (!splitsInitialized) {
initializeSplits();
}
}

public void initializeSplits() {
context.callAsync(
() -> {
try {
Expand Down

0 comments on commit cb9c6b2

Please sign in to comment.