Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acknowledgement support to aggregate processor #5139

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Builder<T> withTimeReceived(final Instant timeReceived) {
* @return returns the builder
* @since 2.10
*/
protected Builder<T> withEventHandle(final EventHandle eventHandle) {
public Builder<T> withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
*/
void prepareForShutdown();

/**
* @since 2.11
* Indicates if the processor holds the events or not
* Holding events indicates that the events are not ready to be released.
*/
default boolean holdsEvents() {
return false;
}

/**
* @since 1.2
* Returns true if the Processor's internal state is safe to be shutdown.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.processor;

import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ProcessorTest {

@Test
public void testDefault() {
Processor processor = mock(Processor.class);
when(processor.holdsEvents()).thenCallRealMethod();
assertThat(processor.holdsEvents(), equalTo(false));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void shutdownExecutorService(final ExecutorService executorService, fina
* @param records records that needs to published to each sink
* @return List of Future, each future for each sink
*/
List<Future<Void>> publishToSinks(final Collection<Record> records) {
public List<Future<Void>> publishToSinks(final Collection<Record> records) {
final int sinksSize = sinks.size();
final List<Future<Void>> sinkFutures = new ArrayList<>(sinksSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void doRun() {

try {
records = processor.execute(records);
if (inputEvents != null) {
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.atLeast;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.model.CheckpointState;
Expand Down Expand Up @@ -132,6 +134,61 @@ void testProcessWorkerHappyPathWithAcknowledgments() {
}
}

@Test
void testProcessWorkerWithProcessorsNotHoldingEvents() {
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
when(eventHandle.release(true)).thenReturn(true);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(false);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, atLeast(1)).release(true);
}


@Test
void testProcessWorkerWithProcessorsHoldingEvents() {
EventHandle eventHandle = mock(EventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(true);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);

processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, never()).release(true);
}

@Test
void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() {

Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/aggregate-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':data-prepper-expression')
implementation project(':data-prepper-plugins:otel-proto-common')
implementation project(':data-prepper-plugins:otel-metrics-raw-processor')
testImplementation project(':data-prepper-core')
implementation libs.guava.core
implementation libs.commons.lang3
implementation libs.opentelemetry.proto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
return AggregateActionResponse.fromEvent(event);
}

/**
* indicates if the action holds the events or not
*
*/
default boolean holdsEvents() {
return false;
}

/**
* Concludes a group of Events
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Map;
import java.util.function.Function;
import java.time.Duration;
Expand All @@ -28,6 +30,12 @@ public interface AggregateActionInput {
*/
Map<Object, Object> getIdentificationKeys();

/**
* @return returns eventHandle held by the instance
* @since 2.11
*/
EventHandle getEventHandle();

/**
* Sets custom shouldConclude function
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.AggregateEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.Event;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
Expand All @@ -19,13 +24,27 @@ class AggregateGroup implements AggregateActionInput {
private final Lock handleEventForGroupLock;
private final Map<Object, Object> identificationKeys;
private Function<Duration, Boolean> customShouldConclude;
private EventHandle eventHandle;

AggregateGroup(final Map<Object, Object> identificationKeys) {
this.groupState = new DefaultGroupState();
this.identificationKeys = identificationKeys;
this.groupStart = Instant.now();
this.concludeGroupLock = new ReentrantLock();
this.handleEventForGroupLock = new ReentrantLock();
this.eventHandle = new AggregateEventHandle(Instant.now());
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
}

public void attachToEventAcknowledgementSet(Event event) {
InternalEventHandle internalEventHandle;
EventHandle handle = event.getEventHandle();
internalEventHandle = (InternalEventHandle)(handle);
internalEventHandle.addEventHandle(eventHandle);
}

public GroupState getGroupState() {
Expand Down Expand Up @@ -63,5 +82,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) {
void resetGroup() {
groupStart = Instant.now();
groupState.clear();
this.eventHandle = new AggregateEventHandle(groupStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
}
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider automating the attachment of an event to the AggregateGroup during object creation so that it's encapsulated within the AggregateGroupManager

public AggregateGroup getAggregateGroup(Map<Object, Object> identificationKeysMap, Event event) {
        AggregateGroup aggregateGroup = getOrCreateAggregateGroup(identificationKeysMap);
        ...
        aggregateGroup.attachToEventAcknowledgementSet(event);
        return aggregateGroup;
    }

aggregateGroupForEvent.attachToEventAcknowledgementSet(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the solution is to create a new event handle to represent the aggregate group? Makes sense but what is happening exactly to the original event's event handle. Those don't have to be released?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are released by ProcessWorker. In the following code

 records = processor.execute(records);
                if (inputEvents != null) {
                    processAcknowledgements(inputEvents, records);
                }

where every event that's not passed to the next processor is acknowledged


final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);

Expand All @@ -149,6 +150,11 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return recordsOut;
}

@Override
public boolean holdsEvents() {
return aggregateAction.holdsEvents();
}

public static long getTimeNanos(final Instant time) {
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
return new AggregateActionOutput(List.of(event));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
Integer countValue = (Integer)groupState.get(countKey);
Expand All @@ -168,6 +169,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withValue((double)countValue)
.withExemplars(List.of(exemplar))
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)sum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
List<Double> explicitBoundsList = new ArrayList<Double>();
Expand Down Expand Up @@ -262,6 +263,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withExplicitBoundsList(explicitBoundsList)
.withExemplars(exemplarList)
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)histogram;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;

import java.util.Collections;

/**
* An AggregateAction that combines multiple Events into a single Event. This action
*
Expand Down Expand Up @@ -47,4 +51,15 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}
return AggregateActionResponse.nullEventResponse();
}

@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();

return new AggregateActionOutput(List.of(event));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;

import com.google.common.util.concurrent.RateLimiter;

import java.util.Collections;

/**
* An AggregateAction that combines multiple Events into a single Event. This action
*
Expand Down Expand Up @@ -42,4 +46,15 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}
return new AggregateActionResponse(event);
}

@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}
Copy link
Member

@dinujoh dinujoh Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be default implementation in the interface ? I see the same logic for multiple implementation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion.

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;

import java.util.Collections;

/**
* An AggregateAction that will pass down the first Event of a groupState immediately for processing, and then ignore Events
* that have a non-empty groupState associated with them
Expand All @@ -35,6 +39,17 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
return AggregateActionResponse.nullEventResponse();
}

@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}

@JsonPropertyOrder
@JsonClassDescription("The <code>remove_duplicates</code> action processes the first event for a group immediately and drops any events that duplicate the first event from the source.")
static class RemoveDuplicatesAggregateActionConfig {
Expand Down
Loading
Loading