Skip to content

Commit

Permalink
Addressed review comments by adding InternalEventHandle
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Oct 31, 2023
1 parent eeedfb8 commit a9c3ebd
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.time.Instant;
import java.io.Serializable;

public class DefaultEventHandle implements EventHandle, Serializable {
public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private WeakReference<AcknowledgementSet> acknowledgementSetRef;
Expand Down Expand Up @@ -51,7 +51,6 @@ public Instant getExternalOriginationTime() {

@Override
public void release(boolean result) {
System.out.println("======release called==="+result);
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class DefaultEventMetadata implements EventMetadata {

private final Instant timeReceived;

private Instant externalOriginationTime;

private Map<String, Object> attributes;

private Set<String> tags;
Expand All @@ -43,13 +45,15 @@ private DefaultEventMetadata(final Builder builder) {
this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes);

this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags);
this.externalOriginationTime = null;
}

private DefaultEventMetadata(final EventMetadata eventMetadata) {
this.eventType = eventMetadata.getEventType();
this.timeReceived = eventMetadata.getTimeReceived();
this.attributes = new HashMap<>(eventMetadata.getAttributes());
this.tags = new HashSet<>(eventMetadata.getTags());
this.externalOriginationTime = null;
}

@Override
Expand All @@ -62,6 +66,16 @@ public Instant getTimeReceived() {
return timeReceived;
}

@Override
public Instant getExternalOriginationTime() {
return externalOriginationTime;
}

@Override
public void setExternalOriginationTime(Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

@Override
public Map<String, Object> getAttributes() {
return attributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,6 @@ public interface EventHandle {
*/
void release(boolean result);

/**
* sets acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);

/**
* gets acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
*/
AcknowledgementSet getAcknowledgementSet();

/**
* sets external origination time
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ public interface EventMetadata extends Serializable {
*/
Instant getTimeReceived();

/**
* Returns the external origination time of the event
* @return the external origination time
* @since 2.6
*/
Instant getExternalOriginationTime();

/**
* Sets the external origination time of the event
* @param externalOriginationTime the external origination time
* @since 2.6
*/
void setExternalOriginationTime(Instant externalOriginationTime);

/**
* Returns the attributes
* @return a map of attributes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.time.Instant;

public interface InternalEventHandle {
/**
* sets acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);

/**
* gets acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
*/
AcknowledgementSet getAcknowledgementSet();

}

Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ protected JacksonEvent(final JacksonEvent otherEvent) {
}

public static Event fromMessage(String message) {
JacksonEvent event = JacksonEvent.builder()
return JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(Collections.singletonMap(MESSAGE_KEY, message))
.build();
return event;
}

private JsonNode getInitialJsonNode(final Object data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
Expand Down Expand Up @@ -80,6 +81,16 @@ public void testGetTimeReceived() {
assertThat(timeReceived, is(equalTo(testTimeReceived)));
}

@Test
public void testExternalOriginationTime() {
Instant externalOriginationTime = eventMetadata.getExternalOriginationTime();
assertThat(externalOriginationTime, is(nullValue()));
Instant now = Instant.now();
eventMetadata.setExternalOriginationTime(now);
externalOriginationTime = eventMetadata.getExternalOriginationTime();
assertThat(externalOriginationTime, is(equalTo(now)));
}

@Test
public void testGetAttributes() {
final Map<String, Object> attributes = eventMetadata.getAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -32,7 +34,12 @@ class AcknowledgementSetMonitor implements Runnable {
private final AtomicInteger numNullHandles;

private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) {
return (DefaultAcknowledgementSet)eventHandle.getAcknowledgementSet();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
return (DefaultAcknowledgementSet)internalEventHandle.getAcknowledgementSet();
} else {
throw new RuntimeException("Unsupported event handle");
}
}

public AcknowledgementSetMonitor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,8 +55,11 @@ public void add(Event event) {
try {
if (event instanceof JacksonEvent) {
EventHandle eventHandle = event.getEventHandle();
eventHandle.setAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
internalEventHandle.setAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
}
}
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;
import org.slf4j.Logger;
Expand Down Expand Up @@ -97,10 +99,15 @@ private void processAcknowledgements(List<Event> inputEvents, Collection outputR
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (Objects.nonNull(eventHandle) && eventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) {
eventHandle.release(true);
} else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) {
invalidEventHandlesCounter.increment();
if (eventHandle != null && eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
if (internalEventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) {
eventHandle.release(true);
} else if (acknowledgementsEnabled) {
invalidEventHandlesCounter.increment();
}
} else if (eventHandle != null) {
throw new RuntimeException("Unexpected EventHandle");
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private void acquireEventReference(final Record record) {
}
if (referencedRecords.contains(record) || ((routedRecords != null) && routedRecords.contains(record))) {
EventHandle eventHandle = ((JacksonEvent)record.getData()).getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
if (eventHandle != null && eventHandle instanceof DefaultEventHandle) {
acknowledgementSetManager.acquireEventReference(eventHandle);
}
} else if (!referencedRecords.contains(record)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -38,20 +38,20 @@ class DefaultAcknowledgementSetManagerTests {
@Mock
JacksonEvent event3;

EventHandle eventHandle1;
EventHandle eventHandle2;
EventHandle eventHandle3;
DefaultEventHandle eventHandle1;
DefaultEventHandle eventHandle2;
DefaultEventHandle eventHandle3;
Boolean result;

@BeforeEach
void setup() {
callbackExecutor = Executors.newFixedThreadPool(2);
event1 = mock(JacksonEvent.class);
eventHandle1 = mock(EventHandle.class);
eventHandle1 = mock(DefaultEventHandle.class);
lenient().when(event1.getEventHandle()).thenReturn(eventHandle1);

event2 = mock(JacksonEvent.class);
eventHandle2 = mock(EventHandle.class);
eventHandle2 = mock(DefaultEventHandle.class);
lenient().when(event2.getEventHandle()).thenReturn(eventHandle2);

acknowledgementSetManager = createObjectUnderTest();
Expand Down Expand Up @@ -91,7 +91,7 @@ void testExpirations() throws InterruptedException {
@Test
void testMultipleAcknowledgementSets() {
event3 = mock(JacksonEvent.class);
eventHandle3 = mock(EventHandle.class);
eventHandle3 = mock(DefaultEventHandle.class);
lenient().when(event3.getEventHandle()).thenReturn(eventHandle3);

AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -84,7 +84,7 @@ class PipelineTests {
private Duration peerForwarderDrainTimeout;
private EventFactory eventFactory;
private JacksonEvent event;
private EventHandle eventHandle;
private DefaultEventHandle eventHandle;
private AcknowledgementSetManager acknowledgementSetManager;

@BeforeEach
Expand Down Expand Up @@ -428,7 +428,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM
RouterCopyRecordStrategy routerCopyRecordStrategy = (RouterCopyRecordStrategy)a.getArgument(2);
Record rec = records.get(0);
event = mock(JacksonEvent.class);
eventHandle = mock(EventHandle.class);
eventHandle = mock(DefaultEventHandle.class);
acknowledgementSet = mock(AcknowledgementSet.class);
when(event.getEventHandle()).thenReturn(eventHandle);
when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet);
Expand All @@ -441,7 +441,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM
Pipeline pipeline = createObjectUnderTest();
when(mockSource.areAcknowledgementsEnabled()).thenReturn(true);
pipeline.publishToSinks(records);
verify(acknowledgementSetManager).acquireEventReference(any(EventHandle.class));
verify(acknowledgementSetManager).acquireEventReference(any(DefaultEventHandle.class));

verify(router)
.route(anyCollection(), eq(dataFlowComponents), any(RouterGetRecordStrategy.class), any(BiConsumer.class));
Expand All @@ -455,7 +455,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_InactiveAcknowledge
RouterCopyRecordStrategy routerCopyRecordStrategy = (RouterCopyRecordStrategy)a.getArgument(2);
Record rec = records.get(0);
event = mock(JacksonEvent.class);
eventHandle = mock(EventHandle.class);
eventHandle = mock(DefaultEventHandle.class);
when(event.getEventHandle()).thenReturn(null);
when(rec.getData()).thenReturn(event);
routerCopyRecordStrategy.getRecord(rec);
Expand Down
Loading

0 comments on commit a9c3ebd

Please sign in to comment.