diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java index 7c325a18b6..994176b7ce 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java @@ -19,9 +19,9 @@ class ReadBufferHelper { static Map.Entry>, CheckpointState> awaitRead(final KafkaBuffer objectUnderTest) { final Map.Entry>, CheckpointState>[] lastReadResult = new Map.Entry[1]; await() - .atMost(Duration.ofSeconds(30)) + .atMost(Duration.ofSeconds(45)) .until(() -> { - lastReadResult[0] = objectUnderTest.read(500); + lastReadResult[0] = objectUnderTest.read(2000); return lastReadResult[0] != null && lastReadResult[0].getKey() != null && lastReadResult[0].getKey().size() >= 1; }); return lastReadResult[0]; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java index a97b68d0f3..a049c21774 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java @@ -27,7 +27,7 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig static final boolean DEFAULT_AUTO_COMMIT = false; static final ByteCount DEFAULT_FETCH_MAX_BYTES = ByteCount.parse("50mb"); static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(1000); - static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("2kb"); + static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("64kb"); static final ByteCount DEFAULT_MAX_PARTITION_FETCH_BYTES = ByteCount.parse("1mb"); static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";