Skip to content

Commit

Permalink
Add queueUrl and sentTimestamp metadata to SQS events
Browse files Browse the repository at this point in the history
Signed-off-by: Jeremy Michael <[email protected]>
  • Loading branch information
Jeremy Michael committed Dec 18, 2024
1 parent 51db627 commit 5e5e84d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Map;
import java.time.Instant;
import java.util.Objects;

/**
Expand All @@ -30,17 +32,27 @@ public class RawSqsMessageHandler implements SqsMessageHandler {
* Processes the SQS message, attempting to parse it as JSON, and adds it to the buffer.
*
* @param message - the SQS message for processing
* @param url - the SQS queue url
* @param bufferAccumulator - the buffer accumulator
* @param acknowledgementSet - the acknowledgement set for end-to-end acknowledgements
*/
@Override
public void handleMessage(final Message message,
final String url,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) {
try {
ObjectNode dataNode = objectMapper.createObjectNode();
dataNode.set("message", parseMessageBody(message.body()));
dataNode.put("queueUrl", url);

Instant now = Instant.now();
int unixTimestamp = (int) now.getEpochSecond();
dataNode.put("sentTimestamp", unixTimestamp);

final Record<Event> event = new Record<Event>(JacksonEvent.builder()
.withEventType("sqs-event")
.withData(objectMapper.createObjectNode().set("message", parseMessageBody(message.body())))
.withData(dataNode)
.build());


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ public class SqsEventProcessor {
}

void addSqsObject(final Message message,
final String url,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) throws IOException {
sqsMessageHandler.handleMessage(message, bufferAccumulator, acknowledgementSet);
sqsMessageHandler.handleMessage(message, url, bufferAccumulator, acknowledgementSet);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

public interface SqsMessageHandler {
void handleMessage(final Message message,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) throws IOException ;
final String url,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) throws IOException ;
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private Optional<DeleteMessageBatchRequestEntry> processSqsObject(
final Message message,
final AcknowledgementSet acknowledgementSet) {
try {
sqsEventProcessor.addSqsObject(message, bufferAccumulator, acknowledgementSet);
sqsEventProcessor.addSqsObject(message, queueConfig.getUrl(), bufferAccumulator, acknowledgementSet);
// TODO: see implementation in s3
return Optional.of(buildDeleteMessageBatchRequestEntry(message));
} catch (final Exception e) {
Expand Down

0 comments on commit 5e5e84d

Please sign in to comment.