Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqs source: json codec support to split sqs message into multiple events #5330

Merged
merged 4 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import java.util.HashMap;
import java.util.Map;

public class AttributeHandler {
public static Map<String, String> collectMetadataAttributes(final Message message, final String queueUrl) {
final Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("queueUrl", queueUrl);

for (Map.Entry<MessageSystemAttributeName, String> entry : message.attributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue());
}

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue().stringValue());
}
return metadataMap;
}

public static void applyMetadataAttributes(final Event event, final Map<String, String> attributes) {
final EventMetadata metadata = event.getMetadata();
attributes.forEach(metadata::setAttribute);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class CodecBulkMessageFieldStrategy implements MessageFieldStrategy {

private final InputCodec codec;

public CodecBulkMessageFieldStrategy(final InputCodec codec) {
this.codec = codec;
}

@Override
public List<Event> parseEvents(final String messageBody) {
final List<Event> events = new ArrayList<>();
final ByteArrayInputStream inputStream = new ByteArrayInputStream(messageBody.getBytes(StandardCharsets.UTF_8));
try {
codec.parse(inputStream, (Consumer<Record<Event>>) record -> events.add(record.getData()));
} catch (Exception e) {
throw new RuntimeException("Failed to parse events from SQS body.", e);
}
return events;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import java.util.List;

public interface MessageFieldStrategy {
/**
* Converts the SQS message body into one or more events.
*/
List<Event> parseEvents(String messageBody);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand All @@ -15,6 +20,7 @@

import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.configuration.PluginModel;

public class QueueConfig {

Expand Down Expand Up @@ -62,6 +68,9 @@ public class QueueConfig {
@DurationMax(seconds = 20)
private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS;

@JsonProperty("codec")
private PluginModel codec = null;

public String getUrl() {
return url;
}
Expand Down Expand Up @@ -93,5 +102,10 @@ public Duration getWaitTime() {
public Duration getPollDelay() {
return pollDelay;
}

public PluginModel getCodec() {
return codec;
}

}

Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;

public class RawSqsMessageHandler implements SqsMessageHandler {

private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class);
private final MessageFieldStrategy messageFieldStrategy;

public RawSqsMessageHandler(final MessageFieldStrategy messageFieldStrategy) {
this.messageFieldStrategy = messageFieldStrategy;
}

@Override
public void handleMessage(final Message message,
Expand All @@ -31,32 +38,18 @@ public void handleMessage(final Message message,
final int bufferTimeoutMillis,
final AcknowledgementSet acknowledgementSet) {
try {
final Map<MessageSystemAttributeName, String> systemAttributes = message.attributes();
final Map<String, MessageAttributeValue> customAttributes = message.messageAttributes();
final Event event = JacksonEvent.builder()
.withEventType("DOCUMENT")
.withData(Collections.singletonMap("message", message.body()))
.build();

final EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute("queueUrl", url);

for (Map.Entry<MessageSystemAttributeName, String> entry : systemAttributes.entrySet()) {
String originalKey = entry.getKey().toString();
String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue());
List<Event> events = messageFieldStrategy.parseEvents(message.body());
Map<String, String> metadataMap = AttributeHandler.collectMetadataAttributes(message, url);
List<Record<Event>> records = new ArrayList<>();
for (Event event : events) {
AttributeHandler.applyMetadataAttributes(event, metadataMap);
if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
records.add(new Record<>(event));
}
buffer.writeAll(records, bufferTimeoutMillis);

for (Map.Entry<String, MessageAttributeValue> entry : customAttributes.entrySet()) {
String originalKey = entry.getKey();
String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue());
}

if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
buffer.write(new Record<>(event), bufferTimeoutMillis);
} catch (Exception e) {
LOG.error("Error processing SQS message: {}", e.getMessage(), e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand All @@ -9,6 +14,10 @@
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -35,9 +44,9 @@ public class SqsService {
static final double JITTER_RATE = 0.20;

private final SqsSourceConfig sqsSourceConfig;
private final SqsEventProcessor sqsEventProcessor;
private final SqsClient sqsClient;
private final PluginMetrics pluginMetrics;
private final PluginFactory pluginFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private final List<ExecutorService> allSqsUrlExecutorServices;
private final List<SqsWorker> sqsWorkers;
Expand All @@ -46,13 +55,13 @@ public class SqsService {
public SqsService(final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager,
final SqsSourceConfig sqsSourceConfig,
final SqsEventProcessor sqsEventProcessor,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final AwsCredentialsProvider credentialsProvider) {

this.sqsSourceConfig = sqsSourceConfig;
this.sqsEventProcessor = sqsEventProcessor;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.allSqsUrlExecutorServices = new ArrayList<>();
this.sqsWorkers = new ArrayList<>();
Expand All @@ -70,8 +79,19 @@ public void start() {
sqsSourceConfig.getQueues().forEach(queueConfig -> {
String queueUrl = queueConfig.getUrl();
String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1);

int numWorkers = queueConfig.getNumWorkers();
SqsEventProcessor sqsEventProcessor;
MessageFieldStrategy strategy;
if (queueConfig.getCodec() != null) {
final PluginModel codecConfiguration = queueConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
strategy = new CodecBulkMessageFieldStrategy(codec);
} else {
strategy = new StandardMessageFieldStrategy();
}

sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(strategy));
ExecutorService executorService = Executors.newFixedThreadPool(
numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName));
allSqsUrlExecutorServices.add(executorService);
Expand All @@ -80,10 +100,10 @@ public void start() {
buffer,
acknowledgementSetManager,
sqsClient,
sqsEventProcessor,
sqsSourceConfig,
queueConfig,
pluginMetrics,
sqsEventProcessor,
backoff))
.collect(Collectors.toList());

Expand Down
Loading
Loading