diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 7f712d9e73..1b4d90d895 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -10,6 +10,7 @@ package org.opensearch.dataprepper.plugins.kinesis.source; +import com.amazonaws.SdkClientException; import lombok.Setter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -35,6 +36,8 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; +import software.amazon.kinesis.exceptions.ThrottlingException; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.polling.PollingConfig; @@ -134,20 +137,21 @@ public void shutDown() { } public Scheduler getScheduler(final Buffer> buffer) { - - int numRetries = 0; - while (scheduler == null && numRetries++ < kinesisSourceConfig.getMaxInitializationAttempts()) { + int maxAttempts = kinesisSourceConfig.getMaxInitializationAttempts(); + while (scheduler == null && maxAttempts-- > 0 ) { try { scheduler = createScheduler(buffer); + } catch (SdkClientException | KinesisClientLibDependencyException | ThrottlingException ex) { + LOG.error(NOISY, "Caught exception when initializing KCL Scheduler due to {}. Number of remaining retries: {}", ex.getMessage(), maxAttempts); } catch (Exception ex) { - LOG.error(NOISY, "Caught exception when initializing KCL Scheduler. Will retry", ex); + LOG.error(NOISY, "Caught exception when initializing KCL Scheduler. Number of remaining retries: {}", maxAttempts, ex); } if (scheduler == null) { try { Thread.sleep(kinesisSourceConfig.getInitializationBackoffTime().toMillis()); } catch (InterruptedException e){ - LOG.debug("Interrupted exception!"); + LOG.debug("Interrupted exception."); } } }