diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 47de571672..09a0705e0e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -587,7 +587,7 @@ public Builder withTimeReceived(final Instant timeReceived) { * @return returns the builder * @since 2.10 */ - protected Builder withEventHandle(final EventHandle eventHandle) { + public Builder withEventHandle(final EventHandle eventHandle) { this.eventHandle = eventHandle; return this; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index 551aed3d01..784758fa95 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -33,6 +33,15 @@ public interface Processor, OutputRecord extends R */ void prepareForShutdown(); + /** + * @since 2.11 + * Indicates if the processor holds the events or not + * Holding events indicates that the events are not ready to be released. + */ + default boolean holdsEvents() { + return false; + } + /** * @since 1.2 * Returns true if the Processor's internal state is safe to be shutdown. diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java index e313430b49..8fb314fd83 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java @@ -137,7 +137,8 @@ private void doRun() { try { records = processor.execute(records); - if (inputEvents != null) { + // acknowledge missing events only if the processor is not holding events + if (!processor.holdsEvents() && inputEvents != null) { processAcknowledgements(inputEvents, records); } } catch (final Exception e) { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java index ae798af032..568a19d564 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java @@ -29,6 +29,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc return AggregateActionResponse.fromEvent(event); } + /** + * indicates if the action holds the events or not + * + */ + default boolean holdsEvents() { + return false; + } + /** * Concludes a group of Events * diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java index 0bec0b2350..cd7b47d66e 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.EventHandle; + import java.util.Map; import java.util.function.Function; import java.time.Duration; @@ -28,6 +30,12 @@ public interface AggregateActionInput { */ Map getIdentificationKeys(); + /** + * @return returns eventHandle held by the instance + * @since 2.11 + */ + EventHandle getEventHandle(); + /** * Sets custom shouldConclude function * diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java index 09e0e97223..7ad7f691ae 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java @@ -5,6 +5,12 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.Event; + import java.time.Duration; import java.time.Instant; import java.util.function.Function; @@ -19,6 +25,7 @@ class AggregateGroup implements AggregateActionInput { private final Lock handleEventForGroupLock; private final Map identificationKeys; private Function customShouldConclude; + private EventHandle eventHandle; AggregateGroup(final Map identificationKeys) { this.groupState = new DefaultGroupState(); @@ -26,6 +33,19 @@ class AggregateGroup implements AggregateActionInput { this.groupStart = Instant.now(); this.concludeGroupLock = new ReentrantLock(); this.handleEventForGroupLock = new ReentrantLock(); + this.eventHandle = new AggregateEventHandle(Instant.now()); + } + + @Override + public EventHandle getEventHandle() { + return eventHandle; + } + + public void attachToEventAcknowledgementSet(Event event) { + InternalEventHandle internalEventHandle; + EventHandle handle = event.getEventHandle(); + internalEventHandle = (InternalEventHandle)(handle); + internalEventHandle.addEventHandle(eventHandle); } public GroupState getGroupState() { @@ -63,5 +83,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) { void resetGroup() { groupStart = Instant.now(); groupState.clear(); + this.eventHandle = new AggregateEventHandle(groupStart); } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 2b19e98516..ae1dcd37c7 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -125,6 +125,7 @@ public Collection> doExecute(Collection> records) { } final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap); + aggregateGroupForEvent.attachToEventAcknowledgementSet(event); final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent); @@ -149,6 +150,11 @@ public Collection> doExecute(Collection> records) { return recordsOut; } + @Override + public boolean holdsEvents() { + return aggregateAction.holdsEvents(); + } + public static long getTimeNanos(final Instant time) { final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000; long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano(); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java index ece5212ac4..0d930b94ba 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java @@ -84,6 +84,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA final Event event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(aggregateActionInput.getGroupState()) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); return new AggregateActionOutput(List.of(event)); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index 8b67ca64cd..16bbf39c31 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -146,6 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(groupState) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); } else { Integer countValue = (Integer)groupState.get(countKey); @@ -168,6 +169,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withValue((double)countValue) .withExemplars(List.of(exemplar)) .withAttributes(attr) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(false); event = (Event)sum; } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java index 22cfa7efb7..ac1a59a712 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java @@ -225,6 +225,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(groupState) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); } else { List explicitBoundsList = new ArrayList(); @@ -262,6 +263,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withExplicitBoundsList(explicitBoundsList) .withExemplars(exemplarList) .withAttributes(attr) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(false); event = (Event)histogram; } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java index 78debabb35..54e0e2c72c 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java @@ -41,6 +41,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA final Event event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(aggregateActionInput.getGroupState()) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); return new AggregateActionOutput(List.of(event)); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java index 3ea0d0b8af..5f69bd5abc 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java @@ -35,6 +35,7 @@ public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelim public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) { if (rateLimiterMode == RateLimiterMode.DROP) { if (!rateLimiter.tryAcquire()) { + event.getEventHandle().release(true); return AggregateActionResponse.nullEventResponse(); } } else { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java index 26b245da73..f621f52d0a 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java @@ -70,6 +70,11 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct return AggregateActionResponse.nullEventResponse(); } + @Override + public boolean holdsEvents() { + return true; + } + @Override public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { GroupState groupState = aggregateActionInput.getGroupState(); @@ -77,6 +82,10 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA if (((groupState.containsKey(ERROR_STATUS_KEY) && (Boolean)groupState.get(ERROR_STATUS_KEY) == true)) || (randomInt < percent)) { return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of())); } + List events = (List)groupState.getOrDefault(EVENTS_KEY, List.of()); + for (final Event event : events) { + event.getEventHandle().release(true); + } return new AggregateActionOutput(List.of()); } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java index 21e49e05be..b46d2bdaab 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java @@ -5,6 +5,11 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.event.EventHandle; + +import java.time.Duration; +import java.time.Instant; import java.util.Map; import java.util.HashMap; import java.time.Duration; @@ -15,10 +20,12 @@ public static class TestAggregateActionInput implements AggregateActionInput { private final GroupState groupState; private final Map identificationKeys; private Function customShouldConclude; + private EventHandle eventHandle; public TestAggregateActionInput(Map identificationKeys) { this.groupState = new AggregateActionTestUtils.TestGroupState(); this.identificationKeys = identificationKeys; + this.eventHandle = new AggregateEventHandle(Instant.now()); } @Override @@ -31,6 +38,11 @@ public GroupState getGroupState() { return groupState; } + @Override + public EventHandle getEventHandle() { + return eventHandle; + } + @Override public Map getIdentificationKeys() { return identificationKeys;