From 8d4787b13cfd5af3983efe777a105598a3124589 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Sun, 26 Nov 2023 21:51:09 -0800 Subject: [PATCH] Fixes for test failures Signed-off-by: Kondaka --- .../integration/PipelinesWithAcksIT.java | 21 ++++++++++--------- .../dataprepper/plugins/InMemorySource.java | 2 +- .../aggregate/AggregateProcessorIT.java | 13 ++++++------ 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index b8b12cb56b..00ef47632e 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -38,6 +38,7 @@ class PipelinesWithAcksIT { private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test-multi-sink.yaml"; private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-three-sinks.yaml"; private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-ack-expiry-test.yaml"; + private static final int WAIT_TIME = 20000; private DataPrepperTestRunner dataPrepperTestRunner; private InMemorySourceAccessor inMemorySourceAccessor; private InMemorySinkAccessor inMemorySinkAccessor; @@ -65,7 +66,7 @@ void simple_pipeline_with_single_record() { final int numRecords = 1; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -81,7 +82,7 @@ void simple_pipeline_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -96,7 +97,7 @@ void two_pipelines_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -111,7 +112,7 @@ void three_pipelines_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -126,7 +127,7 @@ void three_pipelines_with_route_and_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -141,7 +142,7 @@ void two_parallel_pipelines_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -156,7 +157,7 @@ void three_pipelines_multi_sink_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -171,7 +172,7 @@ void one_pipeline_three_sinks_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -186,7 +187,7 @@ void one_pipeline_ack_expiry_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -202,7 +203,7 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() { inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); inMemorySinkAccessor.setResult(false); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java index 3afd17554c..9d465a31e9 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java @@ -123,7 +123,7 @@ public void run() { { inMemorySourceAccessor.setAckReceived(result); }, - Duration.ofSeconds(15)); + Duration.ofSeconds(25)); records.stream().forEach((record) -> { ackSet.add(record.getData()); }); ackSet.complete(); writeToBuffer(records); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java index 9e68187255..444cd993e4 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java @@ -185,7 +185,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -274,7 +274,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); AtomicInteger allowedEventsCount = new AtomicInteger(0); for (int i = 0; i < NUM_THREADS; i++) { @@ -285,7 +285,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc }); } - boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); assertThat((double)allowedEventsCount.get(), closeTo(NUM_THREADS * NUM_EVENTS_PER_BATCH * testPercent/100, 1.0)); @@ -355,7 +355,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { }); } - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); // Expect all events to be received even with rate limiting because no events are dropped @@ -429,12 +429,13 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); Collection> results = objectUnderTest.doExecute(new ArrayList>()); + System.out.println("======"+results); assertThat(results.size(), equalTo(1)); Map expectedEventMap = new HashMap<>(getEventMap(testValue)); @@ -488,7 +489,7 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true));