Skip to content

Commit

Permalink
Fixed test errors by adding await
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Dec 18, 2024
1 parent 98d0644 commit 7cb44d2
Showing 1 changed file with 77 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;

import static org.awaitility.Awaitility.await;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class AggregateProcessorITWithAcks {
private static final int GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE = 2;
private static final int NUM_UNIQUE_EVENTS_PER_BATCH = 8;
private static final int NUM_EVENTS_PER_BATCH = 5;
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);

@Mock
private Pipeline pipeline;
Expand Down Expand Up @@ -205,9 +207,12 @@ public void testHistogramAggregation() throws Exception {

processWorker.run();
}
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}

@Test
Expand All @@ -232,10 +237,13 @@ public void testPercentSamplerAggregation() throws Exception {

processWorker.run();
}
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), greaterThanOrEqualTo(1));
assertThat(aggregatedResults.size(), lessThan(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), greaterThanOrEqualTo(1));
assertThat(aggregatedResults.size(), lessThan(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}


Expand All @@ -260,9 +268,12 @@ public void testPutAllAggregation() throws Exception {

processWorker.run();
}
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}


Expand All @@ -288,9 +299,12 @@ public void testRateLimiterDropAggregation() throws Exception {

processWorker.run();
}
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}

@Test
Expand All @@ -311,9 +325,12 @@ public void testRemoveDuplicatesAggregation() {

processWorker.run();
}
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}

@Test
Expand All @@ -338,9 +355,12 @@ public void testRateLimiterNoDropAggregation() throws Exception {

processWorker.run();
}
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}


Expand Down Expand Up @@ -391,9 +411,12 @@ public void testRateLimiterNoDropAggregationWithMultipleAcknowledgementSets() th

processWorker.run();
}
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}


Expand Down Expand Up @@ -443,11 +466,14 @@ public void testCountAggregationWithMultipleAcknowledgementSets() throws Excepti

processWorker.run();
}
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet2).isDone());
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet3).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet2).isDone());
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet3).isDone());
});
}

@Test
Expand All @@ -471,9 +497,12 @@ public void testCountAggregation() throws Exception {

processWorker.run();
}
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}

@Test
Expand Down Expand Up @@ -501,9 +530,12 @@ public void testTailSamplerAggregationWithNoErrors() throws Exception {

processWorker.run();
}
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}


Expand Down Expand Up @@ -533,9 +565,12 @@ public void testTailSamplerAggregation() throws Exception {

processWorker.run();
}
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertFalse(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(5));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}

@Test
Expand All @@ -558,9 +593,12 @@ public void testAppendAggregation() throws Exception {

processWorker.run();
}
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
await().atMost(TEST_TIMEOUT)
.untilAsserted(() -> {
assertTrue(aggregatedResultReceived);
assertThat(aggregatedResults.size(), equalTo(1));
assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone());
});
}


Expand Down

0 comments on commit 7cb44d2

Please sign in to comment.