Skip to content

Commit

Permalink
Fixes for test failures
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 27, 2023
1 parent 915e84d commit 8d4787b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class PipelinesWithAcksIT {
private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test-multi-sink.yaml";
private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-three-sinks.yaml";
private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-ack-expiry-test.yaml";
private static final int WAIT_TIME = 20000;
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;
Expand Down Expand Up @@ -65,7 +66,7 @@ void simple_pipeline_with_single_record() {
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -81,7 +82,7 @@ void simple_pipeline_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -96,7 +97,7 @@ void two_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -111,7 +112,7 @@ void three_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -126,7 +127,7 @@ void three_pipelines_with_route_and_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -141,7 +142,7 @@ void two_parallel_pipelines_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -156,7 +157,7 @@ void three_pipelines_multi_sink_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -171,7 +172,7 @@ void one_pipeline_three_sinks_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -186,7 +187,7 @@ void one_pipeline_ack_expiry_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -202,7 +203,7 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() {
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySinkAccessor.setResult(false);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void run() {
{
inMemorySourceAccessor.setAckReceived(result);
},
Duration.ofSeconds(15));
Duration.ofSeconds(25));
records.stream().forEach((record) -> { ackSet.add(record.getData()); });
ackSet.complete();
writeToBuffer(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted
final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS);

objectUnderTest.doExecute(eventBatch);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000);

for (int i = 0; i < NUM_THREADS; i++) {
executorService.execute(() -> {
Expand Down Expand Up @@ -274,7 +274,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc
final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS);

objectUnderTest.doExecute(eventBatch);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000);
AtomicInteger allowedEventsCount = new AtomicInteger(0);

for (int i = 0; i < NUM_THREADS; i++) {
Expand All @@ -285,7 +285,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc
});
}

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
assertThat((double)allowedEventsCount.get(), closeTo(NUM_THREADS * NUM_EVENTS_PER_BATCH * testPercent/100, 1.0));
Expand Down Expand Up @@ -355,7 +355,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException {
});
}

boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
// Expect all events to be received even with rate limiting because no events are dropped
Expand Down Expand Up @@ -429,12 +429,13 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio
countDownLatch.countDown();
});
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000);

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));

Collection<Record<Event>> results = objectUnderTest.doExecute(new ArrayList<Record<Event>>());
System.out.println("======"+results);
assertThat(results.size(), equalTo(1));

Map<String, Object> expectedEventMap = new HashMap<>(getEventMap(testValue));
Expand Down Expand Up @@ -488,7 +489,7 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch
countDownLatch.countDown();
});
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000);

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));
Expand Down

0 comments on commit 8d4787b

Please sign in to comment.