diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java index ae09e953ff..1681563675 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java @@ -21,6 +21,7 @@ */ public class ByteCount { private static final Pattern BYTE_PATTERN = Pattern.compile("^(?\\d+\\.?\\d*)(?[a-z]+)?\\z"); + private static final ByteCount ZERO_BYTES = new ByteCount(0); private final long bytes; private ByteCount(final long bytes) { @@ -94,6 +95,10 @@ public static ByteCount parse(final String string) { return new ByteCount(byteCount.longValue()); } + public static ByteCount zeroBytes() { + return ZERO_BYTES; + } + private static BigDecimal scaleToBytes(final BigDecimal value, final Unit unit) { return value.multiply(BigDecimal.valueOf(unit.multiplier)); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java index eda34eae69..b717289a7f 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.model.types; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; @@ -13,6 +14,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertThrows; class ByteCountTest { @@ -145,4 +147,16 @@ void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byte assertThat(byteCount, notNullValue()); assertThat(byteCount.getBytes(), equalTo(expectedBytes)); } + + @Test + void zeroBytes_returns_bytes_with_getBytes_equal_to_0() { + assertThat(ByteCount.zeroBytes(), notNullValue()); + assertThat(ByteCount.zeroBytes().getBytes(), equalTo(0L)); + } + + @Test + void zeroBytes_returns_same_instance() { + assertThat(ByteCount.zeroBytes(), notNullValue()); + assertThat(ByteCount.zeroBytes(), sameInstance(ByteCount.zeroBytes())); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 4de082d415..9f2a782ddf 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.buffer; import org.apache.commons.lang3.RandomStringUtils; @@ -15,8 +20,6 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +53,7 @@ public class KafkaBufferIT { @Mock private AcknowledgementSetManager acknowledgementSetManager; @Mock - private TopicConfig topicConfig; + private BufferTopicConfig topicConfig; private PluginMetrics pluginMetrics; private String bootstrapServersCommaDelimited; @@ -66,7 +69,7 @@ void setUp() { String topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5); when(topicConfig.getName()).thenReturn(topicName); when(topicConfig.getGroupId()).thenReturn("buffergroup-" + RandomStringUtils.randomAlphabetic(6)); - when(topicConfig.isCreate()).thenReturn(true); + when(topicConfig.isCreateTopic()).thenReturn(true); when(topicConfig.getSerdeFormat()).thenReturn(messageFormat); when(topicConfig.getWorkers()).thenReturn(1); when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java index 3b75384581..da901ed241 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java @@ -36,14 +36,12 @@ import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -71,7 +69,7 @@ public class KafkaSinkAvroTypeIT { private KafkaSinkConfig kafkaSinkConfig; @Mock - private TopicConfig topicConfig; + private TopicProducerConfig topicConfig; private KafkaSink kafkaSink; @@ -145,18 +143,10 @@ public void setup() throws RestClientException, IOException { when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.AVRO.toString()); when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}"); - final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5); - topicConfig = mock(TopicConfig.class); + topicConfig = mock(TopicProducerConfig.class); when(topicConfig.getName()).thenReturn(testTopic); - when(topicConfig.getGroupId()).thenReturn(testGroup); - when(topicConfig.getWorkers()).thenReturn(1); - when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(45)); - when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); - when(topicConfig.getAutoCommit()).thenReturn(false); - when(topicConfig.getAutoOffsetReset()).thenReturn("earliest"); - when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); @@ -171,8 +161,7 @@ public void TestPollRecordsAvroSASLPlainText() throws Exception { configureJasConfForSASLPlainText(); final int numRecords = 1; - when(topicConfig.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(topicConfig.isCreate()).thenReturn(false); + when(topicConfig.isCreateTopic()).thenReturn(false); when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig); @@ -250,16 +239,8 @@ private void deleteTopic(AtomicBoolean created, String topicName) throws Interru } private void consumeTestMessages(List> recList) { - - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - topicConfig.getCommitInterval().toSecondsPart()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - topicConfig.getAutoOffsetReset()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - topicConfig.getAutoCommit()); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - topicConfig.getConsumerMaxPollRecords()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); + final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); + props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java index 3d825925ad..bf2d1cb3df 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java @@ -34,12 +34,10 @@ import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -63,7 +61,7 @@ public class KafkaSinkJsonTypeIT { private KafkaSinkConfig kafkaSinkConfig; @Mock - private TopicConfig topicConfig; + private TopicProducerConfig topicConfig; private KafkaSink kafkaSink; @@ -123,18 +121,10 @@ public void setup() { when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON.toString()); when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}"); - final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5); - topicConfig = mock(TopicConfig.class); + topicConfig = mock(TopicProducerConfig.class); when(topicConfig.getName()).thenReturn(testTopic); - when(topicConfig.getGroupId()).thenReturn(testGroup); - when(topicConfig.getWorkers()).thenReturn(1); - when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(45)); - when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); - when(topicConfig.getAutoCommit()).thenReturn(false); - when(topicConfig.getAutoOffsetReset()).thenReturn("earliest"); - when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -145,8 +135,7 @@ public void TestPollRecordsJsonSASLPlainText() throws Exception { configureJasConfForSASLPlainText(); final int numRecords = 1; - when(topicConfig.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(topicConfig.isCreate()).thenReturn(false); + when(topicConfig.isCreateTopic()).thenReturn(false); when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig); kafkaSink = createObjectUnderTest(); @@ -223,16 +212,8 @@ private void configureJasConfForSASLPlainText() { } private void consumeTestMessages(List> recList) { - - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - topicConfig.getCommitInterval().toSecondsPart()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - topicConfig.getAutoOffsetReset()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - topicConfig.getAutoCommit()); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - topicConfig.getConsumerMaxPollRecords()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); + final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); + props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java index cd7ac9526f..c00304d4e8 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java @@ -32,12 +32,10 @@ import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -61,7 +59,7 @@ public class KafkaSinkPlainTextTypeIT { private KafkaSinkConfig kafkaSinkConfig; @Mock - private TopicConfig topicConfig; + private TopicProducerConfig topicConfig; private KafkaSink kafkaSink; @@ -121,18 +119,10 @@ public void setup() { when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT.toString()); when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}"); - final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5); - topicConfig = mock(TopicConfig.class); + topicConfig = mock(TopicProducerConfig.class); when(topicConfig.getName()).thenReturn(testTopic); - when(topicConfig.getGroupId()).thenReturn(testGroup); - when(topicConfig.getWorkers()).thenReturn(1); - when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(45)); - when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); - when(topicConfig.getAutoCommit()).thenReturn(false); - when(topicConfig.getAutoOffsetReset()).thenReturn("earliest"); - when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -144,8 +134,7 @@ public void TestPollRecordsPlainText() throws Exception { configureJasConfForSASLPlainText(); final int numRecords = 1; - when(topicConfig.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(topicConfig.isCreate()).thenReturn(false); + when(topicConfig.isCreateTopic()).thenReturn(false); when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig); kafkaSink = createObjectUnderTest(); @@ -221,16 +210,9 @@ private void deleteTopic(AtomicBoolean created, String topicName) throws Interru } private void consumeTestMessages(List> recList) { + final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - topicConfig.getCommitInterval().toSecondsPart()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - topicConfig.getAutoOffsetReset()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - topicConfig.getAutoCommit()); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - topicConfig.getConsumerMaxPollRecords()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerIT.java index 0b0c8ef85e..ea11ec22b6 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerIT.java @@ -5,49 +5,45 @@ package org.opensearch.dataprepper.plugins.kafka.source; +import io.micrometer.core.instrument.Counter; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; - -import static org.mockito.Mockito.when; -import org.mockito.Mock; -import static org.mockito.Mockito.mock; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.awaitility.Awaitility.await; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import io.micrometer.core.instrument.Counter; -import java.util.List; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; -import java.util.Iterator; - -import java.time.Duration; -import java.time.Instant; -import org.apache.kafka.common.errors.SerializationException; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ConfluentKafkaProducerConsumerIT { @Mock @@ -78,7 +74,7 @@ public class ConfluentKafkaProducerConsumerIT { private PlainTextAuthConfig plainTextAuthConfig; private KafkaSource kafkaSource; - private TopicConfig topicConfig; + private TopicConsumerConfig topicConfig; private Counter counter; private List receivedRecords; @@ -129,7 +125,7 @@ public void setup() { topicName = System.getProperty("tests.kafka.topic_name"); username = System.getProperty("tests.kafka.username"); password = System.getProperty("tests.kafka.password"); - topicConfig = mock(TopicConfig.class); + topicConfig = mock(TopicConsumerConfig.class); when(topicConfig.getName()).thenReturn(topicName); when(topicConfig.getGroupId()).thenReturn("testGroupConf"); when(topicConfig.getWorkers()).thenReturn(1); @@ -147,7 +143,7 @@ public void setup() { when(topicConfig.getFetchMaxWait()).thenReturn(500); when(topicConfig.getMaxPartitionFetchBytes()).thenReturn(1024L*1024); when(topicConfig.getReconnectBackoff()).thenReturn(Duration.ofSeconds(10)); - when(sourceConfig.getTopics()).thenReturn(List.of(topicConfig)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(topicConfig)); when(sourceConfig.getBootstrapServers()).thenReturn(List.of(bootstrapServers)); encryptionConfig = mock(EncryptionConfig.class); when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerWithSchemaRegistryIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerWithSchemaRegistryIT.java index e37bf67799..4ffc153c4e 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerWithSchemaRegistryIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerWithSchemaRegistryIT.java @@ -5,56 +5,52 @@ package org.opensearch.dataprepper.plugins.kafka.source; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.micrometer.core.instrument.Counter; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.apache.avro.Schema; - -import static org.mockito.Mockito.when; -import org.mockito.Mock; -import static org.mockito.Mockito.mock; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.awaitility.Awaitility.await; -import org.apache.commons.lang3.RandomStringUtils; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import io.micrometer.core.instrument.Counter; -import java.util.List; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import java.time.Duration; - -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericData; -import org.apache.kafka.common.errors.SerializationException; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import com.fasterxml.jackson.annotation.JsonProperty; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ConfluentKafkaProducerConsumerWithSchemaRegistryIT { public static class AvroRecord { @@ -131,8 +127,8 @@ public UserRecord(String name, Integer id, Number value) { private PlainTextAuthConfig plainTextAuthConfig; private KafkaSource kafkaSource; - private TopicConfig jsonTopicConfig; - private TopicConfig avroTopicConfig; + private TopicConsumerConfig jsonTopicConfig; + private TopicConsumerConfig avroTopicConfig; private Counter counter; private List receivedRecords; @@ -196,7 +192,7 @@ public void setup() { username = System.getProperty("tests.kafka.username"); password = System.getProperty("tests.kafka.password"); - jsonTopicConfig = mock(TopicConfig.class); + jsonTopicConfig = mock(TopicConsumerConfig.class); jsonTopicName = System.getProperty("tests.kafka.json_topic_name"); when(jsonTopicConfig.getName()).thenReturn(jsonTopicName); when(jsonTopicConfig.getGroupId()).thenReturn("testGroupConf"); @@ -210,7 +206,7 @@ public void setup() { when(jsonTopicConfig.getConsumerMaxPollRecords()).thenReturn(100); when(jsonTopicConfig.getMaxPollInterval()).thenReturn(Duration.ofSeconds(15)); - avroTopicConfig = mock(TopicConfig.class); + avroTopicConfig = mock(TopicConsumerConfig.class); avroTopicName = System.getProperty("tests.kafka.avro_topic_name"); when(avroTopicConfig.getName()).thenReturn(avroTopicName); when(avroTopicConfig.getGroupId()).thenReturn("testGroupConf"); @@ -229,14 +225,14 @@ public void setup() { when(jsonSourceConfig.getAuthConfig()).thenReturn(authConfig); when(jsonSourceConfig.getAcknowledgementsEnabled()).thenReturn(false); when(jsonSourceConfig.getSchemaConfig()).thenReturn(schemaConfig); - when(jsonSourceConfig.getTopics()).thenReturn(List.of(jsonTopicConfig)); + when(jsonSourceConfig.getTopics()).thenReturn((List) List.of(jsonTopicConfig)); when(jsonSourceConfig.getBootstrapServers()).thenReturn(List.of(bootstrapServers)); when(jsonSourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); when(avroSourceConfig.getAuthConfig()).thenReturn(authConfig); when(avroSourceConfig.getAcknowledgementsEnabled()).thenReturn(false); when(avroSourceConfig.getSchemaConfig()).thenReturn(schemaConfig); - when(avroSourceConfig.getTopics()).thenReturn(List.of(avroTopicConfig)); + when(avroSourceConfig.getTopics()).thenReturn((List) List.of(avroTopicConfig)); when(avroSourceConfig.getBootstrapServers()).thenReturn(List.of(bootstrapServers)); when(avroSourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java index cc777b25df..7f9c30f58f 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java @@ -24,18 +24,15 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; import java.io.IOException; import java.io.Reader; import java.io.StringReader; -import java.util.List; import java.util.Map; import java.util.Properties; @@ -44,8 +41,6 @@ public class JSONConsumerIT { private PluginMetrics pluginMetrics; @Mock - TopicConfig topicConfig; - @Mock private SchemaConfig schemaConfig; private KafkaSourceConfig kafkaSourceConfig; @@ -75,8 +70,6 @@ public void configure() throws IOException { String json = mapper.writeValueAsString(kafkaConfigMap); Reader reader = new StringReader(json); kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - List topicConfigList = kafkaSourceConfig.getTopics(); - topicConfig = topicConfigList.get(0); schemaConfig = kafkaSourceConfig.getSchemaConfig(); } } diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 36fd90a100..17ab0d85bb 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -25,10 +25,10 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; @@ -82,7 +82,7 @@ public class KafkaSourceJsonTypeIT { private KafkaClusterConfigSupplier kafkaClusterConfigSupplier; @Mock - private TopicConfig jsonTopic; + private TopicConsumerConfig jsonTopic; private KafkaSource kafkaSource; @@ -126,7 +126,7 @@ public void setup() throws Throwable { testKey = RandomStringUtils.randomAlphabetic(5); testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(6); testTopic = "TestJsonTopic_" + RandomStringUtils.randomAlphabetic(5); - jsonTopic = mock(TopicConfig.class); + jsonTopic = mock(TopicConsumerConfig.class); when(jsonTopic.getName()).thenReturn(testTopic); when(jsonTopic.getGroupId()).thenReturn(testGroup); when(jsonTopic.getWorkers()).thenReturn(1); @@ -182,7 +182,7 @@ public void TestJsonRecordsWithNullKey() throws Exception { when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.INCLUDE_AS_FIELD); - when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(jsonTopic)); when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); @@ -214,7 +214,7 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception { when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.DISCARD); - when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(jsonTopic)); when(sourceConfig.getAuthConfig()).thenReturn(null); when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(true); kafkaSource = createObjectUnderTest(); @@ -264,7 +264,7 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception { when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.DISCARD); - when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(jsonTopic)); when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); @@ -294,7 +294,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception { when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.INCLUDE_AS_FIELD); - when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(jsonTopic)); when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); @@ -325,7 +325,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception { when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.INCLUDE_AS_METADATA); - when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(jsonTopic)); when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java index 693c7dc8af..acc4ca5a46 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -23,9 +23,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; @@ -67,7 +67,7 @@ public class KafkaSourceMultipleAuthTypeIT { private List topicList; @Mock - private TopicConfig plainTextTopic; + private TopicConsumerConfig plainTextTopic; @Mock private AuthConfig authConfig; @@ -128,7 +128,7 @@ public void setup() { final String testGroup = "TestGroup_"+RandomStringUtils.randomAlphabetic(6); final String testTopic = "TestTopic_"+RandomStringUtils.randomAlphabetic(5); - plainTextTopic = mock(TopicConfig.class); + plainTextTopic = mock(TopicConsumerConfig.class); when(plainTextTopic.getName()).thenReturn(testTopic); when(plainTextTopic.getGroupId()).thenReturn(testGroup); when(plainTextTopic.getWorkers()).thenReturn(1); @@ -154,7 +154,7 @@ public void TestPlainTextWithNoAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() t final int numRecords = 1; when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(plainTextTopic)); when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); @@ -203,7 +203,7 @@ public void TestPlainTextWithAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() thr saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(plainTextTopic)); plainTextAuthConfig = mock(PlainTextAuthConfig.class); when(plainTextAuthConfig.getUsername()).thenReturn(kafkaUsername); when(plainTextAuthConfig.getPassword()).thenReturn(kafkaPassword); @@ -262,7 +262,7 @@ public void TestPlainTextWithNoAuthKafkaEncryptionWithNoAuthSchemaRegistry() thr when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL); when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(sslBootstrapServers)); - when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(plainTextTopic)); kafkaSource = createObjectUnderTest(); Properties props = new Properties(); @@ -318,7 +318,7 @@ public void TestPlainTextWithAuthKafkaEncryptionWithNoAuthSchemaRegistry() throw when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL); when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(saslsslBootstrapServers)); - when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(plainTextTopic)); kafkaSource = createObjectUnderTest(); Properties props = new Properties(); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java index 4091a66966..b7e680daae 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java @@ -36,7 +36,6 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.MskBrokerConnectionType; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; @@ -105,8 +104,8 @@ public class MskGlueRegistryMultiTypeIT { private KafkaClusterConfigSupplier kafkaClusterConfigSupplier; private KafkaSource kafkaSource; - private TopicConfig jsonTopic; - private TopicConfig avroTopic; + private SourceTopicConfig jsonTopic; + private SourceTopicConfig avroTopic; private Counter counter; @@ -162,8 +161,8 @@ public void setup() { final String testGroup = "TestGroup_"+RandomStringUtils.randomAlphabetic(6); final String testTopic = "TestTopic_"+RandomStringUtils.randomAlphabetic(5); - avroTopic = mock(TopicConfig.class); - jsonTopic = mock(TopicConfig.class); + avroTopic = mock(SourceTopicConfig.class); + jsonTopic = mock(SourceTopicConfig.class); when(avroTopic.getName()).thenReturn(testTopic); when(avroTopic.getGroupId()).thenReturn(testGroup); when(avroTopic.getWorkers()).thenReturn(1); @@ -201,7 +200,7 @@ public void TestJsonRecordConsumer() throws Exception { final int numRecords = 1; when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL); when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(jsonTopic)); when(sourceConfig.getAuthConfig()).thenReturn(authConfig); when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); when(saslAuthConfig.getAwsIamAuthConfig()).thenReturn(AwsIamAuthConfig.DEFAULT); @@ -268,7 +267,7 @@ public void TestAvroRecordConsumer() throws Exception { final int numRecords = 1; when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL); when(avroTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(sourceConfig.getTopics()).thenReturn(List.of(avroTopic)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(avroTopic)); when(sourceConfig.getAuthConfig()).thenReturn(authConfig); when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); when(saslAuthConfig.getAwsIamAuthConfig()).thenReturn(AwsIamAuthConfig.DEFAULT); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java index a5118e64c5..7936cbadcf 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java @@ -19,18 +19,15 @@ import org.mockito.Mock; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; import java.io.IOException; import java.io.Reader; import java.io.StringReader; -import java.util.List; import java.util.Map; import java.util.Properties; @@ -39,8 +36,6 @@ public class PlainTextConsumerIT { private PluginMetrics pluginMetrics; @Mock - TopicConfig topicConfig; - @Mock private SchemaConfig schemaConfig; private KafkaSourceConfig kafkaSourceConfig; @@ -70,8 +65,6 @@ public void configure() throws IOException { String json = mapper.writeValueAsString(kafkaConfigMap); Reader reader = new StringReader(json); kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - List topicConfigList = kafkaSourceConfig.getTopics(); - topicConfig = topicConfigList.get(0); schemaConfig = kafkaSourceConfig.getSchemaConfig(); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java new file mode 100644 index 0000000000..87c92975b6 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java @@ -0,0 +1,229 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.buffer; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.kafka.configuration.CommonTopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; +import org.opensearch.dataprepper.plugins.kafka.configuration.KmsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; + +import java.time.Duration; + +class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig, TopicConsumerConfig { + static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); + private static final Integer DEFAULT_NUM_OF_PARTITIONS = 1; + private static final Short DEFAULT_REPLICATION_FACTOR = 1; + private static final Long DEFAULT_RETENTION_PERIOD = 604800000L; + static final boolean DEFAULT_AUTO_COMMIT = false; + static final ByteCount DEFAULT_FETCH_MAX_BYTES = ByteCount.parse("50mb"); + static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(500); + static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("1b"); + static final ByteCount DEFAULT_MAX_PARTITION_FETCH_BYTES = ByteCount.parse("1mb"); + static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); + static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; + static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5); + static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300); + static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500; + static final Integer DEFAULT_NUM_OF_WORKERS = 2; + static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5); + + + @JsonProperty("encryption_key") + private String encryptionKey; + + @JsonProperty("kms") + private KmsConfig kmsConfig; + + @JsonProperty("commit_interval") + @Valid + @Size(min = 1) + private Duration commitInterval = DEFAULT_COMMIT_INTERVAL; + + @JsonProperty("number_of_partitions") + private Integer numberOfPartitions = DEFAULT_NUM_OF_PARTITIONS; + + @JsonProperty("replication_factor") + private Short replicationFactor = DEFAULT_REPLICATION_FACTOR; + + @JsonProperty("retention_period") + private Long retentionPeriod = DEFAULT_RETENTION_PERIOD; + + @JsonProperty("create_topic") + private boolean isCreateTopic = false; + + @JsonProperty("group_id") + @Valid + @Size(min = 1, max = 255, message = "size of group id should be between 1 and 255") + private String groupId; + + @JsonProperty("workers") + @Valid + @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") + private Integer workers = DEFAULT_NUM_OF_WORKERS; + + @JsonProperty("session_timeout") + @Valid + @Size(min = 1) + private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT; + + @JsonProperty("auto_offset_reset") + private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET; + + @JsonProperty("thread_waiting_time") + private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME; + + @JsonProperty("max_poll_interval") + private Duration maxPollInterval = DEFAULT_MAX_POLL_INTERVAL; + + @JsonProperty("consumer_max_poll_records") + private Integer consumerMaxPollRecords = DEFAULT_CONSUMER_MAX_POLL_RECORDS; + + @JsonProperty("heart_beat_interval") + @Valid + @Size(min = 1) + private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION; + + @JsonProperty("auto_commit") + private Boolean autoCommit = DEFAULT_AUTO_COMMIT; + + @JsonProperty("max_partition_fetch_bytes") + private ByteCount maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES; + + @JsonProperty("fetch_max_bytes") + private ByteCount fetchMaxBytes = DEFAULT_FETCH_MAX_BYTES; + + @JsonProperty("fetch_max_wait") + @Valid + private Duration fetchMaxWait = DEFAULT_FETCH_MAX_WAIT; + + @JsonProperty("fetch_min_bytes") + private ByteCount fetchMinBytes = DEFAULT_FETCH_MIN_BYTES; + + @Override + public MessageFormat getSerdeFormat() { + return MessageFormat.BYTES; + } + + @Override + public String getEncryptionKey() { + return encryptionKey; + } + + @Override + public KmsConfig getKmsConfig() { + return kmsConfig; + } + + @Override + public KafkaKeyMode getKafkaKeyMode() { + return KafkaKeyMode.DISCARD; + } + + @Override + public String getGroupId() { + return groupId; + } + + @Override + public Duration getCommitInterval() { + return commitInterval; + } + + + @Override + public Integer getNumberOfPartitions() { + return numberOfPartitions; + } + + @Override + public Short getReplicationFactor() { + return replicationFactor; + } + + @Override + public Long getRetentionPeriod() { + return retentionPeriod; + } + + @Override + public boolean isCreateTopic() { + return isCreateTopic; + } + + @Override + public Boolean getAutoCommit() { + return autoCommit; + } + + @Override + public Duration getSessionTimeOut() { + return sessionTimeOut; + } + + @Override + public String getAutoOffsetReset() { + return autoOffsetReset; + } + + @Override + public Duration getThreadWaitingTime() { + return threadWaitingTime; + } + + @Override + public Duration getMaxPollInterval() { + return maxPollInterval; + } + + @Override + public Integer getConsumerMaxPollRecords() { + return consumerMaxPollRecords; + } + + @Override + public Integer getWorkers() { + return workers; + } + + @Override + public Duration getHeartBeatInterval() { + return heartBeatInterval; + } + + @Override + public long getFetchMaxBytes() { + long value = fetchMaxBytes.getBytes(); + if (value < 1 || value > 50*1024*1024) { + throw new RuntimeException("Invalid Fetch Max Bytes"); + } + return value; + } + + @Override + public Integer getFetchMaxWait() { + return Math.toIntExact(fetchMaxWait.toMillis()); + } + + @Override + public long getFetchMinBytes() { + long value = fetchMinBytes.getBytes(); + if (value < 1) { + throw new RuntimeException("Invalid Fetch Min Bytes"); + } + return value; + } + + @Override + public long getMaxPartitionFetchBytes() { + return maxPartitionFetchBytes.getBytes(); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index b139fe5db9..86a934e972 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.buffer; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; @@ -14,7 +19,6 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferConfig.java similarity index 72% rename from data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java rename to data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferConfig.java index 1cab8d7133..0e0eea2837 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferConfig.java @@ -1,4 +1,9 @@ -package org.opensearch.dataprepper.plugins.kafka.configuration; +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.buffer; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; @@ -6,13 +11,21 @@ import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.Optional; -public class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig { +class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig { private static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30); @JsonProperty("bootstrap_servers") @@ -21,11 +34,7 @@ public class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConf @JsonProperty("topics") @NotNull @Size(min = 1, max = 1, message = "Only one topic currently supported for Kafka buffer") - private List topics; - - @JsonProperty("schema") - @Valid - private SchemaConfig schemaConfig; + private List topics; @Valid @JsonProperty("authentication") @@ -59,7 +68,7 @@ public AuthConfig getAuthConfig() { @Override public SchemaConfig getSchemaConfig() { - return schemaConfig; + return null; } @Override @@ -68,12 +77,12 @@ public String getSerdeFormat() { } @Override - public TopicConfig getTopic() { + public BufferTopicConfig getTopic() { return topics.get(0); } @Override - public List getTopics() { + public List getTopics() { return topics; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java index e7d38bf8ba..c4be81fe26 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java @@ -1,13 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.common; import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory; +import org.opensearch.dataprepper.plugins.kafka.configuration.CommonTopicConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import java.util.function.Supplier; /** - * Adapts a {@link TopicConfig} to a {@link KafkaDataConfig}. + * Adapts a {@link CommonTopicConfig} to a {@link KafkaDataConfig}. */ public class KafkaDataConfigAdapter implements KafkaDataConfig { private final KeyFactory keyFactory; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/CommonTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/CommonTopicConfig.java new file mode 100644 index 0000000000..c0ad938c80 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/CommonTopicConfig.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +import java.time.Duration; + +/** + * This class has topic configurations which are common to all Kafka plugins - source, buffer, and sink. + *

+ * Be sure to only add to this configuration if the setting is applicable for all three types. + */ +public abstract class CommonTopicConfig implements TopicConfig { + static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10); + static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10); + + @JsonProperty("name") + @NotNull + @Valid + private String name; + + @JsonProperty("retry_backoff") + private Duration retryBackoff = DEFAULT_RETRY_BACKOFF; + + @JsonProperty("reconnect_backoff") + private Duration reconnectBackoff = DEFAULT_RECONNECT_BACKOFF; + + + @Override + public Duration getRetryBackoff() { + return retryBackoff; + } + + @Override + public Duration getReconnectBackoff() { + return reconnectBackoff; + } + + @Override + public String getName() { + return name; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java index 27b16feb53..c52573bb6b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java @@ -1,14 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.configuration; import java.util.List; public interface KafkaConsumerConfig extends KafkaConnectionConfig { - String getClientDnsLookup(); boolean getAcknowledgementsEnabled(); SchemaConfig getSchemaConfig(); - List getTopics(); + List getTopics(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerConfig.java index b08f97aca2..a46deb3412 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerConfig.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.configuration; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -15,7 +20,7 @@ public interface KafkaProducerConfig extends KafkaConnectionConfig { String getSerdeFormat(); - TopicConfig getTopic(); + TopicProducerConfig getTopic(); KafkaProducerProperties getKafkaProducerProperties(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index 546587a15a..97358e3b1c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -5,281 +5,25 @@ package org.opensearch.dataprepper.plugins.kafka.configuration; -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotNull; -import jakarta.validation.constraints.Size; -import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import java.time.Duration; /** - * * A helper class that helps to read consumer configuration values from - * pipelines.yaml + * Represents a topic configuration to use throughout the code. See the + * {@link TopicConsumerConfig} and {@link TopicProducerConfig} for configurations + * which are specific for those use-cases. */ -public class TopicConfig { - static final boolean DEFAULT_AUTO_COMMIT = false; - static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); - static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); - static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; - static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5); - static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); - static final String DEFAULT_FETCH_MAX_BYTES = "50mb"; - static final Integer DEFAULT_FETCH_MAX_WAIT = 500; - static final String DEFAULT_FETCH_MIN_BYTES = "1b"; - static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10); - static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10); - static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "1mb"; - static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300); - static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500; - static final Integer DEFAULT_NUM_OF_WORKERS = 2; - static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5); +public interface TopicConfig { + String getName(); + MessageFormat getSerdeFormat(); - private static final Integer DEFAULT_NUM_OF_PARTITIONS = 1; - private static final Short DEFAULT_REPLICATION_FACTOR = 1; - private static final Long DEFAULT_RETENTION_PERIOD=604800000L; + String getEncryptionKey(); + KmsConfig getKmsConfig(); - @JsonProperty("name") - @NotNull - @Valid - private String name; + Duration getRetryBackoff(); - @JsonProperty("group_id") - @Valid - @Size(min = 1, max = 255, message = "size of group id should be between 1 and 255") - private String groupId; - - @JsonProperty("workers") - @Valid - @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") - private Integer workers = DEFAULT_NUM_OF_WORKERS; - - @JsonProperty("serde_format") - private MessageFormat serdeFormat= MessageFormat.PLAINTEXT; - - @JsonProperty("auto_commit") - private Boolean autoCommit = DEFAULT_AUTO_COMMIT; - - @JsonProperty("commit_interval") - @Valid - @Size(min = 1) - private Duration commitInterval = DEFAULT_COMMIT_INTERVAL; - - @JsonProperty("session_timeout") - @Valid - @Size(min = 1) - private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT; - - @JsonProperty("auto_offset_reset") - private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET; - - @JsonProperty("thread_waiting_time") - private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME; - - @JsonProperty("max_partition_fetch_bytes") - private String maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES; - - @JsonProperty("fetch_max_bytes") - private String fetchMaxBytes = DEFAULT_FETCH_MAX_BYTES; - - @JsonProperty("fetch_max_wait") - @Valid - @Size(min = 1) - private Integer fetchMaxWait = DEFAULT_FETCH_MAX_WAIT; - - @JsonProperty("fetch_min_bytes") - private String fetchMinBytes = DEFAULT_FETCH_MIN_BYTES; - - @JsonProperty("key_mode") - private KafkaKeyMode kafkaKeyMode = KafkaKeyMode.INCLUDE_AS_FIELD; - - @JsonProperty("retry_backoff") - private Duration retryBackoff = DEFAULT_RETRY_BACKOFF; - - @JsonProperty("reconnect_backoff") - private Duration reconnectBackoff = DEFAULT_RECONNECT_BACKOFF; - - @JsonProperty("max_poll_interval") - private Duration maxPollInterval = DEFAULT_MAX_POLL_INTERVAL; - - @JsonProperty("consumer_max_poll_records") - private Integer consumerMaxPollRecords = DEFAULT_CONSUMER_MAX_POLL_RECORDS; - - @JsonProperty("heart_beat_interval") - @Valid - @Size(min = 1) - private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION; - - @JsonProperty("is_topic_create") - private Boolean isTopicCreate =Boolean.FALSE; - - @JsonProperty("number_of_partitions") - private Integer numberOfPartions = DEFAULT_NUM_OF_PARTITIONS; - - @JsonProperty("replication_factor") - private Short replicationFactor = DEFAULT_REPLICATION_FACTOR; - - @JsonProperty("retention_period") - private Long retentionPeriod=DEFAULT_RETENTION_PERIOD; - - @JsonProperty("encryption_key") - private String encryptionKey; - - @JsonProperty("kms") - private KmsConfig kmsConfig; - - public Long getRetentionPeriod() { - return retentionPeriod; - } - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public MessageFormat getSerdeFormat() { - return serdeFormat; - } - - public String getEncryptionKey() { - return encryptionKey; - } - - public KmsConfig getKmsConfig() { - return kmsConfig; - } - - public Boolean getAutoCommit() { - return autoCommit; - } - - public Duration getCommitInterval() { - return commitInterval; - } - - public void setCommitInterval(Duration commitInterval) { - this.commitInterval = commitInterval; - } - - public Duration getSessionTimeOut() { - return sessionTimeOut; - } - - public String getAutoOffsetReset() { - return autoOffsetReset; - } - - public void setAutoOffsetReset(String autoOffsetReset) { - this.autoOffsetReset = autoOffsetReset; - } - - public Duration getThreadWaitingTime() { - return threadWaitingTime; - } - - public void setThreadWaitingTime(Duration threadWaitingTime) { - this.threadWaitingTime = threadWaitingTime; - } - - public long getMaxPartitionFetchBytes() { - return ByteCount.parse(maxPartitionFetchBytes).getBytes(); - } - - public long getFetchMaxBytes() { - long value = ByteCount.parse(fetchMaxBytes).getBytes(); - if (value < 1 || value > 50*1024*1024) { - throw new RuntimeException("Invalid Fetch Max Bytes"); - } - return value; - } - - public void setAutoCommit(Boolean autoCommit) { - this.autoCommit = autoCommit; - } - - public Integer getFetchMaxWait() { - return fetchMaxWait; - } - - public long getFetchMinBytes() { - long value = ByteCount.parse(fetchMinBytes).getBytes(); - if (value < 1) { - throw new RuntimeException("Invalid Fetch Min Bytes"); - } - return value; - } - - public Duration getRetryBackoff() { - return retryBackoff; - } - - public Duration getReconnectBackoff() { - return reconnectBackoff; - } - - public void setRetryBackoff(Duration retryBackoff) { - this.retryBackoff = retryBackoff; - } - - public Duration getMaxPollInterval() { - return maxPollInterval; - } - - public void setMaxPollInterval(Duration maxPollInterval) { - this.maxPollInterval = maxPollInterval; - } - - public Integer getConsumerMaxPollRecords() { - return consumerMaxPollRecords; - } - - public void setConsumerMaxPollRecords(Integer consumerMaxPollRecords) { - this.consumerMaxPollRecords = consumerMaxPollRecords; - } - - public Integer getWorkers() { - return workers; - } - - public void setWorkers(Integer workers) { - this.workers = workers; - } - - public Duration getHeartBeatInterval() { - return heartBeatInterval; - } - - public void setHeartBeatInterval(Duration heartBeatInterval) { - this.heartBeatInterval = heartBeatInterval; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public KafkaKeyMode getKafkaKeyMode() { - return kafkaKeyMode; - } - - public Boolean isCreate() { - return isTopicCreate; - } - - public Integer getNumberOfPartions() { - return numberOfPartions; - } - - public Short getReplicationFactor() { - return replicationFactor; - } + Duration getReconnectBackoff(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java new file mode 100644 index 0000000000..0ae2126cbe --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import java.time.Duration; + +/** + * An extension of the {@link TopicConfig} specifically for + * consumers from Kafka topics. + */ +public interface TopicConsumerConfig extends TopicConfig { + KafkaKeyMode getKafkaKeyMode(); + + String getGroupId(); + + Boolean getAutoCommit(); + + String getAutoOffsetReset(); + + Duration getCommitInterval(); + + long getMaxPartitionFetchBytes(); + + long getFetchMaxBytes(); + + Integer getFetchMaxWait(); + + long getFetchMinBytes(); + + Duration getThreadWaitingTime(); + + Duration getSessionTimeOut(); + + Duration getHeartBeatInterval(); + + Duration getMaxPollInterval(); + + Integer getConsumerMaxPollRecords(); + + Integer getWorkers(); +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicProducerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicProducerConfig.java new file mode 100644 index 0000000000..bceced39f0 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicProducerConfig.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +/** + * An extension of the {@link TopicConfig} specifically for + * producers to Kafka topics. + */ +public interface TopicProducerConfig extends TopicConfig { + Integer getNumberOfPartitions(); + Short getReplicationFactor(); + Long getRetentionPeriod(); + boolean isCreateTopic(); +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 79e50f0647..e36e93e2d9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -24,14 +24,14 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.plugins.kafka.util.LogRateLimiter; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; @@ -72,7 +72,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener private KafkaConsumer consumer= null; private AtomicBoolean shutdownInProgress; private final String topicName; - private final TopicConfig topicConfig; + private final TopicConsumerConfig topicConfig; private MessageFormat schema; private final BufferAccumulator> bufferAccumulator; private final Buffer> buffer; @@ -97,7 +97,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, final Buffer> buffer, final KafkaConsumerConfig consumerConfig, - final TopicConfig topicConfig, + final TopicConsumerConfig topicConfig, final String schemaType, final AcknowledgementSetManager acknowledgementSetManager, final ByteDecoder byteDecoder, diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index b686dcd113..de553afc99 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.consumer; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; @@ -26,6 +31,7 @@ import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; @@ -62,7 +68,7 @@ public KafkaCustomConsumerFactory(SerializationFactory serializationFactory, Aws this.awsCredentialsSupplier = awsCredentialsSupplier; } - public List createConsumersForTopic(final KafkaConsumerConfig kafkaConsumerConfig, final TopicConfig topic, + public List createConsumersForTopic(final KafkaConsumerConfig kafkaConsumerConfig, final TopicConsumerConfig topic, final Buffer> buffer, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, final ByteDecoder byteDecoder, @@ -109,7 +115,7 @@ public List createConsumersForTopic(final KafkaConsumerConf return consumers; } - private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig, final TopicConfig topicConfig, final Properties authProperties) { + private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig, final TopicConsumerConfig topicConfig, final Properties authProperties) { Properties properties = (Properties)authProperties.clone(); if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) { ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup()); @@ -131,7 +137,7 @@ private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig, return properties; } - private void setConsumerTopicProperties(final Properties properties, final TopicConfig topicConfig) { + private void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig) { properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes()); properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue()); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java index 97c1794658..ccd28eba35 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -14,6 +19,7 @@ import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; @@ -75,10 +81,10 @@ private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig } private void checkTopicCreationCriteriaAndCreateTopic(final KafkaProducerConfig kafkaProducerConfig) { - final TopicConfig topic = kafkaProducerConfig.getTopic(); - if (!topic.isCreate()) { + final TopicProducerConfig topic = kafkaProducerConfig.getTopic(); + if (!topic.isCreateTopic()) { final TopicService topicService = new TopicService(kafkaProducerConfig); - topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartions(), topic.getReplicationFactor()); + topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor()); topicService.closeAdminClient(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index ea273c370d..5cf46ea71a 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -18,9 +18,8 @@ import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory; import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; @@ -145,10 +144,10 @@ private void prepareTopicAndSchema() { } private void checkTopicCreationCriteriaAndCreateTopic() { - final TopicConfig topic = kafkaSinkConfig.getTopic(); - if (topic.isCreate()) { + final TopicProducerConfig topic = kafkaSinkConfig.getTopic(); + if (topic.isCreateTopic()) { final TopicService topicService = new TopicService(kafkaSinkConfig); - topicService.createTopic(kafkaSinkConfig.getTopic().getName(), topic.getNumberOfPartions(), topic.getReplicationFactor()); + topicService.createTopic(kafkaSinkConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor()); topicService.closeAdminClient(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfig.java similarity index 82% rename from data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java rename to data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfig.java index b4eb00c2a1..cb573003c8 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfig.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.kafka.configuration; +package org.opensearch.dataprepper.plugins.kafka.sink; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; @@ -13,6 +13,13 @@ import org.apache.commons.lang3.ObjectUtils; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import java.util.LinkedHashMap; import java.util.List; @@ -24,8 +31,7 @@ * * A helper class that helps to read user configuration values from * pipelines.yaml */ - -public class KafkaSinkConfig implements KafkaProducerConfig{ +public class KafkaSinkConfig implements KafkaProducerConfig { public static final String DLQ = "dlq"; @@ -61,7 +67,7 @@ public void setDlqConfig(final PluginSetting pluginSetting) { @JsonProperty("topic") - TopicConfig topic; + SinkTopicConfig topic; @JsonProperty("authentication") private AuthConfig authConfig; @@ -140,11 +146,11 @@ public void setSchemaConfig(SchemaConfig schemaConfig) { this.schemaConfig = schemaConfig; } - public TopicConfig getTopic() { + public TopicProducerConfig getTopic() { return topic; } - public void setTopic(TopicConfig topic) { + public void setTopic(SinkTopicConfig topic) { this.topic = topic; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/SinkTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/SinkTopicConfig.java new file mode 100644 index 0000000000..adb7ced442 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/SinkTopicConfig.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.sink; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.plugins.kafka.configuration.CommonTopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KmsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; + +public class SinkTopicConfig extends CommonTopicConfig implements TopicProducerConfig { + private static final Integer DEFAULT_NUM_OF_PARTITIONS = 1; + private static final Short DEFAULT_REPLICATION_FACTOR = 1; + private static final Long DEFAULT_RETENTION_PERIOD = 604800000L; + + @JsonProperty("serde_format") + private MessageFormat serdeFormat = MessageFormat.PLAINTEXT; + + @JsonProperty("number_of_partitions") + private Integer numberOfPartitions = DEFAULT_NUM_OF_PARTITIONS; + + @JsonProperty("replication_factor") + private Short replicationFactor = DEFAULT_REPLICATION_FACTOR; + + @JsonProperty("retention_period") + private Long retentionPeriod = DEFAULT_RETENTION_PERIOD; + + @JsonProperty("create_topic") + private boolean isCreateTopic = false; + + @Override + public MessageFormat getSerdeFormat() { + return serdeFormat; + } + + @Override + public String getEncryptionKey() { + return null; + } + + @Override + public KmsConfig getKmsConfig() { + return null; + } + + @Override + public Integer getNumberOfPartitions() { + return numberOfPartitions; + } + + @Override + public Short getReplicationFactor() { + return replicationFactor; + } + + @Override + public Long getRetentionPeriod() { + return retentionPeriod; + } + + @Override + public boolean isCreateTopic() { + return isCreateTopic; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 3321e0d2c2..76219bf951 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -30,7 +30,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; @@ -204,7 +204,7 @@ public void stopExecutor(final ExecutorService executorService, final long shutd } private long calculateLongestThreadWaitingTime() { - List topicsList = sourceConfig.getTopics(); + List topicsList = sourceConfig.getTopics(); return topicsList.stream(). map( topics -> topics.getThreadWaitingTime().toSeconds() @@ -217,7 +217,7 @@ KafkaConsumer getConsumer() { return kafkaConsumer; } - private Properties getConsumerProperties(final TopicConfig topicConfig, final Properties authProperties) { + private Properties getConsumerProperties(final TopicConsumerConfig topicConfig, final Properties authProperties) { Properties properties = (Properties) authProperties.clone(); if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) { ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup()); @@ -306,7 +306,7 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic } } - private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) { + private void setConsumerTopicProperties(Properties properties, TopicConsumerConfig topicConfig) { properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID); properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes()); properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue()); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfig.java similarity index 79% rename from data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java rename to data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfig.java index ba95e75723..a7274d3987 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfig.java @@ -3,12 +3,18 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.kafka.configuration; +package org.opensearch.dataprepper.plugins.kafka.source; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import java.util.List; import java.util.Objects; @@ -26,7 +32,7 @@ public class KafkaSourceConfig implements KafkaConsumerConfig { @JsonProperty("topics") @NotNull @Size(min = 1, max = 10, message = "The number of Topics should be between 1 and 10") - private List topics; + private List topics; @JsonProperty("schema") @Valid @@ -57,11 +63,11 @@ public boolean getAcknowledgementsEnabled() { return acknowledgementsEnabled; } - public List getTopics() { + public List getTopics() { return topics; } - public void setTopics(List topics) { + public void setTopics(List topics) { this.topics = topics; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java new file mode 100644 index 0000000000..adcf030f1f --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java @@ -0,0 +1,200 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.kafka.configuration.CommonTopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; +import org.opensearch.dataprepper.plugins.kafka.configuration.KmsConfig; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; + +import java.time.Duration; + +class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig { + static final boolean DEFAULT_AUTO_COMMIT = false; + static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); + static final String DEFAULT_FETCH_MAX_BYTES = "50mb"; + static final Integer DEFAULT_FETCH_MAX_WAIT = 500; + static final String DEFAULT_FETCH_MIN_BYTES = "1b"; + static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "1mb"; + static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); + static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; + static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5); + static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300); + static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500; + static final Integer DEFAULT_NUM_OF_WORKERS = 2; + static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5); + + + @JsonProperty("serde_format") + private MessageFormat serdeFormat = MessageFormat.PLAINTEXT; + + @JsonProperty("commit_interval") + @Valid + @Size(min = 1) + private Duration commitInterval = DEFAULT_COMMIT_INTERVAL; + + @JsonProperty("key_mode") + private KafkaKeyMode kafkaKeyMode = KafkaKeyMode.INCLUDE_AS_FIELD; + + @JsonProperty("group_id") + @Valid + @Size(min = 1, max = 255, message = "size of group id should be between 1 and 255") + private String groupId; + + @JsonProperty("workers") + @Valid + @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") + private Integer workers = DEFAULT_NUM_OF_WORKERS; + + @JsonProperty("session_timeout") + @Valid + @Size(min = 1) + private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT; + + @JsonProperty("auto_offset_reset") + private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET; + + @JsonProperty("thread_waiting_time") + private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME; + + @JsonProperty("max_poll_interval") + private Duration maxPollInterval = DEFAULT_MAX_POLL_INTERVAL; + + @JsonProperty("consumer_max_poll_records") + private Integer consumerMaxPollRecords = DEFAULT_CONSUMER_MAX_POLL_RECORDS; + + @JsonProperty("heart_beat_interval") + @Valid + @Size(min = 1) + private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION; + + @JsonProperty("auto_commit") + private Boolean autoCommit = DEFAULT_AUTO_COMMIT; + + @JsonProperty("max_partition_fetch_bytes") + private String maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES; + + @JsonProperty("fetch_max_bytes") + private String fetchMaxBytes = DEFAULT_FETCH_MAX_BYTES; + + @JsonProperty("fetch_max_wait") + @Valid + @Size(min = 1) + private Integer fetchMaxWait = DEFAULT_FETCH_MAX_WAIT; + + @JsonProperty("fetch_min_bytes") + private String fetchMinBytes = DEFAULT_FETCH_MIN_BYTES; + + + @Override + public String getEncryptionKey() { + return null; + } + + @Override + public KmsConfig getKmsConfig() { + return null; + } + + + @Override + public Duration getCommitInterval() { + return commitInterval; + } + + + @Override + public KafkaKeyMode getKafkaKeyMode() { + return kafkaKeyMode; + } + + @Override + public String getGroupId() { + return groupId; + } + + @Override + public MessageFormat getSerdeFormat() { + return serdeFormat; + } + + @Override + public Boolean getAutoCommit() { + return autoCommit; + } + + public void setAutoCommit(Boolean autoCommit) { + this.autoCommit = autoCommit; + } + + @Override + public long getFetchMaxBytes() { + long value = ByteCount.parse(fetchMaxBytes).getBytes(); + if (value < 1 || value > 50*1024*1024) { + throw new RuntimeException("Invalid Fetch Max Bytes"); + } + return value; + } + + @Override + public Integer getFetchMaxWait() { + return fetchMaxWait; + } + + @Override + public long getFetchMinBytes() { + long value = ByteCount.parse(fetchMinBytes).getBytes(); + if (value < 1) { + throw new RuntimeException("Invalid Fetch Min Bytes"); + } + return value; + } + + @Override + public long getMaxPartitionFetchBytes() { + return ByteCount.parse(maxPartitionFetchBytes).getBytes(); + } + + @Override + public Duration getSessionTimeOut() { + return sessionTimeOut; + } + + @Override + public String getAutoOffsetReset() { + return autoOffsetReset; + } + + @Override + public Duration getThreadWaitingTime() { + return threadWaitingTime; + } + + @Override + public Duration getMaxPollInterval() { + return maxPollInterval; + } + + @Override + public Integer getConsumerMaxPollRecords() { + return consumerMaxPollRecords; + } + + @Override + public Integer getWorkers() { + return workers; + } + + @Override + public Duration getHeartBeatInterval() { + return heartBeatInterval; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java index db01c919cf..779aefcab0 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java @@ -9,7 +9,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfigTest.java new file mode 100644 index 0000000000..60187fe41d --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfigTest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.buffer; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + +class BufferTopicConfigTest { + private BufferTopicConfig createObjectUnderTest() { + return new BufferTopicConfig(); + } + + @Test + void verify_default_values() { + BufferTopicConfig objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getAutoCommit(), equalTo(BufferTopicConfig.DEFAULT_AUTO_COMMIT)); + assertThat(objectUnderTest.getCommitInterval(), equalTo(BufferTopicConfig.DEFAULT_COMMIT_INTERVAL)); + assertThat(objectUnderTest.getFetchMaxWait(), equalTo((int) BufferTopicConfig.DEFAULT_FETCH_MAX_WAIT.toMillis())); + assertThat(objectUnderTest.getFetchMinBytes(), equalTo(BufferTopicConfig.DEFAULT_FETCH_MIN_BYTES.getBytes())); + assertThat(objectUnderTest.getFetchMaxBytes(), equalTo(BufferTopicConfig.DEFAULT_FETCH_MAX_BYTES.getBytes())); + assertThat(objectUnderTest.getMaxPartitionFetchBytes(), equalTo(BufferTopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES.getBytes())); + + assertThat(objectUnderTest.getSessionTimeOut(), equalTo(BufferTopicConfig.DEFAULT_SESSION_TIMEOUT)); + assertThat(objectUnderTest.getAutoOffsetReset(), equalTo(BufferTopicConfig.DEFAULT_AUTO_OFFSET_RESET)); + assertThat(objectUnderTest.getThreadWaitingTime(), equalTo(BufferTopicConfig.DEFAULT_THREAD_WAITING_TIME)); + assertThat(objectUnderTest.getMaxPollInterval(), equalTo(BufferTopicConfig.DEFAULT_MAX_POLL_INTERVAL)); + assertThat(objectUnderTest.getConsumerMaxPollRecords(), equalTo(BufferTopicConfig.DEFAULT_CONSUMER_MAX_POLL_RECORDS)); + assertThat(objectUnderTest.getWorkers(), equalTo(BufferTopicConfig.DEFAULT_NUM_OF_WORKERS)); + assertThat(objectUnderTest.getHeartBeatInterval(), equalTo(BufferTopicConfig.DEFAULT_HEART_BEAT_INTERVAL_DURATION)); + } + + @Test + void getFetchMaxBytes_on_large_value() throws NoSuchFieldException, IllegalAccessException { + BufferTopicConfig objectUnderTest = createObjectUnderTest(); + + setField(BufferTopicConfig.class, objectUnderTest, "fetchMaxBytes", ByteCount.parse("60mb")); + assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes()); + } + + @Test + void invalid_getFetchMaxBytes_zero_bytes() throws NoSuchFieldException, IllegalAccessException { + BufferTopicConfig objectUnderTest = createObjectUnderTest(); + + setField(BufferTopicConfig.class, objectUnderTest, "fetchMaxBytes", ByteCount.zeroBytes()); + assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 6747ab4894..06c44d3599 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.buffer; import org.junit.jupiter.api.BeforeEach; @@ -22,9 +27,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory; import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; @@ -83,7 +86,7 @@ class KafkaBufferTest { private PluginFactory pluginFactory; @Mock - TopicConfig topic1; + BufferTopicConfig topic1; @Mock AuthConfig authConfig; @Mock @@ -136,7 +139,7 @@ void setUp() { pluginMetrics = mock(PluginMetrics.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); when(topic1.getName()).thenReturn("topic1"); - when(topic1.isCreate()).thenReturn(true); + when(topic1.isCreateTopic()).thenReturn(true); when(topic1.getWorkers()).thenReturn(2); when(topic1.getCommitInterval()).thenReturn(Duration.ofSeconds(1)); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java index 8e5d0546b7..ed23995a64 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.configuration; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; @@ -11,6 +16,7 @@ import static org.hamcrest.Matchers.equalTo; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/CommonTopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/CommonTopicConfigTest.java new file mode 100644 index 0000000000..ab71db191b --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/CommonTopicConfigTest.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class CommonTopicConfigTest { + private TopicConfig topicConfig; + + private static final String YAML_FILE_WITH_CONSUMER_CONFIG = "sample-pipelines.yaml"; + + private static final String YAML_FILE_WITH_MISSING_CONSUMER_CONFIG = "sample-pipelines-1.yaml"; + + @BeforeEach + void setUp(TestInfo testInfo) throws IOException { + String fileName = testInfo.getTags().stream().findFirst().orElse(""); + Yaml yaml = new Yaml(); + FileReader fileReader = new FileReader(getClass().getClassLoader().getResource(fileName).getFile()); + Object data = yaml.load(fileReader); + ObjectMapper mapper = new ObjectMapper(); + if (data instanceof Map) { + Map propertyMap = (Map) data; + Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); + Map sourceMap = (Map) logPipelineMap.get("source"); + Map kafkaConfigMap = (Map) sourceMap.get("kafka"); + mapper.registerModule(new JavaTimeModule()); + String json = mapper.writeValueAsString(kafkaConfigMap); + Reader reader = new StringReader(json); + KafkaSourceConfig kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); + List topicConfigList = kafkaSourceConfig.getTopics(); + topicConfig = topicConfigList.get(0); + } + } + + @Test + @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) + void test_topicsConfig_not_null() { + assertThat(topicConfig, notNullValue()); + } + + @Test + @Tag(YAML_FILE_WITH_MISSING_CONSUMER_CONFIG) + void testConfigValues_default() { + assertEquals("my-topic-2", topicConfig.getName()); + assertEquals(CommonTopicConfig.DEFAULT_RETRY_BACKOFF, topicConfig.getRetryBackoff()); + assertEquals(CommonTopicConfig.DEFAULT_RECONNECT_BACKOFF, topicConfig.getReconnectBackoff()); + } + + @Test + @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) + void testConfigValues_from_yaml() { + assertEquals("my-topic-1", topicConfig.getName()); + assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff()); + } + + @Test + @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) + void testConfigValues_from_yaml_not_null() { + assertNotNull(topicConfig.getName()); + assertNotNull(topicConfig.getRetryBackoff()); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java index 25228da2ba..d33b67973b 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.configuration; import com.fasterxml.jackson.databind.ObjectMapper; @@ -5,6 +10,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java index 560f0042eb..44a5e88554 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.configuration; import com.fasterxml.jackson.databind.ObjectMapper; @@ -5,6 +10,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/SchemaConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/SchemaConfigTest.java index cc9c3f69dd..e02604a19f 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/SchemaConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/SchemaConfigTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.configuration; import com.fasterxml.jackson.databind.ObjectMapper; @@ -5,6 +10,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java deleted file mode 100644 index a85ed92727..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.kafka.configuration; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.mockito.Mock; -import org.yaml.snakeyaml.Yaml; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; - -import java.io.FileReader; -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; -import java.time.Duration; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import org.opensearch.dataprepper.model.types.ByteCount; -import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; - -class TopicConfigTest { - - @Mock - TopicConfig topicConfig; - - private static final String YAML_FILE_WITH_CONSUMER_CONFIG = "sample-pipelines.yaml"; - - private static final String YAML_FILE_WITH_MISSING_CONSUMER_CONFIG = "sample-pipelines-1.yaml"; - - @BeforeEach - void setUp(TestInfo testInfo) throws IOException { - String fileName = testInfo.getTags().stream().findFirst().orElse(""); - topicConfig = new TopicConfig(); - Yaml yaml = new Yaml(); - FileReader fileReader = new FileReader(getClass().getClassLoader().getResource(fileName).getFile()); - Object data = yaml.load(fileReader); - ObjectMapper mapper = new ObjectMapper(); - if (data instanceof Map) { - Map propertyMap = (Map) data; - Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); - Map sourceMap = (Map) logPipelineMap.get("source"); - Map kafkaConfigMap = (Map) sourceMap.get("kafka"); - mapper.registerModule(new JavaTimeModule()); - String json = mapper.writeValueAsString(kafkaConfigMap); - Reader reader = new StringReader(json); - KafkaSourceConfig kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - List topicConfigList = kafkaSourceConfig.getTopics(); - topicConfig = topicConfigList.get(0); - } - } - - @Test - @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) - void test_topicsConfig_not_null() { - assertThat(topicConfig, notNullValue()); - } - - @Test - @Tag(YAML_FILE_WITH_MISSING_CONSUMER_CONFIG) - void testConfigValues_default() { - assertEquals("my-topic-2", topicConfig.getName()); - assertEquals("my-test-group", topicConfig.getGroupId()); - assertEquals(TopicConfig.DEFAULT_AUTO_COMMIT, topicConfig.getAutoCommit()); - assertEquals(TopicConfig.DEFAULT_COMMIT_INTERVAL, topicConfig.getCommitInterval()); - assertEquals(TopicConfig.DEFAULT_SESSION_TIMEOUT, topicConfig.getSessionTimeOut()); - assertEquals(TopicConfig.DEFAULT_AUTO_OFFSET_RESET, topicConfig.getAutoOffsetReset()); - assertEquals(TopicConfig.DEFAULT_THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime()); - assertEquals(ByteCount.parse(TopicConfig.DEFAULT_FETCH_MAX_BYTES).getBytes(), topicConfig.getFetchMaxBytes()); - assertEquals(TopicConfig.DEFAULT_FETCH_MAX_WAIT, topicConfig.getFetchMaxWait()); - assertEquals(ByteCount.parse(TopicConfig.DEFAULT_FETCH_MIN_BYTES).getBytes(), topicConfig.getFetchMinBytes()); - assertEquals(TopicConfig.DEFAULT_RETRY_BACKOFF, topicConfig.getRetryBackoff()); - assertEquals(TopicConfig.DEFAULT_RECONNECT_BACKOFF, topicConfig.getReconnectBackoff()); - assertEquals(TopicConfig.DEFAULT_MAX_POLL_INTERVAL, topicConfig.getMaxPollInterval()); - assertEquals(TopicConfig.DEFAULT_CONSUMER_MAX_POLL_RECORDS, topicConfig.getConsumerMaxPollRecords()); - assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers()); - assertEquals(TopicConfig.DEFAULT_HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval()); - assertEquals(ByteCount.parse(TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES).getBytes(), topicConfig.getMaxPartitionFetchBytes()); - } - - @Test - @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) - void testConfigValues_from_yaml() { - assertEquals("my-topic-1", topicConfig.getName()); - assertEquals(false, topicConfig.getAutoCommit()); - assertEquals(Duration.ofSeconds(5), topicConfig.getCommitInterval()); - assertEquals(45000, topicConfig.getSessionTimeOut().toMillis()); - assertEquals("earliest", topicConfig.getAutoOffsetReset()); - assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime()); - assertEquals(52428800L, topicConfig.getFetchMaxBytes()); - assertEquals(500L, topicConfig.getFetchMaxWait().longValue()); - assertEquals(1L, topicConfig.getFetchMinBytes()); - assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff()); - assertEquals(Duration.ofSeconds(300), topicConfig.getMaxPollInterval()); - assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue()); - assertEquals(5, topicConfig.getWorkers().intValue()); - assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval()); - assertEquals(10*ByteCount.parse(TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES).getBytes(), topicConfig.getMaxPartitionFetchBytes()); - } - - @Test - @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) - void testConfigValues_from_yaml_not_null() { - assertNotNull(topicConfig.getName()); - assertNotNull(topicConfig.getAutoCommit()); - assertNotNull(topicConfig.getCommitInterval()); - assertNotNull(topicConfig.getSessionTimeOut()); - assertNotNull(topicConfig.getAutoOffsetReset()); - assertNotNull(topicConfig.getThreadWaitingTime()); - assertNotNull(topicConfig.getFetchMaxBytes()); - assertNotNull(topicConfig.getFetchMaxWait()); - assertNotNull(topicConfig.getFetchMinBytes()); - assertNotNull(topicConfig.getRetryBackoff()); - assertNotNull(topicConfig.getMaxPollInterval()); - assertNotNull(topicConfig.getConsumerMaxPollRecords()); - assertNotNull(topicConfig.getWorkers()); - assertNotNull(topicConfig.getHeartBeatInterval()); - } - - @Test - @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) - void TestInvalidConfigValues() throws NoSuchFieldException, IllegalAccessException { - setField(TopicConfig.class, topicConfig, "fetchMaxBytes", "60mb"); - assertThrows(RuntimeException.class, () -> topicConfig.getFetchMaxBytes()); - setField(TopicConfig.class, topicConfig, "fetchMaxBytes", "0b"); - assertThrows(RuntimeException.class, () -> topicConfig.getFetchMaxBytes()); - setField(TopicConfig.class, topicConfig, "fetchMinBytes", "0b"); - assertThrows(RuntimeException.class, () -> topicConfig.getFetchMinBytes()); - } - -} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 0d443e4413..e32ce36836 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -32,9 +32,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; @@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.awaitility.Awaitility.await; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -71,13 +70,13 @@ public class KafkaCustomConsumerTest { private Buffer> buffer; @Mock - private KafkaSourceConfig sourceConfig; + private KafkaConsumerConfig sourceConfig; private ExecutorService callbackExecutor; private AcknowledgementSetManager acknowledgementSetManager; @Mock - private TopicConfig topicConfig; + private TopicConsumerConfig topicConfig; @Mock private KafkaTopicMetrics topicMetrics; @@ -115,7 +114,7 @@ public void setUp() { counter = mock(Counter.class); posCounter = mock(Counter.class); negCounter = mock(Counter.class); - topicConfig = mock(TopicConfig.class); + topicConfig = mock(TopicConsumerConfig.class); when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(posCounter); when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(negCounter); when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter); @@ -138,7 +137,7 @@ public void setUp() { callbackExecutor = Executors.newFixedThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000)); - sourceConfig = mock(KafkaSourceConfig.class); + sourceConfig = mock(KafkaConsumerConfig.class); buffer = getBuffer(); shutdownInProgress = new AtomicBoolean(false); when(topicConfig.getName()).thenReturn("topic1"); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index 5216537f9f..13f9879660 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -26,9 +26,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; import java.io.IOException; @@ -47,13 +47,12 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) - public class KafkaCustomProducerTest { private KafkaCustomProducer producer; @Mock - private KafkaSinkConfig kafkaSinkConfig; + private KafkaProducerConfig kafkaSinkConfig; private Record record; @@ -70,8 +69,8 @@ public void setUp() { DefaultEventHandle defaultEventHandle = mock(DefaultEventHandle.class); event.setEventHandle(defaultEventHandle); record = new Record<>(event); - final TopicConfig topicConfig = new TopicConfig(); - topicConfig.setName("test-topic"); + final TopicProducerConfig topicConfig = mock(TopicProducerConfig.class); + when(topicConfig.getName()).thenReturn("test-topic"); when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); when(kafkaSinkConfig.getSchemaConfig()).thenReturn(mock(SchemaConfig.class)); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java index 09f697cfda..d01c62be11 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java @@ -16,7 +16,6 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.springframework.test.util.ReflectionTestUtils; import org.yaml.snakeyaml.Yaml; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfigTest.java similarity index 92% rename from data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java rename to data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfigTest.java index e3c4d6fde8..1a297b5175 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfigTest.java @@ -3,11 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.kafka.configuration; +package org.opensearch.dataprepper.plugins.kafka.sink; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import java.util.Arrays; import java.util.HashMap; @@ -35,7 +37,7 @@ void setUp() { kafkaSinkConfig = new KafkaSinkConfig(); kafkaSinkConfig.setBootStrapServers(Arrays.asList("127.0.0.1:9093")); kafkaSinkConfig.setAuthConfig(mock(AuthConfig.class)); - kafkaSinkConfig.setTopic(mock(TopicConfig.class)); + kafkaSinkConfig.setTopic(mock(SinkTopicConfig.class)); kafkaSinkConfig.setSchemaConfig((mock(SchemaConfig.class))); kafkaSinkConfig.setThreadWaitTime(10L); // kafkaSinkConfig.setSerdeFormat("JSON"); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java index 8758b13881..d61eb22df0 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java @@ -26,9 +26,7 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.SinkContext; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; import org.springframework.test.util.ReflectionTestUtils; import org.yaml.snakeyaml.Yaml; @@ -198,8 +196,8 @@ public void isReadyTest() { @Test public void doOutputTestForAutoTopicCreate() { - TopicConfig topicConfig = mock(TopicConfig.class); - when(topicConfig.isCreate()).thenReturn(true); + SinkTopicConfig topicConfig = mock(SinkTopicConfig.class); + when(topicConfig.isCreateTopic()).thenReturn(true); SchemaConfig schemaConfig = mock(SchemaConfig.class); when(schemaConfig.isCreate()).thenReturn(true); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java similarity index 86% rename from data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java rename to data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java index b2634659cc..5cb86c507d 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java @@ -1,10 +1,18 @@ -package org.opensearch.dataprepper.plugins.kafka.configuration; +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; @@ -27,7 +35,7 @@ class KafkaSourceConfigTest { @Mock - KafkaSourceConfig kafkaSourceConfig; + KafkaSourceConfig kafkaSourceConfig; List bootstrapServers; @@ -73,14 +81,14 @@ void test_setters() throws NoSuchFieldException, IllegalAccessException { kafkaSourceConfig = new KafkaSourceConfig(); EncryptionConfig encryptionConfig = kafkaSourceConfig.getEncryptionConfig(); kafkaSourceConfig.setBootStrapServers(new ArrayList<>(Arrays.asList("127.0.0.1:9092"))); - TopicConfig topicConfig = mock(TopicConfig.class); + SourceTopicConfig topicConfig = mock(SourceTopicConfig.class); kafkaSourceConfig.setTopics(Collections.singletonList(topicConfig)); assertEquals(Collections.singletonList("127.0.0.1:9092"), kafkaSourceConfig.getBootstrapServers()); assertEquals(Collections.singletonList(topicConfig), kafkaSourceConfig.getTopics()); setField(KafkaSourceConfig.class, kafkaSourceConfig, "acknowledgementsEnabled", true); assertEquals(true, kafkaSourceConfig.getAcknowledgementsEnabled()); - assertEquals(EncryptionType.SSL, kafkaSourceConfig.getEncryptionConfig().getType()); + Assertions.assertEquals(EncryptionType.SSL, kafkaSourceConfig.getEncryptionConfig().getType()); setField(EncryptionConfig.class, encryptionConfig, "type", EncryptionType.NONE); assertEquals(EncryptionType.NONE, encryptionConfig.getType()); setField(EncryptionConfig.class, encryptionConfig, "type", EncryptionType.SSL); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index d9c78ab0af..a851b7209c 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -6,48 +6,44 @@ package org.opensearch.dataprepper.plugins.kafka.source; import org.apache.kafka.common.config.ConfigException; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.event.Event; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Arrays; -import java.util.List; -import java.util.Collections; -import java.util.Objects; -import java.time.Duration; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -73,7 +69,7 @@ class KafkaSourceTest { private PipelineDescription pipelineDescription; @Mock - TopicConfig topic1, topic2; + TopicConsumerConfig topic1, topic2; @Mock private Buffer> buffer; @@ -117,7 +113,7 @@ void setUp() throws Exception { when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(topic2.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList("http://localhost:1234")); - when(sourceConfig.getTopics()).thenReturn(Arrays.asList(topic1, topic2)); + when(sourceConfig.getTopics()).thenReturn((List) List.of(topic1, topic2)); when(sourceConfig.getSchemaConfig()).thenReturn(null); when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java new file mode 100644 index 0000000000..b5ad09b853 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + +class SourceTopicConfigTest { + private SourceTopicConfig createObjectUnderTest() { + return new SourceTopicConfig(); + } + + @Test + void verify_default_values() { + SourceTopicConfig objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getAutoCommit(), equalTo(SourceTopicConfig.DEFAULT_AUTO_COMMIT)); + assertThat(objectUnderTest.getCommitInterval(), equalTo(SourceTopicConfig.DEFAULT_COMMIT_INTERVAL)); + assertThat(objectUnderTest.getFetchMaxWait(), equalTo(SourceTopicConfig.DEFAULT_FETCH_MAX_WAIT)); + assertThat(objectUnderTest.getFetchMinBytes(), equalTo(ByteCount.parse(SourceTopicConfig.DEFAULT_FETCH_MIN_BYTES).getBytes())); + assertThat(objectUnderTest.getFetchMaxBytes(), equalTo(ByteCount.parse(SourceTopicConfig.DEFAULT_FETCH_MAX_BYTES).getBytes())); + assertThat(objectUnderTest.getMaxPartitionFetchBytes(), equalTo(ByteCount.parse(SourceTopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES).getBytes())); + + assertThat(objectUnderTest.getSessionTimeOut(), equalTo(SourceTopicConfig.DEFAULT_SESSION_TIMEOUT)); + assertThat(objectUnderTest.getAutoOffsetReset(), equalTo(SourceTopicConfig.DEFAULT_AUTO_OFFSET_RESET)); + assertThat(objectUnderTest.getThreadWaitingTime(), equalTo(SourceTopicConfig.DEFAULT_THREAD_WAITING_TIME)); + assertThat(objectUnderTest.getMaxPollInterval(), equalTo(SourceTopicConfig.DEFAULT_MAX_POLL_INTERVAL)); + assertThat(objectUnderTest.getConsumerMaxPollRecords(), equalTo(SourceTopicConfig.DEFAULT_CONSUMER_MAX_POLL_RECORDS)); + assertThat(objectUnderTest.getWorkers(), equalTo(SourceTopicConfig.DEFAULT_NUM_OF_WORKERS)); + assertThat(objectUnderTest.getHeartBeatInterval(), equalTo(SourceTopicConfig.DEFAULT_HEART_BEAT_INTERVAL_DURATION)); + } + + @Test + void getFetchMaxBytes_on_large_value() throws NoSuchFieldException, IllegalAccessException { + SourceTopicConfig objectUnderTest = createObjectUnderTest(); + + setField(SourceTopicConfig.class, objectUnderTest, "fetchMaxBytes", "60mb"); + assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes()); + } + + @Test + void invalid_getFetchMaxBytes_zero_bytes() throws NoSuchFieldException, IllegalAccessException { + SourceTopicConfig objectUnderTest = createObjectUnderTest(); + + setField(SourceTopicConfig.class, objectUnderTest, "fetchMaxBytes", "0b"); + assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java index 0656fe96ee..ac6f78bb87 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java @@ -11,7 +11,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.sink.KafkaSinkConfig; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java index 4319085f97..36f32736c5 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.util; import com.fasterxml.jackson.databind.ObjectMapper; @@ -9,8 +14,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.sink.KafkaSinkConfig; import org.springframework.test.util.ReflectionTestUtils; import org.yaml.snakeyaml.Yaml; @@ -25,8 +30,6 @@ @ExtendWith(MockitoExtension.class) public class SinkPropertyConfigurerTest { - - KafkaSinkConfig kafkaSinkConfig; MockedStatic authenticationPropertyConfigurerMockedStatic; diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml index e417522854..47e5c7d3d2 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml @@ -15,7 +15,7 @@ log-pipeline : serde_format: plaintext topic: name: plaintext_test_topic - is_topic_create: false + create_topic: false producer_properties: buffer_memory: 10mb compression_type: gzip