Skip to content

Commit

Permalink
Assign individual BufferAccumulators to each SQS worker
Browse files Browse the repository at this point in the history
Signed-off-by: Jeremy Michael <[email protected]>
  • Loading branch information
Jeremy Michael committed Dec 18, 2024
1 parent 5e5e84d commit b79f0d8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class SqsService {
private final AcknowledgementSetManager acknowledgementSetManager;
private final List<ExecutorService> allSqsUrlExecutorServices;
private final List<SqsWorker> sqsWorkers;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Buffer<Record<Event>> buffer;

public SqsService(final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager,
Expand All @@ -58,8 +58,7 @@ public SqsService(final Buffer<Record<Event>> buffer,
this.allSqsUrlExecutorServices = new ArrayList<>();
this.sqsWorkers = new ArrayList<>();
this.sqsClient = createSqsClient(credentialsProvider);
this.bufferAccumulator = BufferAccumulator.create(buffer, sqsSourceConfig.getNumberOfRecordsToAccumulate(), sqsSourceConfig.getBufferTimeout());

this.buffer = buffer;
}


Expand All @@ -77,7 +76,7 @@ public void start() {
allSqsUrlExecutorServices.add(executorService);
List<SqsWorker> workers = IntStream.range(0, numWorkers)
.mapToObj(i -> new SqsWorker(
bufferAccumulator,
buffer,
acknowledgementSetManager,
sqsClient,
sqsEventProcessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.awssdk.services.sqs.model.SqsException;
import software.amazon.awssdk.services.sts.model.StsException;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.buffer.Buffer;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class SqsWorker implements Runnable {
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private Map<Message, Integer> messageVisibilityTimesMap;

public SqsWorker(final BufferAccumulator<Record<Event>> bufferAccumulator,
public SqsWorker(final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager,
final SqsClient sqsClient,
final SqsEventProcessor sqsEventProcessor,
Expand All @@ -76,13 +77,14 @@ public SqsWorker(final BufferAccumulator<Record<Event>> bufferAccumulator,
final PluginMetrics pluginMetrics,
final Backoff backoff) {

this.bufferAccumulator = bufferAccumulator;
this.sqsClient = sqsClient;
this.sqsEventProcessor = sqsEventProcessor;
this.queueConfig = queueConfig;
this.acknowledgementSetManager = acknowledgementSetManager;
this.standardBackoff = backoff;
this.endToEndAcknowledgementsEnabled = sqsSourceConfig.getAcknowledgements();
this.bufferAccumulator = BufferAccumulator.create(buffer, sqsSourceConfig.getNumberOfRecordsToAccumulate(), sqsSourceConfig.getBufferTimeout());

messageVisibilityTimesMap = new HashMap<>();

failedAttemptCount = 0;
Expand Down

0 comments on commit b79f0d8

Please sign in to comment.