Skip to content

Commit

Permalink
Implementation of sqs-common plugin and refactored sqs and s3 source (#…
Browse files Browse the repository at this point in the history
…5361)

* initial refactoring

Signed-off-by: Jeremy Michael <[email protected]>

* refactored sqs-source to use sqs-common

Signed-off-by: Jeremy Michael <[email protected]>

* refactored SqsWorker to use the common library

Signed-off-by: Jeremy Michael <[email protected]>

* minor changes

Signed-off-by: Jeremy Michael <[email protected]>

* another small fix

Signed-off-by: Jeremy Michael <[email protected]>

* added unit tests for sqs-common

Signed-off-by: Jeremy Michael <[email protected]>

* updated tests

Signed-off-by: Jeremy Michael <[email protected]>

---------

Signed-off-by: Jeremy Michael <[email protected]>
Co-authored-by: Jeremy Michael <[email protected]>
  • Loading branch information
jmsusanto and Jeremy Michael authored Feb 1, 2025
1 parent 03f5539 commit 7a3cf87
Show file tree
Hide file tree
Showing 18 changed files with 581 additions and 279 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:sqs-common')

implementation libs.armeria.core
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.awssdk:s3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.plugins.source.s3.configuration.OnErrorOption;
import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsBackoff;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -154,8 +155,7 @@ public void test_sqsService(int numWorkers) throws IOException {
}

private void clearSqsQueue() {
Backoff backoff = Backoff.exponential(SqsService.INITIAL_DELAY, SqsService.MAXIMUM_DELAY).withJitter(SqsService.JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
Backoff backoff = SqsBackoff.createExponentialBackoff();
final SqsWorker sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
//final SqsService objectUnderTest = createObjectUnderTest();
int sqsMessagesProcessed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.source.s3.configuration.NotificationSourceOption;
import org.opensearch.dataprepper.plugins.source.s3.configuration.OnErrorOption;
import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsBackoff;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sqs.SqsClient;
Expand Down Expand Up @@ -93,8 +94,7 @@ void setUp() {
.region(Region.of(System.getProperty("tests.s3source.region")))
.build();

backoff = Backoff.exponential(SqsService.INITIAL_DELAY, SqsService.MAXIMUM_DELAY).withJitter(SqsService.JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
backoff = SqsBackoff.createExponentialBackoff();

s3SourceConfig = mock(S3SourceConfig.class);
s3Service = mock(S3Service.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.sqs.SqsClient;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsBackoff;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsClientFactory;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
Expand All @@ -27,17 +26,14 @@
public class SqsService {
private static final Logger LOG = LoggerFactory.getLogger(SqsService.class);
static final long SHUTDOWN_TIMEOUT = 30L;
static final long INITIAL_DELAY = Duration.ofSeconds(20).toMillis();
static final long MAXIMUM_DELAY = Duration.ofMinutes(5).toMillis();
static final double JITTER_RATE = 0.20;

private final S3SourceConfig s3SourceConfig;
private final S3Service s3Accessor;
private final SqsClient sqsClient;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final ExecutorService executorService;
private final List<SqsWorker> sqsWorkers;
private final Backoff backoff;

public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
final S3SourceConfig s3SourceConfig,
Expand All @@ -48,11 +44,9 @@ public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
this.s3Accessor = s3Accessor;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sqsClient = createSqsClient(credentialsProvider);
this.sqsClient = SqsClientFactory.createSqsClient(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion(), credentialsProvider);
executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-sqs"));

final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
backoff = SqsBackoff.createExponentialBackoff();
sqsWorkers = IntStream.range(0, s3SourceConfig.getNumWorkers())
.mapToObj(i -> new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff))
.collect(Collectors.toList());
Expand All @@ -62,17 +56,6 @@ public void start() {
sqsWorkers.forEach(executorService::submit);
}

SqsClient createSqsClient(final AwsCredentialsProvider credentialsProvider) {
LOG.debug("Creating SQS client");
return SqsClient.builder()
.region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}

public void stop() {
executorService.shutdown();
sqsWorkers.forEach(SqsWorker::stop);
Expand Down
28 changes: 28 additions & 0 deletions data-prepper-plugins/sqs-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation libs.armeria.core
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:sqs'
implementation 'software.amazon.awssdk:arns'
implementation 'software.amazon.awssdk:sts'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:blocking-buffer')
}
test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.sqs.common;

import com.linecorp.armeria.client.retry.Backoff;
import java.time.Duration;

public final class SqsBackoff {
private static final long INITIAL_DELAY_MILLIS = Duration.ofSeconds(20).toMillis();
private static final long MAX_DELAY_MILLIS = Duration.ofMinutes(5).toMillis();
private static final double JITTER_RATE = 0.20;

private SqsBackoff() {}

public static Backoff createExponentialBackoff() {
return Backoff.exponential(INITIAL_DELAY_MILLIS, MAX_DELAY_MILLIS)
.withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs.common;

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

/**
* A common factory to create SQS clients
*/
public final class SqsClientFactory {

private SqsClientFactory() {
}

public static SqsClient createSqsClient(
final Region region,
final AwsCredentialsProvider credentialsProvider) {

return SqsClient.builder()
.region(region)
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
package org.opensearch.dataprepper.plugins.source.sqs.common;

/**
* This exception is thrown when SQS retries are exhausted
Expand Down
Loading

0 comments on commit 7a3cf87

Please sign in to comment.