Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Dec 19, 2024
1 parent 7cb44d2 commit 2af6722
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Collections;

Expand Down Expand Up @@ -46,6 +47,12 @@ default boolean holdsEvents() {
* @since 1.3
*/
default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.common.collect.Maps;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
import org.opensearch.dataprepper.model.event.Event;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -26,6 +27,12 @@ AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKe
return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap()));
}

AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) {
AggregateGroup aggregateGroup = getAggregateGroup(identificationKeysMap);
aggregateGroup.attachToEventAcknowledgementSet(event);
return aggregateGroup;
}

List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting);
}

AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) {
AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
aggregateGroup.attachToEventAcknowledgementSet(event);
return aggregateGroup;
}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final List<Record<Event>> recordsOut = new LinkedList<>();
Expand Down Expand Up @@ -124,8 +130,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
continue;
}
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
aggregateGroupForEvent.attachToEventAcknowledgementSet(event);
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event);

final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;

import java.util.Collections;

/**
* An AggregateAction that combines multiple Events into a single Event. This action
*
Expand Down Expand Up @@ -52,14 +48,4 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
return AggregateActionResponse.nullEventResponse();
}

@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;

import com.google.common.util.concurrent.RateLimiter;

import java.util.Collections;

/**
* An AggregateAction that combines multiple Events into a single Event. This action
*
Expand Down Expand Up @@ -47,14 +43,4 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
return new AggregateActionResponse(event);
}

@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,11 @@
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;

import java.util.Collections;

/**
* An AggregateAction that will pass down the first Event of a groupState immediately for processing, and then ignore Events
* that have a non-empty groupState associated with them
Expand All @@ -39,17 +35,6 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
return AggregateActionResponse.nullEventResponse();
}

@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}

@JsonPropertyOrder
@JsonClassDescription("The <code>remove_duplicates</code> action processes the first event for a group immediately and drops any events that duplicate the first event from the source.")
static class RemoveDuplicatesAggregateActionConfig {
Expand Down

0 comments on commit 2af6722

Please sign in to comment.