From 2af67221b539da1af059dc84fe0c4b06316cd990 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 19 Dec 2024 19:49:31 +0000 Subject: [PATCH] Addressed review comments Signed-off-by: Krishna Kondaka --- .../processor/aggregate/AggregateAction.java | 7 +++++++ .../aggregate/AggregateGroupManager.java | 7 +++++++ .../processor/aggregate/AggregateProcessor.java | 9 +++++++-- .../actions/PercentSamplerAggregateAction.java | 14 -------------- .../actions/RateLimiterAggregateAction.java | 14 -------------- .../actions/RemoveDuplicatesAggregateAction.java | 15 --------------- 6 files changed, 21 insertions(+), 45 deletions(-) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java index 568a19d564..541cd15d3d 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import java.util.Collections; @@ -46,6 +47,12 @@ default boolean holdsEvents() { * @since 1.3 */ default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { + if (aggregateActionInput != null) { + EventHandle eventHandle = aggregateActionInput.getEventHandle(); + if (eventHandle != null) { + eventHandle.release(true); + } + } return new AggregateActionOutput(Collections.emptyList()); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java index 9d271aa40b..d5dd324762 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java @@ -7,6 +7,7 @@ import com.google.common.collect.Maps; import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; +import org.opensearch.dataprepper.model.event.Event; import java.time.Duration; import java.util.ArrayList; @@ -26,6 +27,12 @@ AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKe return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap())); } + AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) { + AggregateGroup aggregateGroup = getAggregateGroup(identificationKeysMap); + aggregateGroup.attachToEventAcknowledgementSet(event); + return aggregateGroup; + } + List> getGroupsToConclude(final boolean forceConclude) { final List> groupsToConclude = new ArrayList<>(); for (final Map.Entry groupEntry : allGroups.entrySet()) { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index ae1dcd37c7..616c3c5ea8 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -93,6 +93,12 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) { return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting); } + AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) { + AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap); + aggregateGroup.attachToEventAcknowledgementSet(event); + return aggregateGroup; + } + @Override public Collection> doExecute(Collection> records) { final List> recordsOut = new LinkedList<>(); @@ -124,8 +130,7 @@ public Collection> doExecute(Collection> records) { continue; } final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); - final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap); - aggregateGroupForEvent.attachToEventAcknowledgementSet(event); + final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event); final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java index 8a11cf9b85..9b27a49dee 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java @@ -8,15 +8,11 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput; -import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse; import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState; -import java.util.Collections; - /** * An AggregateAction that combines multiple Events into a single Event. This action * @@ -52,14 +48,4 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct return AggregateActionResponse.nullEventResponse(); } - @Override - public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { - if (aggregateActionInput != null) { - EventHandle eventHandle = aggregateActionInput.getEventHandle(); - if (eventHandle != null) { - eventHandle.release(true); - } - } - return new AggregateActionOutput(Collections.emptyList()); - } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java index 0c233a69da..ce8131b95c 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java @@ -8,16 +8,12 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput; -import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse; import com.google.common.util.concurrent.RateLimiter; -import java.util.Collections; - /** * An AggregateAction that combines multiple Events into a single Event. This action * @@ -47,14 +43,4 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct return new AggregateActionResponse(event); } - @Override - public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { - if (aggregateActionInput != null) { - EventHandle eventHandle = aggregateActionInput.getEventHandle(); - if (eventHandle != null) { - eventHandle.release(true); - } - } - return new AggregateActionOutput(Collections.emptyList()); - } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java index a9bddd83db..7fc98d0bf8 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java @@ -9,15 +9,11 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput; -import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse; import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState; -import java.util.Collections; - /** * An AggregateAction that will pass down the first Event of a groupState immediately for processing, and then ignore Events * that have a non-empty groupState associated with them @@ -39,17 +35,6 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct return AggregateActionResponse.nullEventResponse(); } - @Override - public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { - if (aggregateActionInput != null) { - EventHandle eventHandle = aggregateActionInput.getEventHandle(); - if (eventHandle != null) { - eventHandle.release(true); - } - } - return new AggregateActionOutput(Collections.emptyList()); - } - @JsonPropertyOrder @JsonClassDescription("The remove_duplicates action processes the first event for a group immediately and drops any events that duplicate the first event from the source.") static class RemoveDuplicatesAggregateActionConfig {