Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Dec 13, 2024
1 parent 81c969c commit 3f2d59f
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Builder<T> withTimeReceived(final Instant timeReceived) {
* @return returns the builder
* @since 2.10
*/
protected Builder<T> withEventHandle(final EventHandle eventHandle) {
public Builder<T> withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
*/
void prepareForShutdown();

/**
* @since 2.11
* Indicates if the processor holds the events or not
* Holding events indicates that the events are not ready to be released.
*/
default boolean holdsEvents() {
return false;
}

/**
* @since 1.2
* Returns true if the Processor's internal state is safe to be shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void doRun() {

try {
records = processor.execute(records);
if (inputEvents != null) {
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
return AggregateActionResponse.fromEvent(event);
}

/**
* indicates if the action holds the events or not
*
*/
default boolean holdsEvents() {
return false;
}

/**
* Concludes a group of Events
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Map;
import java.util.function.Function;
import java.time.Duration;
Expand All @@ -28,6 +30,12 @@ public interface AggregateActionInput {
*/
Map<Object, Object> getIdentificationKeys();

/**
* @return returns eventHandle held by the instance
* @since 2.11
*/
EventHandle getEventHandle();

/**
* Sets custom shouldConclude function
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.AggregateEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.Event;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
Expand All @@ -19,13 +25,27 @@ class AggregateGroup implements AggregateActionInput {
private final Lock handleEventForGroupLock;
private final Map<Object, Object> identificationKeys;
private Function<Duration, Boolean> customShouldConclude;
private EventHandle eventHandle;

AggregateGroup(final Map<Object, Object> identificationKeys) {
this.groupState = new DefaultGroupState();
this.identificationKeys = identificationKeys;
this.groupStart = Instant.now();
this.concludeGroupLock = new ReentrantLock();
this.handleEventForGroupLock = new ReentrantLock();
this.eventHandle = new AggregateEventHandle(Instant.now());
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
}

public void attachToEventAcknowledgementSet(Event event) {
InternalEventHandle internalEventHandle;
EventHandle handle = event.getEventHandle();
internalEventHandle = (InternalEventHandle)(handle);
internalEventHandle.addEventHandle(eventHandle);
}

public GroupState getGroupState() {
Expand Down Expand Up @@ -63,5 +83,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) {
void resetGroup() {
groupStart = Instant.now();
groupState.clear();
this.eventHandle = new AggregateEventHandle(groupStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
}
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
aggregateGroupForEvent.attachToEventAcknowledgementSet(event);

final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);

Expand All @@ -149,6 +150,11 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return recordsOut;
}

@Override
public boolean holdsEvents() {
return aggregateAction.holdsEvents();
}

public static long getTimeNanos(final Instant time) {
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
return new AggregateActionOutput(List.of(event));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
Integer countValue = (Integer)groupState.get(countKey);
Expand All @@ -168,6 +169,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withValue((double)countValue)
.withExemplars(List.of(exemplar))
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)sum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
List<Double> explicitBoundsList = new ArrayList<Double>();
Expand Down Expand Up @@ -262,6 +263,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withExplicitBoundsList(explicitBoundsList)
.withExemplars(exemplarList)
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)histogram;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();

return new AggregateActionOutput(List.of(event));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelim
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
if (rateLimiterMode == RateLimiterMode.DROP) {
if (!rateLimiter.tryAcquire()) {
event.getEventHandle().release(true);
return AggregateActionResponse.nullEventResponse();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,22 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
return AggregateActionResponse.nullEventResponse();
}

@Override
public boolean holdsEvents() {
return true;
}

@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
GroupState groupState = aggregateActionInput.getGroupState();
int randomInt = random.nextInt(100);
if (((groupState.containsKey(ERROR_STATUS_KEY) && (Boolean)groupState.get(ERROR_STATUS_KEY) == true)) || (randomInt < percent)) {
return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of()));
}
List<Event> events = (List)groupState.getOrDefault(EVENTS_KEY, List.of());
for (final Event event : events) {
event.getEventHandle().release(true);
}
return new AggregateActionOutput(List.of());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.AggregateEventHandle;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.HashMap;
import java.time.Duration;
Expand All @@ -15,10 +20,12 @@ public static class TestAggregateActionInput implements AggregateActionInput {
private final GroupState groupState;
private final Map<Object, Object> identificationKeys;
private Function<Duration, Boolean> customShouldConclude;
private EventHandle eventHandle;

public TestAggregateActionInput(Map<Object, Object> identificationKeys) {
this.groupState = new AggregateActionTestUtils.TestGroupState();
this.identificationKeys = identificationKeys;
this.eventHandle = new AggregateEventHandle(Instant.now());
}

@Override
Expand All @@ -31,6 +38,11 @@ public GroupState getGroupState() {
return groupState;
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
}

@Override
public Map<Object, Object> getIdentificationKeys() {
return identificationKeys;
Expand Down

0 comments on commit 3f2d59f

Please sign in to comment.