From cc8ea10024f799bc8f023ef52e0df5015f254e62 Mon Sep 17 00:00:00 2001 From: Jeremy Michael Date: Tue, 14 Jan 2025 10:52:07 -0800 Subject: [PATCH 1/4] added json codec support and functionality to split message into multiple events Signed-off-by: Jeremy Michael --- .../source/sqs/BulkSqsMessageHandler.java | 73 +++++++++++++++++++ .../plugins/source/sqs/QueueConfig.java | 9 +++ .../plugins/source/sqs/SqsService.java | 22 ++++-- .../plugins/source/sqs/SqsSource.java | 8 +- .../plugins/source/sqs/SqsWorker.java | 5 +- .../source/sqs/BulkSqsMessageHandlerTest.java | 72 ++++++++++++++++++ .../plugins/source/sqs/QueueConfigTest.java | 1 + .../plugins/source/sqs/SqsServiceTest.java | 9 ++- .../plugins/source/sqs/SqsSourceTest.java | 5 +- .../plugins/source/sqs/SqsWorkerTest.java | 9 ++- 10 files changed, 194 insertions(+), 19 deletions(-) create mode 100644 data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java create mode 100644 data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java new file mode 100644 index 0000000000..76d7c75c69 --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +public class BulkSqsMessageHandler implements SqsMessageHandler { + private static final Logger LOG = LoggerFactory.getLogger(BulkSqsMessageHandler.class); + private final InputCodec codec; + + public BulkSqsMessageHandler(final InputCodec codec) { + this.codec = codec; + } + + @Override + public void handleMessage(final Message message, + final String url, + final Buffer> buffer, + final int bufferTimeoutMillis, + final AcknowledgementSet acknowledgementSet) { + try { + final String sqsBody = message.body(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(sqsBody.getBytes(StandardCharsets.UTF_8)); + codec.parse(inputStream, record -> { + final Event event = record.getData(); + final EventMetadata eventMetadata = event.getMetadata(); + eventMetadata.setAttribute("queueUrl", url); + for (Map.Entry entry : message.attributes().entrySet()) { + final String originalKey = entry.getKey().toString(); + final String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);; + eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue()); + } + + for (Map.Entry entry : message.messageAttributes().entrySet()) { + final String originalKey = entry.getKey(); + final String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);; + eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue()); + } + + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + + try { + buffer.write(record, bufferTimeoutMillis); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + }); + } catch (final Exception e) { + LOG.error("Error processing SQS message: {}", e.getMessage(), e); + throw new RuntimeException(e); + } + } +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java index ca5566d6cd..8427f59b83 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java @@ -15,6 +15,7 @@ import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; +import org.opensearch.dataprepper.model.configuration.PluginModel; public class QueueConfig { @@ -62,6 +63,9 @@ public class QueueConfig { @DurationMax(seconds = 20) private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS; + @JsonProperty("codec") + private PluginModel codec = null; + public String getUrl() { return url; } @@ -93,5 +97,10 @@ public Duration getWaitTime() { public Duration getPollDelay() { return pollDelay; } + + public PluginModel getCodec() { + return codec; + } + } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index d53f269323..d5e0d9705d 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -9,6 +9,10 @@ import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; + import org.opensearch.dataprepper.model.codec.InputCodec; + import org.opensearch.dataprepper.model.configuration.PluginModel; + import org.opensearch.dataprepper.model.configuration.PluginSetting; + import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -35,9 +39,9 @@ public class SqsService { static final double JITTER_RATE = 0.20; private final SqsSourceConfig sqsSourceConfig; - private final SqsEventProcessor sqsEventProcessor; private final SqsClient sqsClient; private final PluginMetrics pluginMetrics; + private final PluginFactory pluginFactory; private final AcknowledgementSetManager acknowledgementSetManager; private final List allSqsUrlExecutorServices; private final List sqsWorkers; @@ -46,13 +50,13 @@ public class SqsService { public SqsService(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final SqsSourceConfig sqsSourceConfig, - final SqsEventProcessor sqsEventProcessor, final PluginMetrics pluginMetrics, + final PluginFactory pluginFactory, final AwsCredentialsProvider credentialsProvider) { this.sqsSourceConfig = sqsSourceConfig; - this.sqsEventProcessor = sqsEventProcessor; this.pluginMetrics = pluginMetrics; + this.pluginFactory = pluginFactory; this.acknowledgementSetManager = acknowledgementSetManager; this.allSqsUrlExecutorServices = new ArrayList<>(); this.sqsWorkers = new ArrayList<>(); @@ -70,8 +74,16 @@ public void start() { sqsSourceConfig.getQueues().forEach(queueConfig -> { String queueUrl = queueConfig.getUrl(); String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1); - int numWorkers = queueConfig.getNumWorkers(); + SqsEventProcessor sqsEventProcessor; + if (queueConfig.getCodec() != null) { + final PluginModel codecConfiguration = queueConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); + sqsEventProcessor = new SqsEventProcessor(new BulkSqsMessageHandler(codec)); + } else { + sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler()); + } ExecutorService executorService = Executors.newFixedThreadPool( numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName)); allSqsUrlExecutorServices.add(executorService); @@ -80,10 +92,10 @@ public void start() { buffer, acknowledgementSetManager, sqsClient, - sqsEventProcessor, sqsSourceConfig, queueConfig, pluginMetrics, + sqsEventProcessor, backoff)) .collect(Collectors.toList()); diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java index 980e59048b..efe770f23d 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -21,6 +22,7 @@ public class SqsSource implements Source> { private final PluginMetrics pluginMetrics; + private final PluginFactory pluginFactory; private final SqsSourceConfig sqsSourceConfig; private SqsService sqsService; private final AcknowledgementSetManager acknowledgementSetManager; @@ -31,10 +33,12 @@ public class SqsSource implements Source> { @DataPrepperPluginConstructor public SqsSource(final PluginMetrics pluginMetrics, final SqsSourceConfig sqsSourceConfig, + final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final AwsCredentialsSupplier awsCredentialsSupplier) { this.pluginMetrics = pluginMetrics; + this.pluginFactory = pluginFactory; this.sqsSourceConfig = sqsSourceConfig; this.acknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); this.acknowledgementSetManager = acknowledgementSetManager; @@ -49,9 +53,7 @@ public void start(Buffer> buffer) { } final AwsAuthenticationAdapter awsAuthenticationAdapter = new AwsAuthenticationAdapter(awsCredentialsSupplier, sqsSourceConfig); final AwsCredentialsProvider credentialsProvider = awsAuthenticationAdapter.getCredentialsProvider(); - final SqsMessageHandler rawSqsMessageHandler = new RawSqsMessageHandler(); - final SqsEventProcessor sqsEventProcessor = new SqsEventProcessor(rawSqsMessageHandler); - sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, sqsEventProcessor, pluginMetrics, credentialsProvider); + sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider); sqsService.start(); } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java index 3f58906b33..a0dfb03624 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java @@ -68,21 +68,20 @@ public class SqsWorker implements Runnable { public SqsWorker(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final SqsClient sqsClient, - final SqsEventProcessor sqsEventProcessor, final SqsSourceConfig sqsSourceConfig, final QueueConfig queueConfig, final PluginMetrics pluginMetrics, + final SqsEventProcessor sqsEventProcessor, final Backoff backoff) { this.sqsClient = sqsClient; - this.sqsEventProcessor = sqsEventProcessor; this.queueConfig = queueConfig; this.acknowledgementSetManager = acknowledgementSetManager; this.standardBackoff = backoff; this.endToEndAcknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); this.buffer = buffer; this.bufferTimeoutMillis = (int) sqsSourceConfig.getBufferTimeout().toMillis(); - + this.sqsEventProcessor = sqsEventProcessor; messageVisibilityTimesMap = new HashMap<>(); failedAttemptCount = 0; sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME); diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java new file mode 100644 index 0000000000..8cf1c01965 --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.record.Record; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.io.InputStream; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class BulkSqsMessageHandlerTest { + + private InputCodec mockCodec; + private Buffer> mockBuffer; + private BulkSqsMessageHandler bulkSqsMessageHandler; + private int bufferTimeoutMillis; + + @BeforeEach + void setUp() { + mockCodec = mock(InputCodec.class); + mockBuffer = mock(Buffer.class); + bulkSqsMessageHandler = new BulkSqsMessageHandler(mockCodec); + bufferTimeoutMillis = 10000; + } + + @Test + void handleMessage_callsBufferWriteOnce() throws Exception { + final Message message = Message.builder() + .body("{\"someKey\":\"someValue\"}") + .build(); + final String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; + + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Consumer> eventConsumer = invocation.getArgument(1); + final Event mockEvent = mock(Event.class); + final EventMetadata mockMetadata = mock(EventMetadata.class); + when(mockEvent.getMetadata()).thenReturn(mockMetadata); + when(mockMetadata.getEventType()).thenReturn("DOCUMENT"); + eventConsumer.accept(new Record<>(mockEvent)); + return null; + }).when(mockCodec).parse(any(InputStream.class), any()); + + bulkSqsMessageHandler.handleMessage(message, queueUrl, mockBuffer, bufferTimeoutMillis, null); + ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(mockBuffer, times(1)).write(argumentCaptor.capture(), eq(bufferTimeoutMillis)); + Record capturedRecord = argumentCaptor.getValue(); + assertEquals( + "DOCUMENT", + capturedRecord.getData().getMetadata().getEventType(), + "Event type should be 'DOCUMENT'" + ); + } +} diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java index f312d8abc6..867a84dfe0 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java @@ -22,6 +22,7 @@ void testDefaultValues() { assertEquals(1, queueConfig.getNumWorkers(), "Number of workers should default to 1"); assertNull(queueConfig.getMaximumMessages(), "Maximum messages should be null by default"); assertEquals(Duration.ofSeconds(0), queueConfig.getPollDelay(), "Poll delay should default to 0 seconds"); + assertNull(queueConfig.getCodec(), "Codec should be null by default"); assertNull(queueConfig.getVisibilityTimeout(), "Visibility timeout should be null by default"); assertFalse(queueConfig.getVisibilityDuplicateProtection(), "Visibility duplicate protection should default to false"); assertEquals(Duration.ofHours(2), queueConfig.getVisibilityDuplicateProtectionTimeout(), diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java index 3bbc44bbe6..5fdb6ca3b4 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -27,9 +28,9 @@ class SqsServiceTest { private SqsSourceConfig sqsSourceConfig; - private SqsEventProcessor sqsEventProcessor; private SqsClient sqsClient; private PluginMetrics pluginMetrics; + private PluginFactory pluginFactory; private AcknowledgementSetManager acknowledgementSetManager; private Buffer> buffer; private AwsCredentialsProvider credentialsProvider; @@ -37,9 +38,9 @@ class SqsServiceTest { @BeforeEach void setUp() { sqsSourceConfig = mock(SqsSourceConfig.class); - sqsEventProcessor = mock(SqsEventProcessor.class); sqsClient = mock(SqsClient.class, withSettings()); pluginMetrics = mock(PluginMetrics.class); + pluginFactory = mock(PluginFactory.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); buffer = mock(Buffer.class); credentialsProvider = mock(AwsCredentialsProvider.class); @@ -55,7 +56,7 @@ void start_with_single_queue_starts_workers() { when(queueConfig.getUrl()).thenReturn("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"); when(queueConfig.getNumWorkers()).thenReturn(2); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); - SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, sqsEventProcessor, pluginMetrics, credentialsProvider)); + SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider)); doReturn(sqsClient).when(sqsService).createSqsClient(credentialsProvider); sqsService.start(); // if no exception is thrown here, then workers have been started } @@ -67,7 +68,7 @@ void stop_should_shutdown_executors_and_workers_and_close_client() throws Interr when(queueConfig.getNumWorkers()).thenReturn(1); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); SqsClient sqsClient = mock(SqsClient.class); - SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, sqsEventProcessor, pluginMetrics, credentialsProvider) { + SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider) { @Override SqsClient createSqsClient(final AwsCredentialsProvider credentialsProvider) { return sqsClient; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java index cf130c102e..2709484a34 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -27,6 +28,7 @@ class SqsSourceTest { private final String TEST_PIPELINE_NAME = "test_pipeline"; private SqsSource sqsSource; private PluginMetrics pluginMetrics; + private PluginFactory pluginFactory; private SqsSourceConfig sqsSourceConfig; private AcknowledgementSetManager acknowledgementSetManager; private AwsCredentialsSupplier awsCredentialsSupplier; @@ -36,10 +38,11 @@ class SqsSourceTest { @BeforeEach void setUp() { pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + pluginFactory = mock(PluginFactory.class); sqsSourceConfig = mock(SqsSourceConfig.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); - sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, acknowledgementSetManager, awsCredentialsSupplier); + sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, pluginFactory, acknowledgementSetManager, awsCredentialsSupplier); buffer = mock(Buffer.class); } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java index 7bb8e082cc..a580a8660a 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; @@ -72,6 +73,8 @@ class SqsWorkerTest { @Mock private PluginMetrics pluginMetrics; @Mock + private PluginFactory pluginFactory; + @Mock private Backoff backoff; @Mock private Counter sqsMessagesReceivedCounter; @@ -87,17 +90,17 @@ class SqsWorkerTest { private Counter sqsVisibilityTimeoutChangedCount; @Mock private Counter sqsVisibilityTimeoutChangeFailedCount; - private int mockBufferTimeoutMillis = 10000; + private final int mockBufferTimeoutMillis = 10000; private SqsWorker createObjectUnderTest() { return new SqsWorker( buffer, acknowledgementSetManager, sqsClient, - sqsEventProcessor, sqsSourceConfig, queueConfig, pluginMetrics, + sqsEventProcessor, backoff); } @@ -216,7 +219,7 @@ void acknowledgementsEnabled_and_visibilityDuplicateProtectionEnabled_should_cre when(sqsSourceConfig.getAcknowledgements()).thenReturn(true); when(queueConfig.getVisibilityDuplicateProtection()).thenReturn(true); - SqsWorker worker = new SqsWorker(buffer, acknowledgementSetManager, sqsClient, sqsEventProcessor, sqsSourceConfig, queueConfig, pluginMetrics, backoff); + SqsWorker worker = new SqsWorker(buffer, acknowledgementSetManager, sqsClient, sqsSourceConfig, queueConfig, pluginMetrics, sqsEventProcessor, backoff); Message message = Message.builder().messageId("msg-dup").receiptHandle("handle-dup").build(); ReceiveMessageResponse response = ReceiveMessageResponse.builder().messages(message).build(); when(sqsClient.receiveMessage((ReceiveMessageRequest) any())).thenReturn(response); From 685090738fcaa22e99c5cf71a84215afdb85326a Mon Sep 17 00:00:00 2001 From: Jeremy Michael Date: Tue, 14 Jan 2025 14:11:31 -0800 Subject: [PATCH 2/4] Added message strategy and improved metadata handler efficiency Signed-off-by: Jeremy Michael --- .../plugins/source/sqs/AttributeHandler.java | 34 +++++++++ .../source/sqs/BulkSqsMessageHandler.java | 73 ------------------ .../sqs/JsonBulkMessageFieldStrategy.java | 31 ++++++++ .../source/sqs/MessageFieldStrategy.java | 9 +++ .../source/sqs/RawSqsMessageHandler.java | 46 +++++------ .../plugins/source/sqs/SqsService.java | 6 +- .../sqs/StandardMessageFieldStrategy.java | 17 +++++ .../source/sqs/BulkSqsMessageHandlerTest.java | 72 ------------------ .../source/sqs/RawSqsMessageHandlerTest.java | 76 ++++++++++++++++--- 9 files changed, 176 insertions(+), 188 deletions(-) create mode 100644 data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java delete mode 100644 data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java create mode 100644 data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java create mode 100644 data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java create mode 100644 data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java delete mode 100644 data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java new file mode 100644 index 0000000000..86dda2e03a --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java @@ -0,0 +1,34 @@ +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; +import java.util.HashMap; +import java.util.Map; + +public class AttributeHandler { + public static Map collectMetadataAttributes(final Message message, final String queueUrl) { + final Map metadataMap = new HashMap<>(); + metadataMap.put("queueUrl", queueUrl); + + for (Map.Entry entry : message.attributes().entrySet()) { + String originalKey = entry.getKey().toString(); + String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); + metadataMap.put(key, entry.getValue()); + } + + for (Map.Entry entry : message.messageAttributes().entrySet()) { + String originalKey = entry.getKey().toString(); + String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); + metadataMap.put(key, entry.getValue().stringValue()); + } + return metadataMap; + } + + public static void applyMetadataAttributes(final Event event, final Map attributes) { + final EventMetadata metadata = event.getMetadata(); + attributes.forEach(metadata::setAttribute); + } +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java deleted file mode 100644 index 76d7c75c69..0000000000 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandler.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.sqs; - -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.record.Record; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.sqs.model.Message; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; - -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.concurrent.TimeoutException; - -public class BulkSqsMessageHandler implements SqsMessageHandler { - private static final Logger LOG = LoggerFactory.getLogger(BulkSqsMessageHandler.class); - private final InputCodec codec; - - public BulkSqsMessageHandler(final InputCodec codec) { - this.codec = codec; - } - - @Override - public void handleMessage(final Message message, - final String url, - final Buffer> buffer, - final int bufferTimeoutMillis, - final AcknowledgementSet acknowledgementSet) { - try { - final String sqsBody = message.body(); - ByteArrayInputStream inputStream = new ByteArrayInputStream(sqsBody.getBytes(StandardCharsets.UTF_8)); - codec.parse(inputStream, record -> { - final Event event = record.getData(); - final EventMetadata eventMetadata = event.getMetadata(); - eventMetadata.setAttribute("queueUrl", url); - for (Map.Entry entry : message.attributes().entrySet()) { - final String originalKey = entry.getKey().toString(); - final String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);; - eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue()); - } - - for (Map.Entry entry : message.messageAttributes().entrySet()) { - final String originalKey = entry.getKey(); - final String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);; - eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue()); - } - - if (acknowledgementSet != null) { - acknowledgementSet.add(event); - } - - try { - buffer.write(record, bufferTimeoutMillis); - } catch (TimeoutException e) { - throw new RuntimeException(e); - } - }); - } catch (final Exception e) { - LOG.error("Error processing SQS message: {}", e.getMessage(), e); - throw new RuntimeException(e); - } - } -} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java new file mode 100644 index 0000000000..0642694f7a --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java @@ -0,0 +1,31 @@ +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class JsonBulkMessageFieldStrategy implements MessageFieldStrategy { + + private final InputCodec codec; + + public JsonBulkMessageFieldStrategy(final InputCodec codec) { + this.codec = codec; + } + + @Override + public List parseEvents(final String messageBody) { + final List events = new ArrayList<>(); + final ByteArrayInputStream inputStream = new ByteArrayInputStream(messageBody.getBytes(StandardCharsets.UTF_8)); + try { + codec.parse(inputStream, (Consumer>) record -> events.add(record.getData())); + } catch (Exception e) { + throw new RuntimeException("Failed to parse events from SQS body.", e); + } + return events; + } +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java new file mode 100644 index 0000000000..c75780719c --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java @@ -0,0 +1,9 @@ +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.event.Event; +import java.util.List; + +public interface MessageFieldStrategy { + // Convert the SQS message body into one or more events. + List parseEvents(String messageBody); +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java index 493b7ab7d7..5cec4f4713 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java @@ -8,21 +8,23 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.model.Message; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; -import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.ArrayList; public class RawSqsMessageHandler implements SqsMessageHandler { private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class); + private final MessageFieldStrategy messageFieldStrategy; + + public RawSqsMessageHandler(final MessageFieldStrategy messageFieldStrategy) { + this.messageFieldStrategy = messageFieldStrategy; + } @Override public void handleMessage(final Message message, @@ -31,32 +33,18 @@ public void handleMessage(final Message message, final int bufferTimeoutMillis, final AcknowledgementSet acknowledgementSet) { try { - final Map systemAttributes = message.attributes(); - final Map customAttributes = message.messageAttributes(); - final Event event = JacksonEvent.builder() - .withEventType("DOCUMENT") - .withData(Collections.singletonMap("message", message.body())) - .build(); - - final EventMetadata eventMetadata = event.getMetadata(); - eventMetadata.setAttribute("queueUrl", url); - - for (Map.Entry entry : systemAttributes.entrySet()) { - String originalKey = entry.getKey().toString(); - String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); - eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue()); + List events = messageFieldStrategy.parseEvents(message.body()); + Map metadataMap = AttributeHandler.collectMetadataAttributes(message, url); + List> records = new ArrayList<>(); + for (Event event : events) { + AttributeHandler.applyMetadataAttributes(event, metadataMap); + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + records.add(new Record<>(event)); } + buffer.writeAll(records, bufferTimeoutMillis); - for (Map.Entry entry : customAttributes.entrySet()) { - String originalKey = entry.getKey(); - String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1); - eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue()); - } - - if (acknowledgementSet != null) { - acknowledgementSet.add(event); - } - buffer.write(new Record<>(event), bufferTimeoutMillis); } catch (Exception e) { LOG.error("Error processing SQS message: {}", e.getMessage(), e); throw new RuntimeException(e); diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index d5e0d9705d..cf708f786a 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -80,9 +80,11 @@ public void start() { final PluginModel codecConfiguration = queueConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); - sqsEventProcessor = new SqsEventProcessor(new BulkSqsMessageHandler(codec)); + MessageFieldStrategy bulkStrategy = new JsonBulkMessageFieldStrategy(codec); + sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(bulkStrategy)); } else { - sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler()); + MessageFieldStrategy standardStrategy = new StandardMessageFieldStrategy(); + sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(standardStrategy)); } ExecutorService executorService = Executors.newFixedThreadPool( numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName)); diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java new file mode 100644 index 0000000000..63d573836f --- /dev/null +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java @@ -0,0 +1,17 @@ +package org.opensearch.dataprepper.plugins.source.sqs; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import java.util.Collections; +import java.util.List; + +public class StandardMessageFieldStrategy implements MessageFieldStrategy { + @Override + public List parseEvents(final String messageBody) { + final Event event = JacksonEvent.builder() + .withEventType("DOCUMENT") + .withData(Collections.singletonMap("message", messageBody)) + .build(); + return Collections.singletonList(event); + } +} diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java deleted file mode 100644 index 8cf1c01965..0000000000 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/BulkSqsMessageHandlerTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.sqs; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.record.Record; -import software.amazon.awssdk.services.sqs.model.Message; - -import java.io.InputStream; -import java.util.function.Consumer; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -class BulkSqsMessageHandlerTest { - - private InputCodec mockCodec; - private Buffer> mockBuffer; - private BulkSqsMessageHandler bulkSqsMessageHandler; - private int bufferTimeoutMillis; - - @BeforeEach - void setUp() { - mockCodec = mock(InputCodec.class); - mockBuffer = mock(Buffer.class); - bulkSqsMessageHandler = new BulkSqsMessageHandler(mockCodec); - bufferTimeoutMillis = 10000; - } - - @Test - void handleMessage_callsBufferWriteOnce() throws Exception { - final Message message = Message.builder() - .body("{\"someKey\":\"someValue\"}") - .build(); - final String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; - - doAnswer(invocation -> { - @SuppressWarnings("unchecked") - Consumer> eventConsumer = invocation.getArgument(1); - final Event mockEvent = mock(Event.class); - final EventMetadata mockMetadata = mock(EventMetadata.class); - when(mockEvent.getMetadata()).thenReturn(mockMetadata); - when(mockMetadata.getEventType()).thenReturn("DOCUMENT"); - eventConsumer.accept(new Record<>(mockEvent)); - return null; - }).when(mockCodec).parse(any(InputStream.class), any()); - - bulkSqsMessageHandler.handleMessage(message, queueUrl, mockBuffer, bufferTimeoutMillis, null); - ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(mockBuffer, times(1)).write(argumentCaptor.capture(), eq(bufferTimeoutMillis)); - Record capturedRecord = argumentCaptor.getValue(); - assertEquals( - "DOCUMENT", - capturedRecord.getData().getMetadata().getEventType(), - "Event type should be 'DOCUMENT'" - ); - } -} diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java index 4606df45c6..7b68a3e900 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java @@ -8,37 +8,89 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.services.sqs.model.Message; +import org.opensearch.dataprepper.model.codec.InputCodec; +import java.io.InputStream; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singletonMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; class RawSqsMessageHandlerTest { - private final RawSqsMessageHandler rawSqsMessageHandler = new RawSqsMessageHandler(); private Buffer> mockBuffer; private int mockBufferTimeoutMillis; @BeforeEach void setUp() { - mockBuffer = mock(Buffer.class); + mockBuffer = Mockito.mock(Buffer.class); mockBufferTimeoutMillis = 10000; } @Test - void handleMessage_callsBufferWriteOnce() throws Exception { - Message message = Message.builder().body("{\"key\":\"value\"}").build(); + void handleMessage_standardStrategy_callsBufferWriteAllOnce() throws Exception { + MessageFieldStrategy standardMessageFieldStrategy = new StandardMessageFieldStrategy(); + RawSqsMessageHandler handler = new RawSqsMessageHandler(standardMessageFieldStrategy); + Message message = Message.builder() + .body("{\"key\":\"value\"}") + .build(); String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; - rawSqsMessageHandler.handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, null); - ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(mockBuffer, times(1)).write(argumentCaptor.capture(), eq(mockBufferTimeoutMillis)); - Record capturedRecord = argumentCaptor.getValue(); - assertEquals("DOCUMENT", capturedRecord.getData().getMetadata().getEventType(), "Event type should be 'DOCUMENT'"); + handler.handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, null); + + @SuppressWarnings("unchecked") + ArgumentCaptor>> argumentCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(mockBuffer, Mockito.times(1)).writeAll(argumentCaptor.capture(), eq(mockBufferTimeoutMillis)); + List> capturedRecords = argumentCaptor.getValue(); + assertEquals(1, capturedRecords.size(), "Raw strategy should produce exactly one record"); + assertEquals("DOCUMENT", capturedRecords.get(0).getData().getMetadata().getEventType(), + "Event type should be 'DOCUMENT'"); + } + + @Test + void handleMessage_bulkStrategy_callsBufferWriteAllWithMultipleEvents() throws Exception { + InputCodec mockCodec = Mockito.mock(InputCodec.class); + Mockito.doAnswer(invocation -> { + InputStream inputStream = invocation.getArgument(0); + @SuppressWarnings("unchecked") + Consumer> eventConsumer = invocation.getArgument(1); + Event event1 = JacksonEvent.builder() + .withEventType("DOCUMENT") + .withData(singletonMap("key1", "val1")) + .build(); + Event event2 = JacksonEvent.builder() + .withEventType("DOCUMENT") + .withData(singletonMap("key2", "val2")) + .build(); + eventConsumer.accept(new Record<>(event1)); + eventConsumer.accept(new Record<>(event2)); + return null; + }).when(mockCodec).parse(any(InputStream.class), any()); + MessageFieldStrategy bulkStrategy = new JsonBulkMessageFieldStrategy(mockCodec); + RawSqsMessageHandler handler = new RawSqsMessageHandler(bulkStrategy); + String messageBody = "{\"events\":[{\"foo\":\"bar1\"},{\"foo\":\"bar2\"}]}"; + Message message = Message.builder() + .body(messageBody) + .build(); + String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; + handler.handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, null); + @SuppressWarnings("unchecked") + ArgumentCaptor>> argumentCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(mockBuffer, Mockito.times(1)).writeAll(argumentCaptor.capture(), eq(mockBufferTimeoutMillis)); + + List> capturedRecords = argumentCaptor.getValue(); + assertEquals(2, capturedRecords.size(), "Bulk strategy should produce two records"); + for (Record record : capturedRecords) { + assertEquals("DOCUMENT", record.getData().getMetadata().getEventType(), + "Event type should be 'DOCUMENT'"); + } } } From 0197a67a5592906f425cb6c49d643e12821bbe93 Mon Sep 17 00:00:00 2001 From: Jeremy Michael Date: Tue, 14 Jan 2025 14:16:57 -0800 Subject: [PATCH 3/4] updated license Signed-off-by: Jeremy Michael --- .../plugins/source/sqs/AttributeHandler.java | 10 ++++++++++ .../plugins/source/sqs/AwsAuthenticationAdapter.java | 5 +++++ .../plugins/source/sqs/AwsAuthenticationOptions.java | 5 +++++ .../source/sqs/JsonBulkMessageFieldStrategy.java | 10 ++++++++++ .../plugins/source/sqs/MessageFieldStrategy.java | 10 ++++++++++ .../dataprepper/plugins/source/sqs/QueueConfig.java | 5 +++++ .../plugins/source/sqs/RawSqsMessageHandler.java | 5 +++++ .../plugins/source/sqs/SqsEventProcessor.java | 5 +++++ .../plugins/source/sqs/SqsMessageHandler.java | 6 ++++++ .../source/sqs/SqsRetriesExhaustedException.java | 5 +++++ .../dataprepper/plugins/source/sqs/SqsService.java | 5 +++++ .../dataprepper/plugins/source/sqs/SqsSource.java | 5 +++++ .../plugins/source/sqs/SqsSourceConfig.java | 5 +++++ .../dataprepper/plugins/source/sqs/SqsWorker.java | 5 +++++ .../source/sqs/StandardMessageFieldStrategy.java | 10 ++++++++++ .../source/sqs/AwsAuthenticationAdapterTest.java | 5 +++++ .../source/sqs/AwsAuthenticationOptionsTest.java | 5 +++++ .../plugins/source/sqs/QueueConfigTest.java | 5 +++++ .../plugins/source/sqs/RawSqsMessageHandlerTest.java | 5 +++++ .../plugins/source/sqs/SqsEventProcessorTest.java | 5 +++++ .../dataprepper/plugins/source/sqs/SqsServiceTest.java | 5 +++++ .../plugins/source/sqs/SqsSourceConfigTest.java | 5 +++++ .../dataprepper/plugins/source/sqs/SqsSourceTest.java | 5 +++++ .../dataprepper/plugins/source/sqs/SqsWorkerTest.java | 5 +++++ 24 files changed, 141 insertions(+) diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java index 86dda2e03a..338d482f85 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AttributeHandler.java @@ -1,3 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.source.sqs; import org.opensearch.dataprepper.model.event.Event; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java index 08600cba13..e22fa68652 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapter.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java index 99da366de3..3ac9cec011 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java index 0642694f7a..cf7daaed84 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java @@ -1,3 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.source.sqs; import org.opensearch.dataprepper.model.codec.InputCodec; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java index c75780719c..bf8d6092cb 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java @@ -1,3 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.source.sqs; import org.opensearch.dataprepper.model.event.Event; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java index 8427f59b83..47e417bc27 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java index 5cec4f4713..6fc02f238a 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java index a03c485c37..c3669c92cb 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java index 79012b5e00..a9585a4a62 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java @@ -1,7 +1,13 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ + package org.opensearch.dataprepper.plugins.source.sqs; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java index 4e1f9507e6..e1fd536cb7 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index cf708f786a..9b2023f686 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java index efe770f23d..e722b76780 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java index c84a3a3d69..1f90d1a711 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java index a0dfb03624..f6de0b9ee1 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java index 63d573836f..defbbf7baa 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/StandardMessageFieldStrategy.java @@ -1,3 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.source.sqs; import org.opensearch.dataprepper.model.event.Event; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java index 04806ff4d3..0fa8cfec6a 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationAdapterTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java index 77eeeb519a..6f485d1a9e 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java index 867a84dfe0..c6bdb8b644 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java index 7b68a3e900..2560c8cef6 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java index e10b8f471f..630e1e8f10 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java index 5fdb6ca3b4..695164db82 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java index 29f0443670..6922c9dbd4 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java index 2709484a34..b028c67177 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java index a580a8660a..22bf48596f 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.sqs; From 3641fb7b3e77e20e7e0eff9e02455fb528c47cca Mon Sep 17 00:00:00 2001 From: Jeremy Michael Date: Wed, 15 Jan 2025 09:35:35 -0800 Subject: [PATCH 4/4] minor changes Signed-off-by: Jeremy Michael --- ...dStrategy.java => CodecBulkMessageFieldStrategy.java} | 4 ++-- .../plugins/source/sqs/MessageFieldStrategy.java | 4 +++- .../dataprepper/plugins/source/sqs/SqsService.java | 9 +++++---- .../plugins/source/sqs/RawSqsMessageHandlerTest.java | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) rename data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/{JsonBulkMessageFieldStrategy.java => CodecBulkMessageFieldStrategy.java} (89%) diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/CodecBulkMessageFieldStrategy.java similarity index 89% rename from data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java rename to data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/CodecBulkMessageFieldStrategy.java index cf7daaed84..15581169d9 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/JsonBulkMessageFieldStrategy.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/CodecBulkMessageFieldStrategy.java @@ -19,11 +19,11 @@ import java.util.List; import java.util.function.Consumer; -public class JsonBulkMessageFieldStrategy implements MessageFieldStrategy { +public class CodecBulkMessageFieldStrategy implements MessageFieldStrategy { private final InputCodec codec; - public JsonBulkMessageFieldStrategy(final InputCodec codec) { + public CodecBulkMessageFieldStrategy(final InputCodec codec) { this.codec = codec; } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java index bf8d6092cb..f7decd3f91 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/MessageFieldStrategy.java @@ -14,6 +14,8 @@ import java.util.List; public interface MessageFieldStrategy { - // Convert the SQS message body into one or more events. + /** + * Converts the SQS message body into one or more events. + */ List parseEvents(String messageBody); } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index 9b2023f686..672ee9874c 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -81,16 +81,17 @@ public void start() { String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1); int numWorkers = queueConfig.getNumWorkers(); SqsEventProcessor sqsEventProcessor; + MessageFieldStrategy strategy; if (queueConfig.getCodec() != null) { final PluginModel codecConfiguration = queueConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); - MessageFieldStrategy bulkStrategy = new JsonBulkMessageFieldStrategy(codec); - sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(bulkStrategy)); + strategy = new CodecBulkMessageFieldStrategy(codec); } else { - MessageFieldStrategy standardStrategy = new StandardMessageFieldStrategy(); - sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(standardStrategy)); + strategy = new StandardMessageFieldStrategy(); } + + sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(strategy)); ExecutorService executorService = Executors.newFixedThreadPool( numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName)); allSqsUrlExecutorServices.add(executorService); diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java index 2560c8cef6..ae6205457b 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java @@ -79,7 +79,7 @@ void handleMessage_bulkStrategy_callsBufferWriteAllWithMultipleEvents() throws E eventConsumer.accept(new Record<>(event2)); return null; }).when(mockCodec).parse(any(InputStream.class), any()); - MessageFieldStrategy bulkStrategy = new JsonBulkMessageFieldStrategy(mockCodec); + MessageFieldStrategy bulkStrategy = new CodecBulkMessageFieldStrategy(mockCodec); RawSqsMessageHandler handler = new RawSqsMessageHandler(bulkStrategy); String messageBody = "{\"events\":[{\"foo\":\"bar1\"},{\"foo\":\"bar2\"}]}"; Message message = Message.builder()