From 95d0d7dde169e2b8589b8eba838132e41084adba Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 17 Oct 2024 20:55:49 +0000 Subject: [PATCH] Addressed review comments. Rebased to latest Signed-off-by: Krishna Kondaka --- .../acknowledgements/AcknowledgementSet.java | 12 +++++++++++- .../model/event/AggregateEventHandle.java | 12 ++++++++++++ .../model/event/DefaultEventHandle.java | 8 ++++++++ .../model/event/InternalEventHandle.java | 7 +++++++ .../model/event/AggregateEventHandleTests.java | 13 +++++++++++++ .../model/event/DefaultEventHandleTests.java | 12 ++++++++++++ .../DefaultAcknowledgementSet.java | 15 +++++---------- .../pipeline/router/RouterCopyRecordStrategy.java | 8 ++++---- .../processor/splitevent/SplitEventProcessor.java | 4 ++-- 9 files changed, 74 insertions(+), 17 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java index efd36e123d..eab5fdc834 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java @@ -21,6 +21,14 @@ * dropped, etc) */ public interface AcknowledgementSet { + /** + * Adds an event handle to the acknowledgement set. Assigns initial reference + * count of 1. + * + * @param event event to be added + * @since 2.11 + */ + void add(EventHandle eventHandle); /** * Adds an event to the acknowledgement set. Assigns initial reference @@ -29,7 +37,9 @@ public interface AcknowledgementSet { * @param event event to be added * @since 2.2 */ - public void add(Event event); + default void add(Event event) { + add(event.getEventHandle()); + } /** * Aquires a reference to the event by incrementing the reference diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java index 921d689a3c..511490ed5b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java @@ -68,6 +68,18 @@ public boolean release(boolean result) { return returnValue; } + @Override + public void addEventHandle(EventHandle eventHandle) { + synchronized (this) { + for (WeakReference acknowledgementSetRef : acknowledgementSetRefList) { + AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get(); + if (acknowledgementSet != null) { + acknowledgementSet.add(eventHandle); + } + } + } + } + // For testing List> getAcknowledgementSetRefs() { return acknowledgementSetRefList; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index 340c104a14..2782fdfa7c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() { return acknowledgementSet != null; } + @Override + public void addEventHandle(EventHandle eventHandle) { + AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); + if (acknowledgementSet != null) { + acknowledgementSet.add(eventHandle); + } + } + @Override public void acquireReference() { synchronized (this) { diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java index 3ee88f698b..a842033b01 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java @@ -31,5 +31,12 @@ public interface InternalEventHandle { */ void acquireReference(); + /** + * Adds new event handle to the acknowledgement sets associated + * with this event handle + * @param eventHandle event handle to add + * @since 2.11 + */ + void addEventHandle(EventHandle eventHandle); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java index 9998d6eb6d..c05739c940 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java @@ -98,5 +98,18 @@ void testWithOnReleaseHandler() { } + @Test + void testAddEventHandle() { + Instant now = Instant.now(); + AggregateEventHandle eventHandle = new AggregateEventHandle(now); + acknowledgementSet1 = mock(AcknowledgementSet.class); + acknowledgementSet2 = mock(AcknowledgementSet.class); + eventHandle.addAcknowledgementSet(acknowledgementSet1); + eventHandle.addAcknowledgementSet(acknowledgementSet2); + AggregateEventHandle eventHandle2 = new AggregateEventHandle(now); + eventHandle.addEventHandle(eventHandle2); + verify(acknowledgementSet1).add(eventHandle2); + verify(acknowledgementSet2).add(eventHandle2); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java index f351febd11..49933922e2 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java @@ -75,5 +75,17 @@ void testWithOnReleaseHandler() { assertThat(count, equalTo(1)); } + + @Test + void testAddEventHandle() { + Instant now = Instant.now(); + DefaultEventHandle eventHandle = new DefaultEventHandle(now); + acknowledgementSet = mock(AcknowledgementSet.class); + eventHandle.addAcknowledgementSet(acknowledgementSet); + DefaultEventHandle eventHandle2 = new DefaultEventHandle(now); + doNothing().when(acknowledgementSet).add(any(EventHandle.class)); + eventHandle.addEventHandle(eventHandle2); + verify(acknowledgementSet).add(eventHandle2); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index fd26d10c72..19c52ebd66 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -75,18 +75,13 @@ public void checkProgress() { } @Override - public void add(Event event) { + public void add(EventHandle eventHandle) { lock.lock(); try { - if (event instanceof JacksonEvent) { - EventHandle eventHandle = event.getEventHandle(); - if (eventHandle instanceof DefaultEventHandle) { - InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; - internalEventHandle.addAcknowledgementSet(this); - pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); - totalEventsAdded.incrementAndGet(); - } - } + InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle; + internalEventHandle.addAcknowledgementSet(this); + pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + totalEventsAdded.incrementAndGet(); } finally { lock.unlock(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java index b4982c5b07..5a4174ae9c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java @@ -19,7 +19,7 @@ import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager; @@ -97,13 +97,13 @@ public Record getRecord(final Record record) { final Event recordEvent = (Event) record.getData(); JacksonEvent newRecordEvent; Record newRecord; - DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { + InternalEventHandle internalHandle = (InternalEventHandle)recordEvent.getEventHandle(); + if (internalHandle != null && internalHandle.hasAcknowledgementSet()) { final EventMetadata eventMetadata = recordEvent.getMetadata(); final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap()); newRecordEvent = (JacksonEvent) eventBuilder.build(); - eventHandle.getAcknowledgementSet().add(newRecordEvent); + internalHandle.addEventHandle(newRecordEvent.getEventHandle()); newRecord = new Record<>(newRecordEvent); acquireEventReference(newRecord); } else { diff --git a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java index cadd463ae9..0f0b24fbdf 100644 --- a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java +++ b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java @@ -106,8 +106,8 @@ protected Record createNewRecordFromEvent(final Event recordEvent, String splitV protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) { DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { - eventHandle.getAcknowledgementSet().add(recordEvent); + if (eventHandle != null) { + eventHandle.addEventHandle(recordEvent.getEventHandle()); } }