Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sqs source #2355

Merged
merged 14 commits into from
Feb 11, 2025
687 changes: 665 additions & 22 deletions rust/Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ members = [
"numaflow-core",
"numaflow-pb",
"extns/numaflow-pulsar",
"numaflow",
"extns/numaflow-sqs",
"numaflow"
]

[workspace.lints.rust]
Expand Down Expand Up @@ -59,6 +60,7 @@ numaflow-models = { path = "numaflow-models" }
backoff = { path = "backoff" }
numaflow-pb = { path = "numaflow-pb" }
numaflow-pulsar = { path = "extns/numaflow-pulsar" }
numaflow-sqs = { path = "extns/numaflow-sqs" }
tokio = "1.43.0"
bytes = "1.9.0"
tracing = "0.1.41"
Expand Down
25 changes: 25 additions & 0 deletions rust/extns/numaflow-sqs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "numaflow-sqs"
version = "0.1.0"
edition = "2021"

[lints]
workspace = true

[dependencies]
tokio.workspace = true
tracing.workspace = true
bytes.workspace = true
serde.workspace = true
aws-config = "1.5.11"
aws-sdk-sqs = "1.51.0"
aws-smithy-runtime = { version = "1.7.6", features = ["test-util"] }
aws-smithy-types = "1.2.11"
chrono = "0.4.38"
tonic = "0.12.3"
prost = "0.11.9"
thiserror = "1.0.69"
Comment on lines +18 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workspace



[dev-dependencies]
aws-smithy-mocks-experimental = "0.2.1"
100 changes: 100 additions & 0 deletions rust/extns/numaflow-sqs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! Library for robust SQS message handling using an actor-based architecture.
//!
//! This module provides a fault-tolerant interface for interacting with Amazon SQS,
//! with a focus on:
//! - Error propagation and handling for AWS SDK errors
//! - Actor-based concurrency model for thread safety
//! - Clean abstraction of SQS operations
use tokio::sync::oneshot;
pub mod source;

/// Custom error types for the SQS client library.
///
/// Design goals:
/// - Ergonomic error handling with thiserror
/// - Clear error propagation from AWS SDK
/// - Explicit handling of actor communication failures
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed with SQS error - {0}")]
Sqs(#[from] aws_sdk_sqs::Error),

#[error("Failed to receive message from channel. Actor task is terminated: {0:?}")]
ActorTaskTerminated(oneshot::error::RecvError),

#[error("{0}")]
Other(String),
}

pub type Result<T> = core::result::Result<T, Error>;

impl From<String> for Error {
fn from(value: String) -> Self {
Error::Other(value)
}
}

#[cfg(test)]
mod tests {
use aws_config::BehaviorVersion;
use aws_smithy_mocks_experimental::{mock, MockResponseInterceptor, RuleMode};
use aws_smithy_types::error::ErrorMetadata;

use super::*;

#[tokio::test]
async fn test_sqs_error_conversion() {
let modeled_error = mock!(aws_sdk_sqs::Client::get_queue_url).then_error(|| {
aws_sdk_sqs::operation::get_queue_url::GetQueueUrlError::generic(
ErrorMetadata::builder().code("InvalidAddress").build(),
)
});

let get_object_mocks = MockResponseInterceptor::new()
.rule_mode(RuleMode::MatchAny)
.with_rule(&modeled_error);

let sqs = aws_sdk_sqs::Client::from_conf(
aws_sdk_sqs::Config::builder()
.behavior_version(BehaviorVersion::latest())
.region(aws_sdk_sqs::config::Region::new("us-east-1"))
.credentials_provider(make_sqs_test_credentials())
.interceptor(get_object_mocks)
.build(),
);
let err = sqs.get_queue_url().send().await.unwrap_err();

let converted_error = Error::Sqs(err.into());
assert!(matches!(converted_error, Error::Sqs(_)));
assert!(converted_error
.to_string()
.contains("Failed with SQS error"));
}

#[test]
fn test_string_error_conversion() {
let str_err = "custom error message".to_string();
let err: Error = str_err.into();
assert!(matches!(err, Error::Other(_)));
assert_eq!(err.to_string(), "custom error message");
}

#[tokio::test]
async fn test_actor_task_terminated() {
let (tx, rx) = oneshot::channel::<()>();
drop(tx); // Force the error
let err = Error::ActorTaskTerminated(rx.await.unwrap_err());
assert!(matches!(err, Error::ActorTaskTerminated(_)));
assert!(err.to_string().contains("Actor task is terminated"));
}

fn make_sqs_test_credentials() -> aws_sdk_sqs::config::Credentials {
aws_sdk_sqs::config::Credentials::new(
"ATESTCLIENT",
"astestsecretkey",
Some("atestsessiontoken".to_string()),
None,
"",
)
}
}
Loading
Loading