diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java index efd36e123d..6093ce9c6b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java @@ -21,6 +21,14 @@ * dropped, etc) */ public interface AcknowledgementSet { + /** + * Adds an event handle to the acknowledgement set. Assigns initial reference + * count of 1. + * + * @param eventHandle event handle to be added + * @since 2.11 + */ + void add(EventHandle eventHandle); /** * Adds an event to the acknowledgement set. Assigns initial reference @@ -29,7 +37,9 @@ public interface AcknowledgementSet { * @param event event to be added * @since 2.2 */ - public void add(Event event); + default void add(Event event) { + add(event.getEventHandle()); + } /** * Aquires a reference to the event by incrementing the reference diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java index 921d689a3c..511490ed5b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java @@ -68,6 +68,18 @@ public boolean release(boolean result) { return returnValue; } + @Override + public void addEventHandle(EventHandle eventHandle) { + synchronized (this) { + for (WeakReference acknowledgementSetRef : acknowledgementSetRefList) { + AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get(); + if (acknowledgementSet != null) { + acknowledgementSet.add(eventHandle); + } + } + } + } + // For testing List> getAcknowledgementSetRefs() { return acknowledgementSetRefList; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index 340c104a14..2782fdfa7c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() { return acknowledgementSet != null; } + @Override + public void addEventHandle(EventHandle eventHandle) { + AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); + if (acknowledgementSet != null) { + acknowledgementSet.add(eventHandle); + } + } + @Override public void acquireReference() { synchronized (this) { diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java index 3ee88f698b..a842033b01 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java @@ -31,5 +31,12 @@ public interface InternalEventHandle { */ void acquireReference(); + /** + * Adds new event handle to the acknowledgement sets associated + * with this event handle + * @param eventHandle event handle to add + * @since 2.11 + */ + void addEventHandle(EventHandle eventHandle); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetTests.java new file mode 100644 index 0000000000..d74ae9ceef --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetTests.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.acknowledgements; + +import org.junit.jupiter.api.Test; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.junit.jupiter.api.BeforeEach; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; + +public class AcknowledgementSetTests { + + Event event; + EventHandle eventHandle; + AcknowledgementSet acknowledgementSet; + + @BeforeEach + public void setup() { + event = mock(Event.class); + eventHandle = mock(EventHandle.class); + when(event.getEventHandle()).thenReturn(eventHandle); + acknowledgementSet = spy(AcknowledgementSet.class); + } + + @Test + public void testAcknowledgementSetAdd() { + acknowledgementSet.add(event); + verify(acknowledgementSet).add(eventHandle); + } +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java index 9998d6eb6d..c05739c940 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java @@ -98,5 +98,18 @@ void testWithOnReleaseHandler() { } + @Test + void testAddEventHandle() { + Instant now = Instant.now(); + AggregateEventHandle eventHandle = new AggregateEventHandle(now); + acknowledgementSet1 = mock(AcknowledgementSet.class); + acknowledgementSet2 = mock(AcknowledgementSet.class); + eventHandle.addAcknowledgementSet(acknowledgementSet1); + eventHandle.addAcknowledgementSet(acknowledgementSet2); + AggregateEventHandle eventHandle2 = new AggregateEventHandle(now); + eventHandle.addEventHandle(eventHandle2); + verify(acknowledgementSet1).add(eventHandle2); + verify(acknowledgementSet2).add(eventHandle2); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java index f351febd11..df1d8d814d 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java @@ -13,7 +13,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import org.mockito.Mock; import java.time.Instant; @@ -39,7 +38,6 @@ void testBasic() { void testWithAcknowledgementSet() { acknowledgementSet = mock(AcknowledgementSet.class); when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); - doNothing().when(acknowledgementSet).acquire(any(EventHandle.class)); Instant now = Instant.now(); DefaultEventHandle eventHandle = new DefaultEventHandle(now); assertThat(eventHandle.getAcknowledgementSet(), equalTo(null)); @@ -75,5 +73,16 @@ void testWithOnReleaseHandler() { assertThat(count, equalTo(1)); } + + @Test + void testAddEventHandle() { + Instant now = Instant.now(); + DefaultEventHandle eventHandle = new DefaultEventHandle(now); + acknowledgementSet = mock(AcknowledgementSet.class); + eventHandle.addAcknowledgementSet(acknowledgementSet); + DefaultEventHandle eventHandle2 = new DefaultEventHandle(now); + eventHandle.addEventHandle(eventHandle2); + verify(acknowledgementSet).add(eventHandle2); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java index b5481add14..b52399646f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java @@ -7,11 +7,8 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; -import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.InternalEventHandle; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,18 +72,13 @@ public void checkProgress() { } @Override - public void add(Event event) { + public void add(EventHandle eventHandle) { lock.lock(); try { - if (event instanceof JacksonEvent) { - EventHandle eventHandle = event.getEventHandle(); - if (eventHandle instanceof DefaultEventHandle) { - InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; - internalEventHandle.addAcknowledgementSet(this); - pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); - totalEventsAdded.incrementAndGet(); - } - } + InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle; + internalEventHandle.addAcknowledgementSet(this); + pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + totalEventsAdded.incrementAndGet(); } finally { lock.unlock(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java index 932c42c614..589ec7eda3 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java @@ -9,7 +9,6 @@ import org.opensearch.dataprepper.core.parser.DataFlowComponent; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; @@ -95,13 +94,13 @@ public Record getRecord(final Record record) { final Event recordEvent = (Event) record.getData(); JacksonEvent newRecordEvent; Record newRecord; - DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { + InternalEventHandle internalHandle = (InternalEventHandle)recordEvent.getEventHandle(); + if (internalHandle != null && internalHandle.hasAcknowledgementSet()) { final EventMetadata eventMetadata = recordEvent.getMetadata(); final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap()); newRecordEvent = (JacksonEvent) eventBuilder.build(); - eventHandle.getAcknowledgementSet().add(newRecordEvent); + internalHandle.addEventHandle(newRecordEvent.getEventHandle()); newRecord = new Record<>(newRecordEvent); acquireEventReference(newRecord); } else { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java index 06521b9d38..dbf2bab1a6 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java @@ -129,7 +129,7 @@ void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception { @Test void testDefaultAcknowledgementInvalidAcquire() { - defaultAcknowledgementSet.add(event); + defaultAcknowledgementSet.add(event.getEventHandle()); defaultAcknowledgementSet.complete(); DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest(); defaultAcknowledgementSet.acquire(handle2); @@ -247,7 +247,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception { Duration.ofSeconds(1) ); defaultAcknowledgementSet.add(event); - defaultAcknowledgementSet.add(event2); + defaultAcknowledgementSet.add(event2.getEventHandle()); defaultAcknowledgementSet.complete(); lenient().doAnswer(a -> { AcknowledgementSet acknowledgementSet = a.getArgument(0); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java index 178460cbe9..aedf1f60a5 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventMetadata; @@ -241,10 +242,10 @@ void test_one_record_with_acknowledgements_and_multi_components() { attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { - JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - ((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1); + EventHandle handle = (EventHandle) i.getArgument(0); + ((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1); return null; - }).when(acknowledgementSet1).add(any(JacksonEvent.class)); + }).when(acknowledgementSet1).add(any(EventHandle.class)); } catch (Exception e){} eventBuilder = mock(EventBuilder.class); @@ -279,10 +280,10 @@ void test_multiple_records_with_acknowledgements_and_multi_components() { attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { - JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - ((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1); + EventHandle handle = (EventHandle) i.getArgument(0); + ((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1); return null; - }).when(acknowledgementSet1).add(any(JacksonEvent.class)); + }).when(acknowledgementSet1).add(any(EventHandle.class)); } catch (Exception e){} eventBuilder = mock(EventBuilder.class); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index e13c403676..e044676799 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -294,7 +294,7 @@ void testProcessRecordsWithAcknowledgementsEnabled() doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index b5d0d4ff28..a162e352c5 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index dac0db3fa9..e715d54563 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -259,7 +259,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index ba9b32b7df..09349579e4 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java index eadc9a7b8b..0b9fcfdaee 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java @@ -140,7 +140,7 @@ void setUp() throws Exception { lenient().doAnswer(a -> { numEventsAdded++; return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); bucketName = UUID.randomUUID().toString(); key = UUID.randomUUID().toString(); when(s3ObjectReference.getBucketName()).thenReturn(bucketName); @@ -196,6 +196,8 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg numEventsAdded = 0; doAnswer(a -> { Record record = mock(Record.class); + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); Consumer c = (Consumer)a.getArgument(2); c.accept(record); return null; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java index a911964ab6..603bc5d77e 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java @@ -127,7 +127,7 @@ void setup() { lenient().doAnswer(a -> { numEventsAdded++; return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); final String bucketName = UUID.randomUUID().toString(); final String objectKey = UUID.randomUUID().toString(); diff --git a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java index cadd463ae9..0f0b24fbdf 100644 --- a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java +++ b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java @@ -106,8 +106,8 @@ protected Record createNewRecordFromEvent(final Event recordEvent, String splitV protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) { DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { - eventHandle.getAcknowledgementSet().add(recordEvent); + if (eventHandle != null) { + eventHandle.addEventHandle(recordEvent.getEventHandle()); } } diff --git a/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java index 4e8944ab91..5c67e123ba 100644 --- a/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java +++ b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java @@ -376,7 +376,7 @@ public void testAddToAcknowledgementSetFromOriginEvent() { DefaultEventHandle spyEventHandle = (DefaultEventHandle) spyEvent.getEventHandle(); // Verify that the add method is called on the acknowledgement set - verify(spyEventHandle).getAcknowledgementSet(); + verify(spyEventHandle).addEventHandle(recordEvent.getEventHandle()); AcknowledgementSet spyAckSet = spyEventHandle.getAcknowledgementSet(); DefaultEventHandle eventHandle = (DefaultEventHandle) recordEvent.getEventHandle();