diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index 2dd63d5268..457696810d 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -42,7 +42,7 @@ public class SqsService { private final AcknowledgementSetManager acknowledgementSetManager; private final List allSqsUrlExecutorServices; private final List sqsWorkers; - private final BufferAccumulator> bufferAccumulator; + private final Buffer> buffer; public SqsService(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, @@ -58,8 +58,7 @@ public SqsService(final Buffer> buffer, this.allSqsUrlExecutorServices = new ArrayList<>(); this.sqsWorkers = new ArrayList<>(); this.sqsClient = createSqsClient(credentialsProvider); - this.bufferAccumulator = BufferAccumulator.create(buffer, sqsSourceConfig.getNumberOfRecordsToAccumulate(), sqsSourceConfig.getBufferTimeout()); - + this.buffer = buffer; } @@ -77,7 +76,7 @@ public void start() { allSqsUrlExecutorServices.add(executorService); List workers = IntStream.range(0, numWorkers) .mapToObj(i -> new SqsWorker( - bufferAccumulator, + buffer, acknowledgementSetManager, sqsClient, sqsEventProcessor, diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java index c47cf24765..27cfd54f84 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java @@ -27,6 +27,7 @@ import software.amazon.awssdk.services.sqs.model.SqsException; import software.amazon.awssdk.services.sts.model.StsException; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.buffer.Buffer; import java.time.Duration; import java.util.ArrayList; @@ -67,7 +68,7 @@ public class SqsWorker implements Runnable { private final BufferAccumulator> bufferAccumulator; private Map messageVisibilityTimesMap; - public SqsWorker(final BufferAccumulator> bufferAccumulator, + public SqsWorker(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final SqsClient sqsClient, final SqsEventProcessor sqsEventProcessor, @@ -76,13 +77,14 @@ public SqsWorker(final BufferAccumulator> bufferAccumulator, final PluginMetrics pluginMetrics, final Backoff backoff) { - this.bufferAccumulator = bufferAccumulator; this.sqsClient = sqsClient; this.sqsEventProcessor = sqsEventProcessor; this.queueConfig = queueConfig; this.acknowledgementSetManager = acknowledgementSetManager; this.standardBackoff = backoff; this.endToEndAcknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); + this.bufferAccumulator = BufferAccumulator.create(buffer, sqsSourceConfig.getNumberOfRecordsToAccumulate(), sqsSourceConfig.getBufferTimeout()); + messageVisibilityTimesMap = new HashMap<>(); failedAttemptCount = 0;