Skip to content

Commit

Permalink
Move more fields from the common TopicConfig into TopicConsumerConfig…
Browse files Browse the repository at this point in the history
… along with the corresponding subclasses.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Oct 30, 2023
1 parent 62fe0cc commit 6dcd581
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -144,17 +143,10 @@ public void setup() throws RestClientException, IOException {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.AVRO.toString());
when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}");

final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);
testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5);

topicConfig = mock(TopicProducerConfig.class);
when(topicConfig.getName()).thenReturn(testTopic);
when(topicConfig.getGroupId()).thenReturn(testGroup);
when(topicConfig.getWorkers()).thenReturn(1);
when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(45));
when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(topicConfig.getAutoOffsetReset()).thenReturn("earliest");
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));

bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
Expand All @@ -169,7 +161,6 @@ public void TestPollRecordsAvroSASLPlainText() throws Exception {
configureJasConfForSASLPlainText();

final int numRecords = 1;
when(topicConfig.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(topicConfig.isCreateTopic()).thenReturn(false);
when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig);

Expand Down Expand Up @@ -248,11 +239,8 @@ private void deleteTopic(AtomicBoolean created, String topicName) throws Interru
}

private void consumeTestMessages(List<Record<Event>> recList) {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
props.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId());
final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);
props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -122,17 +121,10 @@ public void setup() {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON.toString());
when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}");

final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);
testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5);

topicConfig = mock(TopicProducerConfig.class);
when(topicConfig.getName()).thenReturn(testTopic);
when(topicConfig.getGroupId()).thenReturn(testGroup);
when(topicConfig.getWorkers()).thenReturn(1);
when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(45));
when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(topicConfig.getAutoOffsetReset()).thenReturn("earliest");
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Expand All @@ -143,7 +135,6 @@ public void TestPollRecordsJsonSASLPlainText() throws Exception {
configureJasConfForSASLPlainText();

final int numRecords = 1;
when(topicConfig.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(topicConfig.isCreateTopic()).thenReturn(false);
when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig);
when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig);
Expand Down Expand Up @@ -221,11 +212,8 @@ private void configureJasConfForSASLPlainText() {
}

private void consumeTestMessages(List<Record<Event>> recList) {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
props.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId());
final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);
props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -120,17 +119,10 @@ public void setup() {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT.toString());
when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}");

final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);
testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5);

topicConfig = mock(TopicProducerConfig.class);
when(topicConfig.getName()).thenReturn(testTopic);
when(topicConfig.getGroupId()).thenReturn(testGroup);
when(topicConfig.getWorkers()).thenReturn(1);
when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(45));
when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(topicConfig.getAutoOffsetReset()).thenReturn("earliest");
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Expand All @@ -142,7 +134,6 @@ public void TestPollRecordsPlainText() throws Exception {
configureJasConfForSASLPlainText();

final int numRecords = 1;
when(topicConfig.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(topicConfig.isCreateTopic()).thenReturn(false);
when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig);
when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig);
Expand Down Expand Up @@ -219,11 +210,9 @@ private void deleteTopic(AtomicBoolean created, String topicName) throws Interru
}

private void consumeTestMessages(List<Record<Event>> recList) {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
props.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId());
final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);

props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(500);
static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("1b");
static final ByteCount DEFAULT_MAX_PARTITION_FETCH_BYTES = ByteCount.parse("1mb");
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300);
static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);


@JsonProperty("encryption_key")
Expand All @@ -53,6 +60,38 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
@JsonProperty("create_topic")
private boolean isCreateTopic = false;

@JsonProperty("group_id")
@Valid
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
private String groupId;

@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
private Integer workers = DEFAULT_NUM_OF_WORKERS;

@JsonProperty("session_timeout")
@Valid
@Size(min = 1)
private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT;

@JsonProperty("auto_offset_reset")
private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET;

@JsonProperty("thread_waiting_time")
private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME;

@JsonProperty("max_poll_interval")
private Duration maxPollInterval = DEFAULT_MAX_POLL_INTERVAL;

@JsonProperty("consumer_max_poll_records")
private Integer consumerMaxPollRecords = DEFAULT_CONSUMER_MAX_POLL_RECORDS;

@JsonProperty("heart_beat_interval")
@Valid
@Size(min = 1)
private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION;

@JsonProperty("auto_commit")
private Boolean autoCommit = DEFAULT_AUTO_COMMIT;

Expand Down Expand Up @@ -89,6 +128,11 @@ public KafkaKeyMode getKafkaKeyMode() {
return KafkaKeyMode.DISCARD;
}

@Override
public String getGroupId() {
return groupId;
}

@Override
public Duration getCommitInterval() {
return commitInterval;
Expand Down Expand Up @@ -120,6 +164,41 @@ public Boolean getAutoCommit() {
return autoCommit;
}

@Override
public Duration getSessionTimeOut() {
return sessionTimeOut;
}

@Override
public String getAutoOffsetReset() {
return autoOffsetReset;
}

@Override
public Duration getThreadWaitingTime() {
return threadWaitingTime;
}

@Override
public Duration getMaxPollInterval() {
return maxPollInterval;
}

@Override
public Integer getConsumerMaxPollRecords() {
return consumerMaxPollRecords;
}

@Override
public Integer getWorkers() {
return workers;
}

@Override
public Duration getHeartBeatInterval() {
return heartBeatInterval;
}

@Override
public long getFetchMaxBytes() {
long value = fetchMaxBytes.getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;

import java.time.Duration;

Expand All @@ -18,80 +17,20 @@
* Be sure to only add to this configuration if the setting is applicable for all three types.
*/
public abstract class CommonTopicConfig implements TopicConfig {
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5);
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300);
static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);


@JsonProperty("name")
@NotNull
@Valid
private String name;

@JsonProperty("group_id")
@Valid
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
private String groupId;

@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
private Integer workers = DEFAULT_NUM_OF_WORKERS;

@JsonProperty("session_timeout")
@Valid
@Size(min = 1)
private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT;

@JsonProperty("auto_offset_reset")
private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET;

@JsonProperty("thread_waiting_time")
private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME;

@JsonProperty("retry_backoff")
private Duration retryBackoff = DEFAULT_RETRY_BACKOFF;

@JsonProperty("reconnect_backoff")
private Duration reconnectBackoff = DEFAULT_RECONNECT_BACKOFF;

@JsonProperty("max_poll_interval")
private Duration maxPollInterval = DEFAULT_MAX_POLL_INTERVAL;

@JsonProperty("consumer_max_poll_records")
private Integer consumerMaxPollRecords = DEFAULT_CONSUMER_MAX_POLL_RECORDS;

@JsonProperty("heart_beat_interval")
@Valid
@Size(min = 1)
private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION;


@Override
public String getGroupId() {
return groupId;
}

@Override
public Duration getSessionTimeOut() {
return sessionTimeOut;
}

@Override
public String getAutoOffsetReset() {
return autoOffsetReset;
}

@Override
public Duration getThreadWaitingTime() {
return threadWaitingTime;
}

@Override
public Duration getRetryBackoff() {
Expand All @@ -103,26 +42,6 @@ public Duration getReconnectBackoff() {
return reconnectBackoff;
}

@Override
public Duration getMaxPollInterval() {
return maxPollInterval;
}

@Override
public Integer getConsumerMaxPollRecords() {
return consumerMaxPollRecords;
}

@Override
public Integer getWorkers() {
return workers;
}

@Override
public Duration getHeartBeatInterval() {
return heartBeatInterval;
}

@Override
public String getName() {
return name;
Expand Down
Loading

0 comments on commit 6dcd581

Please sign in to comment.