Skip to content

Commit

Permalink
Fixed kinesis retrieval config argument passing to KCL scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: RashmiRam <[email protected]>
  • Loading branch information
RashmiRam committed Dec 18, 2024
1 parent 76b06ef commit 180e11e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.util.Objects;
Expand Down Expand Up @@ -178,9 +179,10 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
.tableName(tableName)
.namespace(kclMetricsNamespaceName);

RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
configsBuilder.retrievalConfig().retrievalSpecificConfig(
retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(
new PollingConfig(kinesisClient)
.maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
Expand All @@ -196,7 +198,7 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
retrievalConfig
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -275,6 +276,30 @@ void testCreateSchedulerWithPollingStrategy() {
verify(workerIdentifierGenerator, times(1)).generate();
}

@Test
void testCreateSchedulerWithPollingStrategyAndPollingConfig() {
when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING);
when(kinesisSourceConfig.getPollingConfig()).thenReturn(kinesisStreamPollingConfig);
KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator);
Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer);

assertEquals(kinesisService.getApplicationName(), pipelineName);
assertNotNull(schedulerObjectUnderTest);
assertNotNull(schedulerObjectUnderTest.checkpointConfig());
assertNotNull(schedulerObjectUnderTest.leaseManagementConfig());
assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
assertNotNull(schedulerObjectUnderTest.lifecycleConfig());
assertNotNull(schedulerObjectUnderTest.metricsConfig());
assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED);
assertNotNull(schedulerObjectUnderTest.processorConfig());
assertNotNull(schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig());
assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).maxRecords(), kinesisStreamPollingConfig.getMaxPollingRecords());
assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).idleTimeBetweenReadsInMillis(), kinesisStreamPollingConfig.getIdleTimeBetweenReads().toMillis());
assertNotNull(schedulerObjectUnderTest.retrievalConfig());
verify(workerIdentifierGenerator, times(1)).generate();
}

@Test
void testServiceStartNullBufferThrows() {
KinesisService kinesisService = createObjectUnderTest();
Expand Down

0 comments on commit 180e11e

Please sign in to comment.