From 6ea382827ebfdff307f22990dc5fcbee4544b320 Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Wed, 29 Jan 2025 10:42:20 -0800 Subject: [PATCH] Improve global sequence processing in Java SDK's "ordered" extension (#33629) * Resolved several issues related to global sequence processing - pipeline doesn't drain, errors related to View.asSingleton() in global window, but fix in the global sequence accumulator. * Fix watermark progression issues related to the per key tickers. --- .../ordered/ContiguousSequenceRange.java | 38 ++++- .../ordered/GlobalSequenceTracker.java | 23 +-- .../ordered/GlobalSequencesProcessorDoFn.java | 49 +++++-- .../ordered/OrderedEventProcessor.java | 6 +- .../ordered/OrderedEventProcessorResult.java | 12 +- .../ordered/OrderedProcessingHandler.java | 12 +- .../ordered/OrderedProcessingStatus.java | 9 +- .../ordered/PerKeyTickerGenerator.java | 40 +++++- .../extensions/ordered/ProcessingState.java | 3 +- .../sdk/extensions/ordered/ProcessorDoFn.java | 3 +- .../combiner/DefaultSequenceCombiner.java | 18 +-- .../combiner/SequenceRangeAccumulator.java | 3 +- .../ordered/ContiguousSequenceRangeTest.java | 71 +++++++++ ...deredEventProcessorPerKeySequenceTest.java | 135 +++++++++++++----- .../OrderedEventProcessorTestBase.java | 8 +- .../SequenceRangeAccumulatorTest.java | 62 ++++++++ 16 files changed, 399 insertions(+), 93 deletions(-) create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRangeTest.java diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRange.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRange.java index c16cf9328dcd..ccdf87298e3f 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRange.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRange.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Serializable; +import java.util.Iterator; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.InstantCoder; @@ -32,7 +34,9 @@ /** A range of contiguous event sequences and the latest timestamp of the events in the range. */ @AutoValue -public abstract class ContiguousSequenceRange { +public abstract class ContiguousSequenceRange + implements Serializable, Comparable { + public static final ContiguousSequenceRange EMPTY = ContiguousSequenceRange.of( Long.MIN_VALUE, Long.MIN_VALUE, Instant.ofEpochMilli(Long.MIN_VALUE)); @@ -46,19 +50,43 @@ public abstract class ContiguousSequenceRange { /** @return latest timestamp of all events in the range */ public abstract Instant getTimestamp(); + @Override + public int compareTo(ContiguousSequenceRange other) { + if (other == null) { + throw new IllegalArgumentException("Can't compare " + this + " with NULL"); + } + + int startCompare = Long.compare(this.getStart(), other.getStart()); + return startCompare == 0 ? Long.compare(this.getEnd(), other.getEnd()) : startCompare; + } + + public static ContiguousSequenceRange largestRange( + Iterable rangeIterable) { + ContiguousSequenceRange result = EMPTY; + + Iterator iterator = rangeIterable.iterator(); + while (iterator.hasNext()) { + ContiguousSequenceRange next = iterator.next(); + if (next.compareTo(result) > 0) { + result = next; + } + } + return result; + } + public static ContiguousSequenceRange of(long start, long end, Instant timestamp) { return new AutoValue_ContiguousSequenceRange(start, end, timestamp); } - static class CompletedSequenceRangeCoder extends CustomCoder { + static class ContiguousSequenceRangeCoder extends CustomCoder { - private static final CompletedSequenceRangeCoder INSTANCE = new CompletedSequenceRangeCoder(); + private static final ContiguousSequenceRangeCoder INSTANCE = new ContiguousSequenceRangeCoder(); - static CompletedSequenceRangeCoder of() { + static ContiguousSequenceRangeCoder of() { return INSTANCE; } - private CompletedSequenceRangeCoder() {} + private ContiguousSequenceRangeCoder() {} @Override public void encode( diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequenceTracker.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequenceTracker.java index aa12c30a5317..fbf3be168dd1 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequenceTracker.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequenceTracker.java @@ -17,9 +17,10 @@ */ package org.apache.beam.sdk.extensions.ordered; -import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange.CompletedSequenceRangeCoder; +import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange.ContiguousSequenceRangeCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; @@ -45,9 +46,9 @@ class GlobalSequenceTracker< EventKeyT, EventT, ResultT, StateT extends MutableState> extends PTransform< PCollection>>>, - PCollectionView> { + PCollectionView>> { - private final Combine.GloballyAsSingletonView< + private final Combine.Globally< TimestampedValue>>, ContiguousSequenceRange> sideInputProducer; private final @Nullable Duration frequencyOfGeneration; @@ -59,8 +60,7 @@ class GlobalSequenceTracker< * @param sideInputProducer */ public GlobalSequenceTracker( - Combine.GloballyAsSingletonView< - TimestampedValue>>, ContiguousSequenceRange> + Combine.Globally>>, ContiguousSequenceRange> sideInputProducer) { this.sideInputProducer = sideInputProducer; this.frequencyOfGeneration = null; @@ -68,8 +68,7 @@ public GlobalSequenceTracker( } public GlobalSequenceTracker( - Combine.GloballyAsSingletonView< - TimestampedValue>>, ContiguousSequenceRange> + Combine.Globally>>, ContiguousSequenceRange> sideInputProducer, Duration globalSequenceGenerationFrequency, int maxElementsBeforeReevaluatingGlobalSequence) { @@ -79,12 +78,12 @@ public GlobalSequenceTracker( } @Override - public PCollectionView expand( + public PCollectionView> expand( PCollection>>> input) { input .getPipeline() .getCoderRegistry() - .registerCoderForClass(ContiguousSequenceRange.class, CompletedSequenceRangeCoder.of()); + .registerCoderForClass(ContiguousSequenceRange.class, ContiguousSequenceRangeCoder.of()); if (frequencyOfGeneration != null) { // This branch will only be executed in case of streaming pipelines. @@ -107,6 +106,10 @@ public PCollectionView expand( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(frequencyOfGeneration))))); } - return input.apply("Create Side Input", sideInputProducer); + return input + .apply("Combine Sequences", sideInputProducer) + // Have to use asIterable instead of asSingleton due to + // https://github.com/apache/beam/issues/26465 + .apply("Create Side Input", View.asIterable()); } } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequencesProcessorDoFn.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequencesProcessorDoFn.java index 64c2d119c97d..4c4989966076 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequencesProcessorDoFn.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequencesProcessorDoFn.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +80,7 @@ class GlobalSequencesProcessorDoFn< @SuppressWarnings("unused") private final TimerSpec statusEmissionTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); - private final PCollectionView latestContiguousRangeSideInput; + private final PCollectionView> latestContiguousRangeSideInput; private final Duration maxLateness; @@ -94,7 +95,7 @@ class GlobalSequencesProcessorDoFn< TupleTag>>> unprocessedEventTupleTag, boolean produceStatusUpdateOnEveryEvent, long maxNumberOfResultsToProduce, - PCollectionView latestContiguousRangeSideInput, + PCollectionView> latestContiguousRangeSideInput, Duration maxLateness) { super( eventExaminer, @@ -127,6 +128,7 @@ boolean checkForSequenceGapInBufferedEvents() { public void processElement( ProcessContext context, @Element KV> eventAndSequence, + @Timestamp Instant elementTimestamp, @StateId(BUFFERED_EVENTS) OrderedListState bufferedEventsProxy, @AlwaysFetched @StateId(PROCESSING_STATE) ValueState> processingStateProxy, @@ -136,7 +138,8 @@ public void processElement( MultiOutputReceiver outputReceiver, BoundedWindow window) { - ContiguousSequenceRange lastContiguousRange = context.sideInput(latestContiguousRangeSideInput); + ContiguousSequenceRange lastContiguousRange = + ContiguousSequenceRange.largestRange(context.sideInput(latestContiguousRangeSideInput)); EventT event = eventAndSequence.getValue().getValue(); EventKeyT key = eventAndSequence.getKey(); @@ -164,7 +167,7 @@ public void processElement( // sequence. processingStateProxy.write(processingState); - setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState); + setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState, elementTimestamp); return; } @@ -193,15 +196,33 @@ public void processElement( outputReceiver, window.maxTimestamp()); - setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState); + setBatchEmissionTimerIfNeeded(batchEmissionTimer, processingState, elementTimestamp); } private void setBatchEmissionTimerIfNeeded( - Timer batchEmissionTimer, ProcessingState processingState) { + Timer batchEmissionTimer, + ProcessingState processingState, + Instant elementTimestamp) { ContiguousSequenceRange lastCompleteGlobalSequence = processingState.getLastContiguousRange(); if (lastCompleteGlobalSequence != null && processingState.thereAreGloballySequencedEventsToBeProcessed()) { - batchEmissionTimer.set(lastCompleteGlobalSequence.getTimestamp().plus(maxLateness)); + Instant maxTimeToWait = lastCompleteGlobalSequence.getTimestamp().plus(maxLateness); + Instant timerTime = + maxTimeToWait.isAfter(elementTimestamp) + ? maxTimeToWait + : elementTimestamp.plus(Duration.millis(1)); + + if (LOG.isTraceEnabled()) { + LOG.trace( + "Setting batch emission timer to: " + + timerTime + + ", max time of the range: " + + lastCompleteGlobalSequence.getTimestamp() + + ", element time: " + + elementTimestamp); + } + + batchEmissionTimer.set(timerTime); } } @@ -213,8 +234,13 @@ public void onBatchEmission( ValueState> processingStatusState, @AlwaysFetched @StateId(MUTABLE_STATE) ValueState mutableStateState, @TimerId(BATCH_EMISSION_TIMER) Timer batchEmissionTimer, + @Key EventKeyT key, MultiOutputReceiver outputReceiver) { + if (LOG.isTraceEnabled()) { + LOG.trace("Running batch processing for: " + key); + } + // At this point everything in the buffered state is ready to be processed up to the latest // global sequence. @Nullable ProcessingState processingState = processingStatusState.read(); @@ -237,10 +263,6 @@ public void onBatchEmission( return; } - if (LOG.isTraceEnabled()) { - LOG.trace("Emission timer: " + processingState); - } - this.numberOfResultsBeforeBundleStart = processingState.getResultCount(); state = @@ -273,4 +295,9 @@ public void onStatusEmission( processStatusTimerEvent( outputReceiver, statusEmissionTimer, windowClosedState, processingStateState); } + + @OnWindowExpiration + public void onWindowExpiration(@StateId(WINDOW_CLOSED) ValueState windowClosedState) { + windowClosedState.write(true); + } } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java index fb23a7c8667a..2eba63740497 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java @@ -54,6 +54,10 @@ *

There are two sequencing modes - a sequence per key and a global sequence. See {@link * OrderedProcessingHandler} for details on how to configure this transform. * + *

Notice: the global sequence processing on the Dataflow Runner requires running under Runner + * V2. Refer to Dataflow + * documentation for details. + * * @param type of event * @param type of event key * @param type of the state @@ -235,7 +239,7 @@ private OrderedEventProcessorResult expandGlobalSequ PCollectionTuple processingResult; boolean streamingProcessing = input.isBounded() == IsBounded.UNBOUNDED; - final PCollectionView latestContiguousRange = + final PCollectionView> latestContiguousRange = input .apply("Convert to SequenceAndTimestamp", ParDo.of(new ToTimestampedEventConverter<>())) .apply( diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java index 48b9fafc99af..35191fdf272f 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java @@ -55,7 +55,7 @@ public class OrderedEventProcessorResult implements POutp unprocessedEventPCollection; private final TupleTag>>> unprocessedEventTupleTag; - private final @Nullable PCollectionView latestContiguousRange; + private final @Nullable PCollectionView> latestContiguousRange; OrderedEventProcessorResult( Pipeline pipeline, @@ -85,7 +85,7 @@ public class OrderedEventProcessorResult implements POutp TupleTag> eventProcessingStatusTupleTag, PCollection>>> unprocessedEventPCollection, TupleTag>>> unprocessedEventTupleTag, - @Nullable PCollectionView latestContiguousRange) { + @Nullable PCollectionView> latestContiguousRange) { this.pipeline = pipeline; this.outputPCollection = outputPCollection; @@ -132,11 +132,17 @@ public PCollection> output() { return outputPCollection; } + /** @return events which failed to process, including the reasons for failure. */ public PCollection>>> unprocessedEvents() { return unprocessedEventPCollection; } - public @Nullable PCollectionView latestContiguousRange() { + /** + * @return a view to a calculated side input with the last contiguous range. Note: an iterator is + * returned instead of a single value. Use {@link + * ContiguousSequenceRange#largestRange(Iterable)} to get the largest range. + */ + public @Nullable PCollectionView> latestContiguousRange() { return latestContiguousRange; } } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java index d8ad13330a1a..764d353544ce 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java @@ -24,11 +24,13 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.ordered.combiner.DefaultSequenceCombiner; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView; +import org.apache.beam.sdk.transforms.Combine.Globally; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; +import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Duration; /** @@ -253,11 +255,13 @@ public OrderedProcessingGlobalSequenceHandler( * * @return combiner */ - public GloballyAsSingletonView< + public @UnknownKeyFor @NonNull @Initialized Globally< TimestampedValue>>, ContiguousSequenceRange> getGlobalSequenceCombiner() { - return Combine.globally(new DefaultSequenceCombiner(getEventExaminer())) - .asSingletonView(); + Globally>>, ContiguousSequenceRange> result = + Combine.globally(new DefaultSequenceCombiner<>(getEventExaminer())); + result = result.withoutDefaults(); + return result; } /** diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java index 7a556de1017b..67af75e00f9d 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java @@ -37,7 +37,8 @@ public static OrderedProcessingStatus create( long numberOfReceivedEvents, long resultCount, long duplicateCount, - boolean lastEventReceived) { + boolean lastEventReceived, + @Nullable ContiguousSequenceRange lastContiguousRange) { return new AutoValue_OrderedProcessingStatus.Builder() .setLastProcessedSequence(lastProcessedSequence) .setNumberOfBufferedEvents(numberOfBufferedEvents) @@ -48,6 +49,7 @@ public static OrderedProcessingStatus create( .setDuplicateCount(duplicateCount) .setResultCount(resultCount) .setStatusDate(Instant.now()) + .setLastContiguousSequenceRange(lastContiguousRange) .build(); } @@ -93,6 +95,9 @@ public static OrderedProcessingStatus create( */ public abstract Instant getStatusDate(); + @Nullable + public abstract ContiguousSequenceRange getLastContiguousSequenceRange(); + @Override public final boolean equals(@Nullable Object obj) { if (obj == null) { @@ -146,6 +151,8 @@ public abstract static class Builder { public abstract Builder setStatusDate(Instant value); + public abstract Builder setLastContiguousSequenceRange(@Nullable ContiguousSequenceRange value); + public abstract OrderedProcessingStatus build(); } } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/PerKeyTickerGenerator.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/PerKeyTickerGenerator.java index a18ba53f5266..1e725440538f 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/PerKeyTickerGenerator.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/PerKeyTickerGenerator.java @@ -38,6 +38,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Duration; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,13 +102,23 @@ static class PerKeyTickerGeneratorDoFn public void process( @Element KV> element, @AlwaysFetched @StateId(STATE) ValueState state, + @Timestamp Instant currentTimestamp, @TimerId(TIMER) Timer tickerTimer) { @Nullable EventKeyT keyValue = state.read(); if (keyValue != null) { return; } - tickerTimer.offset(tickerFrequency).setRelative(); + // The first event received becomes the starting point for watermark tracking in this + // transform. setTimer() method's withOutputTimestamp() determines what that watermark will + // be. + // + // This can be an issue if the first event is really far back because the flattened + // PCollection of events and tickers will have the lowest watermark of the two. + // It might be possible to improve the tracking by inspecting other elements and progress + // the watermark further based on the timestamp of a new event. Or use the latest contiguous + // range side input and its timestamp. + setTimer(tickerTimer, currentTimestamp); state.write(element.getKey()); } @@ -116,17 +127,40 @@ public void process( public void onTimer( @StateId(STATE) ValueState state, @TimerId(TIMER) Timer tickerTimer, + @Timestamp Instant currentTimestamp, OutputReceiver>> outputReceiver) { @Nullable EventKeyT key = state.read(); if (key == null) { - LOG.error("Expected to get the key from the state, but got null"); + LOG.warn( + "Expected to get the key from the state, but got null. " + + "It is expected during pipeline draining."); return; } // Null value will be an indicator to the main transform that the element is a ticker outputReceiver.output(KV.of(key, KV.of(0L, null))); - tickerTimer.offset(tickerFrequency).setRelative(); + setTimer(tickerTimer, currentTimestamp); + } + + private void setTimer(Timer tickerTimer, Instant currentTime) { + tickerTimer + .offset(tickerFrequency) + .withOutputTimestamp(currentTime.plus(tickerFrequency)) + .setRelative(); + } + + /** + * This call will be received when the input is windowed or in case the input is in Global + * Window and the pipeline is drained. In either cases we need to stop the timers. We can't + * access the timer from this method and clear the state to indicate that the processing needs + * to stop. + * + * @param state + */ + @OnWindowExpiration + public void onWindowExpiration(@StateId(STATE) ValueState state) { + state.clear(); } } } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java index 425eb4444a63..2980c2d614f0 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange.ContiguousSequenceRangeCoder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.initialization.qual.Initialized; @@ -343,7 +344,7 @@ static class ProcessingStateCoder extends Coder> { private static final BooleanCoder BOOLEAN_CODER = BooleanCoder.of(); private static final NullableCoder SEQUENCE_AND_TIMESTAMP_CODER = - NullableCoder.of(ContiguousSequenceRange.CompletedSequenceRangeCoder.of()); + NullableCoder.of(ContiguousSequenceRangeCoder.of()); private Coder keyCoder; diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessorDoFn.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessorDoFn.java index a05b0829074a..60f5dcdde4f7 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessorDoFn.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessorDoFn.java @@ -260,7 +260,8 @@ protected void emitProcessingStatus( processingState.getEventsReceived(), processingState.getResultCount(), processingState.getDuplicates(), - processingState.isLastEventReceived())), + processingState.isLastEventReceived(), + processingState.getLastContiguousRange())), statusTimestamp); } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/DefaultSequenceCombiner.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/DefaultSequenceCombiner.java index 32e5cbc36e4e..8e3a517c7e86 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/DefaultSequenceCombiner.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/DefaultSequenceCombiner.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.extensions.ordered.combiner; import java.util.Iterator; -import java.util.function.BiFunction; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -31,9 +30,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,15 +54,6 @@ public class DefaultSequenceCombiner - OLDEST_TIMESTAMP_SELECTOR = - (instant1, instant2) -> { - if (instant2 == null) { - return instant1; - } - @NonNull Instant nonNullableSecondValue = instant2; - return instant1.isAfter(nonNullableSecondValue) ? instant1 : nonNullableSecondValue; - }; private final EventExaminer eventExaminer; public DefaultSequenceCombiner(EventExaminer eventExaminer) { @@ -93,20 +81,22 @@ public SequenceRangeAccumulator addInput( @Override public SequenceRangeAccumulator mergeAccumulators( Iterable accumulators) { + // There should be at least one accumulator. Iterator iterator = accumulators.iterator(); SequenceRangeAccumulator result = iterator.next(); while (iterator.hasNext()) { result.merge(iterator.next()); } + return result; } @Override public ContiguousSequenceRange extractOutput(SequenceRangeAccumulator accum) { ContiguousSequenceRange result = accum.largestContinuousRange(); - if (LOG.isTraceEnabled()) { - LOG.trace("Returning completed sequence range: " + result); + if (LOG.isDebugEnabled()) { + LOG.debug("Returning completed sequence range: " + result); } return result; } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulator.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulator.java index 89dc912afc90..70c22f54056c 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulator.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulator.java @@ -39,7 +39,6 @@ /** Default accumulator used to combine sequence ranges. */ public class SequenceRangeAccumulator { - private static Instant max(Instant a, Instant b) { return a.isAfter(b) ? a : b; } @@ -217,7 +216,7 @@ public void merge(SequenceRangeAccumulator another) { private Instant removeAllRanges(long lowerBound, long upperBound, Instant currentTimestamp) { Instant result = currentTimestamp; - SortedMap> rangesToRemove = data.subMap(lowerBound, upperBound); + SortedMap> rangesToRemove = data.subMap(lowerBound, upperBound + 1); for (Pair value : rangesToRemove.values()) { result = result.isAfter(value.getRight()) ? result : value.getRight(); } diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRangeTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRangeTest.java new file mode 100644 index 000000000000..cb4648cc8b65 --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRangeTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.util.Arrays; +import java.util.Collections; +import junit.framework.TestCase; +import org.joda.time.Instant; + +public class ContiguousSequenceRangeTest extends TestCase { + + public void testCompareTo() { + ContiguousSequenceRange refToEmpty = ContiguousSequenceRange.EMPTY; + assertEquals("Empty ranges are equal", 0, ContiguousSequenceRange.EMPTY.compareTo(refToEmpty)); + + assertEquals( + "Empty range is smaller than another", + -1, + ContiguousSequenceRange.EMPTY.compareTo(ContiguousSequenceRange.of(0, 5, new Instant()))); + + assertEquals( + "First range is smaller than the second", + -1, + ContiguousSequenceRange.of(0, 2, new Instant()) + .compareTo(ContiguousSequenceRange.of(0, 5, new Instant()))); + + assertEquals( + "First range is larger than the second", + 1, + ContiguousSequenceRange.of(0, 10, new Instant()) + .compareTo(ContiguousSequenceRange.of(0, 5, new Instant()))); + + assertEquals( + "Ranges are equal", + 0, + ContiguousSequenceRange.of(0, 10, new Instant()) + .compareTo(ContiguousSequenceRange.of(0, 10, new Instant()))); + } + + public void testLargestRange() { + assertEquals( + "Empty if no elements", + ContiguousSequenceRange.EMPTY, + ContiguousSequenceRange.largestRange(Collections.EMPTY_LIST)); + + ContiguousSequenceRange one = ContiguousSequenceRange.EMPTY; + ContiguousSequenceRange two = ContiguousSequenceRange.of(0, 5, new Instant()); + ContiguousSequenceRange three = ContiguousSequenceRange.of(0, 22, new Instant()); + ContiguousSequenceRange four = ContiguousSequenceRange.of(0, 10, new Instant()); + assertEquals( + "third range", + three, + ContiguousSequenceRange.largestRange( + Arrays.asList(new ContiguousSequenceRange[] {one, two, three, four}))); + } +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorPerKeySequenceTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorPerKeySequenceTest.java index 6909a3bb992c..5dad7ac1852b 100644 --- a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorPerKeySequenceTest.java +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorPerKeySequenceTest.java @@ -38,6 +38,7 @@ import org.junit.Test; public class OrderedEventProcessorPerKeySequenceTest extends OrderedEventProcessorTestBase { + private static final ContiguousSequenceRange NOT_USED_FOR_TESTING = null; @Test public void testPerfectOrderingProcessing() throws CannotProvideCoderException { @@ -62,7 +63,8 @@ public void testPerfectOrderingProcessing() throws CannotProvideCoderException { 4, Arrays.stream(events).filter(e -> e.getKey().equals("id-1")).count(), 0, - false))); + false, + NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( "id-2", @@ -74,7 +76,8 @@ public void testPerfectOrderingProcessing() throws CannotProvideCoderException { 2, Arrays.stream(events).filter(e -> e.getKey().equals("id-2")).count(), 0, - false))); + false, + NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -120,7 +123,8 @@ public void testOutOfSequenceProcessing() throws CannotProvideCoderException { 4, Arrays.stream(events).filter(e -> e.getKey().equals("id-1")).count(), 0, - false))); + false, + NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( "id-2", @@ -132,7 +136,8 @@ public void testOutOfSequenceProcessing() throws CannotProvideCoderException { 5, Arrays.stream(events).filter(e -> e.getKey().equals("id-2")).count(), 0, - false))); + false, + NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -168,9 +173,14 @@ public void testUnfinishedProcessing() throws CannotProvideCoderException { Collection> expectedStatuses = new ArrayList<>(); expectedStatuses.add( - KV.of("id-1", OrderedProcessingStatus.create(0L, 2, 2L, 3L, 3, 1L, 0, false))); + KV.of( + "id-1", + OrderedProcessingStatus.create(0L, 2, 2L, 3L, 3, 1L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( - KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2L, 0, false))); + KV.of( + "id-2", + OrderedProcessingStatus.create( + 1L, 0, null, null, 2, 2L, 0, false, NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -203,7 +213,15 @@ public void testHandlingOfDuplicateSequences() throws CannotProvideCoderExceptio KV.of( "id-1", OrderedProcessingStatus.create( - 3L, 0, null, null, events.length, resultCount, duplicateCount, false))); + 3L, + 0, + null, + null, + events.length, + resultCount, + duplicateCount, + false, + NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -239,7 +257,10 @@ public void testHandlingOfCheckedExceptions() throws CannotProvideCoderException Collection> expectedStatuses = new ArrayList<>(); expectedStatuses.add( - KV.of("id-1", OrderedProcessingStatus.create(1L, 1, 3L, 3L, events.length, 2, 0, false))); + KV.of( + "id-1", + OrderedProcessingStatus.create( + 1L, 1, 3L, 3L, events.length, 2, 0, false, NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -277,9 +298,15 @@ public void testProcessingWithEveryOtherResultEmission() throws CannotProvideCod Collection> expectedStatuses = new ArrayList<>(); expectedStatuses.add( - KV.of("id-1", OrderedProcessingStatus.create(3L, 0, null, null, 4, 2L, 0, false))); + KV.of( + "id-1", + OrderedProcessingStatus.create( + 3L, 0, null, null, 4, 2L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( - KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 1L, 0, false))); + KV.of( + "id-2", + OrderedProcessingStatus.create( + 1L, 0, null, null, 2, 1L, 0, false, NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -335,7 +362,15 @@ public void testLargeBufferedOutputInTimer() throws CannotProvideCoderException KV.of( key, OrderedProcessingStatus.create( - null, bufferedEventCount, 2L, sequence, bufferedEventCount, 0L, 0, false))); + null, + bufferedEventCount, + 2L, + sequence, + bufferedEventCount, + 0L, + 0, + false, + NOT_USED_FOR_TESTING))); } } @@ -353,7 +388,8 @@ public void testLargeBufferedOutputInTimer() throws CannotProvideCoderException sequences.length, lastOutputSequence, 0, - false))); + false, + NOT_USED_FOR_TESTING))); } // -- Final status - indicates that everything has been fully processed @@ -368,7 +404,8 @@ public void testLargeBufferedOutputInTimer() throws CannotProvideCoderException sequences.length, sequences.length, 0, - false))); + false, + NOT_USED_FOR_TESTING))); testPerKeySequenceProcessing( events.toArray(new Event[events.size()]), @@ -408,54 +445,54 @@ public void testSequenceGapProcessingInBufferedOutput() throws CannotProvideCode KV.of( key, OrderedProcessingStatus.create( - null, 1, 2L, 2L, ++numberOfReceivedEvents, 0L, 0, false))); + null, 1, 2L, 2L, ++numberOfReceivedEvents, 0L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - null, 2, 2L, 3L, ++numberOfReceivedEvents, 0L, 0, false))); + null, 2, 2L, 3L, ++numberOfReceivedEvents, 0L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - null, 3, 2L, 7L, ++numberOfReceivedEvents, 0L, 0, false))); + null, 3, 2L, 7L, ++numberOfReceivedEvents, 0L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - null, 4, 2L, 8L, ++numberOfReceivedEvents, 0L, 0, false))); + null, 4, 2L, 8L, ++numberOfReceivedEvents, 0L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - null, 5, 2L, 9L, ++numberOfReceivedEvents, 0L, 0, false))); + null, 5, 2L, 9L, ++numberOfReceivedEvents, 0L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - null, 6, 2L, 10L, ++numberOfReceivedEvents, 0L, 0, false))); + null, 6, 2L, 10L, ++numberOfReceivedEvents, 0L, 0, false, NOT_USED_FOR_TESTING))); // --- 1 has appeared and caused the batch to be sent out. expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - 3L, 4, 7L, 10L, ++numberOfReceivedEvents, 3L, 0, false))); + 3L, 4, 7L, 10L, ++numberOfReceivedEvents, 3L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - 4L, 4, 7L, 10L, ++numberOfReceivedEvents, 4L, 0, false))); + 4L, 4, 7L, 10L, ++numberOfReceivedEvents, 4L, 0, false, NOT_USED_FOR_TESTING))); expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - 5L, 4, 7L, 10L, ++numberOfReceivedEvents, 5L, 0, false))); + 5L, 4, 7L, 10L, ++numberOfReceivedEvents, 5L, 0, false, NOT_USED_FOR_TESTING))); // --- 6 came and 6, 7, and 8 got output expectedStatuses.add( KV.of( key, OrderedProcessingStatus.create( - 8L, 2, 9L, 10L, ++numberOfReceivedEvents, 8L, 0, false))); + 8L, 2, 9L, 10L, ++numberOfReceivedEvents, 8L, 0, false, NOT_USED_FOR_TESTING))); // Last timer run produces the final status. Number of received events doesn't // increase, // this is the result of a timer processing @@ -463,7 +500,7 @@ public void testSequenceGapProcessingInBufferedOutput() throws CannotProvideCode KV.of( key, OrderedProcessingStatus.create( - 10L, 0, null, null, numberOfReceivedEvents, 10L, 0, false))); + 10L, 0, null, null, numberOfReceivedEvents, 10L, 0, false, NOT_USED_FOR_TESTING))); testPerKeySequenceProcessing( events.toArray(new Event[events.size()]), @@ -485,7 +522,10 @@ public void testHandlingOfMaxSequenceNumber() throws CannotProvideCoderException Collection> expectedStatuses = new ArrayList<>(); expectedStatuses.add( - KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 3, 2, 0, false))); + KV.of( + "id-1", + OrderedProcessingStatus.create( + 1L, 0, null, null, 3, 2, 0, false, NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -524,7 +564,15 @@ public void testProcessingOfTheLastInput() throws CannotProvideCoderException { KV.of( "id-1", OrderedProcessingStatus.create( - 2L, 0, null, null, events.length, events.length, 0, LAST_EVENT_RECEIVED))); + 2L, + 0, + null, + null, + events.length, + events.length, + 0, + LAST_EVENT_RECEIVED, + NOT_USED_FOR_TESTING))); Collection> expectedOutput = new ArrayList<>(); expectedOutput.add(KV.of("id-1", "a")); @@ -668,17 +716,38 @@ public void testWindowedProcessing() throws CannotProvideCoderException { PAssert.that("Statuses match in window 1", processingResult.processingStatuses()) .inWindow(window1) .containsInAnyOrder( - KV.of("id-1", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), - KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false)), - KV.of("id-2", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), - KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false)), - KV.of("id-2", OrderedProcessingStatus.create(2L, 0, null, null, 3, 3, 0, false))); + KV.of( + "id-1", + OrderedProcessingStatus.create( + 0L, 0, null, null, 1, 1, 0, false, NOT_USED_FOR_TESTING)), + KV.of( + "id-1", + OrderedProcessingStatus.create( + 1L, 0, null, null, 2, 2, 0, false, NOT_USED_FOR_TESTING)), + KV.of( + "id-2", + OrderedProcessingStatus.create( + 0L, 0, null, null, 1, 1, 0, false, NOT_USED_FOR_TESTING)), + KV.of( + "id-2", + OrderedProcessingStatus.create( + 1L, 0, null, null, 2, 2, 0, false, NOT_USED_FOR_TESTING)), + KV.of( + "id-2", + OrderedProcessingStatus.create( + 2L, 0, null, null, 3, 3, 0, false, NOT_USED_FOR_TESTING))); PAssert.that("Statuses match in window 2", processingResult.processingStatuses()) .inWindow(window2) .containsInAnyOrder( - KV.of("id-1", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), - KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false))); + KV.of( + "id-1", + OrderedProcessingStatus.create( + 0L, 0, null, null, 1, 1, 0, false, NOT_USED_FOR_TESTING)), + KV.of( + "id-1", + OrderedProcessingStatus.create( + 1L, 0, null, null, 2, 2, 0, false, NOT_USED_FOR_TESTING))); PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) .containsInAnyOrder(NO_EXPECTED_DLQ_EVENTS); diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTestBase.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTestBase.java index fd651b919df1..cdf222fadf56 100644 --- a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTestBase.java +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTestBase.java @@ -340,12 +340,12 @@ static String normalizeExplanation(String value) { static class GlobalSequenceRangePublisher extends PTransform, PCollection> { - private final PCollectionView lastCompletedSequenceRangeView; + private final PCollectionView> lastCompletedSequenceRangeView; private final Coder keyCoder; private final Coder eventCoder; public GlobalSequenceRangePublisher( - PCollectionView latestCompletedSequenceRange, + PCollectionView> latestCompletedSequenceRange, Coder keyCoder, Coder eventCoder) { this.lastCompletedSequenceRangeView = latestCompletedSequenceRange; @@ -386,9 +386,9 @@ static class SideInputEmitter @ProcessElement public void produceCompletedRange( - @SideInput("lastCompletedSequence") ContiguousSequenceRange sideInput, + @SideInput("lastCompletedSequence") Iterable sideInput, OutputReceiver outputReceiver) { - outputReceiver.output(sideInput); + outputReceiver.output(ContiguousSequenceRange.largestRange(sideInput)); } } } diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulatorTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulatorTest.java index 4082ce6de758..1f7780169332 100644 --- a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulatorTest.java +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulatorTest.java @@ -17,7 +17,11 @@ */ package org.apache.beam.sdk.extensions.ordered.combiner; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange; import org.joda.time.Instant; @@ -276,6 +280,64 @@ public void testMergingAdjacentRanges() { doTestMerging(set1, set2, expectedResult, expectedNumberOfRanges); } + @Test + public void testMergingAdjacentRangesWithSingleValue() { + Event[] set1 = + new Event[] { + new Event(4, nextTimestamp()), + }; + Event[] set2 = + new Event[] { + new Event(1, nextTimestamp(), true), + new Event(3, nextTimestamp()), + new Event(2, nextTimestamp()) + }; + + ContiguousSequenceRange expectedResult = + ContiguousSequenceRange.of(1, 5, eventTimestamp(set2, 2L)); + int expectedNumberOfRanges = 1; + + doTestMerging(set1, set2, expectedResult, expectedNumberOfRanges); + } + + @Test + public void testLargeVolumeMerging() { + int numberOfSets = 3000; + ArrayList> eventSets = new ArrayList<>(numberOfSets); + + for (int i = 0; i < numberOfSets; i++) { + eventSets.add(new HashSet<>()); + } + + int eventCount = 100000; + + Random random = new Random(); + Instant lastTimestamp = null; + for (int i = 0; i < eventCount; i++) { + int setNumber = random.nextInt(numberOfSets); + Set currentSet = eventSets.get(setNumber); + boolean initialEvent = i == 0; + lastTimestamp = nextTimestamp(); + currentSet.add(new Event(i, lastTimestamp, initialEvent)); + } + + final SequenceRangeAccumulator initialAccumulator = new SequenceRangeAccumulator(); + eventSets.get(0).forEach(e -> initialAccumulator.add(e.sequence, e.timestamp, e.initialEvent)); + + for (int i = 1; i < numberOfSets; i++) { + SequenceRangeAccumulator nextAccumulator = new SequenceRangeAccumulator(); + eventSets.get(i).forEach(e -> nextAccumulator.add(e.sequence, e.timestamp, e.initialEvent)); + initialAccumulator.merge(nextAccumulator); + } + + ContiguousSequenceRange expectedResult = + ContiguousSequenceRange.of(0, eventCount, lastTimestamp); + Assert.assertEquals( + "Accumulated results", expectedResult, initialAccumulator.largestContinuousRange()); + + Assert.assertEquals("Number of ranges", 1, initialAccumulator.numberOfRanges()); + } + @Test public void testPruningSequencesBelowInitial() { Event[] set1 =