diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java new file mode 100644 index 0000000000..338d482f85 --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; +import java.util.HashMap; +import java.util.Map; + +public class AttributeHandler { + public static Map collectMetadataAttributes(final Message message, final String queueUrl) { + final Map metadataMap = new HashMap<>(); + metadataMap.put("queueUrl", queueUrl); + + for (Map.Entry entry : message.attributes().entrySet()) { + String originalKey = entry.getKey().toString(); + String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); + metadataMap.put(key, entry.getValue()); + } + + for (Map.Entry entry : message.messageAttributes().entrySet()) { + String originalKey = entry.getKey().toString(); + String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); + metadataMap.put(key, entry.getValue().stringValue()); + } + return metadataMap; + } + + public static void applyMetadataAttributes(final Event event, final Map attributes) { + final EventMetadata metadata = event.getMetadata(); + attributes.forEach(metadata::setAttribute); + } +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java index 08600cba13..e22fa68652 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java index 99da366de3..3ac9cec011 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/CodecBulkMessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/CodecBulkMessageFieldStrategy.java new file mode 100644 index 0000000000..15581169d9 --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/CodecBulkMessageFieldStrategy.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class CodecBulkMessageFieldStrategy implements MessageFieldStrategy { + + private final InputCodec codec; + + public CodecBulkMessageFieldStrategy(final InputCodec codec) { + this.codec = codec; + } + + @Override + public List parseEvents(final String messageBody) { + final List events = new ArrayList<>(); + final ByteArrayInputStream inputStream = new ByteArrayInputStream(messageBody.getBytes(StandardCharsets.UTF_8)); + try { + codec.parse(inputStream, (Consumer>) record -> events.add(record.getData())); + } catch (Exception e) { + throw new RuntimeException("Failed to parse events from SQS body.", e); + } + return events; + } +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java new file mode 100644 index 0000000000..f7decd3f91 --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.event.Event; +import java.util.List; + +public interface MessageFieldStrategy { + /** + * Converts the SQS message body into one or more events. + */ + List parseEvents(String messageBody); +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java index ca5566d6cd..47e417bc27 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -15,6 +20,7 @@ import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; +import org.opensearch.dataprepper.model.configuration.PluginModel; public class QueueConfig { @@ -62,6 +68,9 @@ public class QueueConfig { @DurationMax(seconds = 20) private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS; + @JsonProperty("codec") + private PluginModel codec = null; + public String getUrl() { return url; } @@ -93,5 +102,10 @@ public Duration getWaitTime() { public Duration getPollDelay() { return pollDelay; } + + public PluginModel getCodec() { + return codec; + } + } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java index 493b7ab7d7..6fc02f238a 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -8,21 +13,23 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.model.Message; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; -import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.ArrayList; public class RawSqsMessageHandler implements SqsMessageHandler { private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class); + private final MessageFieldStrategy messageFieldStrategy; + + public RawSqsMessageHandler(final MessageFieldStrategy messageFieldStrategy) { + this.messageFieldStrategy = messageFieldStrategy; + } @Override public void handleMessage(final Message message, @@ -31,32 +38,18 @@ public void handleMessage(final Message message, final int bufferTimeoutMillis, final AcknowledgementSet acknowledgementSet) { try { - final Map systemAttributes = message.attributes(); - final Map customAttributes = message.messageAttributes(); - final Event event = JacksonEvent.builder() - .withEventType("DOCUMENT") - .withData(Collections.singletonMap("message", message.body())) - .build(); - - final EventMetadata eventMetadata = event.getMetadata(); - eventMetadata.setAttribute("queueUrl", url); - - for (Map.Entry entry : systemAttributes.entrySet()) { - String originalKey = entry.getKey().toString(); - String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); - eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue()); + List events = messageFieldStrategy.parseEvents(message.body()); + Map metadataMap = AttributeHandler.collectMetadataAttributes(message, url); + List> records = new ArrayList<>(); + for (Event event : events) { + AttributeHandler.applyMetadataAttributes(event, metadataMap); + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + records.add(new Record<>(event)); } + buffer.writeAll(records, bufferTimeoutMillis); - for (Map.Entry entry : customAttributes.entrySet()) { - String originalKey = entry.getKey(); - String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); - eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue()); - } - - if (acknowledgementSet != null) { - acknowledgementSet.add(event); - } - buffer.write(new Record<>(event), bufferTimeoutMillis); } catch (Exception e) { LOG.error("Error processing SQS message: {}", e.getMessage(), e); throw new RuntimeException(e); diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java index a03c485c37..c3669c92cb 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java index 79012b5e00..a9585a4a62 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java @@ -1,7 +1,13 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ + package org.opensearch.dataprepper.plugins.source.sqs; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java index 4e1f9507e6..e1fd536cb7 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index d53f269323..672ee9874c 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -9,6 +14,10 @@ import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; + import org.opensearch.dataprepper.model.codec.InputCodec; + import org.opensearch.dataprepper.model.configuration.PluginModel; + import org.opensearch.dataprepper.model.configuration.PluginSetting; + import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -35,9 +44,9 @@ public class SqsService { static final double JITTER_RATE = 0.20; private final SqsSourceConfig sqsSourceConfig; - private final SqsEventProcessor sqsEventProcessor; private final SqsClient sqsClient; private final PluginMetrics pluginMetrics; + private final PluginFactory pluginFactory; private final AcknowledgementSetManager acknowledgementSetManager; private final List allSqsUrlExecutorServices; private final List sqsWorkers; @@ -46,13 +55,13 @@ public class SqsService { public SqsService(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final SqsSourceConfig sqsSourceConfig, - final SqsEventProcessor sqsEventProcessor, final PluginMetrics pluginMetrics, + final PluginFactory pluginFactory, final AwsCredentialsProvider credentialsProvider) { this.sqsSourceConfig = sqsSourceConfig; - this.sqsEventProcessor = sqsEventProcessor; this.pluginMetrics = pluginMetrics; + this.pluginFactory = pluginFactory; this.acknowledgementSetManager = acknowledgementSetManager; this.allSqsUrlExecutorServices = new ArrayList<>(); this.sqsWorkers = new ArrayList<>(); @@ -70,8 +79,19 @@ public void start() { sqsSourceConfig.getQueues().forEach(queueConfig -> { String queueUrl = queueConfig.getUrl(); String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1); - int numWorkers = queueConfig.getNumWorkers(); + SqsEventProcessor sqsEventProcessor; + MessageFieldStrategy strategy; + if (queueConfig.getCodec() != null) { + final PluginModel codecConfiguration = queueConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); + strategy = new CodecBulkMessageFieldStrategy(codec); + } else { + strategy = new StandardMessageFieldStrategy(); + } + + sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(strategy)); ExecutorService executorService = Executors.newFixedThreadPool( numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName)); allSqsUrlExecutorServices.add(executorService); @@ -80,10 +100,10 @@ public void start() { buffer, acknowledgementSetManager, sqsClient, - sqsEventProcessor, sqsSourceConfig, queueConfig, pluginMetrics, + sqsEventProcessor, backoff)) .collect(Collectors.toList()); diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java index 980e59048b..e722b76780 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -12,6 +17,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -21,6 +27,7 @@ public class SqsSource implements Source> { private final PluginMetrics pluginMetrics; + private final PluginFactory pluginFactory; private final SqsSourceConfig sqsSourceConfig; private SqsService sqsService; private final AcknowledgementSetManager acknowledgementSetManager; @@ -31,10 +38,12 @@ public class SqsSource implements Source> { @DataPrepperPluginConstructor public SqsSource(final PluginMetrics pluginMetrics, final SqsSourceConfig sqsSourceConfig, + final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final AwsCredentialsSupplier awsCredentialsSupplier) { this.pluginMetrics = pluginMetrics; + this.pluginFactory = pluginFactory; this.sqsSourceConfig = sqsSourceConfig; this.acknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); this.acknowledgementSetManager = acknowledgementSetManager; @@ -49,9 +58,7 @@ public void start(Buffer> buffer) { } final AwsAuthenticationAdapter awsAuthenticationAdapter = new AwsAuthenticationAdapter(awsCredentialsSupplier, sqsSourceConfig); final AwsCredentialsProvider credentialsProvider = awsAuthenticationAdapter.getCredentialsProvider(); - final SqsMessageHandler rawSqsMessageHandler = new RawSqsMessageHandler(); - final SqsEventProcessor sqsEventProcessor = new SqsEventProcessor(rawSqsMessageHandler); - sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, sqsEventProcessor, pluginMetrics, credentialsProvider); + sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider); sqsService.start(); } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java index c84a3a3d69..1f90d1a711 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java index 3f58906b33..f6de0b9ee1 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -68,21 +73,20 @@ public class SqsWorker implements Runnable { public SqsWorker(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final SqsClient sqsClient, - final SqsEventProcessor sqsEventProcessor, final SqsSourceConfig sqsSourceConfig, final QueueConfig queueConfig, final PluginMetrics pluginMetrics, + final SqsEventProcessor sqsEventProcessor, final Backoff backoff) { this.sqsClient = sqsClient; - this.sqsEventProcessor = sqsEventProcessor; this.queueConfig = queueConfig; this.acknowledgementSetManager = acknowledgementSetManager; this.standardBackoff = backoff; this.endToEndAcknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); this.buffer = buffer; this.bufferTimeoutMillis = (int) sqsSourceConfig.getBufferTimeout().toMillis(); - + this.sqsEventProcessor = sqsEventProcessor; messageVisibilityTimesMap = new HashMap<>(); failedAttemptCount = 0; sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME); diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java new file mode 100644 index 0000000000..defbbf7baa --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import java.util.Collections; +import java.util.List; + +public class StandardMessageFieldStrategy implements MessageFieldStrategy { + @Override + public List parseEvents(final String messageBody) { + final Event event = JacksonEvent.builder() + .withEventType("DOCUMENT") + .withData(Collections.singletonMap("message", messageBody)) + .build(); + return Collections.singletonList(event); + } +} diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java index 04806ff4d3..0fa8cfec6a 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java index 77eeeb519a..6f485d1a9e 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java index f312d8abc6..c6bdb8b644 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -22,6 +27,7 @@ void testDefaultValues() { assertEquals(1, queueConfig.getNumWorkers(), "Number of workers should default to 1"); assertNull(queueConfig.getMaximumMessages(), "Maximum messages should be null by default"); assertEquals(Duration.ofSeconds(0), queueConfig.getPollDelay(), "Poll delay should default to 0 seconds"); + assertNull(queueConfig.getCodec(), "Codec should be null by default"); assertNull(queueConfig.getVisibilityTimeout(), "Visibility timeout should be null by default"); assertFalse(queueConfig.getVisibilityDuplicateProtection(), "Visibility duplicate protection should default to false"); assertEquals(Duration.ofHours(2), queueConfig.getVisibilityDuplicateProtectionTimeout(), diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java index 4606df45c6..ae6205457b 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -8,37 +13,89 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.services.sqs.model.Message; +import org.opensearch.dataprepper.model.codec.InputCodec; +import java.io.InputStream; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singletonMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; class RawSqsMessageHandlerTest { - private final RawSqsMessageHandler rawSqsMessageHandler = new RawSqsMessageHandler(); private Buffer> mockBuffer; private int mockBufferTimeoutMillis; @BeforeEach void setUp() { - mockBuffer = mock(Buffer.class); + mockBuffer = Mockito.mock(Buffer.class); mockBufferTimeoutMillis = 10000; } @Test - void handleMessage_callsBufferWriteOnce() throws Exception { - Message message = Message.builder().body("{\"key\":\"value\"}").build(); + void handleMessage_standardStrategy_callsBufferWriteAllOnce() throws Exception { + MessageFieldStrategy standardMessageFieldStrategy = new StandardMessageFieldStrategy(); + RawSqsMessageHandler handler = new RawSqsMessageHandler(standardMessageFieldStrategy); + Message message = Message.builder() + .body("{\"key\":\"value\"}") + .build(); String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; - rawSqsMessageHandler.handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, null); - ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(mockBuffer, times(1)).write(argumentCaptor.capture(), eq(mockBufferTimeoutMillis)); - Record capturedRecord = argumentCaptor.getValue(); - assertEquals("DOCUMENT", capturedRecord.getData().getMetadata().getEventType(), "Event type should be 'DOCUMENT'"); + handler.handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, null); + + @SuppressWarnings("unchecked") + ArgumentCaptor>> argumentCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(mockBuffer, Mockito.times(1)).writeAll(argumentCaptor.capture(), eq(mockBufferTimeoutMillis)); + List> capturedRecords = argumentCaptor.getValue(); + assertEquals(1, capturedRecords.size(), "Raw strategy should produce exactly one record"); + assertEquals("DOCUMENT", capturedRecords.get(0).getData().getMetadata().getEventType(), + "Event type should be 'DOCUMENT'"); + } + + @Test + void handleMessage_bulkStrategy_callsBufferWriteAllWithMultipleEvents() throws Exception { + InputCodec mockCodec = Mockito.mock(InputCodec.class); + Mockito.doAnswer(invocation -> { + InputStream inputStream = invocation.getArgument(0); + @SuppressWarnings("unchecked") + Consumer> eventConsumer = invocation.getArgument(1); + Event event1 = JacksonEvent.builder() + .withEventType("DOCUMENT") + .withData(singletonMap("key1", "val1")) + .build(); + Event event2 = JacksonEvent.builder() + .withEventType("DOCUMENT") + .withData(singletonMap("key2", "val2")) + .build(); + eventConsumer.accept(new Record<>(event1)); + eventConsumer.accept(new Record<>(event2)); + return null; + }).when(mockCodec).parse(any(InputStream.class), any()); + MessageFieldStrategy bulkStrategy = new CodecBulkMessageFieldStrategy(mockCodec); + RawSqsMessageHandler handler = new RawSqsMessageHandler(bulkStrategy); + String messageBody = "{\"events\":[{\"foo\":\"bar1\"},{\"foo\":\"bar2\"}]}"; + Message message = Message.builder() + .body(messageBody) + .build(); + String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; + handler.handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, null); + @SuppressWarnings("unchecked") + ArgumentCaptor>> argumentCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(mockBuffer, Mockito.times(1)).writeAll(argumentCaptor.capture(), eq(mockBufferTimeoutMillis)); + + List> capturedRecords = argumentCaptor.getValue(); + assertEquals(2, capturedRecords.size(), "Bulk strategy should produce two records"); + for (Record record : capturedRecords) { + assertEquals("DOCUMENT", record.getData().getMetadata().getEventType(), + "Event type should be 'DOCUMENT'"); + } } } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java index e10b8f471f..630e1e8f10 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java index 3bbc44bbe6..695164db82 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -11,6 +16,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -27,9 +33,9 @@ class SqsServiceTest { private SqsSourceConfig sqsSourceConfig; - private SqsEventProcessor sqsEventProcessor; private SqsClient sqsClient; private PluginMetrics pluginMetrics; + private PluginFactory pluginFactory; private AcknowledgementSetManager acknowledgementSetManager; private Buffer> buffer; private AwsCredentialsProvider credentialsProvider; @@ -37,9 +43,9 @@ class SqsServiceTest { @BeforeEach void setUp() { sqsSourceConfig = mock(SqsSourceConfig.class); - sqsEventProcessor = mock(SqsEventProcessor.class); sqsClient = mock(SqsClient.class, withSettings()); pluginMetrics = mock(PluginMetrics.class); + pluginFactory = mock(PluginFactory.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); buffer = mock(Buffer.class); credentialsProvider = mock(AwsCredentialsProvider.class); @@ -55,7 +61,7 @@ void start_with_single_queue_starts_workers() { when(queueConfig.getUrl()).thenReturn("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"); when(queueConfig.getNumWorkers()).thenReturn(2); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); - SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, sqsEventProcessor, pluginMetrics, credentialsProvider)); + SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider)); doReturn(sqsClient).when(sqsService).createSqsClient(credentialsProvider); sqsService.start(); // if no exception is thrown here, then workers have been started } @@ -67,7 +73,7 @@ void stop_should_shutdown_executors_and_workers_and_close_client() throws Interr when(queueConfig.getNumWorkers()).thenReturn(1); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); SqsClient sqsClient = mock(SqsClient.class); - SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, sqsEventProcessor, pluginMetrics, credentialsProvider) { + SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider) { @Override SqsClient createSqsClient(final AwsCredentialsProvider credentialsProvider) { return sqsClient; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java index 29f0443670..6922c9dbd4 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java index cf130c102e..b028c67177 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -12,6 +17,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -27,6 +33,7 @@ class SqsSourceTest { private final String TEST_PIPELINE_NAME = "test_pipeline"; private SqsSource sqsSource; private PluginMetrics pluginMetrics; + private PluginFactory pluginFactory; private SqsSourceConfig sqsSourceConfig; private AcknowledgementSetManager acknowledgementSetManager; private AwsCredentialsSupplier awsCredentialsSupplier; @@ -36,10 +43,11 @@ class SqsSourceTest { @BeforeEach void setUp() { pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + pluginFactory = mock(PluginFactory.class); sqsSourceConfig = mock(SqsSourceConfig.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); - sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, acknowledgementSetManager, awsCredentialsSupplier); + sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, pluginFactory, acknowledgementSetManager, awsCredentialsSupplier); buffer = mock(Buffer.class); } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java index 7bb8e082cc..22bf48596f 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -19,6 +24,7 @@ import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; @@ -72,6 +78,8 @@ class SqsWorkerTest { @Mock private PluginMetrics pluginMetrics; @Mock + private PluginFactory pluginFactory; + @Mock private Backoff backoff; @Mock private Counter sqsMessagesReceivedCounter; @@ -87,17 +95,17 @@ class SqsWorkerTest { private Counter sqsVisibilityTimeoutChangedCount; @Mock private Counter sqsVisibilityTimeoutChangeFailedCount; - private int mockBufferTimeoutMillis = 10000; + private final int mockBufferTimeoutMillis = 10000; private SqsWorker createObjectUnderTest() { return new SqsWorker( buffer, acknowledgementSetManager, sqsClient, - sqsEventProcessor, sqsSourceConfig, queueConfig, pluginMetrics, + sqsEventProcessor, backoff); } @@ -216,7 +224,7 @@ void acknowledgementsEnabled_and_visibilityDuplicateProtectionEnabled_should_cre when(sqsSourceConfig.getAcknowledgements()).thenReturn(true); when(queueConfig.getVisibilityDuplicateProtection()).thenReturn(true); - SqsWorker worker = new SqsWorker(buffer, acknowledgementSetManager, sqsClient, sqsEventProcessor, sqsSourceConfig, queueConfig, pluginMetrics, backoff); + SqsWorker worker = new SqsWorker(buffer, acknowledgementSetManager, sqsClient, sqsSourceConfig, queueConfig, pluginMetrics, sqsEventProcessor, backoff); Message message = Message.builder().messageId("msg-dup").receiptHandle("handle-dup").build(); ReceiveMessageResponse response = ReceiveMessageResponse.builder().messages(message).build(); when(sqsClient.receiveMessage((ReceiveMessageRequest) any())).thenReturn(response);