Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sqs source #2355

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open

feat: add sqs source #2355

wants to merge 9 commits into from

Conversation

cosmic-chichu
Copy link

@cosmic-chichu cosmic-chichu commented Jan 23, 2025

resolves #2367

  • Add SQS source
  • Add unit tests for sqs source
  • Add end to end test for sqs source

vigith and others added 2 commits January 16, 2025 16:14
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Comment on lines 1 to 13
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;

use aws_config::{meta::region::RegionProviderChain, BehaviorVersion};
use aws_sdk_sqs::config::Region;
use aws_sdk_sqs::types::{MessageSystemAttributeName, QueueAttributeName};
use aws_sdk_sqs::{Client};
use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};

use crate::{Error, Result};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group imports in std-external-crate order https://rust-lang.github.io/rustfmt/?version=master&search=#StdExternalCrate%5C%3A
We try to follow this for new changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added --config group_imports=StdExternalCrate to my Rust Rover rustfmt arguments. Should we add a rustfmt.toml?

@vigith
Copy link
Member

vigith commented Jan 25, 2025

please resolve conflicts

Copy link

codecov bot commented Jan 27, 2025

Codecov Report

Attention: Patch coverage is 89.29987% with 81 lines in your changes missing coverage. Please review.

Project coverage is 69.87%. Comparing base (8e9bafb) to head (d0e426e).

Files with missing lines Patch % Lines
rust/extns/numaflow-sqs/src/source.rs 88.31% 50 Missing ⚠️
rust/numaflow-core/src/source/sqs.rs 88.21% 31 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2355      +/-   ##
==========================================
+ Coverage   69.84%   69.87%   +0.03%     
==========================================
  Files         361      364       +3     
  Lines       49935    50823     +888     
==========================================
+ Hits        34878    35515     +637     
- Misses      13979    14229     +250     
- Partials     1078     1079       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

tonic = "0.12.3"
prost = "0.11.9"
thiserror = "1.0.69"
http = "1.2.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need http?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cosmic-chichu You can use cargo-udeps to identify crates that are not used.

# Install
cargo install cargo-udeps

# Go to the crates directory and run it. It needs nightly compiler
cd rust/extns/numaflow-sqs
cargo +nightly udeps --all-targets

Comment on lines +18 to +21
chrono = "0.4.38"
tonic = "0.12.3"
prost = "0.11.9"
thiserror = "1.0.69"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workspace

Comment on lines +18 to +21
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed with SQS error - {0}")]
SQS(aws_sdk_sqs::Error),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to handle this error in the core, we will have to import aws_sdk_sqs, right? so convert this to numaflow error.

rust/extns/numaflow-sqs/src/lib.rs Outdated Show resolved Hide resolved
@BulkBeing
Copy link
Contributor

@cosmic-chichu Run clippy for the crates in which you made changes and make the changes suggested by clippy:

cd rust/extns/numaflow-sqs

cargo clippy --tests --all-features --no-deps

Comment on lines +28 to +30
pub struct SQSSourceConfig {
pub region: String,
pub queue_name: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have to enhance it with more sqs source related configs like visibility timeout, poll timeout etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add support for all the configurations that is supported by SQS Client.

Comment on lines +25 to +26
/// Used to initialize the SQS client with region and queue settings.
/// Implements serde::Deserialize to support loading from configuration files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to support different ways to authenticate (example role-based)

rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
respond_to: tx,
queue_name: self.queue_name.clone(),
count: self.batch_size as i32,
timeout_at: Instant::now() + self.timeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
timeout_at: Instant::now() + self.timeout,
timeout_at: start + self.timeout,

timeout_at,
} => {
let messages = self.get_messages(queue_name, count, timeout_at).await;
let _ = respond_to.send(messages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use expect?

}
_ => {
tracing::error!("Unsupported attribute name");
let _ = respond_to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use expect?

let status = self.delete_messages(queue_name, offsets).await;
let _ = respond_to.send(status);
}
SQSActorMessage::GetQueueAttributes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this actor message and move this functionality inside pending?

Comment on lines 51 to 55
GetQueueAttributes {
respond_to: oneshot::Sender<Result<Option<usize>>>,
queue_name: String,
attribute_name: QueueAttributeName,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this pending?

}

/// Retrieves messages from SQS with timeout and batching.
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make queue static to avoid dynamic queue url creation and to avoid caching?

@yhl25 yhl25 added this to the 1.5 milestone Jan 31, 2025
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
event_time: message.event_time,
watermark: None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not set Watermark based of SentTimestamp in the Attributes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SQS source implementation in rust
4 participants