Skip to content

Commit

Permalink
[Flink] fix GBK streaming with side input
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Nov 6, 2024
1 parent 687113a commit cfa9acf
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,9 @@ public final void processElement2(StreamRecord<RawUnionValue> streamRecord) thro
WindowedValue<InputT> element = it.next();
// we need to set the correct key in case the operator is
// a (keyed) window operator
setKeyContextElement1(new StreamRecord<>(element));
if (keySelector != null) {
setCurrentKey(keySelector.getKey(element));
}

Iterable<WindowedValue<InputT>> justPushedBack =
pushbackDoFnRunner.processElementInReadyWindows(element);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ public void testAfterProcessingTimeContinuationTriggerUsingState() throws Except
final long waitMillis = 500;

PCollection<Integer> triggeredSums =
p.apply(
GenerateSequence.from(0)
.to(1)) // forces unbounded so delay cannot be fast-forwarded
p.apply(Create.of(0L))
.apply(WithKeys.of("dummy key"))
.apply(
"output then delay",
Expand Down

0 comments on commit cfa9acf

Please sign in to comment.