Skip to content

Commit

Permalink
Fixes an issue with the end-to-end acknowledgements where the the sch…
Browse files Browse the repository at this point in the history
…eduled monitor thread holds a user thread and prevents Data Prepper from shutting down correctly. The monitor now runs in a dedicated daemon thread and the callback methods are submitted to a distinct executor service with a lower bound of available threads. Includes various test improvements as well. (#2483)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Apr 13, 2023
1 parent 5326d91 commit 068c571
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

@Configuration
class AcknowledgementAppConfig {
private static final int MAX_THREADS = 12;

@Bean
CallbackTheadFactory callbackTheadFactory() {
final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
return new CallbackTheadFactory(defaultThreadFactory);
}

@Bean(name = "acknowledgementCallbackExecutor")
ExecutorService acknowledgementCallbackExecutor(final CallbackTheadFactory callbackTheadFactory) {
return Executors.newFixedThreadPool(MAX_THREADS, callbackTheadFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
import org.slf4j.LoggerFactory;
/**
* AcknowledgementSetMonitor - monitors the acknowledgement sets for completion/expiration
*
* <p>
* Every acknowledgement set must complete (ie get acknowledgements from all the events in it)
* by a specified time. If it is not completed, then it is considered 'expired' and it is
* cleaned up. The 'run' method is invoked periodically to cleanup the acknowledgement sets
* that are either completed or expired.
*/
public class AcknowledgementSetMonitor implements Runnable {
class AcknowledgementSetMonitor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AcknowledgementSetMonitor.class);
private final Set<AcknowledgementSet> acknowledgementSets;
private final ReentrantLock lock;
private AtomicInteger numInvalidAcquires;
private AtomicInteger numInvalidReleases;
private AtomicInteger numNullHandles;
private final AtomicInteger numInvalidAcquires;
private final AtomicInteger numInvalidReleases;
private final AtomicInteger numNullHandles;

private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) {
return (DefaultAcknowledgementSet)((DefaultEventHandle)eventHandle).getAcknowledgementSet();
Expand Down Expand Up @@ -110,7 +110,10 @@ public void release(final EventHandle eventHandle, final boolean success) {
}
}

// For testing
/**
* for testing
* @return the size
*/
int getSize() {
return acknowledgementSets.size();
}
Expand All @@ -120,7 +123,7 @@ public void run() {
lock.lock();
try {
if (acknowledgementSets.size() > 0) {
acknowledgementSets.removeIf((ackSet) -> ((DefaultAcknowledgementSet)ackSet).isDone());
acknowledgementSets.removeIf((ackSet) -> ((DefaultAcknowledgementSet) ackSet).isDone());
}
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;

import java.time.Duration;

class AcknowledgementSetMonitorThread {
private final Thread monitorThread;
private final AcknowledgementSetMonitor acknowledgementSetMonitor;
private final Duration delayTime;
private volatile boolean isStopped = false;

public AcknowledgementSetMonitorThread(
final AcknowledgementSetMonitor acknowledgementSetMonitor,
final Duration delayTime) {
this.acknowledgementSetMonitor = acknowledgementSetMonitor;
this.delayTime = delayTime;
monitorThread = new Thread(new Monitor());
monitorThread.setDaemon(true);
monitorThread.setName("acknowledgement-monitor");
}

public void start() {
monitorThread.start();
}

public void stop() {
isStopped = true;
}

private class Monitor implements Runnable {
@Override
public void run() {
while (!isStopped) {
acknowledgementSetMonitor.run();
try {
Thread.sleep(delayTime.toMillis());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CallbackTheadFactory implements ThreadFactory {
private final ThreadFactory delegateFactory;
private final AtomicInteger threadNumber = new AtomicInteger(1);

public CallbackTheadFactory(final ThreadFactory delegateFactory) {
this.delegateFactory = Objects.requireNonNull(delegateFactory);
}

@Override
public Thread newThread(final Runnable runnable) {
final Thread thread = delegateFactory.newThread(runnable);
thread.setName("acknowledgement-callback-" + threadNumber.getAndIncrement());
return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,39 @@

package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.Map;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.time.Instant;
import java.time.Duration;
import java.util.function.Consumer;

public class DefaultAcknowledgementSet implements AcknowledgementSet {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class);
private final Consumer<Boolean> callback;
private final Instant expiryTime;
private final ScheduledExecutorService executor;
private final ExecutorService executor;
// This lock protects all the non-final members
private final ReentrantLock lock;
private boolean result;
private Map<EventHandle, AtomicInteger> pendingAcknowledgments;
private ScheduledFuture callbackFuture;
private final Map<EventHandle, AtomicInteger> pendingAcknowledgments;
private Future<?> callbackFuture;

private AtomicInteger numInvalidAcquires;
private AtomicInteger numInvalidReleases;
private final AtomicInteger numInvalidAcquires;
private final AtomicInteger numInvalidReleases;

public DefaultAcknowledgementSet(final ScheduledExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime) {
public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime) {
this.callback = callback;
this.result = true;
this.executor = executor;
Expand Down Expand Up @@ -127,7 +124,7 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) {
pendingAcknowledgments.remove(eventHandle);
if (pendingAcknowledgments.size() == 0) {
callbackFuture = executor.schedule(() -> {callback.accept(this.result);}, 0, TimeUnit.SECONDS);
callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,37 @@

package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.util.function.Consumer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import javax.inject.Named;
import javax.inject.Inject;
import javax.inject.Named;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

@Named
public class DefaultAcknowledgementSetManager implements AcknowledgementSetManager {
private static final int MAX_THREADS = 100;
private static final int DEFAULT_WAIT_TIME_MS = 15 * 1000;
private final AcknowledgementSetMonitor acknowledgementSetMonitor;
private final ScheduledExecutorService executor;
private final ExecutorService executor;
private final AcknowledgementSetMonitorThread acknowledgementSetMonitorThread;


@Inject
public DefaultAcknowledgementSetManager() {
this(Duration.ofMillis(DEFAULT_WAIT_TIME_MS));
public DefaultAcknowledgementSetManager(
@Named("acknowledgementCallbackExecutor") final ExecutorService callbackExecutor) {
this(callbackExecutor, Duration.ofMillis(DEFAULT_WAIT_TIME_MS));
}

public DefaultAcknowledgementSetManager(final Duration waitTime) {
this.executor = Executors.newScheduledThreadPool(MAX_THREADS);
public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor, final Duration waitTime) {
this.acknowledgementSetMonitor = new AcknowledgementSetMonitor();
this.executor.scheduleAtFixedRate(this.acknowledgementSetMonitor, waitTime.toMillis(), waitTime.toMillis(), TimeUnit.MILLISECONDS);
this.executor = Objects.requireNonNull(callbackExecutor);
acknowledgementSetMonitorThread = new AcknowledgementSetMonitorThread(acknowledgementSetMonitor, waitTime);
acknowledgementSetMonitorThread.start();
}

public AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout) {
Expand All @@ -44,7 +45,7 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
}

public void acquireEventReference(final Event event) {
acquireEventReference(((JacksonEvent)event).getEventHandle());
acquireEventReference(event.getEventHandle());
}

public void acquireEventReference(final EventHandle eventHandle) {
Expand All @@ -56,11 +57,15 @@ public void releaseEventReference(final EventHandle eventHandle, final boolean s
}

public void shutdown() {
this.executor.shutdownNow();
acknowledgementSetMonitorThread.stop();
}

// for testing
public AcknowledgementSetMonitor getAcknowledgementSetMonitor() {
/**
* For testing only.
*
* @return the AcknowledgementSetMonitor
*/
AcknowledgementSetMonitor getAcknowledgementSetMonitor() {
return acknowledgementSetMonitor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.time.Duration;

import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
class AcknowledgementSetMonitorThreadTest {
@Mock
private AcknowledgementSetMonitor acknowledgementSetMonitor;
private Duration delayTime;

@BeforeEach
void setUp() {
delayTime = Duration.ofMillis(10);
}

private AcknowledgementSetMonitorThread createObjectUnderTest() {
return new AcknowledgementSetMonitorThread(acknowledgementSetMonitor, delayTime);
}

@Test
void run_will_call_monitor_run() {
final AcknowledgementSetMonitorThread objectUnderTest = createObjectUnderTest();

objectUnderTest.start();
await().atMost(delayTime.plusMillis(500))
.untilAsserted(() -> {
verify(acknowledgementSetMonitor, atLeastOnce()).run();
});

verify(acknowledgementSetMonitor, atLeastOnce()).run();

objectUnderTest.stop();
}
}
Loading

0 comments on commit 068c571

Please sign in to comment.