Skip to content

Commit

Permalink
Improve global sequence processing in Java SDK's "ordered" extension (#…
Browse files Browse the repository at this point in the history
…33629)

* Resolved several issues related to global sequence processing - pipeline doesn't drain, errors related to View.asSingleton() in global window, but fix in the global sequence accumulator.

* Fix watermark progression issues related to the per key tickers.
  • Loading branch information
slilichenko authored Jan 29, 2025
1 parent 3c563e8 commit 6ea3828
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
Expand All @@ -32,7 +34,9 @@

/** A range of contiguous event sequences and the latest timestamp of the events in the range. */
@AutoValue
public abstract class ContiguousSequenceRange {
public abstract class ContiguousSequenceRange
implements Serializable, Comparable<ContiguousSequenceRange> {

public static final ContiguousSequenceRange EMPTY =
ContiguousSequenceRange.of(
Long.MIN_VALUE, Long.MIN_VALUE, Instant.ofEpochMilli(Long.MIN_VALUE));
Expand All @@ -46,19 +50,43 @@ public abstract class ContiguousSequenceRange {
/** @return latest timestamp of all events in the range */
public abstract Instant getTimestamp();

@Override
public int compareTo(ContiguousSequenceRange other) {
if (other == null) {
throw new IllegalArgumentException("Can't compare " + this + " with NULL");
}

int startCompare = Long.compare(this.getStart(), other.getStart());
return startCompare == 0 ? Long.compare(this.getEnd(), other.getEnd()) : startCompare;
}

public static ContiguousSequenceRange largestRange(
Iterable<ContiguousSequenceRange> rangeIterable) {
ContiguousSequenceRange result = EMPTY;

Iterator<ContiguousSequenceRange> iterator = rangeIterable.iterator();
while (iterator.hasNext()) {
ContiguousSequenceRange next = iterator.next();
if (next.compareTo(result) > 0) {
result = next;
}
}
return result;
}

public static ContiguousSequenceRange of(long start, long end, Instant timestamp) {
return new AutoValue_ContiguousSequenceRange(start, end, timestamp);
}

static class CompletedSequenceRangeCoder extends CustomCoder<ContiguousSequenceRange> {
static class ContiguousSequenceRangeCoder extends CustomCoder<ContiguousSequenceRange> {

private static final CompletedSequenceRangeCoder INSTANCE = new CompletedSequenceRangeCoder();
private static final ContiguousSequenceRangeCoder INSTANCE = new ContiguousSequenceRangeCoder();

static CompletedSequenceRangeCoder of() {
static ContiguousSequenceRangeCoder of() {
return INSTANCE;
}

private CompletedSequenceRangeCoder() {}
private ContiguousSequenceRangeCoder() {}

@Override
public void encode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
*/
package org.apache.beam.sdk.extensions.ordered;

import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange.CompletedSequenceRangeCoder;
import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange.ContiguousSequenceRangeCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
Expand All @@ -45,9 +46,9 @@ class GlobalSequenceTracker<
EventKeyT, EventT, ResultT, StateT extends MutableState<EventT, ResultT>>
extends PTransform<
PCollection<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>>,
PCollectionView<ContiguousSequenceRange>> {
PCollectionView<Iterable<ContiguousSequenceRange>>> {

private final Combine.GloballyAsSingletonView<
private final Combine.Globally<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer;
private final @Nullable Duration frequencyOfGeneration;
Expand All @@ -59,17 +60,15 @@ class GlobalSequenceTracker<
* @param sideInputProducer
*/
public GlobalSequenceTracker(
Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
Combine.Globally<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer) {
this.sideInputProducer = sideInputProducer;
this.frequencyOfGeneration = null;
this.maxElementsBeforeReevaluatingGlobalSequence = 0;
}

public GlobalSequenceTracker(
Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
Combine.Globally<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer,
Duration globalSequenceGenerationFrequency,
int maxElementsBeforeReevaluatingGlobalSequence) {
Expand All @@ -79,12 +78,12 @@ public GlobalSequenceTracker(
}

@Override
public PCollectionView<ContiguousSequenceRange> expand(
public PCollectionView<Iterable<ContiguousSequenceRange>> expand(
PCollection<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>> input) {
input
.getPipeline()
.getCoderRegistry()
.registerCoderForClass(ContiguousSequenceRange.class, CompletedSequenceRangeCoder.of());
.registerCoderForClass(ContiguousSequenceRange.class, ContiguousSequenceRangeCoder.of());

if (frequencyOfGeneration != null) {
// This branch will only be executed in case of streaming pipelines.
Expand All @@ -107,6 +106,10 @@ public PCollectionView<ContiguousSequenceRange> expand(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(frequencyOfGeneration)))));
}
return input.apply("Create Side Input", sideInputProducer);
return input
.apply("Combine Sequences", sideInputProducer)
// Have to use asIterable instead of asSingleton due to
// https://github.com/apache/beam/issues/26465
.apply("Create Side Input", View.asIterable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,7 +80,7 @@ class GlobalSequencesProcessorDoFn<
@SuppressWarnings("unused")
private final TimerSpec statusEmissionTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

private final PCollectionView<ContiguousSequenceRange> latestContiguousRangeSideInput;
private final PCollectionView<Iterable<ContiguousSequenceRange>> latestContiguousRangeSideInput;

private final Duration maxLateness;

Expand All @@ -94,7 +95,7 @@ class GlobalSequencesProcessorDoFn<
TupleTag<KV<EventKeyT, KV<Long, UnprocessedEvent<EventT>>>> unprocessedEventTupleTag,
boolean produceStatusUpdateOnEveryEvent,
long maxNumberOfResultsToProduce,
PCollectionView<ContiguousSequenceRange> latestContiguousRangeSideInput,
PCollectionView<Iterable<ContiguousSequenceRange>> latestContiguousRangeSideInput,
Duration maxLateness) {
super(
eventExaminer,
Expand Down Expand Up @@ -127,6 +128,7 @@ boolean checkForSequenceGapInBufferedEvents() {
public void processElement(
ProcessContext context,
@Element KV<EventKeyT, KV<Long, EventT>> eventAndSequence,
@Timestamp Instant elementTimestamp,
@StateId(BUFFERED_EVENTS) OrderedListState<EventT> bufferedEventsProxy,
@AlwaysFetched @StateId(PROCESSING_STATE)
ValueState<ProcessingState<EventKeyT>> processingStateProxy,
Expand All @@ -136,7 +138,8 @@ public void processElement(
MultiOutputReceiver outputReceiver,
BoundedWindow window) {

ContiguousSequenceRange lastContiguousRange = context.sideInput(latestContiguousRangeSideInput);
ContiguousSequenceRange lastContiguousRange =
ContiguousSequenceRange.largestRange(context.sideInput(latestContiguousRangeSideInput));

EventT event = eventAndSequence.getValue().getValue();
EventKeyT key = eventAndSequence.getKey();
Expand Down Expand Up @@ -164,7 +167,7 @@ public void processElement(
// sequence.
processingStateProxy.write(processingState);

setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState);
setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState, elementTimestamp);

return;
}
Expand Down Expand Up @@ -193,15 +196,33 @@ public void processElement(
outputReceiver,
window.maxTimestamp());

setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState);
setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState, elementTimestamp);
}

private void setBatchEmissionTimerIfNeeded(
Timer batchEmissionTimer, ProcessingState<EventKeyT> processingState) {
Timer batchEmissionTimer,
ProcessingState<EventKeyT> processingState,
Instant elementTimestamp) {
ContiguousSequenceRange lastCompleteGlobalSequence = processingState.getLastContiguousRange();
if (lastCompleteGlobalSequence != null
&& processingState.thereAreGloballySequencedEventsToBeProcessed()) {
batchEmissionTimer.set(lastCompleteGlobalSequence.getTimestamp().plus(maxLateness));
Instant maxTimeToWait = lastCompleteGlobalSequence.getTimestamp().plus(maxLateness);
Instant timerTime =
maxTimeToWait.isAfter(elementTimestamp)
? maxTimeToWait
: elementTimestamp.plus(Duration.millis(1));

if (LOG.isTraceEnabled()) {
LOG.trace(
"Setting batch emission timer to: "
+ timerTime
+ ", max time of the range: "
+ lastCompleteGlobalSequence.getTimestamp()
+ ", element time: "
+ elementTimestamp);
}

batchEmissionTimer.set(timerTime);
}
}

Expand All @@ -213,8 +234,13 @@ public void onBatchEmission(
ValueState<ProcessingState<EventKeyT>> processingStatusState,
@AlwaysFetched @StateId(MUTABLE_STATE) ValueState<StateT> mutableStateState,
@TimerId(BATCH_EMISSION_TIMER) Timer batchEmissionTimer,
@Key EventKeyT key,
MultiOutputReceiver outputReceiver) {

if (LOG.isTraceEnabled()) {
LOG.trace("Running batch processing for: " + key);
}

// At this point everything in the buffered state is ready to be processed up to the latest
// global sequence.
@Nullable ProcessingState<EventKeyT> processingState = processingStatusState.read();
Expand All @@ -237,10 +263,6 @@ public void onBatchEmission(
return;
}

if (LOG.isTraceEnabled()) {
LOG.trace("Emission timer: " + processingState);
}

this.numberOfResultsBeforeBundleStart = processingState.getResultCount();

state =
Expand Down Expand Up @@ -273,4 +295,9 @@ public void onStatusEmission(
processStatusTimerEvent(
outputReceiver, statusEmissionTimer, windowClosedState, processingStateState);
}

@OnWindowExpiration
public void onWindowExpiration(@StateId(WINDOW_CLOSED) ValueState<Boolean> windowClosedState) {
windowClosedState.write(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
* <p>There are two sequencing modes - a sequence per key and a global sequence. See {@link
* OrderedProcessingHandler} for details on how to configure this transform.
*
* <p>Notice: the global sequence processing on the Dataflow Runner requires running under Runner
* V2. Refer to <a href="https://cloud.google.com/dataflow/docs/runner-v2">Dataflow
* documentation</a> for details.
*
* @param <EventT> type of event
* @param <EventKeyT> type of event key
* @param <StateT> type of the state
Expand Down Expand Up @@ -235,7 +239,7 @@ private OrderedEventProcessorResult<EventKeyT, ResultT, EventT> expandGlobalSequ
PCollectionTuple processingResult;
boolean streamingProcessing = input.isBounded() == IsBounded.UNBOUNDED;

final PCollectionView<ContiguousSequenceRange> latestContiguousRange =
final PCollectionView<Iterable<ContiguousSequenceRange>> latestContiguousRange =
input
.apply("Convert to SequenceAndTimestamp", ParDo.of(new ToTimestampedEventConverter<>()))
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class OrderedEventProcessorResult<KeyT, ResultT, EventT> implements POutp
unprocessedEventPCollection;
private final TupleTag<KV<KeyT, KV<Long, UnprocessedEvent<EventT>>>> unprocessedEventTupleTag;

private final @Nullable PCollectionView<ContiguousSequenceRange> latestContiguousRange;
private final @Nullable PCollectionView<Iterable<ContiguousSequenceRange>> latestContiguousRange;

OrderedEventProcessorResult(
Pipeline pipeline,
Expand Down Expand Up @@ -85,7 +85,7 @@ public class OrderedEventProcessorResult<KeyT, ResultT, EventT> implements POutp
TupleTag<KV<KeyT, OrderedProcessingStatus>> eventProcessingStatusTupleTag,
PCollection<KV<KeyT, KV<Long, UnprocessedEvent<EventT>>>> unprocessedEventPCollection,
TupleTag<KV<KeyT, KV<Long, UnprocessedEvent<EventT>>>> unprocessedEventTupleTag,
@Nullable PCollectionView<ContiguousSequenceRange> latestContiguousRange) {
@Nullable PCollectionView<Iterable<ContiguousSequenceRange>> latestContiguousRange) {

this.pipeline = pipeline;
this.outputPCollection = outputPCollection;
Expand Down Expand Up @@ -132,11 +132,17 @@ public PCollection<KV<KeyT, ResultT>> output() {
return outputPCollection;
}

/** @return events which failed to process, including the reasons for failure. */
public PCollection<KV<KeyT, KV<Long, UnprocessedEvent<EventT>>>> unprocessedEvents() {
return unprocessedEventPCollection;
}

public @Nullable PCollectionView<ContiguousSequenceRange> latestContiguousRange() {
/**
* @return a view to a calculated side input with the last contiguous range. Note: an iterator is
* returned instead of a single value. Use {@link
* ContiguousSequenceRange#largestRange(Iterable)} to get the largest range.
*/
public @Nullable PCollectionView<Iterable<ContiguousSequenceRange>> latestContiguousRange() {
return latestContiguousRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.ordered.combiner.DefaultSequenceCombiner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
import org.apache.beam.sdk.transforms.Combine.Globally;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;

/**
Expand Down Expand Up @@ -253,11 +255,13 @@ public OrderedProcessingGlobalSequenceHandler(
*
* @return combiner
*/
public GloballyAsSingletonView<
public @UnknownKeyFor @NonNull @Initialized Globally<
TimestampedValue<KV<KeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
getGlobalSequenceCombiner() {
return Combine.globally(new DefaultSequenceCombiner<KeyT, EventT, StateT>(getEventExaminer()))
.asSingletonView();
Globally<TimestampedValue<KV<KeyT, KV<Long, EventT>>>, ContiguousSequenceRange> result =
Combine.globally(new DefaultSequenceCombiner<>(getEventExaminer()));
result = result.withoutDefaults();
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static OrderedProcessingStatus create(
long numberOfReceivedEvents,
long resultCount,
long duplicateCount,
boolean lastEventReceived) {
boolean lastEventReceived,
@Nullable ContiguousSequenceRange lastContiguousRange) {
return new AutoValue_OrderedProcessingStatus.Builder()
.setLastProcessedSequence(lastProcessedSequence)
.setNumberOfBufferedEvents(numberOfBufferedEvents)
Expand All @@ -48,6 +49,7 @@ public static OrderedProcessingStatus create(
.setDuplicateCount(duplicateCount)
.setResultCount(resultCount)
.setStatusDate(Instant.now())
.setLastContiguousSequenceRange(lastContiguousRange)
.build();
}

Expand Down Expand Up @@ -93,6 +95,9 @@ public static OrderedProcessingStatus create(
*/
public abstract Instant getStatusDate();

@Nullable
public abstract ContiguousSequenceRange getLastContiguousSequenceRange();

@Override
public final boolean equals(@Nullable Object obj) {
if (obj == null) {
Expand Down Expand Up @@ -146,6 +151,8 @@ public abstract static class Builder {

public abstract Builder setStatusDate(Instant value);

public abstract Builder setLastContiguousSequenceRange(@Nullable ContiguousSequenceRange value);

public abstract OrderedProcessingStatus build();
}
}
Loading

0 comments on commit 6ea3828

Please sign in to comment.