From a9c3ebd9c7a3b963cb5240b648b231c10a38cd50 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 31 Oct 2023 21:25:09 +0000 Subject: [PATCH] Addressed review comments by adding InternalEventHandle Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventHandle.java | 3 +- .../model/event/DefaultEventMetadata.java | 14 +++++++++ .../dataprepper/model/event/EventHandle.java | 16 ---------- .../model/event/EventMetadata.java | 14 +++++++++ .../model/event/InternalEventHandle.java | 29 ++++++++++++++++++ .../dataprepper/model/event/JacksonEvent.java | 3 +- .../model/event/DefaultEventMetadataTest.java | 11 +++++++ .../AcknowledgementSetMonitor.java | 9 +++++- .../DefaultAcknowledgementSet.java | 9 ++++-- .../dataprepper/pipeline/ProcessWorker.java | 15 +++++++--- .../router/RouterCopyRecordStrategy.java | 2 +- ...DefaultAcknowledgementSetManagerTests.java | 14 ++++----- .../dataprepper/pipeline/PipelineTests.java | 10 +++---- .../router/RouterCopyRecordStrategyTests.java | 30 +++++++++---------- .../sink/opensearch/OpenSearchSinkIT.java | 5 ---- .../plugins/sink/s3/S3SinkServiceTest.java | 4 --- 16 files changed, 124 insertions(+), 64 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index f6fd46b0c1..86278bea74 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -11,7 +11,7 @@ import java.time.Instant; import java.io.Serializable; -public class DefaultEventHandle implements EventHandle, Serializable { +public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable { private Instant externalOriginationTime; private final Instant internalOriginationTime; private WeakReference acknowledgementSetRef; @@ -51,7 +51,6 @@ public Instant getExternalOriginationTime() { @Override public void release(boolean result) { - System.out.println("======release called==="+result); AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); if (acknowledgementSet != null) { acknowledgementSet.release(this, result); diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index e2ce55caa2..883297d567 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -27,6 +27,8 @@ public class DefaultEventMetadata implements EventMetadata { private final Instant timeReceived; + private Instant externalOriginationTime; + private Map attributes; private Set tags; @@ -43,6 +45,7 @@ private DefaultEventMetadata(final Builder builder) { this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes); this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags); + this.externalOriginationTime = null; } private DefaultEventMetadata(final EventMetadata eventMetadata) { @@ -50,6 +53,7 @@ private DefaultEventMetadata(final EventMetadata eventMetadata) { this.timeReceived = eventMetadata.getTimeReceived(); this.attributes = new HashMap<>(eventMetadata.getAttributes()); this.tags = new HashSet<>(eventMetadata.getTags()); + this.externalOriginationTime = null; } @Override @@ -62,6 +66,16 @@ public Instant getTimeReceived() { return timeReceived; } + @Override + public Instant getExternalOriginationTime() { + return externalOriginationTime; + } + + @Override + public void setExternalOriginationTime(Instant externalOriginationTime) { + this.externalOriginationTime = externalOriginationTime; + } + @Override public Map getAttributes() { return attributes; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java index 92416dee67..9bb0ede315 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java @@ -18,22 +18,6 @@ public interface EventHandle { */ void release(boolean result); - /** - * sets acknowledgement set - * - * @param acknowledgementSet acknowledgementSet to be set in the event handle - * @since 2.6 - */ - void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet); - - /** - * gets acknowledgement set - * - * @return returns acknowledgementSet from the event handle - * @since 2.6 - */ - AcknowledgementSet getAcknowledgementSet(); - /** * sets external origination time * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java index 511a87c1fa..5db56ba85c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java @@ -31,6 +31,20 @@ public interface EventMetadata extends Serializable { */ Instant getTimeReceived(); + /** + * Returns the external origination time of the event + * @return the external origination time + * @since 2.6 + */ + Instant getExternalOriginationTime(); + + /** + * Sets the external origination time of the event + * @param externalOriginationTime the external origination time + * @since 2.6 + */ + void setExternalOriginationTime(Instant externalOriginationTime); + /** * Returns the attributes * @return a map of attributes diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java new file mode 100644 index 0000000000..0975fe9823 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import java.time.Instant; + +public interface InternalEventHandle { + /** + * sets acknowledgement set + * + * @param acknowledgementSet acknowledgementSet to be set in the event handle + * @since 2.6 + */ + void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet); + + /** + * gets acknowledgement set + * + * @return returns acknowledgementSet from the event handle + * @since 2.6 + */ + AcknowledgementSet getAcknowledgementSet(); + +} + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 3c6e4e7969..846c4c3011 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -101,11 +101,10 @@ protected JacksonEvent(final JacksonEvent otherEvent) { } public static Event fromMessage(String message) { - JacksonEvent event = JacksonEvent.builder() + return JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(Collections.singletonMap(MESSAGE_KEY, message)) .build(); - return event; } private JsonNode getInitialJsonNode(final Object data) { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java index fa624a9e2b..479e7be0c2 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anEmptyMap; @@ -80,6 +81,16 @@ public void testGetTimeReceived() { assertThat(timeReceived, is(equalTo(testTimeReceived))); } + @Test + public void testExternalOriginationTime() { + Instant externalOriginationTime = eventMetadata.getExternalOriginationTime(); + assertThat(externalOriginationTime, is(nullValue())); + Instant now = Instant.now(); + eventMetadata.setExternalOriginationTime(now); + externalOriginationTime = eventMetadata.getExternalOriginationTime(); + assertThat(externalOriginationTime, is(equalTo(now))); + } + @Test public void testGetAttributes() { final Map attributes = eventMetadata.getAttributes(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java index bf9f021aa7..af9860cc9a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.acknowledgements; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.util.concurrent.locks.ReentrantLock; @@ -32,7 +34,12 @@ class AcknowledgementSetMonitor implements Runnable { private final AtomicInteger numNullHandles; private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) { - return (DefaultAcknowledgementSet)eventHandle.getAcknowledgementSet(); + if (eventHandle instanceof DefaultEventHandle) { + InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; + return (DefaultAcknowledgementSet)internalEventHandle.getAcknowledgementSet(); + } else { + throw new RuntimeException("Unsupported event handle"); + } } public AcknowledgementSetMonitor() { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index 3c145fe99c..741f08939d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -8,6 +8,8 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +55,11 @@ public void add(Event event) { try { if (event instanceof JacksonEvent) { EventHandle eventHandle = event.getEventHandle(); - eventHandle.setAcknowledgementSet(this); - pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + if (eventHandle instanceof DefaultEventHandle) { + InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; + internalEventHandle.setAcknowledgementSet(this); + pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + } } } finally { lock.unlock(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 9c0ec69a8b..565c0b9075 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -13,6 +13,8 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.pipeline.common.FutureHelper; import org.opensearch.dataprepper.pipeline.common.FutureHelperResult; import org.slf4j.Logger; @@ -97,10 +99,15 @@ private void processAcknowledgements(List inputEvents, Collection outputR // For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it inputEvents.forEach(event -> { EventHandle eventHandle = event.getEventHandle(); - if (Objects.nonNull(eventHandle) && eventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) { - eventHandle.release(true); - } else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) { - invalidEventHandlesCounter.increment(); + if (eventHandle != null && eventHandle instanceof DefaultEventHandle) { + InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; + if (internalEventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) { + eventHandle.release(true); + } else if (acknowledgementsEnabled) { + invalidEventHandlesCounter.increment(); + } + } else if (eventHandle != null) { + throw new RuntimeException("Unexpected EventHandle"); } }); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java index 9c090cbf58..1bd2944c2e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java @@ -65,7 +65,7 @@ private void acquireEventReference(final Record record) { } if (referencedRecords.contains(record) || ((routedRecords != null) && routedRecords.contains(record))) { EventHandle eventHandle = ((JacksonEvent)record.getData()).getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { + if (eventHandle != null && eventHandle instanceof DefaultEventHandle) { acknowledgementSetManager.acquireEventReference(eventHandle); } } else if (!referencedRecords.contains(record)) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java index 012e823ec5..c9fd556214 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.acknowledgements; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.junit.jupiter.api.BeforeEach; @@ -38,20 +38,20 @@ class DefaultAcknowledgementSetManagerTests { @Mock JacksonEvent event3; - EventHandle eventHandle1; - EventHandle eventHandle2; - EventHandle eventHandle3; + DefaultEventHandle eventHandle1; + DefaultEventHandle eventHandle2; + DefaultEventHandle eventHandle3; Boolean result; @BeforeEach void setup() { callbackExecutor = Executors.newFixedThreadPool(2); event1 = mock(JacksonEvent.class); - eventHandle1 = mock(EventHandle.class); + eventHandle1 = mock(DefaultEventHandle.class); lenient().when(event1.getEventHandle()).thenReturn(eventHandle1); event2 = mock(JacksonEvent.class); - eventHandle2 = mock(EventHandle.class); + eventHandle2 = mock(DefaultEventHandle.class); lenient().when(event2.getEventHandle()).thenReturn(eventHandle2); acknowledgementSetManager = createObjectUnderTest(); @@ -91,7 +91,7 @@ void testExpirations() throws InterruptedException { @Test void testMultipleAcknowledgementSets() { event3 = mock(JacksonEvent.class); - eventHandle3 = mock(EventHandle.class); + eventHandle3 = mock(DefaultEventHandle.class); lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index 0fef88268e..75c1154baa 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -14,7 +14,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.EventFactory; -import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -84,7 +84,7 @@ class PipelineTests { private Duration peerForwarderDrainTimeout; private EventFactory eventFactory; private JacksonEvent event; - private EventHandle eventHandle; + private DefaultEventHandle eventHandle; private AcknowledgementSetManager acknowledgementSetManager; @BeforeEach @@ -428,7 +428,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM RouterCopyRecordStrategy routerCopyRecordStrategy = (RouterCopyRecordStrategy)a.getArgument(2); Record rec = records.get(0); event = mock(JacksonEvent.class); - eventHandle = mock(EventHandle.class); + eventHandle = mock(DefaultEventHandle.class); acknowledgementSet = mock(AcknowledgementSet.class); when(event.getEventHandle()).thenReturn(eventHandle); when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); @@ -441,7 +441,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM Pipeline pipeline = createObjectUnderTest(); when(mockSource.areAcknowledgementsEnabled()).thenReturn(true); pipeline.publishToSinks(records); - verify(acknowledgementSetManager).acquireEventReference(any(EventHandle.class)); + verify(acknowledgementSetManager).acquireEventReference(any(DefaultEventHandle.class)); verify(router) .route(anyCollection(), eq(dataFlowComponents), any(RouterGetRecordStrategy.class), any(BiConsumer.class)); @@ -455,7 +455,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_InactiveAcknowledge RouterCopyRecordStrategy routerCopyRecordStrategy = (RouterCopyRecordStrategy)a.getArgument(2); Record rec = records.get(0); event = mock(JacksonEvent.class); - eventHandle = mock(EventHandle.class); + eventHandle = mock(DefaultEventHandle.class); when(event.getEventHandle()).thenReturn(null); when(rec.getData()).thenReturn(event); routerCopyRecordStrategy.getRecord(rec); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java index 6d7e3bb545..4c56113323 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java @@ -42,7 +42,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.event.EventFactory; -import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -66,7 +66,7 @@ public class RouterCopyRecordStrategyTests { private JacksonEvent event; - private Map handleRefCount; + private Map handleRefCount; private static class TestComponent { } @@ -79,11 +79,11 @@ void setUp() { acknowledgementSet1 = mock(AcknowledgementSet.class); try { lenient().doAnswer((i) -> { - EventHandle handle = (EventHandle) i.getArgument(0); + DefaultEventHandle handle = (DefaultEventHandle) i.getArgument(0); int v = handleRefCount.getOrDefault(handle, 0); handleRefCount.put(handle, v+1); return null; - }).when(acknowledgementSetManager).acquireEventReference(any(EventHandle.class)); + }).when(acknowledgementSetManager).acquireEventReference(any(DefaultEventHandle.class)); } catch (Exception e){} mockRecordsIn = IntStream.range(0, 10) .mapToObj(i -> mock(Record.class)) @@ -98,11 +98,11 @@ void setUp() { .collect(Collectors.toList()); } - private void attachEventHandlesToRecordsIn(List eventHandles) { + private void attachEventHandlesToRecordsIn(List eventHandles) { Iterator iter = recordsIn.iterator(); while (iter.hasNext()) { Record r = (Record) iter.next(); - EventHandle handle = ((JacksonEvent)r.getData()).getEventHandle(); + DefaultEventHandle handle = (DefaultEventHandle)((JacksonEvent)r.getData()).getEventHandle(); handle.setAcknowledgementSet(acknowledgementSet1); eventHandles.add(handle); } @@ -186,10 +186,10 @@ void test_one_record_with_acknowledgements() { = Collections.singletonList(dataFlowComponent); final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); Record firstRecord = recordsIn.iterator().next(); - EventHandle firstHandle = ((Event)firstRecord.getData()).getEventHandle(); + DefaultEventHandle firstHandle = (DefaultEventHandle)((Event)firstRecord.getData()).getEventHandle(); Record recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, sameInstance(firstRecord)); assertTrue(getRecordStrategy.getReferencedRecords().contains(firstRecord)); @@ -208,7 +208,7 @@ void test_multiple_records_with_acknowledgements() { = Collections.singletonList(dataFlowComponent); final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); Collection recordsOut = getRecordStrategy.getAllRecords(recordsIn); assertThat(recordsOut.size(), equalTo(recordsIn.size())); @@ -237,12 +237,12 @@ void test_one_record_with_acknowledgements_and_multi_components() { } final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - e1.getEventHandle().setAcknowledgementSet(acknowledgementSet1); + ((DefaultEventHandle)e1.getEventHandle()).setAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} @@ -253,14 +253,14 @@ void test_one_record_with_acknowledgements_and_multi_components() { when(eventBuilder.build()).thenReturn(JacksonEvent.fromEvent(event)); when(eventFactory.eventBuilder(EventBuilder.class)).thenReturn(eventBuilder); Record firstRecord = recordsIn.iterator().next(); - EventHandle firstHandle = ((Event)firstRecord.getData()).getEventHandle(); + DefaultEventHandle firstHandle = (DefaultEventHandle)((Event)firstRecord.getData()).getEventHandle(); Record recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, sameInstance(firstRecord)); assertTrue(getRecordStrategy.getReferencedRecords().contains(firstRecord)); recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, not(sameInstance(firstRecord))); assertFalse(handleRefCount.containsKey(firstHandle)); - EventHandle newHandle = ((JacksonEvent)recordOut.getData()).getEventHandle(); + DefaultEventHandle newHandle = (DefaultEventHandle)((JacksonEvent)recordOut.getData()).getEventHandle(); assertTrue(getRecordStrategy.getReferencedRecords().contains(recordOut)); assertThat(newHandle, not(equalTo(null))); assertFalse(handleRefCount.containsKey(newHandle)); @@ -275,12 +275,12 @@ void test_multiple_records_with_acknowledgements_and_multi_components() { } final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - e1.getEventHandle().setAcknowledgementSet(acknowledgementSet1); + ((DefaultEventHandle)e1.getEventHandle()).setAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index dcdf4200ac..755ccb5283 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -149,11 +149,6 @@ public void setup() { expressionEvaluator = mock(ExpressionEvaluator.class); when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); - /* - lenient().doAnswer(a -> { - return null; - }).when(eventHandle).release(any(Boolean.class)); - */ } @BeforeEach diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 9952fde732..8610bfbf85 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -386,8 +386,6 @@ void output_will_release_all_handles_since_a_flush() throws IOException { } s3SinkService.output(records); - //final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { verify(acknowledgementSet).release(eventHandle, true); } @@ -452,13 +450,11 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); for (EventHandle eventHandle : eventHandles) { - System.out.println("==1====EventHandle=="+eventHandle+"==="+acknowledgementSet); eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records); for (EventHandle eventHandle : eventHandles) { - System.out.println("==2====EventHandle=="+eventHandle+"==="+acknowledgementSet); verify(acknowledgementSet).release(eventHandle, false); } }