Skip to content

Commit

Permalink
sqs source: json codec support to split sqs message into multiple eve…
Browse files Browse the repository at this point in the history
…nts (#5330)

* added json codec support and functionality to split message into multiple events

Signed-off-by: Jeremy Michael <[email protected]>

* Added message strategy and improved metadata handler efficiency

Signed-off-by: Jeremy Michael <[email protected]>

* updated license

Signed-off-by: Jeremy Michael <[email protected]>

* minor changes

Signed-off-by: Jeremy Michael <[email protected]>

---------

Signed-off-by: Jeremy Michael <[email protected]>
Co-authored-by: Jeremy Michael <[email protected]>
  • Loading branch information
jmsusanto and Jeremy Michael authored Jan 15, 2025
1 parent b13a645 commit 7c3681f
Show file tree
Hide file tree
Showing 24 changed files with 367 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import java.util.HashMap;
import java.util.Map;

public class AttributeHandler {
public static Map<String, String> collectMetadataAttributes(final Message message, final String queueUrl) {
final Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("queueUrl", queueUrl);

for (Map.Entry<MessageSystemAttributeName, String> entry : message.attributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue());
}

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue().stringValue());
}
return metadataMap;
}

public static void applyMetadataAttributes(final Event event, final Map<String, String> attributes) {
final EventMetadata metadata = event.getMetadata();
attributes.forEach(metadata::setAttribute);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class CodecBulkMessageFieldStrategy implements MessageFieldStrategy {

private final InputCodec codec;

public CodecBulkMessageFieldStrategy(final InputCodec codec) {
this.codec = codec;
}

@Override
public List<Event> parseEvents(final String messageBody) {
final List<Event> events = new ArrayList<>();
final ByteArrayInputStream inputStream = new ByteArrayInputStream(messageBody.getBytes(StandardCharsets.UTF_8));
try {
codec.parse(inputStream, (Consumer<Record<Event>>) record -> events.add(record.getData()));
} catch (Exception e) {
throw new RuntimeException("Failed to parse events from SQS body.", e);
}
return events;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import java.util.List;

public interface MessageFieldStrategy {
/**
* Converts the SQS message body into one or more events.
*/
List<Event> parseEvents(String messageBody);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand All @@ -15,6 +20,7 @@

import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.configuration.PluginModel;

public class QueueConfig {

Expand Down Expand Up @@ -62,6 +68,9 @@ public class QueueConfig {
@DurationMax(seconds = 20)
private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS;

@JsonProperty("codec")
private PluginModel codec = null;

public String getUrl() {
return url;
}
Expand Down Expand Up @@ -93,5 +102,10 @@ public Duration getWaitTime() {
public Duration getPollDelay() {
return pollDelay;
}

public PluginModel getCodec() {
return codec;
}

}

Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;

public class RawSqsMessageHandler implements SqsMessageHandler {

private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class);
private final MessageFieldStrategy messageFieldStrategy;

public RawSqsMessageHandler(final MessageFieldStrategy messageFieldStrategy) {
this.messageFieldStrategy = messageFieldStrategy;
}

@Override
public void handleMessage(final Message message,
Expand All @@ -31,32 +38,18 @@ public void handleMessage(final Message message,
final int bufferTimeoutMillis,
final AcknowledgementSet acknowledgementSet) {
try {
final Map<MessageSystemAttributeName, String> systemAttributes = message.attributes();
final Map<String, MessageAttributeValue> customAttributes = message.messageAttributes();
final Event event = JacksonEvent.builder()
.withEventType("DOCUMENT")
.withData(Collections.singletonMap("message", message.body()))
.build();

final EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute("queueUrl", url);

for (Map.Entry<MessageSystemAttributeName, String> entry : systemAttributes.entrySet()) {
String originalKey = entry.getKey().toString();
String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue());
List<Event> events = messageFieldStrategy.parseEvents(message.body());
Map<String, String> metadataMap = AttributeHandler.collectMetadataAttributes(message, url);
List<Record<Event>> records = new ArrayList<>();
for (Event event : events) {
AttributeHandler.applyMetadataAttributes(event, metadataMap);
if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
records.add(new Record<>(event));
}
buffer.writeAll(records, bufferTimeoutMillis);

for (Map.Entry<String, MessageAttributeValue> entry : customAttributes.entrySet()) {
String originalKey = entry.getKey();
String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue());
}

if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
buffer.write(new Record<>(event), bufferTimeoutMillis);
} catch (Exception e) {
LOG.error("Error processing SQS message: {}", e.getMessage(), e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand All @@ -9,6 +14,10 @@
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -35,9 +44,9 @@ public class SqsService {
static final double JITTER_RATE = 0.20;

private final SqsSourceConfig sqsSourceConfig;
private final SqsEventProcessor sqsEventProcessor;
private final SqsClient sqsClient;
private final PluginMetrics pluginMetrics;
private final PluginFactory pluginFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private final List<ExecutorService> allSqsUrlExecutorServices;
private final List<SqsWorker> sqsWorkers;
Expand All @@ -46,13 +55,13 @@ public class SqsService {
public SqsService(final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager,
final SqsSourceConfig sqsSourceConfig,
final SqsEventProcessor sqsEventProcessor,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final AwsCredentialsProvider credentialsProvider) {

this.sqsSourceConfig = sqsSourceConfig;
this.sqsEventProcessor = sqsEventProcessor;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.allSqsUrlExecutorServices = new ArrayList<>();
this.sqsWorkers = new ArrayList<>();
Expand All @@ -70,8 +79,19 @@ public void start() {
sqsSourceConfig.getQueues().forEach(queueConfig -> {
String queueUrl = queueConfig.getUrl();
String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1);

int numWorkers = queueConfig.getNumWorkers();
SqsEventProcessor sqsEventProcessor;
MessageFieldStrategy strategy;
if (queueConfig.getCodec() != null) {
final PluginModel codecConfiguration = queueConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
strategy = new CodecBulkMessageFieldStrategy(codec);
} else {
strategy = new StandardMessageFieldStrategy();
}

sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(strategy));
ExecutorService executorService = Executors.newFixedThreadPool(
numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName));
allSqsUrlExecutorServices.add(executorService);
Expand All @@ -80,10 +100,10 @@ public void start() {
buffer,
acknowledgementSetManager,
sqsClient,
sqsEventProcessor,
sqsSourceConfig,
queueConfig,
pluginMetrics,
sqsEventProcessor,
backoff))
.collect(Collectors.toList());

Expand Down
Loading

0 comments on commit 7c3681f

Please sign in to comment.