Skip to content

Commit

Permalink
Fix build failures
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 28, 2023
1 parent 9449f24 commit b150cce
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import static org.opensearch.dataprepper.plugins.InMemorySource.ACK_EXPIRY_TIME;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.Assert.assertFalse;
import org.slf4j.Logger;
Expand All @@ -38,6 +39,7 @@ class PipelinesWithAcksIT {
private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test-multi-sink.yaml";
private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-three-sinks.yaml";
private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-ack-expiry-test.yaml";
private static final long WAIT_TIME_MS = ACK_EXPIRY_TIME.minusMillis(5000L).toMillis();
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;
Expand Down Expand Up @@ -65,7 +67,7 @@ void simple_pipeline_with_single_record() {
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -81,7 +83,7 @@ void simple_pipeline_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -96,7 +98,7 @@ void two_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -111,7 +113,7 @@ void three_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -126,7 +128,7 @@ void three_pipelines_with_route_and_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -141,7 +143,7 @@ void two_parallel_pipelines_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -156,7 +158,7 @@ void three_pipelines_multi_sink_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -171,7 +173,7 @@ void one_pipeline_three_sinks_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -186,7 +188,7 @@ void one_pipeline_ack_expiry_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -202,7 +204,7 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() {
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySinkAccessor.setResult(false);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
@DataPrepperPlugin(name = "in_memory", pluginType = Source.class, pluginConfigurationType = InMemoryConfig.class)
public class InMemorySource implements Source<Record<Event>> {
public static final Duration ACK_EXPIRY_TIME = Duration.ofSeconds(35);
private static final Logger LOG = LoggerFactory.getLogger(InMemorySource.class);

private final String testingKey;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void run() {
{
inMemorySourceAccessor.setAckReceived(result);
},
Duration.ofSeconds(15));
ACK_EXPIRY_TIME);
records.stream().forEach((record) -> { ackSet.add(record.getData()); });
ackSet.complete();
writeToBuffer(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void aggregateWithNoConcludingGroupsReturnsExpectedResult() throws InterruptedEx
});
}

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
assertThat(aggregatedResult.size(), equalTo(NUM_UNIQUE_EVENTS_PER_BATCH));
Expand Down Expand Up @@ -198,7 +198,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted
});
}

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
assertThat(aggregatedResult.size(), equalTo(NUM_UNIQUE_EVENTS_PER_BATCH));
Expand Down Expand Up @@ -249,7 +249,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException {
});
}

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
assertThat(aggregatedResult.size(), equalTo(NUM_UNIQUE_EVENTS_PER_BATCH/2));
Expand Down Expand Up @@ -285,10 +285,10 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc
});
}

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
assertThat((double)allowedEventsCount.get(), closeTo(NUM_THREADS * NUM_EVENTS_PER_BATCH * testPercent/100, 1.0));
assertThat((double)allowedEventsCount.get(), closeTo(NUM_THREADS * NUM_EVENTS_PER_BATCH * testPercent/100, 5.0));
}

@RepeatedTest(value = 2)
Expand Down Expand Up @@ -320,7 +320,7 @@ void aggregateWithRateLimiterAction() throws InterruptedException {
});
}

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
// Expect less number of events to be received, because of rate limiting
Expand Down Expand Up @@ -355,7 +355,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException {
});
}

boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);

assertThat(allThreadsFinished, equalTo(true));
// Expect all events to be received even with rate limiting because no events are dropped
Expand Down Expand Up @@ -385,7 +385,7 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));

Collection<Record<Event>> results = objectUnderTest.doExecute(new ArrayList<Record<Event>>());
Expand Down Expand Up @@ -431,7 +431,7 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));

Collection<Record<Event>> results = objectUnderTest.doExecute(new ArrayList<Record<Event>>());
Expand Down Expand Up @@ -490,7 +490,7 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));

Collection<Record<Event>> results = objectUnderTest.doExecute(new ArrayList<Record<Event>>());
Expand Down Expand Up @@ -544,7 +544,7 @@ void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedExc
});
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
boolean allThreadsFinished = countDownLatch.await(20L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));
List<Event> errorEventList = eventBatch.stream().map(Record::getData).filter(event -> {
Event ev = ((Event)event);
Expand Down

0 comments on commit b150cce

Please sign in to comment.