From a85e05e60f9fd2d2a269aaf2c7d5291c59946b87 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Mon, 17 Jun 2024 12:34:33 -0700 Subject: [PATCH] Add Aggregate event handle (#4625) Aggregate event handle Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../AcknowledgementSetManager.java | 31 ------ .../model/event/AbstractEventHandle.java | 52 +++++++++ .../model/event/AggregateEventHandle.java | 77 +++++++++++++ .../model/event/DefaultEventHandle.java | 49 +++------ .../dataprepper/model/event/EventHandle.java | 3 +- .../model/event/InternalEventHandle.java | 21 ++-- .../event/AggregateEventHandleTests.java | 102 ++++++++++++++++++ .../model/event/DefaultEventHandleTests.java | 9 +- .../JacksonEvent_JavaSerializationTest.java | 10 +- .../dataprepper/plugins/InMemorySink.java | 2 +- .../AcknowledgementSetMonitor.java | 63 +---------- .../DefaultAcknowledgementSet.java | 2 +- .../DefaultAcknowledgementSetManager.java | 14 --- .../InactiveAcknowledgementSetManager.java | 13 --- .../dataprepper/pipeline/ProcessWorker.java | 2 +- .../router/RouterCopyRecordStrategy.java | 5 +- .../AcknowledgementSetMonitorTests.java | 54 ---------- ...DefaultAcknowledgementSetManagerTests.java | 86 ++++++++++++--- .../DefaultAcknowledgementSetTests.java | 10 +- ...nactiveAcknowledgementSetManagerTests.java | 24 ----- .../codec/JavaPeerForwarderCodecTest.java | 4 +- .../dataprepper/pipeline/PipelineTests.java | 2 +- .../pipeline/ProcessWorkerTest.java | 11 +- .../router/RouterCopyRecordStrategyTests.java | 9 +- .../plugins/sink/s3/S3SinkServiceTest.java | 4 +- .../splitevent/SplitEventProcessorTest.java | 2 +- 26 files changed, 375 insertions(+), 286 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AbstractEventHandle.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java index 69c07c4aa5..6afebeaa91 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java @@ -5,9 +5,6 @@ package org.opensearch.dataprepper.model.acknowledgements; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; - import java.time.Duration; import java.util.function.Consumer; @@ -29,32 +26,4 @@ public interface AcknowledgementSetManager { * @since 2.2 */ AcknowledgementSet create(final Consumer callback, final Duration timeout); - - /** - * Releases an event's reference - * - * @param eventHandle event handle - * @param success indicates negative or positive acknowledgement - * - * @since 2.2 - */ - void releaseEventReference(final EventHandle eventHandle, boolean success); - - /** - * Acquires an event's reference - * - * @param eventHandle event handle - * - * @since 2.2 - */ - void acquireEventReference(final EventHandle eventHandle); - - /** - * Acquires an event's reference - * - * @param event event - * - * @since 2.2 - */ - void acquireEventReference(final Event event); } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AbstractEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AbstractEventHandle.java new file mode 100644 index 0000000000..2ca40fbe59 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AbstractEventHandle.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import java.util.ArrayList; +import java.util.List; +import java.time.Instant; +import java.util.function.BiConsumer; + +abstract class AbstractEventHandle implements EventHandle, InternalEventHandle { + private Instant externalOriginationTime; + private final Instant internalOriginationTime; + private List> releaseConsumers; + + AbstractEventHandle(final Instant internalOriginationTime) { + this.externalOriginationTime = null; + this.internalOriginationTime = internalOriginationTime; + this.releaseConsumers = new ArrayList<>(); + } + @Override + public void setExternalOriginationTime(final Instant externalOriginationTime) { + this.externalOriginationTime = externalOriginationTime; + } + + @Override + public Instant getInternalOriginationTime() { + return this.internalOriginationTime; + } + + @Override + public Instant getExternalOriginationTime() { + return this.externalOriginationTime; + } + + @Override + public void onRelease(BiConsumer releaseConsumer) { + synchronized (releaseConsumers) { + releaseConsumers.add(releaseConsumer); + } + } + + public void notifyReleaseConsumers(boolean result) { + synchronized (releaseConsumers) { + for (final BiConsumer consumer: releaseConsumers) { + consumer.accept(this, result); + } + } + } +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java new file mode 100644 index 0000000000..921d689a3c --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import java.lang.ref.WeakReference; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.time.Instant; +import java.io.Serializable; + +public class AggregateEventHandle extends AbstractEventHandle implements Serializable { + private List> acknowledgementSetRefList; + private Set acknowledgementSetHashes; + + public AggregateEventHandle(final Instant internalOriginationTime) { + super(internalOriginationTime); + this.acknowledgementSetRefList = new ArrayList<>(); + this.acknowledgementSetHashes = new HashSet<>(); + } + + @Override + public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) { + int hashCode = acknowledgementSet.hashCode(); + if (!acknowledgementSetHashes.contains(hashCode)) { + this.acknowledgementSetRefList.add(new WeakReference<>(acknowledgementSet)); + acknowledgementSetHashes.add(hashCode); + } + } + + @Override + public boolean hasAcknowledgementSet() { + return acknowledgementSetRefList.size() != 0; + } + + @Override + public void acquireReference() { + synchronized (this) { + for (WeakReference acknowledgementSetRef: acknowledgementSetRefList) {; + AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get(); + if (acknowledgementSet != null) { + acknowledgementSet.acquire(this); + } + } + } + } + + @Override + public boolean release(boolean result) { + notifyReleaseConsumers(result); + boolean returnValue = true; + synchronized (this) { + for (WeakReference acknowledgementSetRef: acknowledgementSetRefList) { + AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get(); + if (acknowledgementSet != null) { + acknowledgementSet.release(this, result); + } else { + returnValue = false; + } + } + } + return returnValue; + } + + // For testing + List> getAcknowledgementSetRefs() { + return acknowledgementSetRefList; + } + +} + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index 743309bf75..340c104a14 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -8,35 +8,22 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.List; -import java.util.function.BiConsumer; import java.time.Instant; import java.io.Serializable; -public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable { - private Instant externalOriginationTime; - private final Instant internalOriginationTime; +public class DefaultEventHandle extends AbstractEventHandle implements Serializable { private WeakReference acknowledgementSetRef; - private List> releaseConsumers; public DefaultEventHandle(final Instant internalOriginationTime) { + super(internalOriginationTime); this.acknowledgementSetRef = null; - this.externalOriginationTime = null; - this.internalOriginationTime = internalOriginationTime; - this.releaseConsumers = new ArrayList<>(); } @Override - public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) { + public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) { this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet); } - @Override - public void setExternalOriginationTime(final Instant externalOriginationTime) { - this.externalOriginationTime = externalOriginationTime; - } - public AcknowledgementSet getAcknowledgementSet() { if (acknowledgementSetRef == null) { return null; @@ -45,32 +32,30 @@ public AcknowledgementSet getAcknowledgementSet() { } @Override - public Instant getInternalOriginationTime() { - return this.internalOriginationTime; + public boolean hasAcknowledgementSet() { + AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); + return acknowledgementSet != null; } @Override - public Instant getExternalOriginationTime() { - return this.externalOriginationTime; + public void acquireReference() { + synchronized (this) { + AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); + if (acknowledgementSet != null) { + acknowledgementSet.acquire(this); + } + } } @Override - public void release(boolean result) { - synchronized (releaseConsumers) { - for (final BiConsumer consumer: releaseConsumers) { - consumer.accept(this, result); - } - } + public boolean release(boolean result) { + notifyReleaseConsumers(result); AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); if (acknowledgementSet != null) { acknowledgementSet.release(this, result); + return true; } + return false; } - @Override - public void onRelease(BiConsumer releaseConsumer) { - synchronized (releaseConsumers) { - releaseConsumers.add(releaseConsumer); - } - } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java index d05dd8e36c..898384c32e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java @@ -14,9 +14,10 @@ public interface EventHandle { * * @param result result to be used while releasing. This indicates if * the operation on the event handle is success or not + * @return returns true if the event handle is released successful, false otherwise * @since 2.2 */ - void release(boolean result); + boolean release(boolean result); /** * sets external origination time diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java index 3817365f17..3ee88f698b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java @@ -9,20 +9,27 @@ public interface InternalEventHandle { /** - * sets acknowledgement set + * adds acknowledgement set * * @param acknowledgementSet acknowledgementSet to be set in the event handle - * @since 2.6 + * @since 2.9 */ - void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet); + void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet); /** - * gets acknowledgement set + * Indicates if the event handle has atleast one acknowledgement set * - * @return returns acknowledgementSet from the event handle - * @since 2.6 + * @return returns true if there is at least one acknowledgementSet in the event handle + * @since 2.9 */ - AcknowledgementSet getAcknowledgementSet(); + boolean hasAcknowledgementSet(); + + /** + * Acquires reference to acknowledgement set(s) in the event handle + * + * @since 2.9 + */ + void acquireReference(); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java new file mode 100644 index 0000000000..9998d6eb6d --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import org.mockito.Mock; + +import java.lang.ref.WeakReference; +import java.time.Instant; + +class AggregateEventHandleTests { + @Mock + private AcknowledgementSet acknowledgementSet1; + @Mock + private AcknowledgementSet acknowledgementSet2; + private int count; + + @Test + void testBasic() { + Instant now = Instant.now(); + AggregateEventHandle eventHandle = new AggregateEventHandle(now); + assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); + assertThat(eventHandle.hasAcknowledgementSet(), equalTo(false)); + eventHandle.acquireReference(); + eventHandle.release(true); + } + + @Test + void testWithAcknowledgementSet() { + acknowledgementSet1 = mock(AcknowledgementSet.class); + acknowledgementSet2 = mock(AcknowledgementSet.class); + when(acknowledgementSet1.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); + when(acknowledgementSet2.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); + Instant now = Instant.now(); + AggregateEventHandle eventHandle = new AggregateEventHandle(now); + assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); + eventHandle.addAcknowledgementSet(acknowledgementSet1); + // just do duplicate add + eventHandle.addAcknowledgementSet(acknowledgementSet1); + assertThat(eventHandle.hasAcknowledgementSet(), equalTo(true)); + eventHandle.addAcknowledgementSet(acknowledgementSet2); + eventHandle.acquireReference(); + verify(acknowledgementSet1).acquire(eventHandle); + verify(acknowledgementSet2).acquire(eventHandle); + eventHandle.release(true); + verify(acknowledgementSet1).release(eventHandle, true); + verify(acknowledgementSet2).release(eventHandle, true); + } + + @Test + void testWithExternalOriginationTime() { + Instant now = Instant.now(); + AggregateEventHandle eventHandle = new AggregateEventHandle(now); + assertThat(eventHandle.hasAcknowledgementSet(), equalTo(false)); + assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); + eventHandle.setExternalOriginationTime(now.minusSeconds(60)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60))); + eventHandle.release(true); + } + + @Test + void testWithOnReleaseHandler() { + Instant now = Instant.now(); + count = 0; + AggregateEventHandle eventHandle = new AggregateEventHandle(now); + acknowledgementSet1 = mock(AcknowledgementSet.class); + acknowledgementSet2 = mock(AcknowledgementSet.class); + eventHandle.onRelease((handle, result) -> {if (result) count++; }); + eventHandle.addAcknowledgementSet(acknowledgementSet1); + assertThat(eventHandle.hasAcknowledgementSet(), equalTo(true)); + eventHandle.addAcknowledgementSet(acknowledgementSet2); + // Simulate weak reference object not available for + // verification tests to pass 100% + for (WeakReference acknowledgementSetRef: eventHandle.getAcknowledgementSetRefs()) { + if (acknowledgementSetRef.get() == acknowledgementSet2 ) { + acknowledgementSetRef.clear(); + break; + } + } + eventHandle.release(true); + assertThat(count, equalTo(1)); + verify(acknowledgementSet1, times(1)).release(eventHandle, true); + verify(acknowledgementSet2, times(0)).release(eventHandle, true); + + } + +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java index b2a66b2d1d..f351febd11 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java @@ -13,6 +13,7 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; import org.mockito.Mock; import java.time.Instant; @@ -29,6 +30,8 @@ void testBasic() { assertThat(eventHandle.getAcknowledgementSet(), equalTo(null)); assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); + eventHandle.acquireReference(); + assertThat(eventHandle.hasAcknowledgementSet(), equalTo(false)); eventHandle.release(true); } @@ -36,12 +39,16 @@ void testBasic() { void testWithAcknowledgementSet() { acknowledgementSet = mock(AcknowledgementSet.class); when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); + doNothing().when(acknowledgementSet).acquire(any(EventHandle.class)); Instant now = Instant.now(); DefaultEventHandle eventHandle = new DefaultEventHandle(now); assertThat(eventHandle.getAcknowledgementSet(), equalTo(null)); assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); - eventHandle.setAcknowledgementSet(acknowledgementSet); + eventHandle.addAcknowledgementSet(acknowledgementSet); + assertThat(eventHandle.hasAcknowledgementSet(), equalTo(true)); + eventHandle.acquireReference(); + verify(acknowledgementSet).acquire(eventHandle); eventHandle.release(true); verify(acknowledgementSet).release(eventHandle, true); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEvent_JavaSerializationTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEvent_JavaSerializationTest.java index b3ee46b55c..160f08d673 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEvent_JavaSerializationTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEvent_JavaSerializationTest.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import static org.junit.jupiter.api.Assertions.assertFalse; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -20,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; class JacksonEvent_JavaSerializationTest { @@ -54,7 +54,7 @@ void serialize_without_acknowledgementSet_includes_data() throws IOException, Cl assertThat(deserializedEvent.getMetadata(), equalTo(objectUnderTest.getMetadata())); assertThat(deserializedEvent.getEventHandle(), instanceOf(InternalEventHandle.class)); - assertThat(((InternalEventHandle) deserializedEvent.getEventHandle()).getAcknowledgementSet(), nullValue()); + assertFalse(((InternalEventHandle) deserializedEvent.getEventHandle()).hasAcknowledgementSet()); assertThat(deserializedEvent.getEventHandle().getInternalOriginationTime(), equalTo(objectUnderTest.getMetadata().getTimeReceived())); } @@ -63,7 +63,7 @@ void serialize_without_acknowledgementSet_includes_data() throws IOException, Cl void serialize_with_acknowledgementSet_does_not_include_old_acknowledgement_set() throws IOException, ClassNotFoundException { final JacksonEvent objectUnderTest = createObjectUnderTest(); final InternalEventHandle internalEventHandle = (InternalEventHandle) objectUnderTest.getEventHandle(); - internalEventHandle.setAcknowledgementSet(mock(AcknowledgementSet.class)); + internalEventHandle.addAcknowledgementSet(mock(AcknowledgementSet.class)); final Object deserializedObject = serializeAndDeserialize(objectUnderTest); @@ -74,7 +74,7 @@ void serialize_with_acknowledgementSet_does_not_include_old_acknowledgement_set( assertThat(deserializedEvent.getMetadata(), equalTo(objectUnderTest.getMetadata())); assertThat(deserializedEvent.getEventHandle(), instanceOf(InternalEventHandle.class)); - assertThat(((InternalEventHandle) deserializedEvent.getEventHandle()).getAcknowledgementSet(), nullValue()); + assertFalse(((InternalEventHandle) deserializedEvent.getEventHandle()).hasAcknowledgementSet()); assertThat(deserializedEvent.getEventHandle().getInternalOriginationTime(), equalTo(objectUnderTest.getMetadata().getTimeReceived())); } @@ -84,4 +84,4 @@ private Object serializeAndDeserialize(final JacksonEvent objectUnderTest) throw return objectInputStream.readObject(); } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java index dec7aa5c1f..360367a1e4 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java @@ -40,7 +40,7 @@ public void output(final Collection> records) { records.stream().forEach((record) -> { EventHandle eventHandle = ((Event)record.getData()).getEventHandle(); if (acknowledgements) { - acknowledgementSetManager.releaseEventReference(eventHandle, result); + eventHandle.release(result); } }); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java index af9860cc9a..8c911346db 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java @@ -5,9 +5,6 @@ package org.opensearch.dataprepper.acknowledgements; -import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; -import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.util.concurrent.locks.ReentrantLock; @@ -33,15 +30,6 @@ class AcknowledgementSetMonitor implements Runnable { private final AtomicInteger numInvalidReleases; private final AtomicInteger numNullHandles; - private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) { - if (eventHandle instanceof DefaultEventHandle) { - InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; - return (DefaultAcknowledgementSet)internalEventHandle.getAcknowledgementSet(); - } else { - throw new RuntimeException("Unsupported event handle"); - } - } - public AcknowledgementSetMonitor() { this.acknowledgementSets = new HashSet<>(); this.lock = new ReentrantLock(true); @@ -67,55 +55,6 @@ public void add(final AcknowledgementSet acknowledgementSet) { } } - public void acquire(final EventHandle eventHandle) { - if (eventHandle == null) { - numNullHandles.incrementAndGet(); - return; - } - - DefaultAcknowledgementSet acknowledgementSet = getAcknowledgementSet(eventHandle); - lock.lock(); - boolean exists = false; - try { - exists = acknowledgementSets.contains(acknowledgementSet); - } finally { - lock.unlock(); - } - // if acknowledgementSet doesn't exist then it means that the - // event still active even after the acknowledgement set is - // cleaned up. - if (exists) { - acknowledgementSet.acquire(eventHandle); - } else { - LOG.warn("Trying acquire an event in an AcknowledgementSet that does not exist"); - numInvalidAcquires.incrementAndGet(); - } - } - - public void release(final EventHandle eventHandle, final boolean success) { - if (eventHandle == null) { - numNullHandles.incrementAndGet(); - return; - } - DefaultAcknowledgementSet acknowledgementSet = getAcknowledgementSet(eventHandle); - lock.lock(); - boolean exists = false; - try { - exists = acknowledgementSets.contains(acknowledgementSet); - } finally { - lock.unlock(); - } - // if acknowledgementSet doesn't exist then it means some late - // arrival of event handle release after the acknowledgement set - // is cleaned up. - if (exists) { - boolean b = acknowledgementSet.release(eventHandle, success); - } else { - LOG.warn("Trying to release from an AcknowledgementSet that does not exist"); - numInvalidReleases.incrementAndGet(); - } - } - /** * for testing * @return the size @@ -131,6 +70,8 @@ public void run() { if (acknowledgementSets.size() > 0) { acknowledgementSets.removeIf((ackSet) -> ((DefaultAcknowledgementSet) ackSet).isDone()); } + Thread.sleep(1000); + } catch (InterruptedException e) { } finally { lock.unlock(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index c2823203fe..fd26d10c72 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -82,7 +82,7 @@ public void add(Event event) { EventHandle eventHandle = event.getEventHandle(); if (eventHandle instanceof DefaultEventHandle) { InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; - internalEventHandle.setAcknowledgementSet(this); + internalEventHandle.addAcknowledgementSet(this); pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); totalEventsAdded.incrementAndGet(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java index 3f2e3761bd..b8f81dbfc1 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java @@ -7,8 +7,6 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.metrics.PluginMetrics; import javax.inject.Inject; @@ -49,18 +47,6 @@ public AcknowledgementSet create(final Consumer callback, final Duratio return acknowledgementSet; } - public void acquireEventReference(final Event event) { - acquireEventReference(event.getEventHandle()); - } - - public void acquireEventReference(final EventHandle eventHandle) { - acknowledgementSetMonitor.acquire(eventHandle); - } - - public void releaseEventReference(final EventHandle eventHandle, final boolean success) { - acknowledgementSetMonitor.release(eventHandle, success); - } - public void shutdown() { acknowledgementSetMonitorThread.stop(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java index 2e112b4560..52f0e1978f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java @@ -5,8 +5,6 @@ package org.opensearch.dataprepper.acknowledgements; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.util.function.Consumer; @@ -26,15 +24,4 @@ public AcknowledgementSet create(final Consumer callback, final Duratio throw new UnsupportedOperationException("create operation not supported"); } - public void acquireEventReference(final Event event) { - throw new UnsupportedOperationException("acquire operation not supported"); - } - - public void acquireEventReference(final EventHandle eventHandle) { - throw new UnsupportedOperationException("acquire operation not supported"); - } - - public void releaseEventReference(final EventHandle eventHandle, boolean success) { - throw new UnsupportedOperationException("release operation not supported"); - } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 2178fd6bcc..b5538dfe73 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -100,7 +100,7 @@ private void processAcknowledgements(List inputEvents, Collection {return null; }).when(acknowledgementSet1).acquire(eventHandle1); - } catch (Exception e){} - acknowledgementSetMonitor.add(acknowledgementSet1); - acknowledgementSetMonitor.acquire(eventHandle1); - acknowledgementSetMonitor.release(eventHandle1, true); - Thread shutdownThread = new Thread(() -> { - try { - Thread.sleep(DEFAULT_WAIT_TIME_MS); - } catch (Exception e){} - }); - shutdownThread.start(); - acknowledgementSetMonitor.run(); - assertThat(acknowledgementSetMonitor.getSize(), equalTo(0)); - } - - @Test - public void testAcknowledgementSetInvalidAcquire() { - acknowledgementSet2 = mock(DefaultAcknowledgementSet.class); - when(eventHandle1.getAcknowledgementSet()).thenReturn(acknowledgementSet2); - acknowledgementSetMonitor.add(acknowledgementSet1); - acknowledgementSetMonitor.acquire(eventHandle1); - Thread shutdownThread = new Thread(() -> { - try { - Thread.sleep(DEFAULT_WAIT_TIME_MS); - } catch (Exception e){} - }); - shutdownThread.start(); - acknowledgementSetMonitor.run(); - assertThat(acknowledgementSetMonitor.getSize(), equalTo(0)); - assertThat(acknowledgementSetMonitor.getNumInvalidAcquires(), equalTo(1)); - } - - @Test - public void testAcknowledgementSetInvalidRelease() { - acknowledgementSet2 = mock(DefaultAcknowledgementSet.class); - when(eventHandle1.getAcknowledgementSet()).thenReturn(acknowledgementSet2); - acknowledgementSetMonitor.add(acknowledgementSet1); - acknowledgementSetMonitor.release(eventHandle1, true); - Thread shutdownThread = new Thread(() -> { - try { - Thread.sleep(DEFAULT_WAIT_TIME_MS); - } catch (Exception e){} - }); - shutdownThread.start(); - acknowledgementSetMonitor.run(); - assertThat(acknowledgementSetMonitor.getSize(), equalTo(0)); - assertThat(acknowledgementSetMonitor.getNumInvalidReleases(), equalTo(1)); - } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java index 1b87d6c849..a083f5ea85 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java @@ -14,6 +14,8 @@ import org.junit.jupiter.api.Test; import org.mockito.junit.jupiter.MockitoExtension; import org.junit.jupiter.api.extension.ExtendWith; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.ArgumentMatchers.any; import org.mockito.Mock; import static org.awaitility.Awaitility.await; @@ -53,17 +55,27 @@ class DefaultAcknowledgementSetManagerTests { void setup() { currentRatio = 0; callbackExecutor = Executors.newScheduledThreadPool(2); + acknowledgementSetManager = createObjectUnderTest(); + AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); event1 = mock(JacksonEvent.class); eventHandle1 = mock(DefaultEventHandle.class); + lenient().doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet1.release(eventHandle1, result); + return null; + }).when(eventHandle1).release(any(Boolean.class)); lenient().when(event1.getEventHandle()).thenReturn(eventHandle1); pluginMetrics = mock(PluginMetrics.class); event2 = mock(JacksonEvent.class); eventHandle2 = mock(DefaultEventHandle.class); + lenient().doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet1.release(eventHandle2, result); + return null; + }).when(eventHandle2).release(any(Boolean.class)); lenient().when(event2.getEventHandle()).thenReturn(eventHandle2); - acknowledgementSetManager = createObjectUnderTest(); - AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); acknowledgementSet1.add(event1); acknowledgementSet1.add(event2); lenient().when(eventHandle1.getAcknowledgementSet()).thenReturn(acknowledgementSet1); @@ -77,8 +89,8 @@ DefaultAcknowledgementSetManager createObjectUnderTest() { @Test void testBasic() { - acknowledgementSetManager.releaseEventReference(eventHandle2, true); - acknowledgementSetManager.releaseEventReference(eventHandle1, true); + eventHandle2.release(true); + eventHandle1.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); @@ -88,7 +100,7 @@ void testBasic() { @Test void testExpirations() throws InterruptedException { - acknowledgementSetManager.releaseEventReference(eventHandle2, true); + eventHandle2.release(true); Thread.sleep(TEST_TIMEOUT.multipliedBy(5).toMillis()); assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); await().atMost(TEST_TIMEOUT.multipliedBy(5)) @@ -99,17 +111,22 @@ void testExpirations() throws InterruptedException { @Test void testMultipleAcknowledgementSets() { + AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); event3 = mock(JacksonEvent.class); eventHandle3 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle3, result); + return null; + }).when(eventHandle3).release(any(Boolean.class)); lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); - AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); acknowledgementSet2.add(event3); lenient().when(eventHandle3.getAcknowledgementSet()).thenReturn(acknowledgementSet2); acknowledgementSet2.complete(); - acknowledgementSetManager.releaseEventReference(eventHandle2, true); - acknowledgementSetManager.releaseEventReference(eventHandle3, true); + eventHandle2.release(true); + eventHandle3.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); @@ -119,22 +136,42 @@ void testMultipleAcknowledgementSets() { @Test void testWithProgressCheckCallbacks() { + AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofMillis(10000)); eventHandle3 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle3, result); + return null; + }).when(eventHandle3).release(any(Boolean.class)); lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); eventHandle4 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle4, result); + return null; + }).when(eventHandle4).release(any(Boolean.class)); JacksonEvent event4 = mock(JacksonEvent.class); lenient().when(event4.getEventHandle()).thenReturn(eventHandle4); eventHandle5 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle5, result); + return null; + }).when(eventHandle5).release(any(Boolean.class)); JacksonEvent event5 = mock(JacksonEvent.class); lenient().when(event5.getEventHandle()).thenReturn(eventHandle5); eventHandle6 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle6, result); + return null; + }).when(eventHandle6).release(any(Boolean.class)); JacksonEvent event6 = mock(JacksonEvent.class); lenient().when(event6.getEventHandle()).thenReturn(eventHandle6); - AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofMillis(10000)); acknowledgementSet2.addProgressCheck((progressCheck) -> {currentRatio = progressCheck.getRatio();}, Duration.ofSeconds(1)); acknowledgementSet2.add(event3); acknowledgementSet2.add(event4); @@ -145,22 +182,22 @@ void testWithProgressCheckCallbacks() { lenient().when(eventHandle5.getAcknowledgementSet()).thenReturn(acknowledgementSet2); lenient().when(eventHandle6.getAcknowledgementSet()).thenReturn(acknowledgementSet2); acknowledgementSet2.complete(); - acknowledgementSetManager.releaseEventReference(eventHandle3, true); + eventHandle3.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(currentRatio, equalTo(0.75)); }); - acknowledgementSetManager.releaseEventReference(eventHandle4, true); + eventHandle4.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(currentRatio, equalTo(0.5)); }); - acknowledgementSetManager.releaseEventReference(eventHandle5, true); + eventHandle5.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(currentRatio, equalTo(0.25)); }); - acknowledgementSetManager.releaseEventReference(eventHandle6, true); + eventHandle6.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(result, equalTo(true)); @@ -170,14 +207,30 @@ void testWithProgressCheckCallbacks() { @Test void testWithProgressCheckCallbacks_AcksExpire() { + AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofSeconds(10)); eventHandle3 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle3, result); + return null; + }).when(eventHandle3).release(any(Boolean.class)); lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); eventHandle4 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle4, result); + return null; + }).when(eventHandle4).release(any(Boolean.class)); JacksonEvent event4 = mock(JacksonEvent.class); lenient().when(event4.getEventHandle()).thenReturn(eventHandle4); eventHandle5 = mock(DefaultEventHandle.class); + doAnswer(a -> { + Boolean result = (Boolean)a.getArgument(0); + acknowledgementSet2.release(eventHandle5, result); + return null; + }).when(eventHandle5).release(any(Boolean.class)); JacksonEvent event5 = mock(JacksonEvent.class); lenient().when(event5.getEventHandle()).thenReturn(eventHandle5); @@ -185,7 +238,6 @@ void testWithProgressCheckCallbacks_AcksExpire() { JacksonEvent event6 = mock(JacksonEvent.class); lenient().when(event6.getEventHandle()).thenReturn(eventHandle6); - AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofSeconds(10)); acknowledgementSet2.addProgressCheck((progressCheck) -> {currentRatio = progressCheck.getRatio();}, Duration.ofSeconds(1)); acknowledgementSet2.add(event3); acknowledgementSet2.add(event4); @@ -196,17 +248,17 @@ void testWithProgressCheckCallbacks_AcksExpire() { lenient().when(eventHandle5.getAcknowledgementSet()).thenReturn(acknowledgementSet2); lenient().when(eventHandle6.getAcknowledgementSet()).thenReturn(acknowledgementSet2); acknowledgementSet2.complete(); - acknowledgementSetManager.releaseEventReference(eventHandle3, true); + eventHandle3.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(currentRatio, equalTo(0.75)); }); - acknowledgementSetManager.releaseEventReference(eventHandle4, true); + eventHandle4.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(currentRatio, equalTo(0.5)); }); - acknowledgementSetManager.releaseEventReference(eventHandle5, true); + eventHandle5.release(true); await().atMost(TEST_TIMEOUT.multipliedBy(5)) .untilAsserted(() -> { assertThat(currentRatio, equalTo(0.25)); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java index 28e17d77cc..a3ee665adf 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java @@ -91,14 +91,14 @@ void setupEvent() { AcknowledgementSet acknowledgementSet = a.getArgument(0); lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); return null; - }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); + }).when(handle).addAcknowledgementSet(any(AcknowledgementSet.class)); lenient().when(event.getEventHandle()).thenReturn(handle); event2 = mock(JacksonEvent.class); lenient().doAnswer(a -> { AcknowledgementSet acknowledgementSet = a.getArgument(0); lenient().when(handle2.getAcknowledgementSet()).thenReturn(acknowledgementSet); return null; - }).when(handle2).setAcknowledgementSet(any(AcknowledgementSet.class)); + }).when(handle2).addAcknowledgementSet(any(AcknowledgementSet.class)); handle2 = mock(DefaultEventHandle.class); lenient().when(event2.getEventHandle()).thenReturn(handle2); } @@ -186,7 +186,7 @@ void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception { AcknowledgementSet acknowledgementSet = a.getArgument(0); lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); return null; - }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); + }).when(handle).addAcknowledgementSet(any(AcknowledgementSet.class)); assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet)); defaultAcknowledgementSet.acquire(handle); assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(false)); @@ -219,7 +219,7 @@ void testDefaultAcknowledgementSetExpirations() throws Exception { AcknowledgementSet acknowledgementSet = a.getArgument(0); lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); return null; - }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); + }).when(handle).addAcknowledgementSet(any(AcknowledgementSet.class)); assertThat(handle, not(equalTo(null))); assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet)); assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true)); @@ -253,7 +253,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception { AcknowledgementSet acknowledgementSet = a.getArgument(0); lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); return null; - }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); + }).when(handle).addAcknowledgementSet(any(AcknowledgementSet.class)); assertThat(handle, not(equalTo(null))); assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet)); await().atMost(Duration.ofSeconds(5)) diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManagerTests.java index eb1303d487..8a0a4d2ffd 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManagerTests.java @@ -7,12 +7,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static org.mockito.Mockito.mock; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; -import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.event.Event; import java.time.Duration; @@ -30,25 +27,4 @@ void testCreateAPI() { assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.create((a)->{}, Duration.ofMillis(10))); } - @Test - void testEventAcquireAPI() { - assertThat(acknowledgementSetManager, notNullValue()); - Event event = mock(Event.class); - assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.acquireEventReference(event)); - } - - @Test - void testEventHandleAcquireAPI() { - assertThat(acknowledgementSetManager, notNullValue()); - EventHandle eventHandle = mock(EventHandle.class); - assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.acquireEventReference(eventHandle)); - } - - @Test - void testReleaseAPI() { - assertThat(acknowledgementSetManager, notNullValue()); - EventHandle eventHandle = mock(EventHandle.class); - assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.releaseEventReference(eventHandle, true)); - } - } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/codec/JavaPeerForwarderCodecTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/codec/JavaPeerForwarderCodecTest.java index 70a1e737d8..bd0b26e05f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/codec/JavaPeerForwarderCodecTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/codec/JavaPeerForwarderCodecTest.java @@ -78,7 +78,7 @@ void testCodec_with_acknowledgementSet() throws IOException, ClassNotFoundExcept inputEvents.getEvents().stream() .map(Event::getEventHandle) .map(handle -> (InternalEventHandle)handle) - .forEach(handle -> handle.setAcknowledgementSet(mock(AcknowledgementSet.class))); + .forEach(handle -> handle.addAcknowledgementSet(mock(AcknowledgementSet.class))); final byte[] bytes = createObjectUnderTest().serialize(inputEvents); final PeerForwardingEvents outputEvents = createObjectUnderTest().deserialize(bytes); assertThat(outputEvents.getDestinationPipelineName(), equalTo(inputEvents.getDestinationPipelineName())); @@ -119,4 +119,4 @@ private PeerForwardingEvents generatePeerForwardingEvents(final int numEvents) { } return new PeerForwardingEvents(events, pluginId, pipelineName); } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index 5c0a9a974e..c2e0ad769f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -441,7 +441,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM Pipeline pipeline = createObjectUnderTest(); when(mockSource.areAcknowledgementsEnabled()).thenReturn(true); pipeline.publishToSinks(records); - verify(acknowledgementSetManager).acquireEventReference(any(DefaultEventHandle.class)); + verify(eventHandle).acquireReference(); verify(router) .route(anyCollection(), eq(dataFlowComponents), any(RouterGetRecordStrategy.class), any(BiConsumer.class)); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java index 3d13c0d49f..455da07a93 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; @@ -27,7 +28,6 @@ import java.util.concurrent.Future; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -104,7 +104,6 @@ void testProcessWorkerHappyPathWithAcknowledgments() { final Record mockRecord = mock(Record.class); final Event mockEvent = mock(Event.class); final EventHandle eventHandle = mock(DefaultEventHandle.class); - when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class)); when(mockRecord.getData()).thenReturn(mockEvent); when(mockEvent.getEventHandle()).thenReturn(eventHandle); @@ -174,8 +173,8 @@ void testProcessWorkerWithProcessorThrowingExceptionAndAcknowledgmentsEnabledIsH final Record mockRecord = mock(Record.class); final Event mockEvent = mock(Event.class); final EventHandle eventHandle = mock(DefaultEventHandle.class); - when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class)); - doNothing().when(eventHandle).release(true); + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + ((InternalEventHandle)eventHandle).addAcknowledgementSet(acknowledgementSet); when(mockRecord.getData()).thenReturn(mockEvent); when(mockEvent.getEventHandle()).thenReturn(eventHandle); @@ -218,8 +217,8 @@ void testProcessWorkerWithProcessorDroppingAllRecordsAndAcknowledgmentsEnabledIs final Record mockRecord = mock(Record.class); final Event mockEvent = mock(Event.class); final EventHandle eventHandle = mock(DefaultEventHandle.class); - when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class)); - doNothing().when(eventHandle).release(true); + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + ((InternalEventHandle)eventHandle).addAcknowledgementSet(acknowledgementSet); when(mockRecord.getData()).thenReturn(mockEvent); when(mockEvent.getEventHandle()).thenReturn(eventHandle); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java index 4c56113323..c971cd5b8d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java @@ -83,7 +83,7 @@ void setUp() { int v = handleRefCount.getOrDefault(handle, 0); handleRefCount.put(handle, v+1); return null; - }).when(acknowledgementSetManager).acquireEventReference(any(DefaultEventHandle.class)); + }).when(acknowledgementSet1).acquire(any(DefaultEventHandle.class)); } catch (Exception e){} mockRecordsIn = IntStream.range(0, 10) .mapToObj(i -> mock(Record.class)) @@ -103,7 +103,7 @@ private void attachEventHandlesToRecordsIn(List eventHandles while (iter.hasNext()) { Record r = (Record) iter.next(); DefaultEventHandle handle = (DefaultEventHandle)((JacksonEvent)r.getData()).getEventHandle(); - handle.setAcknowledgementSet(acknowledgementSet1); + handle.addAcknowledgementSet(acknowledgementSet1); eventHandles.add(handle); } } @@ -195,6 +195,7 @@ void test_one_record_with_acknowledgements() { assertTrue(getRecordStrategy.getReferencedRecords().contains(firstRecord)); recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, sameInstance(firstRecord)); + firstHandle.addAcknowledgementSet(acknowledgementSet1); assertThat(handleRefCount.get(firstHandle), equalTo(1)); recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, sameInstance(firstRecord)); @@ -242,7 +243,7 @@ void test_one_record_with_acknowledgements_and_multi_components() { try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - ((DefaultEventHandle)e1.getEventHandle()).setAcknowledgementSet(acknowledgementSet1); + ((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} @@ -280,7 +281,7 @@ void test_multiple_records_with_acknowledgements_and_multi_components() { try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - ((DefaultEventHandle)e1.getEventHandle()).setAcknowledgementSet(acknowledgementSet1); + ((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 88c4df5202..c1f84d2bd4 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -565,8 +565,8 @@ void output_will_skip_and_drop_failed_records() throws IOException { DefaultEventHandle eventHandle1 = (DefaultEventHandle)event1.getEventHandle(); DefaultEventHandle eventHandle2 = (DefaultEventHandle)event2.getEventHandle(); - eventHandle1.setAcknowledgementSet(acknowledgementSet); - eventHandle2.setAcknowledgementSet(acknowledgementSet); + eventHandle1.addAcknowledgementSet(acknowledgementSet); + eventHandle2.addAcknowledgementSet(acknowledgementSet); doThrow(RuntimeException.class).when(codec).writeEvent(event1, outputStream); diff --git a/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java index 7fc126fdf5..4e8944ab91 100644 --- a/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java +++ b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java @@ -69,7 +69,7 @@ private Record createTestRecord(final Map data) { DefaultEventHandle eventHandle = (DefaultEventHandle) event.getEventHandle(); - eventHandle.setAcknowledgementSet(mockAcknowledgementSet); + eventHandle.addAcknowledgementSet(mockAcknowledgementSet); return new Record<>(event); }