From 21f3526d64b24d04d2b7305c67bdf8261942acff Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 2 Nov 2023 06:32:45 +0000 Subject: [PATCH 1/9] Add ProgressCheck callbacks to end-to-end acknowledgements Signed-off-by: Krishna Kondaka --- .../acknowledgements/AcknowledgementSet.java | 15 ++ .../AcknowledgementAppConfig.java | 6 +- .../DefaultAcknowledgementSet.java | 47 ++++- .../DefaultAcknowledgementSetManager.java | 12 +- ...DefaultAcknowledgementSetManagerTests.java | 135 ++++++++++++-- .../DefaultAcknowledgementSetTests.java | 51 +++++- .../consumer/KafkaCustomConsumerTest.java | 6 +- .../source/s3/S3ScanObjectWorkerIT.java | 10 +- .../plugins/source/s3/SqsWorkerIT.java | 164 +++++++++++++++++- .../plugins/source/s3/SqsWorker.java | 58 ++++++- .../source/s3/configuration/SqsOptions.java | 18 ++ .../plugins/source/s3/SqsWorkerTest.java | 39 ++++- 12 files changed, 507 insertions(+), 54 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java index c95c2e5f88..7ce038fdb8 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java @@ -8,6 +8,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import java.time.Duration; +import java.util.function.Consumer; + /** * AcknowledgmentSet keeps track of set of events that * belong to the batch of events that a source creates. @@ -58,4 +61,16 @@ public interface AcknowledgementSet { * initial events are going through the pipeline line. */ public void complete(); + + /** + * adds progress check callback to the acknowledgement set. When added + * the callback is called every progressCheckInterval time with the + * indication of current progress as a ratio of pending number of + * acknowledgements over total acknowledgements + * + * @param progressCheckCallback progress check callback to be called + * @param progressCheckInterval frequency of invocation of progress check callback + * @since 2.6 + */ + public void addProgressCheck(final Consumer progressCheckCallback, final Duration progressCheckInterval); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java index 21032873a4..2d32cb116c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java @@ -8,7 +8,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -23,7 +23,7 @@ CallbackTheadFactory callbackTheadFactory() { } @Bean(name = "acknowledgementCallbackExecutor") - ExecutorService acknowledgementCallbackExecutor(final CallbackTheadFactory callbackTheadFactory) { - return Executors.newFixedThreadPool(MAX_THREADS, callbackTheadFactory); + ScheduledExecutorService acknowledgementCallbackExecutor(final CallbackTheadFactory callbackTheadFactory) { + return Executors.newScheduledThreadPool(MAX_THREADS, callbackTheadFactory); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index 741f08939d..17c338d89b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -18,37 +18,61 @@ import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; public class DefaultAcknowledgementSet implements AcknowledgementSet { private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class); private final Consumer callback; + private Consumer progressCheckCallback; private final Instant expiryTime; - private final ExecutorService executor; + private final ScheduledExecutorService scheduledExecutor; // This lock protects all the non-final members private final ReentrantLock lock; private boolean result; private final Map pendingAcknowledgments; private Future callbackFuture; private final DefaultAcknowledgementSetMetrics metrics; + private ScheduledFuture progressCheckFuture; private boolean completed; + private AtomicInteger totalEventsAdded; - public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) { + public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecutor, + final Consumer callback, + final Duration expiryTime, + final DefaultAcknowledgementSetMetrics metrics) { this.callback = callback; this.result = true; - this.executor = executor; + this.totalEventsAdded = new AtomicInteger(0); + this.scheduledExecutor = scheduledExecutor; this.expiryTime = Instant.now().plusMillis(expiryTime.toMillis()); this.callbackFuture = null; this.metrics = metrics; this.completed = false; + this.progressCheckCallback = null; pendingAcknowledgments = new HashMap<>(); lock = new ReentrantLock(true); } + public void addProgressCheck(final Consumer progressCheckCallback, final Duration progressCheckInterval) { + this.progressCheckCallback = progressCheckCallback; + this.progressCheckFuture = scheduledExecutor.scheduleAtFixedRate(this::checkProgress, 0L, progressCheckInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + public void checkProgress() { + lock.lock(); + int numberOfEventsPending = pendingAcknowledgments.size(); + lock.unlock(); + if (progressCheckCallback != null) { + progressCheckCallback.accept((double)numberOfEventsPending/totalEventsAdded.get()); + } + } + @Override public void add(Event event) { lock.lock(); @@ -59,6 +83,7 @@ public void add(Event event) { InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; internalEventHandle.setAcknowledgementSet(this); pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + totalEventsAdded.incrementAndGet(); } } } finally { @@ -88,11 +113,15 @@ public boolean isDone() { return true; } if (Instant.now().isAfter(expiryTime)) { + if (progressCheckFuture != null) { + progressCheckFuture.cancel(false); + } if (callbackFuture != null) { callbackFuture.cancel(true); callbackFuture = null; LOG.warn("AcknowledgementSet expired"); } + System.out.println("=======EXPIRED======="); metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME); return true; } @@ -112,7 +141,10 @@ public void complete() { try { completed = true; if (pendingAcknowledgments.size() == 0) { - callbackFuture = executor.submit(() -> callback.accept(this.result)); + if (progressCheckFuture != null) { + progressCheckFuture.cancel(false); + } + callbackFuture = scheduledExecutor.submit(() -> callback.accept(this.result)); } } finally { lock.unlock(); @@ -136,7 +168,10 @@ public boolean release(final EventHandle eventHandle, final boolean result) { if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) { pendingAcknowledgments.remove(eventHandle); if (completed && pendingAcknowledgments.size() == 0) { - callbackFuture = executor.submit(() -> callback.accept(this.result)); + if (progressCheckFuture != null) { + progressCheckFuture.cancel(false); + } + callbackFuture = scheduledExecutor.submit(() -> callback.accept(this.result)); return true; } else if (pendingAcknowledgments.size() == 0) { LOG.warn("Acknowledgement set is not completed. Delaying callback until it is completed"); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java index 104945960e..3f2e3761bd 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java @@ -15,27 +15,27 @@ import javax.inject.Named; import java.time.Duration; import java.util.Objects; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; @Named public class DefaultAcknowledgementSetManager implements AcknowledgementSetManager { private static final int DEFAULT_WAIT_TIME_MS = 15 * 1000; private final AcknowledgementSetMonitor acknowledgementSetMonitor; - private final ExecutorService executor; + private final ScheduledExecutorService scheduledExecutor; private final AcknowledgementSetMonitorThread acknowledgementSetMonitorThread; private PluginMetrics pluginMetrics; private DefaultAcknowledgementSetMetrics metrics; @Inject public DefaultAcknowledgementSetManager( - @Named("acknowledgementCallbackExecutor") final ExecutorService callbackExecutor) { + @Named("acknowledgementCallbackExecutor") final ScheduledExecutorService callbackExecutor) { this(callbackExecutor, Duration.ofMillis(DEFAULT_WAIT_TIME_MS)); } - public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor, final Duration waitTime) { + public DefaultAcknowledgementSetManager(final ScheduledExecutorService callbackExecutor, final Duration waitTime) { this.acknowledgementSetMonitor = new AcknowledgementSetMonitor(); - this.executor = Objects.requireNonNull(callbackExecutor); + this.scheduledExecutor = Objects.requireNonNull(callbackExecutor); acknowledgementSetMonitorThread = new AcknowledgementSetMonitorThread(acknowledgementSetMonitor, waitTime); acknowledgementSetMonitorThread.start(); pluginMetrics = PluginMetrics.fromNames("acknowledgementSetManager", "acknowledgements"); @@ -43,7 +43,7 @@ public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor, } public AcknowledgementSet create(final Consumer callback, final Duration timeout) { - AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(executor, callback, timeout, metrics); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutor, callback, timeout, metrics); acknowledgementSetMonitor.add(acknowledgementSet); metrics.increment(DefaultAcknowledgementSetMetrics.CREATED_METRIC_NAME); return acknowledgementSet; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java index c9fd556214..6cdd77a39b 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.junit.jupiter.api.BeforeEach; @@ -22,14 +23,14 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import java.time.Duration; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; @ExtendWith(MockitoExtension.class) class DefaultAcknowledgementSetManagerTests { private static final Duration TEST_TIMEOUT = Duration.ofMillis(400); - DefaultAcknowledgementSetManager acknowledgementSetManager; - private ExecutorService callbackExecutor; + private DefaultAcknowledgementSetManager acknowledgementSetManager; + private ScheduledExecutorService callbackExecutor; @Mock JacksonEvent event1; @@ -38,17 +39,24 @@ class DefaultAcknowledgementSetManagerTests { @Mock JacksonEvent event3; - DefaultEventHandle eventHandle1; - DefaultEventHandle eventHandle2; - DefaultEventHandle eventHandle3; - Boolean result; + private PluginMetrics pluginMetrics; + private DefaultEventHandle eventHandle1; + private DefaultEventHandle eventHandle2; + private DefaultEventHandle eventHandle3; + private DefaultEventHandle eventHandle4; + private DefaultEventHandle eventHandle5; + private DefaultEventHandle eventHandle6; + private Boolean result; + private double currentRatio; @BeforeEach void setup() { - callbackExecutor = Executors.newFixedThreadPool(2); + currentRatio = 0; + callbackExecutor = Executors.newScheduledThreadPool(2); event1 = mock(JacksonEvent.class); eventHandle1 = mock(DefaultEventHandle.class); lenient().when(event1.getEventHandle()).thenReturn(eventHandle1); + pluginMetrics = mock(PluginMetrics.class); event2 = mock(JacksonEvent.class); eventHandle2 = mock(DefaultEventHandle.class); @@ -76,8 +84,6 @@ void testBasic() { assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); assertThat(result, equalTo(true)); }); - assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); - assertThat(result, equalTo(true)); } @Test @@ -85,7 +91,10 @@ void testExpirations() throws InterruptedException { acknowledgementSetManager.releaseEventReference(eventHandle2, true); Thread.sleep(TEST_TIMEOUT.multipliedBy(5).toMillis()); assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); - assertThat(result, equalTo(null)); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(result, equalTo(null)); + }); } @Test @@ -106,7 +115,107 @@ void testMultipleAcknowledgementSets() { assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); assertThat(result, equalTo(true)); }); - assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); - assertThat(result, equalTo(true)); } + + @Test + void testWithProgressCheckCallbacks() { + eventHandle3 = mock(DefaultEventHandle.class); + lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); + + eventHandle4 = mock(DefaultEventHandle.class); + JacksonEvent event4 = mock(JacksonEvent.class); + lenient().when(event4.getEventHandle()).thenReturn(eventHandle4); + + eventHandle5 = mock(DefaultEventHandle.class); + JacksonEvent event5 = mock(JacksonEvent.class); + lenient().when(event5.getEventHandle()).thenReturn(eventHandle5); + + eventHandle6 = mock(DefaultEventHandle.class); + JacksonEvent event6 = mock(JacksonEvent.class); + lenient().when(event6.getEventHandle()).thenReturn(eventHandle6); + + AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofMillis(10000)); + acknowledgementSet2.addProgressCheck((ratio) -> {currentRatio = ratio;}, Duration.ofSeconds(1)); + acknowledgementSet2.add(event3); + acknowledgementSet2.add(event4); + acknowledgementSet2.add(event5); + acknowledgementSet2.add(event6); + lenient().when(eventHandle3.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + lenient().when(eventHandle4.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + lenient().when(eventHandle5.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + lenient().when(eventHandle6.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + acknowledgementSet2.complete(); + acknowledgementSetManager.releaseEventReference(eventHandle3, true); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(0.75)); + }); + acknowledgementSetManager.releaseEventReference(eventHandle4, true); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(0.5)); + }); + acknowledgementSetManager.releaseEventReference(eventHandle5, true); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(0.25)); + }); + acknowledgementSetManager.releaseEventReference(eventHandle6, true); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(result, equalTo(true)); + }); + + } + + @Test + void testWithProgressCheckCallbacks_AcksExpire() { + eventHandle3 = mock(DefaultEventHandle.class); + lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); + + eventHandle4 = mock(DefaultEventHandle.class); + JacksonEvent event4 = mock(JacksonEvent.class); + lenient().when(event4.getEventHandle()).thenReturn(eventHandle4); + + eventHandle5 = mock(DefaultEventHandle.class); + JacksonEvent event5 = mock(JacksonEvent.class); + lenient().when(event5.getEventHandle()).thenReturn(eventHandle5); + + eventHandle6 = mock(DefaultEventHandle.class); + JacksonEvent event6 = mock(JacksonEvent.class); + lenient().when(event6.getEventHandle()).thenReturn(eventHandle6); + + AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofSeconds(10)); + acknowledgementSet2.addProgressCheck((ratio) -> {currentRatio = ratio;}, Duration.ofSeconds(1)); + acknowledgementSet2.add(event3); + acknowledgementSet2.add(event4); + acknowledgementSet2.add(event5); + acknowledgementSet2.add(event6); + lenient().when(eventHandle3.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + lenient().when(eventHandle4.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + lenient().when(eventHandle5.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + lenient().when(eventHandle6.getAcknowledgementSet()).thenReturn(acknowledgementSet2); + acknowledgementSet2.complete(); + acknowledgementSetManager.releaseEventReference(eventHandle3, true); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(0.75)); + }); + acknowledgementSetManager.releaseEventReference(eventHandle4, true); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(0.5)); + }); + acknowledgementSetManager.releaseEventReference(eventHandle5, true); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(0.25)); + }); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(result, equalTo(null)); + }); + + } + } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java index 8a4aa1485a..660e6f84d1 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.acknowledgements; import org.awaitility.Awaitility; +import static org.awaitility.Awaitility.await; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -16,7 +17,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.time.Duration; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -41,7 +42,9 @@ class DefaultAcknowledgementSetTests { @Mock private DefaultEventHandle handle2; - private ExecutorService executor; + private double currentRatio; + + private ScheduledExecutorService executor; private Boolean acknowledgementSetResult; private final Duration TEST_TIMEOUT = Duration.ofMillis(5000); private AtomicBoolean callbackInterrupted; @@ -76,7 +79,7 @@ private DefaultAcknowledgementSet createObjectUnderTestWithCallback(Consumer callbackInterrupted.get()); assertThat(callbackInterrupted.get(), equalTo(true)); } + + @Test + void testDefaultAcknowledgementSetWithProgressCheck() throws Exception { + defaultAcknowledgementSet = createObjectUnderTestWithCallback( + (flag) -> { + acknowledgementSetResult = flag; + } + ); + defaultAcknowledgementSet.addProgressCheck( + (ratio) -> { + currentRatio = ratio; + }, + Duration.ofSeconds(1) + ); + defaultAcknowledgementSet.add(event); + defaultAcknowledgementSet.add(event2); + defaultAcknowledgementSet.complete(); + lenient().doAnswer(a -> { + AcknowledgementSet acknowledgementSet = a.getArgument(0); + lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); + return null; + }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); + assertThat(handle, not(equalTo(null))); + assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet)); + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(1.0)); + }); + assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(false)); + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(currentRatio, equalTo(0.5)); + }); + assertThat(defaultAcknowledgementSet.release(handle2, true), equalTo(true)); + Awaitility.waitAtMost(Duration.ofSeconds(10)) + .pollDelay(Duration.ofMillis(500)) + .until(() -> defaultAcknowledgementSet.isDone()); + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(acknowledgementSetResult, equalTo(true)); + }); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index e32ce36836..a5913042de 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -45,7 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,7 +72,7 @@ public class KafkaCustomConsumerTest { @Mock private KafkaConsumerConfig sourceConfig; - private ExecutorService callbackExecutor; + private ScheduledExecutorService callbackExecutor; private AcknowledgementSetManager acknowledgementSetManager; @Mock @@ -134,7 +134,7 @@ public void setUp() { }).when(negCounter).increment(); doAnswer((i)-> {return posCount;}).when(posCounter).count(); doAnswer((i)-> {return negCount;}).when(negCounter).count(); - callbackExecutor = Executors.newFixedThreadPool(2); + callbackExecutor = Executors.newScheduledThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000)); sourceConfig = mock(KafkaConsumerConfig.class); diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java index 49f5c687b6..48c5862155 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java @@ -56,7 +56,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; @@ -209,7 +209,7 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen lenient().when(s3ScanSchedulingOptions.getInterval()).thenReturn(Duration.ofHours(1)); lenient().when(s3ScanSchedulingOptions.getCount()).thenReturn(1); - ExecutorService executor = Executors.newFixedThreadPool(2); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); return new ScanObjectWorker(s3Client,List.of(scanOptions),createObjectUnderTest(s3ObjectRequest) @@ -257,7 +257,7 @@ void parseS3Object_parquet_correctly_with_bucket_scan_and_loads_data_into_Buffer startTimeAndRangeScanOptions, Boolean.TRUE); - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.submit(objectUnderTest::run); await().atMost(Duration.ofSeconds(30)).until(() -> waitForAllRecordsToBeProcessed(numberOfRecords)); @@ -299,7 +299,7 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer( final int expectedWrites = numberOfRecords / numberOfRecordsToAccumulate + (numberOfRecords % numberOfRecordsToAccumulate != 0 ? 1 : 0); - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.submit(scanObjectWorker::run); await().atMost(Duration.ofSeconds(30)).until(() -> waitForAllRecordsToBeProcessed(numberOfRecords)); @@ -346,7 +346,7 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer_and_del final int expectedWrites = numberOfRecords / numberOfRecordsToAccumulate; - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.submit(scanObjectWorker::run); diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java index 474929f71e..49dfa38ae7 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java @@ -35,11 +35,15 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.List; +import java.util.ArrayList; import java.util.UUID; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -60,6 +64,8 @@ class SqsWorkerIT { private SqsClient sqsClient; @Mock private S3Service s3Service; + @Mock + private SqsOptions sqsOptions; private S3SourceConfig s3SourceConfig; private PluginMetrics pluginMetrics; private S3ObjectGenerator s3ObjectGenerator; @@ -69,8 +75,11 @@ class SqsWorkerIT { private Double receivedCount = 0.0; private Double deletedCount = 0.0; private Double ackCallbackCount = 0.0; + private Double visibilityTimeoutChangedCount = 0.0; private Event event; private AtomicBoolean ready = new AtomicBoolean(false); + private int numEventsAdded; + private List events; @BeforeEach void setUp() { @@ -80,6 +89,7 @@ void setUp() { .build(); bucket = System.getProperty("tests.s3source.bucket"); s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket); + events = new ArrayList<>(); sqsClient = SqsClient.builder() .region(Region.of(System.getProperty("tests.s3source.region"))) @@ -100,14 +110,14 @@ void setUp() { lenient().when(pluginMetrics.summary(anyString())).thenReturn(distributionSummary); when(pluginMetrics.timer(anyString())).thenReturn(sqsMessageDelayTimer); - final SqsOptions sqsOptions = mock(SqsOptions.class); + sqsOptions = mock(SqsOptions.class); when(sqsOptions.getSqsUrl()).thenReturn(System.getProperty("tests.s3source.queue.url")); when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(60)); when(sqsOptions.getMaximumMessages()).thenReturn(10); when(sqsOptions.getWaitTime()).thenReturn(Duration.ofSeconds(10)); when(s3SourceConfig.getSqsOptions()).thenReturn(sqsOptions); lenient().when(s3SourceConfig.getOnErrorOption()).thenReturn(OnErrorOption.DELETE_MESSAGES); - when(s3SourceConfig.getNotificationSource()).thenReturn(NotificationSourceOption.S3); + lenient().when(s3SourceConfig.getNotificationSource()).thenReturn(NotificationSourceOption.S3); } private SqsWorker createObjectUnderTest() { @@ -183,7 +193,7 @@ void processSqsMessages_should_return_at_least_one_message_with_acks_with_callba ackSet.add(event); return null; }).when(s3Service).addS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class)); - ExecutorService executor = Executors.newFixedThreadPool(2); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); final SqsWorker objectUnderTest = createObjectUnderTest(); Thread sinkThread = new Thread(() -> { @@ -256,7 +266,7 @@ void processSqsMessages_should_return_at_least_one_message_with_acks_with_callba return null; }).when(s3Service).addS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class)); - ExecutorService executor = Executors.newFixedThreadPool(2); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); final SqsWorker objectUnderTest = createObjectUnderTest(); Thread sinkThread = new Thread(() -> { @@ -281,6 +291,150 @@ void processSqsMessages_should_return_at_least_one_message_with_acks_with_callba assertThat(ackCallbackCount, equalTo((double)1.0)); } + @ParameterizedTest + @ValueSource(ints = {1}) + void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberOfObjectsToWrite) throws IOException, InterruptedException { + writeToS3(numberOfObjectsToWrite); + + when(s3SourceConfig.getAcknowledgements()).thenReturn(true); + final Counter receivedCounter = mock(Counter.class); + final Counter deletedCounter = mock(Counter.class); + final Counter ackCallbackCounter = mock(Counter.class); + final Counter visibilityTimeoutChangedCounter = mock(Counter.class); + when(pluginMetrics.counter(SqsWorker.SQS_MESSAGES_RECEIVED_METRIC_NAME)).thenReturn(receivedCounter); + when(pluginMetrics.counter(SqsWorker.SQS_MESSAGES_DELETED_METRIC_NAME)).thenReturn(deletedCounter); + when(pluginMetrics.counter(SqsWorker.ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME)).thenReturn(ackCallbackCounter); + when(pluginMetrics.counter(SqsWorker.SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME)).thenReturn(visibilityTimeoutChangedCounter); + lenient().doAnswer((val) -> { + receivedCount += (double)val.getArgument(0); + return null; + }).when(receivedCounter).increment(any(Double.class)); + + lenient().doAnswer((val) -> { + if (val.getArgument(0) != null) { + delCount.getAndAdd((int)(double)val.getArgument(0)); + } + return null; + }).when(deletedCounter).increment(any(Double.class)); + ackCallbackCount = 0.0; + lenient().doAnswer((val) -> { + ackCallbackCount += 1; + return null; + }).when(ackCallbackCounter).increment(); + lenient().doAnswer((val) -> { + visibilityTimeoutChangedCount += 1; + return null; + }).when(visibilityTimeoutChangedCounter).increment(); + numEventsAdded = 0; + + doAnswer((val) -> { + AcknowledgementSet ackSet = val.getArgument(1); + S3ObjectReference s3ObjectReference = val.getArgument(0); + assertThat(s3ObjectReference.getBucketName(), equalTo(bucket)); + assertThat(s3ObjectReference.getKey(), startsWith("s3 source/sqs/")); + event = (Event)JacksonEvent.fromMessage(val.getArgument(0).toString()); + + ackSet.add(event); + synchronized(events) { + events.add(event); + } + try { + Thread.sleep(4000); + } catch (Exception e) {} + return null; + }).when(s3Service).addS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class)); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); + when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); + when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60)); + when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); + final SqsWorker objectUnderTest = createObjectUnderTest(); + final int sqsMessagesProcessed = objectUnderTest.processSqsMessages(); + synchronized(events) { + for (Event e: events) { + if (e.getEventHandle() != null) { + e.getEventHandle().release(true); + } + } + } + await().atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> { + assertThat(visibilityTimeoutChangedCount, greaterThanOrEqualTo((double)numberOfObjectsToWrite)); + assertThat(delCount.get(), equalTo(numberOfObjectsToWrite)); + assertThat(ackCallbackCount, equalTo((double)numberOfObjectsToWrite)); + }); + } + + @ParameterizedTest + @ValueSource(ints = {1}) + void processSqsMessages_with_acks_and_progress_check_callbacks_expires(final int numberOfObjectsToWrite) throws IOException, InterruptedException { + writeToS3(numberOfObjectsToWrite); + + when(s3SourceConfig.getAcknowledgements()).thenReturn(true); + final Counter receivedCounter = mock(Counter.class); + final Counter deletedCounter = mock(Counter.class); + final Counter ackCallbackCounter = mock(Counter.class); + final Counter visibilityTimeoutChangedCounter = mock(Counter.class); + when(pluginMetrics.counter(SqsWorker.SQS_MESSAGES_RECEIVED_METRIC_NAME)).thenReturn(receivedCounter); + when(pluginMetrics.counter(SqsWorker.SQS_MESSAGES_DELETED_METRIC_NAME)).thenReturn(deletedCounter); + when(pluginMetrics.counter(SqsWorker.ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME)).thenReturn(ackCallbackCounter); + when(pluginMetrics.counter(SqsWorker.SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME)).thenReturn(visibilityTimeoutChangedCounter); + lenient().doAnswer((val) -> { + receivedCount += (double)val.getArgument(0); + return null; + }).when(receivedCounter).increment(any(Double.class)); + + lenient().doAnswer((val) -> { + if (val.getArgument(0) != null) { + deletedCount += (double)val.getArgument(0); + delCount.getAndAdd((int)(double)val.getArgument(0)); + } + return null; + }).when(deletedCounter).increment(any(Double.class)); + lenient().when(deletedCounter.count()).thenReturn(deletedCount); + ackCallbackCount = 0.0; + lenient().doAnswer((val) -> { + ackCallbackCount += 1; + return null; + }).when(ackCallbackCounter).increment(); + lenient().doAnswer((val) -> { + visibilityTimeoutChangedCount += 1; + return null; + }).when(visibilityTimeoutChangedCounter).increment(); + numEventsAdded = 0; + + doAnswer((val) -> { + AcknowledgementSet ackSet = val.getArgument(1); + S3ObjectReference s3ObjectReference = val.getArgument(0); + assertThat(s3ObjectReference.getBucketName(), equalTo(bucket)); + assertThat(s3ObjectReference.getKey(), startsWith("s3 source/sqs/")); + event = (Event)JacksonEvent.fromMessage(val.getArgument(0).toString()); + + ackSet.add(event); + synchronized(events) { + events.add(event); + } + try { + Thread.sleep(2000); + } catch (Exception e) {} + return null; + }).when(s3Service).addS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class)); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); + when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); + when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60)); + when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); + final SqsWorker objectUnderTest = createObjectUnderTest(); + final int sqsMessagesProcessed = objectUnderTest.processSqsMessages(); + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + assertThat(visibilityTimeoutChangedCount, greaterThanOrEqualTo((double)numberOfObjectsToWrite)); + assertThat(delCount.get(), equalTo(0)); + assertThat(ackCallbackCount, equalTo(0.0)); + }); + + } + /** The EventBridge test is disabled by default * To run this test run only this one test with S3 bucket configured to use EventBridge to send notifications to SQS */ diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 06a38d2393..d02d8f214c 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -28,6 +28,7 @@ import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.Message; @@ -40,7 +41,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -51,6 +54,7 @@ public class SqsWorker implements Runnable { static final String SQS_MESSAGES_FAILED_METRIC_NAME = "sqsMessagesFailed"; static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed"; static final String SQS_MESSAGE_DELAY_METRIC_NAME = "sqsMessageDelay"; + static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount"; static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; private final S3SourceConfig s3SourceConfig; @@ -64,6 +68,7 @@ public class SqsWorker implements Runnable { private final Counter sqsMessagesFailedCounter; private final Counter sqsMessagesDeleteFailedCounter; private final Counter acknowledgementSetCallbackCounter; + private final Counter sqsVisibilityTimeoutChangedCount; private final Timer sqsMessageDelayTimer; private final Backoff standardBackoff; private int failedAttemptCount; @@ -72,6 +77,7 @@ public class SqsWorker implements Runnable { private final ObjectMapper objectMapper = new ObjectMapper(); private volatile boolean isStopped = false; + private Map parsedMessageVisibilityTimesMap; public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, final SqsClient sqsClient, @@ -89,6 +95,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, objectCreatedFilter = new S3ObjectCreatedFilter(); evenBridgeObjectCreatedFilter = new EventBridgeObjectCreatedFilter(); failedAttemptCount = 0; + parsedMessageVisibilityTimesMap = new HashMap<>(); sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME); sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME); @@ -96,6 +103,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, sqsMessagesDeleteFailedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME); sqsMessageDelayTimer = pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME); acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME); + sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME); } @Override @@ -226,16 +234,48 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { for (ParsedMessage parsedMessage : parsedMessagesToRead) { List waitingForAcknowledgements = new ArrayList<>(); AcknowledgementSet acknowledgementSet = null; + final int visibilityTimeout = (int)sqsOptions.getVisibilityTimeout().getSeconds(); + final int maxVisibilityTimeout = (int)sqsOptions.getMaxVisibilityTimeoutExtension().getSeconds(); + final int progressCheckInterval = visibilityTimeout/2 - 1; if (endToEndAcknowledgementsEnabled) { - // Acknowledgement Set timeout is slightly smaller than the visibility timeout; - int timeout = (int) sqsOptions.getVisibilityTimeout().getSeconds() - 2; - acknowledgementSet = acknowledgementSetManager.create((result) -> { - acknowledgementSetCallbackCounter.increment(); - // Delete only if this is positive acknowledgement - if (result == true) { - deleteSqsMessages(waitingForAcknowledgements); - } - }, Duration.ofSeconds(timeout)); + int expiryTimeout = visibilityTimeout - 2; + final boolean extendedVisibilityTimeoutEnabled = sqsOptions.getExtendVisibilityTimeout(); + if (extendedVisibilityTimeoutEnabled) { + expiryTimeout = maxVisibilityTimeout; + } + acknowledgementSet = acknowledgementSetManager.create( + (result) -> { + acknowledgementSetCallbackCounter.increment(); + // Delete only if this is positive acknowledgement + if (extendedVisibilityTimeoutEnabled) { + parsedMessageVisibilityTimesMap.remove(parsedMessage); + } + if (result == true) { + deleteSqsMessages(waitingForAcknowledgements); + } + }, + Duration.ofSeconds(expiryTimeout)); + if (extendedVisibilityTimeoutEnabled) { + acknowledgementSet.addProgressCheck( + (ratio) -> { + final int newVisibilityTimeoutSeconds = visibilityTimeout; + int newValue = parsedMessageVisibilityTimesMap.getOrDefault(parsedMessage, visibilityTimeout) + progressCheckInterval; + if (newValue >= maxVisibilityTimeout) { + return; + } + parsedMessageVisibilityTimesMap.put(parsedMessage, newValue); + final ChangeMessageVisibilityRequest changeMessageVisibilityRequest = ChangeMessageVisibilityRequest.builder() + .visibilityTimeout(newVisibilityTimeoutSeconds) + .queueUrl(sqsOptions.getSqsUrl()) + .receiptHandle(parsedMessage.getMessage().receiptHandle()) + .build(); + + LOG.info("Setting visibility timeout for message {} to {}", parsedMessage.getMessage().messageId(), newVisibilityTimeoutSeconds); + sqsClient.changeMessageVisibility(changeMessageVisibilityRequest); + sqsVisibilityTimeoutChangedCount.increment(); + }, + Duration.ofSeconds(progressCheckInterval)); + } } final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey()); final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java index ee2cbd0395..50ee22c7ff 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java @@ -16,7 +16,9 @@ public class SqsOptions { private static final int DEFAULT_MAXIMUM_MESSAGES = 10; + private static final Boolean DEFAULT_EXTEND_VISIBILITY_TIMEOUT = false; private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30); + private static final Duration DEFAULT_MAX_VISIBILITY_TIMEOUT_EXTENSION = Duration.ofSeconds(1800); // 30 minutes private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20); private static final Duration DEFAULT_POLL_DELAY_SECONDS = Duration.ofSeconds(0); @@ -34,6 +36,14 @@ public class SqsOptions { @DurationMax(seconds = 43200) private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS; + @JsonProperty("extend_visibility_timeout") + private Boolean extendVisibilityTimeout = DEFAULT_EXTEND_VISIBILITY_TIMEOUT; + + @JsonProperty("max_visibility_timeout_extesion") + @DurationMin(seconds = 30) + @DurationMax(seconds = 3600) + private Duration maxVisibilityTimeoutExtension = DEFAULT_MAX_VISIBILITY_TIMEOUT_EXTENSION; + @JsonProperty("wait_time") @DurationMin(seconds = 0) @DurationMax(seconds = 20) @@ -55,6 +65,14 @@ public Duration getVisibilityTimeout() { return visibilityTimeout; } + public Duration getMaxVisibilityTimeoutExtension() { + return maxVisibilityTimeoutExtension; + } + + public Boolean getExtendVisibilityTimeout() { + return extendVisibilityTimeout; + } + public Duration getWaitTime() { return waitTime; } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java index 61ac59308d..dcab3184c5 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java @@ -91,6 +91,7 @@ class SqsWorkerTest { private Timer sqsMessageDelayTimer; private AcknowledgementSetManager acknowledgementSetManager; private AcknowledgementSet acknowledgementSet; + private SqsOptions sqsOptions; @BeforeEach void setUp() { @@ -105,7 +106,7 @@ void setUp() { AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); - SqsOptions sqsOptions = mock(SqsOptions.class); + sqsOptions = mock(SqsOptions.class); when(sqsOptions.getSqsUrl()).thenReturn("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"); when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); @@ -217,6 +218,42 @@ void processSqsMessages_should_return_number_of_messages_processed_with_acknowle assertThat(actualDelay, greaterThanOrEqualTo(Duration.ofHours(1).minus(Duration.ofSeconds(5)))); } + @ParameterizedTest + @ValueSource(strings = {"ObjectCreated:Put", "ObjectCreated:Post", "ObjectCreated:Copy", "ObjectCreated:CompleteMultipartUpload"}) + void processSqsMessages_should_return_number_of_messages_processed_with_acknowledgements_and_progress_check(final String eventName) throws IOException { + when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); + when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); + when(s3SourceConfig.getAcknowledgements()).thenReturn(true); + sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff); + Instant startTime = Instant.now().minus(1, ChronoUnit.HOURS); + final Message message = mock(Message.class); + when(message.body()).thenReturn(createEventNotification(eventName, startTime)); + final String testReceiptHandle = UUID.randomUUID().toString(); + when(message.messageId()).thenReturn(testReceiptHandle); + when(message.receiptHandle()).thenReturn(testReceiptHandle); + + final ReceiveMessageResponse receiveMessageResponse = mock(ReceiveMessageResponse.class); + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResponse); + when(receiveMessageResponse.messages()).thenReturn(Collections.singletonList(message)); + + final int messagesProcessed = sqsWorker.processSqsMessages(); + final ArgumentCaptor deleteMessageBatchRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + + final ArgumentCaptor durationArgumentCaptor = ArgumentCaptor.forClass(Duration.class); + verify(sqsMessageDelayTimer).record(durationArgumentCaptor.capture()); + Duration actualDelay = durationArgumentCaptor.getValue(); + + assertThat(messagesProcessed, equalTo(1)); + verify(s3Service).addS3Object(any(S3ObjectReference.class), any()); + verify(acknowledgementSetManager).create(any(), any(Duration.class)); + verify(acknowledgementSet).addProgressCheck(any(), any(Duration.class)); + verify(sqsMessagesReceivedCounter).increment(1); + verifyNoInteractions(sqsMessagesDeletedCounter); + assertThat(actualDelay, lessThanOrEqualTo(Duration.ofHours(1).plus(Duration.ofSeconds(5)))); + assertThat(actualDelay, greaterThanOrEqualTo(Duration.ofHours(1).minus(Duration.ofSeconds(5)))); + } + @ParameterizedTest @ValueSource(strings = {"", "{\"foo\": \"bar\""}) void processSqsMessages_should_not_interact_with_S3Service_if_input_is_not_valid_JSON(String inputString) { From 5a9ab5c689e96cbde29addba3f6732b2ae5ea964 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 2 Nov 2023 15:11:38 +0000 Subject: [PATCH 2/9] Removed unnecessary logs Signed-off-by: Krishna Kondaka --- .../dataprepper/acknowledgements/DefaultAcknowledgementSet.java | 1 - .../dataprepper/plugins/sink/opensearch/OpenSearchSink.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index 17c338d89b..4445256cb3 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -121,7 +121,6 @@ public boolean isDone() { callbackFuture = null; LOG.warn("AcknowledgementSet expired"); } - System.out.println("=======EXPIRED======="); metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME); return true; } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 8820de4afe..b4c18c2bbd 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -236,7 +236,7 @@ private void doInitializeInternal() throws IOException { .setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id") .build()); bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient); - bulkRetryStrategy = new BulkRetryStrategy( + bulkRetryStrategy = new BulkRetryStrategy(this, bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()), this::logFailureForBulkRequests, pluginMetrics, From 057b9d1be12c677f5d6632ed9e9c3a5855fcc14f Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 2 Nov 2023 15:17:21 +0000 Subject: [PATCH 3/9] Fixed unnecessary change Signed-off-by: Krishna Kondaka --- .../dataprepper/plugins/sink/opensearch/OpenSearchSink.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index b4c18c2bbd..8853674d90 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -236,8 +236,7 @@ private void doInitializeInternal() throws IOException { .setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id") .build()); bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient); - bulkRetryStrategy = new BulkRetryStrategy(this, - bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()), + bulkRetryStrategy = new BulkRetryStrategy(bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()), this::logFailureForBulkRequests, pluginMetrics, maxRetries, From 211ac55160cf72ce2ea47445f47dd2ca3b7f0b1d Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 2 Nov 2023 15:50:07 +0000 Subject: [PATCH 4/9] Fixed build failure in KafkaSourceJsonTypeIT Signed-off-by: Krishna Kondaka --- .../plugins/kafka/source/KafkaSourceJsonTypeIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 17ab0d85bb..6cd7a5215f 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -42,7 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -107,7 +107,7 @@ public void setup() throws Throwable { buffer = mock(Buffer.class); encryptionConfig = mock(EncryptionConfig.class); receivedRecords = new ArrayList<>(); - ExecutorService executor = Executors.newFixedThreadPool(2); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); pipelineDescription = mock(PipelineDescription.class); when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false); From e8455cfc1a591af578443a5d3a8d254ff6e36a75 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 2 Nov 2023 19:21:26 +0000 Subject: [PATCH 5/9] Fix sqs worker integration test failure Signed-off-by: Krishna Kondaka --- .../dataprepper/plugins/source/s3/SqsWorkerIT.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java index 49dfa38ae7..58a4be9182 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java @@ -312,7 +312,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO lenient().doAnswer((val) -> { if (val.getArgument(0) != null) { - delCount.getAndAdd((int)(double)val.getArgument(0)); + deletedCount += (double)val.getArgument(0); } return null; }).when(deletedCounter).increment(any(Double.class)); @@ -360,7 +360,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO await().atMost(Duration.ofSeconds(20)) .untilAsserted(() -> { assertThat(visibilityTimeoutChangedCount, greaterThanOrEqualTo((double)numberOfObjectsToWrite)); - assertThat(delCount.get(), equalTo(numberOfObjectsToWrite)); + assertThat(deletedCount, equalTo((double)numberOfObjectsToWrite)); assertThat(ackCallbackCount, equalTo((double)numberOfObjectsToWrite)); }); } @@ -387,7 +387,6 @@ void processSqsMessages_with_acks_and_progress_check_callbacks_expires(final int lenient().doAnswer((val) -> { if (val.getArgument(0) != null) { deletedCount += (double)val.getArgument(0); - delCount.getAndAdd((int)(double)val.getArgument(0)); } return null; }).when(deletedCounter).increment(any(Double.class)); @@ -429,7 +428,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks_expires(final int await().atMost(Duration.ofSeconds(10)) .untilAsserted(() -> { assertThat(visibilityTimeoutChangedCount, greaterThanOrEqualTo((double)numberOfObjectsToWrite)); - assertThat(delCount.get(), equalTo(0)); + assertThat(deletedCount, equalTo(0.0)); assertThat(ackCallbackCount, equalTo(0.0)); }); From 740cb4b4540e455f569379b00fe5fe8ff3933074 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 3 Nov 2023 19:06:48 +0000 Subject: [PATCH 6/9] Addressed review comments Signed-off-by: Krishna Kondaka --- .../model/acknowledgements/AcknowledgementSet.java | 2 +- .../acknowledgements/DefaultAcknowledgementSet.java | 7 ++++--- .../DefaultAcknowledgementSetManagerTests.java | 4 ++-- .../acknowledgements/DefaultAcknowledgementSetTests.java | 4 ++-- .../dataprepper/plugins/source/s3/SqsWorkerIT.java | 4 ++-- .../plugins/source/s3/configuration/SqsOptions.java | 2 +- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java index 7ce038fdb8..efd36e123d 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java @@ -72,5 +72,5 @@ public interface AcknowledgementSet { * @param progressCheckInterval frequency of invocation of progress check callback * @since 2.6 */ - public void addProgressCheck(final Consumer progressCheckCallback, final Duration progressCheckInterval); + public void addProgressCheck(final Consumer progressCheckCallback, final Duration progressCheckInterval); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index 4445256cb3..2ee9cc6ba8 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.acknowledgements; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.InternalEventHandle; @@ -29,7 +30,7 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet { private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class); private final Consumer callback; - private Consumer progressCheckCallback; + private Consumer progressCheckCallback; private final Instant expiryTime; private final ScheduledExecutorService scheduledExecutor; // This lock protects all the non-final members @@ -59,7 +60,7 @@ public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecuto lock = new ReentrantLock(true); } - public void addProgressCheck(final Consumer progressCheckCallback, final Duration progressCheckInterval) { + public void addProgressCheck(final Consumer progressCheckCallback, final Duration progressCheckInterval) { this.progressCheckCallback = progressCheckCallback; this.progressCheckFuture = scheduledExecutor.scheduleAtFixedRate(this::checkProgress, 0L, progressCheckInterval.toMillis(), TimeUnit.MILLISECONDS); } @@ -69,7 +70,7 @@ public void checkProgress() { int numberOfEventsPending = pendingAcknowledgments.size(); lock.unlock(); if (progressCheckCallback != null) { - progressCheckCallback.accept((double)numberOfEventsPending/totalEventsAdded.get()); + progressCheckCallback.accept(new DefaultProgressCheck((double)numberOfEventsPending/totalEventsAdded.get())); } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java index 6cdd77a39b..1b87d6c849 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java @@ -135,7 +135,7 @@ void testWithProgressCheckCallbacks() { lenient().when(event6.getEventHandle()).thenReturn(eventHandle6); AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofMillis(10000)); - acknowledgementSet2.addProgressCheck((ratio) -> {currentRatio = ratio;}, Duration.ofSeconds(1)); + acknowledgementSet2.addProgressCheck((progressCheck) -> {currentRatio = progressCheck.getRatio();}, Duration.ofSeconds(1)); acknowledgementSet2.add(event3); acknowledgementSet2.add(event4); acknowledgementSet2.add(event5); @@ -186,7 +186,7 @@ void testWithProgressCheckCallbacks_AcksExpire() { lenient().when(event6.getEventHandle()).thenReturn(eventHandle6); AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofSeconds(10)); - acknowledgementSet2.addProgressCheck((ratio) -> {currentRatio = ratio;}, Duration.ofSeconds(1)); + acknowledgementSet2.addProgressCheck((progressCheck) -> {currentRatio = progressCheck.getRatio();}, Duration.ofSeconds(1)); acknowledgementSet2.add(event3); acknowledgementSet2.add(event4); acknowledgementSet2.add(event5); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java index 660e6f84d1..5deba46be8 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java @@ -244,8 +244,8 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception { } ); defaultAcknowledgementSet.addProgressCheck( - (ratio) -> { - currentRatio = ratio; + (progressCheck) -> { + currentRatio = progressCheck.getRatio(); }, Duration.ofSeconds(1) ); diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java index 58a4be9182..2987158bed 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java @@ -261,7 +261,7 @@ void processSqsMessages_should_return_at_least_one_message_with_acks_with_callba this.notify(); } try { - Thread.sleep(4000); + Thread.sleep(2000); } catch (Exception e){} return null; @@ -339,7 +339,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO events.add(event); } try { - Thread.sleep(4000); + Thread.sleep(2000); } catch (Exception e) {} return null; }).when(s3Service).addS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class)); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java index 50ee22c7ff..6c34c715db 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java @@ -39,7 +39,7 @@ public class SqsOptions { @JsonProperty("extend_visibility_timeout") private Boolean extendVisibilityTimeout = DEFAULT_EXTEND_VISIBILITY_TIMEOUT; - @JsonProperty("max_visibility_timeout_extesion") + @JsonProperty("max_visibility_timeout_extension") @DurationMin(seconds = 30) @DurationMax(seconds = 3600) private Duration maxVisibilityTimeoutExtension = DEFAULT_MAX_VISIBILITY_TIMEOUT_EXTENSION; From d3bfac401bf3c13dc4ff711438df07195cec04ff Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 3 Nov 2023 19:26:44 +0000 Subject: [PATCH 7/9] Added Progress Check interface and test files Signed-off-by: Krishna Kondaka --- .../model/acknowledgements/ProgressCheck.java | 17 +++++++++++++++ .../DefaultProgressCheck.java | 21 +++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ProgressCheck.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultProgressCheck.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ProgressCheck.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ProgressCheck.java new file mode 100644 index 0000000000..07a2f18c03 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ProgressCheck.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.acknowledgements; + +public interface ProgressCheck { + /** + * Returns the pending ratio + * + * @return returns the ratio of pending to the total acknowledgements + * @since 2.6 + */ + Double getRatio(); +} + diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultProgressCheck.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultProgressCheck.java new file mode 100644 index 0000000000..87b7a8226d --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultProgressCheck.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.acknowledgements; + +import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; + +public class DefaultProgressCheck implements ProgressCheck { + double ratio; + + public DefaultProgressCheck(double ratio) { + this.ratio = ratio; + } + + @Override + public Double getRatio() { + return ratio; + } +} From cf6e8f84defe908af3e6d2801c7778f28becb404 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 3 Nov 2023 20:47:38 +0000 Subject: [PATCH 8/9] Addressed review comments Signed-off-by: Krishna Kondaka --- .../opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java index 2987158bed..60d48c97d3 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.ArrayList; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; From 1b8fad62cd233ae9b4fc93807060a93bbac507b0 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 3 Nov 2023 21:20:25 +0000 Subject: [PATCH 9/9] Addressed review comments Signed-off-by: Krishna Kondaka --- .../dataprepper/plugins/source/s3/SqsWorkerIT.java | 4 ++-- .../dataprepper/plugins/source/s3/SqsWorker.java | 8 ++++---- .../plugins/source/s3/configuration/SqsOptions.java | 10 +++++----- .../dataprepper/plugins/source/s3/SqsWorkerTest.java | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java index 60d48c97d3..dc09cd989c 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java @@ -345,7 +345,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60)); - when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); final SqsWorker objectUnderTest = createObjectUnderTest(); final int sqsMessagesProcessed = objectUnderTest.processSqsMessages(); @@ -420,7 +420,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks_expires(final int ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60)); - when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); final SqsWorker objectUnderTest = createObjectUnderTest(); final int sqsMessagesProcessed = objectUnderTest.processSqsMessages(); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index d02d8f214c..a0f7a0bb16 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -239,15 +239,15 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { final int progressCheckInterval = visibilityTimeout/2 - 1; if (endToEndAcknowledgementsEnabled) { int expiryTimeout = visibilityTimeout - 2; - final boolean extendedVisibilityTimeoutEnabled = sqsOptions.getExtendVisibilityTimeout(); - if (extendedVisibilityTimeoutEnabled) { + final boolean visibilityDuplicateProtectionEnabled = sqsOptions.getVisibilityDuplicateProtection(); + if (visibilityDuplicateProtectionEnabled) { expiryTimeout = maxVisibilityTimeout; } acknowledgementSet = acknowledgementSetManager.create( (result) -> { acknowledgementSetCallbackCounter.increment(); // Delete only if this is positive acknowledgement - if (extendedVisibilityTimeoutEnabled) { + if (visibilityDuplicateProtectionEnabled) { parsedMessageVisibilityTimesMap.remove(parsedMessage); } if (result == true) { @@ -255,7 +255,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } }, Duration.ofSeconds(expiryTimeout)); - if (extendedVisibilityTimeoutEnabled) { + if (visibilityDuplicateProtectionEnabled) { acknowledgementSet.addProgressCheck( (ratio) -> { final int newVisibilityTimeoutSeconds = visibilityTimeout; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java index 6c34c715db..c4acd3abfd 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java @@ -16,7 +16,7 @@ public class SqsOptions { private static final int DEFAULT_MAXIMUM_MESSAGES = 10; - private static final Boolean DEFAULT_EXTEND_VISIBILITY_TIMEOUT = false; + private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false; private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30); private static final Duration DEFAULT_MAX_VISIBILITY_TIMEOUT_EXTENSION = Duration.ofSeconds(1800); // 30 minutes private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20); @@ -36,8 +36,8 @@ public class SqsOptions { @DurationMax(seconds = 43200) private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS; - @JsonProperty("extend_visibility_timeout") - private Boolean extendVisibilityTimeout = DEFAULT_EXTEND_VISIBILITY_TIMEOUT; + @JsonProperty("visibility_duplication_protection") + private Boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION; @JsonProperty("max_visibility_timeout_extension") @DurationMin(seconds = 30) @@ -69,8 +69,8 @@ public Duration getMaxVisibilityTimeoutExtension() { return maxVisibilityTimeoutExtension; } - public Boolean getExtendVisibilityTimeout() { - return extendVisibilityTimeout; + public Boolean getVisibilityDuplicateProtection() { + return visibilityDuplicateProtection; } public Duration getWaitTime() { diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java index dcab3184c5..9fc800eac0 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java @@ -221,7 +221,7 @@ void processSqsMessages_should_return_number_of_messages_processed_with_acknowle @ParameterizedTest @ValueSource(strings = {"ObjectCreated:Put", "ObjectCreated:Post", "ObjectCreated:Copy", "ObjectCreated:CompleteMultipartUpload"}) void processSqsMessages_should_return_number_of_messages_processed_with_acknowledgements_and_progress_check(final String eventName) throws IOException { - when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true); when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); when(s3SourceConfig.getAcknowledgements()).thenReturn(true);