Skip to content

Commit

Permalink
Addressed review comments. Rebased to latest
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Oct 17, 2024
1 parent 8216fdc commit 95d0d7d
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
* dropped, etc)
*/
public interface AcknowledgementSet {
/**
* Adds an event handle to the acknowledgement set. Assigns initial reference
* count of 1.
*
* @param event event to be added
* @since 2.11
*/
void add(EventHandle eventHandle);

/**
* Adds an event to the acknowledgement set. Assigns initial reference
Expand All @@ -29,7 +37,9 @@ public interface AcknowledgementSet {
* @param event event to be added
* @since 2.2
*/
public void add(Event event);
default void add(Event event) {
add(event.getEventHandle());
}

/**
* Aquires a reference to the event by incrementing the reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public boolean release(boolean result) {
return returnValue;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef : acknowledgementSetRefList) {
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}
}
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() {
return acknowledgementSet != null;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}

@Override
public void acquireReference() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,12 @@ public interface InternalEventHandle {
*/
void acquireReference();

/**
* Adds new event handle to the acknowledgement sets associated
* with this event handle
* @param eventHandle event handle to add
* @since 2.11
*/
void addEventHandle(EventHandle eventHandle);
}

Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,18 @@ void testWithOnReleaseHandler() {

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
acknowledgementSet1 = mock(AcknowledgementSet.class);
acknowledgementSet2 = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet1);
eventHandle.addAcknowledgementSet(acknowledgementSet2);
AggregateEventHandle eventHandle2 = new AggregateEventHandle(now);
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet1).add(eventHandle2);
verify(acknowledgementSet2).add(eventHandle2);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,17 @@ void testWithOnReleaseHandler() {
assertThat(count, equalTo(1));

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
acknowledgementSet = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet);
DefaultEventHandle eventHandle2 = new DefaultEventHandle(now);
doNothing().when(acknowledgementSet).add(any(EventHandle.class));
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet).add(eventHandle2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,13 @@ public void checkProgress() {
}

@Override
public void add(Event event) {
public void add(EventHandle eventHandle) {
lock.lock();
try {
if (event instanceof JacksonEvent) {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
}
}
InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager;

Expand Down Expand Up @@ -97,13 +97,13 @@ public Record getRecord(final Record record) {
final Event recordEvent = (Event) record.getData();
JacksonEvent newRecordEvent;
Record newRecord;
DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
InternalEventHandle internalHandle = (InternalEventHandle)recordEvent.getEventHandle();
if (internalHandle != null && internalHandle.hasAcknowledgementSet()) {
final EventMetadata eventMetadata = recordEvent.getMetadata();
final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap());
newRecordEvent = (JacksonEvent) eventBuilder.build();

eventHandle.getAcknowledgementSet().add(newRecordEvent);
internalHandle.addEventHandle(newRecordEvent.getEventHandle());
newRecord = new Record<>(newRecordEvent);
acquireEventReference(newRecord);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ protected Record createNewRecordFromEvent(final Event recordEvent, String splitV

protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) {
DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
eventHandle.getAcknowledgementSet().add(recordEvent);
if (eventHandle != null) {
eventHandle.addEventHandle(recordEvent.getEventHandle());
}
}

Expand Down

0 comments on commit 95d0d7d

Please sign in to comment.