Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqs source: json codec support to split sqs message into multiple events #5330

Merged
merged 4 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import java.util.HashMap;
import java.util.Map;

public class AttributeHandler {
public static Map<String, String> collectMetadataAttributes(final Message message, final String queueUrl) {
final Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("queueUrl", queueUrl);

for (Map.Entry<MessageSystemAttributeName, String> entry : message.attributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue());
}

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue().stringValue());
}
return metadataMap;
}

public static void applyMetadataAttributes(final Event event, final Map<String, String> attributes) {
final EventMetadata metadata = event.getMetadata();
attributes.forEach(metadata::setAttribute);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class JsonBulkMessageFieldStrategy implements MessageFieldStrategy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rename this to CodecBulkMessageFieldStrategy because it is not bound to JSON.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private final InputCodec codec;

public JsonBulkMessageFieldStrategy(final InputCodec codec) {
this.codec = codec;
}

@Override
public List<Event> parseEvents(final String messageBody) {
final List<Event> events = new ArrayList<>();
final ByteArrayInputStream inputStream = new ByteArrayInputStream(messageBody.getBytes(StandardCharsets.UTF_8));
try {
codec.parse(inputStream, (Consumer<Record<Event>>) record -> events.add(record.getData()));
} catch (Exception e) {
throw new RuntimeException("Failed to parse events from SQS body.", e);
}
return events;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import java.util.List;

public interface MessageFieldStrategy {
// Convert the SQS message body into one or more events.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this into a Javadoc comment.

List<Event> parseEvents(String messageBody);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand All @@ -15,6 +20,7 @@

import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.configuration.PluginModel;

public class QueueConfig {

Expand Down Expand Up @@ -62,6 +68,9 @@ public class QueueConfig {
@DurationMax(seconds = 20)
private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS;

@JsonProperty("codec")
private PluginModel codec = null;

public String getUrl() {
return url;
}
Expand Down Expand Up @@ -93,5 +102,10 @@ public Duration getWaitTime() {
public Duration getPollDelay() {
return pollDelay;
}

public PluginModel getCodec() {
return codec;
}

}

Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;

public class RawSqsMessageHandler implements SqsMessageHandler {

private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class);
private final MessageFieldStrategy messageFieldStrategy;

public RawSqsMessageHandler(final MessageFieldStrategy messageFieldStrategy) {
this.messageFieldStrategy = messageFieldStrategy;
}

@Override
public void handleMessage(final Message message,
Expand All @@ -31,32 +38,18 @@ public void handleMessage(final Message message,
final int bufferTimeoutMillis,
final AcknowledgementSet acknowledgementSet) {
try {
final Map<MessageSystemAttributeName, String> systemAttributes = message.attributes();
final Map<String, MessageAttributeValue> customAttributes = message.messageAttributes();
final Event event = JacksonEvent.builder()
.withEventType("DOCUMENT")
.withData(Collections.singletonMap("message", message.body()))
.build();

final EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute("queueUrl", url);

for (Map.Entry<MessageSystemAttributeName, String> entry : systemAttributes.entrySet()) {
String originalKey = entry.getKey().toString();
String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue());
List<Event> events = messageFieldStrategy.parseEvents(message.body());
Map<String, String> metadataMap = AttributeHandler.collectMetadataAttributes(message, url);
List<Record<Event>> records = new ArrayList<>();
for (Event event : events) {
AttributeHandler.applyMetadataAttributes(event, metadataMap);
if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
records.add(new Record<>(event));
}
buffer.writeAll(records, bufferTimeoutMillis);

for (Map.Entry<String, MessageAttributeValue> entry : customAttributes.entrySet()) {
String originalKey = entry.getKey();
String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue());
}

if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
buffer.write(new Record<>(event), bufferTimeoutMillis);
} catch (Exception e) {
LOG.error("Error processing SQS message: {}", e.getMessage(), e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand All @@ -9,6 +14,10 @@
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -35,9 +44,9 @@ public class SqsService {
static final double JITTER_RATE = 0.20;

private final SqsSourceConfig sqsSourceConfig;
private final SqsEventProcessor sqsEventProcessor;
private final SqsClient sqsClient;
private final PluginMetrics pluginMetrics;
private final PluginFactory pluginFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private final List<ExecutorService> allSqsUrlExecutorServices;
private final List<SqsWorker> sqsWorkers;
Expand All @@ -46,13 +55,13 @@ public class SqsService {
public SqsService(final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager,
final SqsSourceConfig sqsSourceConfig,
final SqsEventProcessor sqsEventProcessor,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final AwsCredentialsProvider credentialsProvider) {

this.sqsSourceConfig = sqsSourceConfig;
this.sqsEventProcessor = sqsEventProcessor;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.allSqsUrlExecutorServices = new ArrayList<>();
this.sqsWorkers = new ArrayList<>();
Expand All @@ -70,8 +79,18 @@ public void start() {
sqsSourceConfig.getQueues().forEach(queueConfig -> {
String queueUrl = queueConfig.getUrl();
String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1);

int numWorkers = queueConfig.getNumWorkers();
SqsEventProcessor sqsEventProcessor;
if (queueConfig.getCodec() != null) {
final PluginModel codecConfiguration = queueConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
MessageFieldStrategy bulkStrategy = new JsonBulkMessageFieldStrategy(codec);
sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(bulkStrategy));
} else {
MessageFieldStrategy standardStrategy = new StandardMessageFieldStrategy();
sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(standardStrategy));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line and line 89 are the same. Refactor to use the same line. Just get the MessageFieldStrategy from the conditional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, i made the change

}
ExecutorService executorService = Executors.newFixedThreadPool(
numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName));
allSqsUrlExecutorServices.add(executorService);
Expand All @@ -80,10 +99,10 @@ public void start() {
buffer,
acknowledgementSetManager,
sqsClient,
sqsEventProcessor,
sqsSourceConfig,
queueConfig,
pluginMetrics,
sqsEventProcessor,
backoff))
.collect(Collectors.toList());

Expand Down
Loading
Loading