diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java
new file mode 100644
index 0000000000..21032873a4
--- /dev/null
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.acknowledgements;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+@Configuration
+class AcknowledgementAppConfig {
+ private static final int MAX_THREADS = 12;
+
+ @Bean
+ CallbackTheadFactory callbackTheadFactory() {
+ final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
+ return new CallbackTheadFactory(defaultThreadFactory);
+ }
+
+ @Bean(name = "acknowledgementCallbackExecutor")
+ ExecutorService acknowledgementCallbackExecutor(final CallbackTheadFactory callbackTheadFactory) {
+ return Executors.newFixedThreadPool(MAX_THREADS, callbackTheadFactory);
+ }
+}
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java
index 20cb38fe03..1057418876 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java
@@ -18,19 +18,19 @@
import org.slf4j.LoggerFactory;
/**
* AcknowledgementSetMonitor - monitors the acknowledgement sets for completion/expiration
- *
+ *
* Every acknowledgement set must complete (ie get acknowledgements from all the events in it)
* by a specified time. If it is not completed, then it is considered 'expired' and it is
* cleaned up. The 'run' method is invoked periodically to cleanup the acknowledgement sets
* that are either completed or expired.
*/
-public class AcknowledgementSetMonitor implements Runnable {
+class AcknowledgementSetMonitor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AcknowledgementSetMonitor.class);
private final Set acknowledgementSets;
private final ReentrantLock lock;
- private AtomicInteger numInvalidAcquires;
- private AtomicInteger numInvalidReleases;
- private AtomicInteger numNullHandles;
+ private final AtomicInteger numInvalidAcquires;
+ private final AtomicInteger numInvalidReleases;
+ private final AtomicInteger numNullHandles;
private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) {
return (DefaultAcknowledgementSet)((DefaultEventHandle)eventHandle).getAcknowledgementSet();
@@ -110,7 +110,10 @@ public void release(final EventHandle eventHandle, final boolean success) {
}
}
- // For testing
+ /**
+ * for testing
+ * @return the size
+ */
int getSize() {
return acknowledgementSets.size();
}
@@ -120,7 +123,7 @@ public void run() {
lock.lock();
try {
if (acknowledgementSets.size() > 0) {
- acknowledgementSets.removeIf((ackSet) -> ((DefaultAcknowledgementSet)ackSet).isDone());
+ acknowledgementSets.removeIf((ackSet) -> ((DefaultAcknowledgementSet) ackSet).isDone());
}
} finally {
lock.unlock();
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorThread.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorThread.java
new file mode 100644
index 0000000000..98377df89a
--- /dev/null
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorThread.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.acknowledgements;
+
+import java.time.Duration;
+
+class AcknowledgementSetMonitorThread {
+ private final Thread monitorThread;
+ private final AcknowledgementSetMonitor acknowledgementSetMonitor;
+ private final Duration delayTime;
+ private volatile boolean isStopped = false;
+
+ public AcknowledgementSetMonitorThread(
+ final AcknowledgementSetMonitor acknowledgementSetMonitor,
+ final Duration delayTime) {
+ this.acknowledgementSetMonitor = acknowledgementSetMonitor;
+ this.delayTime = delayTime;
+ monitorThread = new Thread(new Monitor());
+ monitorThread.setDaemon(true);
+ monitorThread.setName("acknowledgement-monitor");
+ }
+
+ public void start() {
+ monitorThread.start();
+ }
+
+ public void stop() {
+ isStopped = true;
+ }
+
+ private class Monitor implements Runnable {
+ @Override
+ public void run() {
+ while (!isStopped) {
+ acknowledgementSetMonitor.run();
+ try {
+ Thread.sleep(delayTime.toMillis());
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/CallbackTheadFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/CallbackTheadFactory.java
new file mode 100644
index 0000000000..5969994f0d
--- /dev/null
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/CallbackTheadFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.acknowledgements;
+
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CallbackTheadFactory implements ThreadFactory {
+ private final ThreadFactory delegateFactory;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ public CallbackTheadFactory(final ThreadFactory delegateFactory) {
+ this.delegateFactory = Objects.requireNonNull(delegateFactory);
+ }
+
+ @Override
+ public Thread newThread(final Runnable runnable) {
+ final Thread thread = delegateFactory.newThread(runnable);
+ thread.setName("acknowledgement-callback-" + threadNumber.getAndIncrement());
+ return thread;
+ }
+}
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java
index e7a7908e90..7c35799140 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java
@@ -5,42 +5,39 @@
package org.opensearch.dataprepper.acknowledgements;
-import org.opensearch.dataprepper.model.event.Event;
-import org.opensearch.dataprepper.model.event.JacksonEvent;
-import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
-
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.event.EventHandle;
+import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.Map;
+import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.time.Instant;
-import java.time.Duration;
+import java.util.function.Consumer;
public class DefaultAcknowledgementSet implements AcknowledgementSet {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class);
private final Consumer callback;
private final Instant expiryTime;
- private final ScheduledExecutorService executor;
+ private final ExecutorService executor;
// This lock protects all the non-final members
private final ReentrantLock lock;
private boolean result;
- private Map pendingAcknowledgments;
- private ScheduledFuture callbackFuture;
+ private final Map pendingAcknowledgments;
+ private Future> callbackFuture;
- private AtomicInteger numInvalidAcquires;
- private AtomicInteger numInvalidReleases;
+ private final AtomicInteger numInvalidAcquires;
+ private final AtomicInteger numInvalidReleases;
- public DefaultAcknowledgementSet(final ScheduledExecutorService executor, final Consumer callback, final Duration expiryTime) {
+ public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer callback, final Duration expiryTime) {
this.callback = callback;
this.result = true;
this.executor = executor;
@@ -127,7 +124,7 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) {
pendingAcknowledgments.remove(eventHandle);
if (pendingAcknowledgments.size() == 0) {
- callbackFuture = executor.schedule(() -> {callback.accept(this.result);}, 0, TimeUnit.SECONDS);
+ callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
}
}
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java
index 52a47f6068..8091334335 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java
@@ -5,36 +5,37 @@
package org.opensearch.dataprepper.acknowledgements;
+import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
+import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
-import org.opensearch.dataprepper.model.event.JacksonEvent;
-import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
-import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
-import java.util.function.Consumer;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.time.Duration;
-import javax.inject.Named;
import javax.inject.Inject;
+import javax.inject.Named;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
@Named
public class DefaultAcknowledgementSetManager implements AcknowledgementSetManager {
- private static final int MAX_THREADS = 100;
private static final int DEFAULT_WAIT_TIME_MS = 15 * 1000;
private final AcknowledgementSetMonitor acknowledgementSetMonitor;
- private final ScheduledExecutorService executor;
+ private final ExecutorService executor;
+ private final AcknowledgementSetMonitorThread acknowledgementSetMonitorThread;
+
@Inject
- public DefaultAcknowledgementSetManager() {
- this(Duration.ofMillis(DEFAULT_WAIT_TIME_MS));
+ public DefaultAcknowledgementSetManager(
+ @Named("acknowledgementCallbackExecutor") final ExecutorService callbackExecutor) {
+ this(callbackExecutor, Duration.ofMillis(DEFAULT_WAIT_TIME_MS));
}
- public DefaultAcknowledgementSetManager(final Duration waitTime) {
- this.executor = Executors.newScheduledThreadPool(MAX_THREADS);
+ public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor, final Duration waitTime) {
this.acknowledgementSetMonitor = new AcknowledgementSetMonitor();
- this.executor.scheduleAtFixedRate(this.acknowledgementSetMonitor, waitTime.toMillis(), waitTime.toMillis(), TimeUnit.MILLISECONDS);
+ this.executor = Objects.requireNonNull(callbackExecutor);
+ acknowledgementSetMonitorThread = new AcknowledgementSetMonitorThread(acknowledgementSetMonitor, waitTime);
+ acknowledgementSetMonitorThread.start();
}
public AcknowledgementSet create(final Consumer callback, final Duration timeout) {
@@ -44,7 +45,7 @@ public AcknowledgementSet create(final Consumer callback, final Duratio
}
public void acquireEventReference(final Event event) {
- acquireEventReference(((JacksonEvent)event).getEventHandle());
+ acquireEventReference(event.getEventHandle());
}
public void acquireEventReference(final EventHandle eventHandle) {
@@ -56,11 +57,15 @@ public void releaseEventReference(final EventHandle eventHandle, final boolean s
}
public void shutdown() {
- this.executor.shutdownNow();
+ acknowledgementSetMonitorThread.stop();
}
- // for testing
- public AcknowledgementSetMonitor getAcknowledgementSetMonitor() {
+ /**
+ * For testing only.
+ *
+ * @return the AcknowledgementSetMonitor
+ */
+ AcknowledgementSetMonitor getAcknowledgementSetMonitor() {
return acknowledgementSetMonitor;
}
}
diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorThreadTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorThreadTest.java
new file mode 100644
index 0000000000..301e90c02e
--- /dev/null
+++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorThreadTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.acknowledgements;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+class AcknowledgementSetMonitorThreadTest {
+ @Mock
+ private AcknowledgementSetMonitor acknowledgementSetMonitor;
+ private Duration delayTime;
+
+ @BeforeEach
+ void setUp() {
+ delayTime = Duration.ofMillis(10);
+ }
+
+ private AcknowledgementSetMonitorThread createObjectUnderTest() {
+ return new AcknowledgementSetMonitorThread(acknowledgementSetMonitor, delayTime);
+ }
+
+ @Test
+ void run_will_call_monitor_run() {
+ final AcknowledgementSetMonitorThread objectUnderTest = createObjectUnderTest();
+
+ objectUnderTest.start();
+ await().atMost(delayTime.plusMillis(500))
+ .untilAsserted(() -> {
+ verify(acknowledgementSetMonitor, atLeastOnce()).run();
+ });
+
+ verify(acknowledgementSetMonitor, atLeastOnce()).run();
+
+ objectUnderTest.stop();
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/CallbackTheadFactoryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/CallbackTheadFactoryTest.java
new file mode 100644
index 0000000000..4f781cfbd3
--- /dev/null
+++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/CallbackTheadFactoryTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.acknowledgements;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.concurrent.ThreadFactory;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class CallbackTheadFactoryTest {
+ @Mock
+ private ThreadFactory delegateThreadFactory;
+
+ @Mock
+ private Runnable runnable;
+
+ private CallbackTheadFactory createObjectUnderTest() {
+ return new CallbackTheadFactory(delegateThreadFactory);
+ }
+
+ @Test
+ void constructor_throws_with_null_delegate() {
+ delegateThreadFactory = null;
+ assertThrows(NullPointerException.class, this::createObjectUnderTest);
+ }
+
+ @Nested
+ class WithNewThread {
+
+ @Mock
+ private Thread threadFromDelegate;
+
+ @BeforeEach
+ void setUp() {
+ when(delegateThreadFactory.newThread(runnable))
+ .thenReturn(threadFromDelegate);
+ }
+
+ @Test
+ void newThread_returns_thread_from_inner() {
+ assertThat(createObjectUnderTest().newThread(runnable),
+ equalTo(threadFromDelegate));
+ }
+
+ @Test
+ void newThread_assigns_name() {
+ createObjectUnderTest().newThread(runnable);
+ verify(threadFromDelegate).setName("acknowledgement-callback-1");
+ }
+ }
+
+ @Test
+ void newThread_called_multiple_times_uses_new_thread_name() {
+ when(delegateThreadFactory.newThread(runnable))
+ .thenAnswer(a -> mock(Thread.class));
+
+ final CallbackTheadFactory objectUnderTest = createObjectUnderTest();
+
+ final Thread thread1 = objectUnderTest.newThread(runnable);
+ final Thread thread2 = objectUnderTest.newThread(runnable);
+ final Thread thread3 = objectUnderTest.newThread(runnable);
+
+ verify(thread1).setName("acknowledgement-callback-1");
+ verify(thread2).setName("acknowledgement-callback-2");
+ verify(thread3).setName("acknowledgement-callback-3");
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java
index 74227be182..6538e5c89b 100644
--- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java
+++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java
@@ -22,11 +22,14 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
@ExtendWith(MockitoExtension.class)
class DefaultAcknowledgementSetManagerTests {
private static final Duration TEST_TIMEOUT_MS = Duration.ofMillis(1000);
DefaultAcknowledgementSetManager acknowledgementSetManager;
+ private ExecutorService callbackExecutor;
@Mock
JacksonEvent event1;
@@ -42,22 +45,19 @@ class DefaultAcknowledgementSetManagerTests {
@BeforeEach
void setup() {
+ callbackExecutor = Executors.newFixedThreadPool(2);
event1 = mock(JacksonEvent.class);
- try {
- doAnswer((i) -> {
- eventHandle1 = (EventHandle)i.getArgument(0);
- return null;
- }).when(event1).setEventHandle(any());
- } catch (Exception e){}
+ doAnswer((i) -> {
+ eventHandle1 = i.getArgument(0);
+ return null;
+ }).when(event1).setEventHandle(any());
lenient().when(event1.getEventHandle()).thenReturn(eventHandle1);
event2 = mock(JacksonEvent.class);
- try {
- doAnswer((i) -> {
- eventHandle2 = (EventHandle)i.getArgument(0);
- return null;
- }).when(event2).setEventHandle(any());
- } catch (Exception e){}
+ doAnswer((i) -> {
+ eventHandle2 = i.getArgument(0);
+ return null;
+ }).when(event2).setEventHandle(any());
lenient().when(event2.getEventHandle()).thenReturn(eventHandle2);
acknowledgementSetManager = createObjectUnderTest();
@@ -67,39 +67,33 @@ void setup() {
}
DefaultAcknowledgementSetManager createObjectUnderTest() {
- return new DefaultAcknowledgementSetManager(Duration.ofMillis(TEST_TIMEOUT_MS.toMillis() * 2));
+ return new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(TEST_TIMEOUT_MS.toMillis() * 2));
}
@Test
- void testBasic() {
+ void testBasic() throws InterruptedException {
acknowledgementSetManager.releaseEventReference(eventHandle2, true);
acknowledgementSetManager.releaseEventReference(eventHandle1, true);
- try {
- Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
- } catch (Exception e){}
+ Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(true));
}
@Test
- void testExpirations() {
+ void testExpirations() throws InterruptedException {
acknowledgementSetManager.releaseEventReference(eventHandle2, true);
- try {
- Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
- } catch (Exception e){}
+ Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(null));
}
@Test
- void testMultipleAcknowledgementSets() {
+ void testMultipleAcknowledgementSets() throws InterruptedException {
event3 = mock(JacksonEvent.class);
- try {
- doAnswer((i) -> {
- eventHandle3 = (EventHandle)i.getArgument(0);
- return null;
- }).when(event3).setEventHandle(any());
- } catch (Exception e){}
+ doAnswer((i) -> {
+ eventHandle3 = i.getArgument(0);
+ return null;
+ }).when(event3).setEventHandle(any());
lenient().when(event3.getEventHandle()).thenReturn(eventHandle3);
AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT_MS);
@@ -107,9 +101,7 @@ void testMultipleAcknowledgementSets() {
acknowledgementSetManager.releaseEventReference(eventHandle2, true);
acknowledgementSetManager.releaseEventReference(eventHandle3, true);
- try {
- Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
- } catch (Exception e){}
+ Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(true));
}
diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java
index 7c6f3fe822..ba4e19241a 100644
--- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java
+++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java
@@ -5,26 +5,28 @@
package org.opensearch.dataprepper.acknowledgements;
-import org.opensearch.dataprepper.model.event.JacksonEvent;
-import org.opensearch.dataprepper.event.DefaultEventHandle;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.CoreMatchers.not;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Consumer;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.dataprepper.event.DefaultEventHandle;
+import org.opensearch.dataprepper.model.event.JacksonEvent;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
-import org.junit.jupiter.api.extension.ExtendWith;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doAnswer;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.time.Duration;
@ExtendWith(MockitoExtension.class)
class DefaultAcknowledgementSetTests {
@@ -34,7 +36,7 @@ class DefaultAcknowledgementSetTests {
private JacksonEvent event;
private DefaultEventHandle handle;
- private ScheduledExecutorService executor;
+ private ExecutorService executor;
private Boolean acknowledgementSetResult;
private final Duration TEST_TIMEOUT = Duration.ofMillis(5000);
private AtomicBoolean callbackInterrupted;
@@ -50,7 +52,7 @@ private DefaultAcknowledgementSet createObjectUnderTestWithCallback(Consumer {
- handle = (DefaultEventHandle)i.getArgument(0);
- return null;
- }).when(event).setEventHandle(any());
- } catch (Exception e){}
+ doAnswer((i) -> {
+ handle = i.getArgument(0);
+ return null;
+ }).when(event).setEventHandle(any());
lenient().when(event.getEventHandle()).thenReturn(handle);
}
@@ -129,9 +129,9 @@ void testDefaultAcknowledgementSetWithCustomCallback() throws Exception {
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true));
- while (!defaultAcknowledgementSet.isDone()) {
- Thread.sleep(1000);
- }
+ Awaitility.waitAtMost(Duration.ofSeconds(10))
+ .pollDelay(Duration.ofMillis(500))
+ .until(() -> defaultAcknowledgementSet.isDone());
assertThat(acknowledgementSetResult, equalTo(true));
}
@@ -152,9 +152,9 @@ void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception {
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(false));
assertThat(defaultAcknowledgementSet.release(handle, false), equalTo(false));
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true));
- while (!defaultAcknowledgementSet.isDone()) {
- Thread.sleep(1000);
- }
+ Awaitility.waitAtMost(Duration.ofSeconds(10))
+ .pollDelay(Duration.ofMillis(500))
+ .until(() -> defaultAcknowledgementSet.isDone());
assertThat(acknowledgementSetResult, equalTo(false));
}
@@ -174,18 +174,13 @@ void testDefaultAcknowledgementSetExpirations() throws Exception {
assertThat(handle, not(equalTo(null)));
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true));
- while (!defaultAcknowledgementSet.isDone()) {
- Thread.sleep(1000);
- }
+ Awaitility.waitAtMost(Duration.ofSeconds(10))
+ .pollDelay(Duration.ofMillis(500))
+ .until(() -> defaultAcknowledgementSet.isDone());
assertThat(acknowledgementSetResult, equalTo(null));
- final int MAX_TRIES = 10;
- int numTries = 0;
- // Try few times
- while (numTries++ < MAX_TRIES && !callbackInterrupted.get()) {
- try {
- Thread.sleep(1000);
- } catch (Exception e){}
- }
+ Awaitility.waitAtMost(Duration.ofSeconds(20))
+ .pollDelay(Duration.ofMillis(500))
+ .until(() -> callbackInterrupted.get());
assertThat(callbackInterrupted.get(), equalTo(true));
}
}