Skip to content

Commit

Permalink
Merge pull request #33989: Fix never trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax authored Feb 15, 2025
1 parent eea972f commit 23ba9fc
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
Expand Down Expand Up @@ -154,8 +155,11 @@ public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark)
private static class WatermarkCallback {
public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
@SuppressWarnings("unchecked")
Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window);
Instant firingAfter =
Ordering.natural()
.min(
LateDataUtils.garbageCollectionTime(window, strategy),
strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window));
return new WatermarkCallback(firingAfter, callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
Expand Down Expand Up @@ -54,6 +55,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering;
import org.joda.time.Instant;

/** A class that handles streaming side inputs in a {@link DoFnRunner}. */
@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -312,15 +315,19 @@ private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest buildGlob
throw new RuntimeException(e);
}

Instant firingAfter =
Ordering.natural()
.min(
LateDataUtils.garbageCollectionTime(sideInputWindow, sideWindowStrategy),
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideInputWindow));

return Windmill.GlobalDataRequest.newBuilder()
.setDataId(
Windmill.GlobalDataId.newBuilder()
.setTag(view.getTagInternal().getId())
.setVersion(windowStream.toByteString())
.build())
.setExistenceWatermarkDeadline(
WindmillTimeUtils.harnessToWindmillTimestamp(
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideInputWindow)))
.setExistenceWatermarkDeadline(WindmillTimeUtils.harnessToWindmillTimestamp(firingAfter))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void add(Iterable<String> values) {
}

@Override
public void add(String ... values) {
public void add(String... values) {
add(Arrays.asList(values));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ public BoundedTrie getBoundedTrie(MetricName metricName) {

@SuppressWarnings("FutureReturnValueIgnored")
public void flush(boolean async) {
if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty() &&
stringSets.isEmpty() && boundedTries.isEmpty()) {
if (counters.isEmpty()
&& distributions.isEmpty()
&& gauges.isEmpty()
&& stringSets.isEmpty()
&& boundedTries.isEmpty()) {
return;
}

Expand All @@ -102,7 +105,8 @@ public void flush(boolean async) {
extractUpdates(this.stringSets);
ImmutableList<MetricUpdates.MetricUpdate<BoundedTrieData>> boundedTries =
extractUpdates(this.boundedTries);
MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries);
MetricUpdates updates =
new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries);

if (async) {
accumulator.setAsync(metricsKey, updates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,12 +1309,7 @@ public StackManipulation.Size apply(MethodVisitor mv, Context context) {
if (returnVarIndex != null) {
// Drop the return type from the locals
mv.visitLocalVariable(
"res",
returnType.getDescriptor(),
null,
wrapStart,
wrapEnd,
returnVarIndex);
"res", returnType.getDescriptor(), null, wrapStart, wrapEnd, returnVarIndex);
}

return size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,21 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand All @@ -38,9 +47,15 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -210,6 +225,126 @@ public void testWaitWithSomeSignalWindowsEmpty() {
FixedWindows.of(Duration.standardSeconds(1)));
}

private static final Set<Long> PROCESSED_LONGS = Sets.newConcurrentHashSet();
private static final Set<Long> VERIFIED_LONGS = Sets.newConcurrentHashSet();

@DefaultSchema(JavaFieldSchema.class)
static class WindowExpirationValue {
public final @Nullable Instant watermarkAdvance;
public final long value;

@SchemaCreate
public WindowExpirationValue(@Nullable Instant watermarkAdvance, long value) {
this.watermarkAdvance = watermarkAdvance;
this.value = value;
}
}

@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testWindowExpiration() throws NoSuchSchemaException {
PROCESSED_LONGS.clear();
VERIFIED_LONGS.clear();

SchemaCoder<WindowExpirationValue> schemaCoder =
p.getSchemaRegistry().getSchemaCoder(WindowExpirationValue.class);
List<Long> allLongs = LongStream.range(0, 200).boxed().collect(Collectors.toList());
TestStream.Builder<WindowExpirationValue> streamBuilder =
TestStream.create(schemaCoder).advanceWatermarkTo(Instant.EPOCH);
for (long i : allLongs) {
if (i > 0 && (i % 2) == 0) {
Instant watermarkValue = Instant.ofEpochMilli(i * 1000);
streamBuilder = streamBuilder.advanceWatermarkTo(watermarkValue);
streamBuilder =
streamBuilder.addElements(
TimestampedValue.of(
new WindowExpirationValue(watermarkValue, -1), Instant.ofEpochSecond(i)));
}
streamBuilder =
streamBuilder.addElements(
TimestampedValue.of(new WindowExpirationValue(null, i), Instant.ofEpochSecond(i)));
}
Instant watermarkValue = Instant.ofEpochMilli(200 * 1000);
streamBuilder = streamBuilder.advanceWatermarkTo(watermarkValue);
streamBuilder =
streamBuilder.addElements(
TimestampedValue.of(
new WindowExpirationValue(watermarkValue, -1), Instant.ofEpochSecond(200)));

PCollection<WindowExpirationValue> longs = p.apply(streamBuilder.advanceWatermarkToInfinity());

TupleTag<Long> signalOutputTag = new TupleTag<>();
TupleTag<Long> verifiedOutputTag = new TupleTag<>();
// Keeps track of values processed.
PCollectionTuple pCollectionTuple =
longs.apply(
ParDo.of(
new DoFn<WindowExpirationValue, Long>() {
@ProcessElement
public void process(
@Element WindowExpirationValue element, MultiOutputReceiver o) {
if (element.watermarkAdvance != null) {
// Since TestStream is synchronous, we can assume that the Wait has
// released the previous
// window. Each window contains two elements, so verify that these two
// elements have been
// verified by the ParDo following the Wait.
long elementUpperBound = element.watermarkAdvance.getMillis() / 1000;
// This means the watermark has advanced. We expect the previous window to
// have been verified.
OutputReceiver<Long> verified = o.get(verifiedOutputTag);
if (VERIFIED_LONGS.contains(elementUpperBound - 1)) {
verified.output(elementUpperBound - 1);
}
if (VERIFIED_LONGS.contains(elementUpperBound - 2)) {
verified.output(elementUpperBound - 2);
}
}
PROCESSED_LONGS.add(element.value);
o.get(signalOutputTag).output(element.value);
}
})
.withOutputTags(signalOutputTag, TupleTagList.of(verifiedOutputTag)));
pCollectionTuple.get(verifiedOutputTag).setCoder(VarLongCoder.of());

FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(2));
PCollection<Long> verifiedInts =
longs
.apply(
"flatmap",
FlatMapElements.into(TypeDescriptors.longs())
.via(
value ->
value.watermarkAdvance == null
? Collections.singletonList(value.value)
: Collections.emptyList()))
.apply("w1", Window.<Long>into(fixedWindows).withAllowedLateness(Duration.ZERO))
.apply(
Wait.on(
pCollectionTuple
.get(signalOutputTag)
.apply(
"w2",
Window.<Long>into(fixedWindows).withAllowedLateness(Duration.ZERO))))
.apply(
"verify",
ParDo.of(
new DoFn<Long, Long>() {
@ProcessElement
public void process(@Element Long element, OutputReceiver<Long> o) {
if (PROCESSED_LONGS.contains(element)) {
VERIFIED_LONGS.add(element);
o.output(element);
}
}
}));
PAssert.that(verifiedInts).containsInAnyOrder(Iterables.toArray(allLongs, Long.class));

PAssert.that(pCollectionTuple.get(verifiedOutputTag))
.containsInAnyOrder(Iterables.toArray(allLongs, Long.class));
p.run();
}

/**
* Tests the {@link Wait} transform with a given configuration of the main input and the signal
* input. Specifically, generates random streams with bounded lateness for main and signal inputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
public class NeverTest {
@Test
public void testFireDeadline() throws Exception {
IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
assertEquals(
BoundedWindow.TIMESTAMP_MAX_VALUE,
Never.ever()
.getWatermarkThatGuaranteesFiring(new IntervalWindow(new Instant(0), new Instant(10))));
BoundedWindow.TIMESTAMP_MAX_VALUE, Never.ever().getWatermarkThatGuaranteesFiring(window));
}

@Test
Expand Down

0 comments on commit 23ba9fc

Please sign in to comment.