From 180e11ed8f334de6eee5a1b4fe72725b2f88741d Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Wed, 18 Dec 2024 09:47:02 +0530 Subject: [PATCH] Fixed kinesis retrieval config argument passing to KCL scheduler Signed-off-by: RashmiRam --- .../kinesis/source/KinesisService.java | 6 +++-- .../kinesis/source/KinesisServiceTest.java | 25 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 10a33b071d..93d6b2329b 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -40,6 +40,7 @@ import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.ThrottlingException; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.util.Objects; @@ -178,9 +179,10 @@ public Scheduler createScheduler(final Buffer> buffer) { .tableName(tableName) .namespace(kclMetricsNamespaceName); + RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy(); if (consumerStrategy == ConsumerStrategy.POLLING) { - configsBuilder.retrievalConfig().retrievalSpecificConfig( + retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig( new PollingConfig(kinesisClient) .maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords()) .idleTimeBetweenReadsInMillis( @@ -196,7 +198,7 @@ public Scheduler createScheduler(final Buffer> buffer) { configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), - configsBuilder.retrievalConfig() + retrievalConfig ); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 0bd7dfb217..1d4207594d 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -42,6 +42,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.time.Duration; import java.time.Instant; @@ -275,6 +276,30 @@ void testCreateSchedulerWithPollingStrategy() { verify(workerIdentifierGenerator, times(1)).generate(); } + @Test + void testCreateSchedulerWithPollingStrategyAndPollingConfig() { + when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); + when(kinesisSourceConfig.getPollingConfig()).thenReturn(kinesisStreamPollingConfig); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertEquals(kinesisService.getApplicationName(), pipelineName); + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.checkpointConfig()); + assertNotNull(schedulerObjectUnderTest.leaseManagementConfig()); + assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + assertNotNull(schedulerObjectUnderTest.lifecycleConfig()); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); + assertNotNull(schedulerObjectUnderTest.processorConfig()); + assertNotNull(schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()); + assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).maxRecords(), kinesisStreamPollingConfig.getMaxPollingRecords()); + assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).idleTimeBetweenReadsInMillis(), kinesisStreamPollingConfig.getIdleTimeBetweenReads().toMillis()); + assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + verify(workerIdentifierGenerator, times(1)).generate(); + } + @Test void testServiceStartNullBufferThrows() { KinesisService kinesisService = createObjectUnderTest();