Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sqs source #2355

Merged
merged 14 commits into from
Feb 11, 2025
12 changes: 7 additions & 5 deletions rust/numaflow-core/src/source/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use numaflow_sqs::source::{SQSMessage, SQSSource, SQSSourceBuilder, SQSSourceConfig};

use crate::config::get_vertex_name;
use crate::config::{get_vertex_name, get_vertex_replica};
use crate::error::Error;
use crate::message::{Message, MessageID, Offset, StringOffset};
use crate::source;
Expand All @@ -12,13 +12,14 @@ impl TryFrom<SQSMessage> for Message {
type Error = Error;

fn try_from(message: SQSMessage) -> crate::Result<Self> {
let offset = Offset::String(StringOffset::new(message.offset, 0));
let offset = Offset::String(StringOffset::new(message.offset, *get_vertex_replica()));
Ok(Message {
keys: Arc::from(vec![message.key]),
tags: None,
value: message.payload,
offset: Some(offset.clone()),
offset: offset.clone(),
event_time: message.event_time,
watermark: None,
cosmic-chichu marked this conversation as resolved.
Show resolved Hide resolved
id: MessageID {
vertex_name: get_vertex_name().to_string().into(),
offset: offset.to_string().into(),
Expand Down Expand Up @@ -134,7 +135,7 @@ pub mod tests {
assert_eq!(message.value, "value");
assert_eq!(
message.offset,
Some(Offset::String(StringOffset::new("offset".to_string(), 0)))
Offset::String(StringOffset::new("offset".to_string(), 0)),
);
assert_eq!(message.event_time, ts);
assert_eq!(message.headers, headers);
Expand Down Expand Up @@ -174,13 +175,14 @@ pub mod tests {

// create SQS source with test client
use crate::tracker::TrackerHandle;
let tracker_handle = TrackerHandle::new(None);
let tracker_handle = TrackerHandle::new(None, None);
let source = Source::new(
1,
SourceType::SQS(sqs_source),
tracker_handle.clone(),
true,
None,
None,
);

// create sink writer
Expand Down
Loading