From 31772879c5b04f14043d56bd98c08c8fa1f65c26 Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Tue, 26 Mar 2024 14:11:37 -0400 Subject: [PATCH 1/3] feat!: enabling sqs event processing --- Cargo.toml | 9 ++++---- src/main.rs | 62 +++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3170254..c6e369b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,10 +11,11 @@ authors = [ anyhow = "1.0.75" aws-config = "0.56.0" aws-sdk-s3 = "0.29.0" -aws_lambda_events = { version = "0.10.0", default-features = false, features = ["s3"] } -chrono = "0.4.26" +aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] } +deltalake = { version = "0.16.5", features = ["s3", "json"]} +chrono = "0.4.31" liquid = "0.26" - +serde = { version = "=1", features = ["rc"] } lambda_runtime = "0.8.1" routefinder = "0.5.3" serde_json = "1.0.105" @@ -22,4 +23,4 @@ tokio = { version = "1", features = ["macros"] } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "tracing-log"] } urlencoding = "2.1.3" - +url = { version = "2.3", features = ["serde"] } diff --git a/src/main.rs b/src/main.rs index 2982927..a4b6504 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,10 +3,64 @@ use aws_sdk_s3::Client as S3Client; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use routefinder::Router; use tracing::log::*; +use aws_lambda_events::sqs::SqsEvent; +// use deltalake::{DeltaResult}; use std::collections::HashMap; -async fn function_handler(event: LambdaEvent, client: &S3Client) -> Result<(), Error> { + +/// A simple structure to make deserializing test events for identification easier +/// +/// See +#[derive(serde::Deserialize)] +#[serde(rename_all = "PascalCase")] +struct TestEvent { + event: String, +} + + +/// Convert the given [aws_lambda_events::sqs::SqsEvent] to a collection of +/// [aws_lambda_events::s3::S3EventRecord] entities. This is mostly useful for handling S3 Bucket +/// Notifications which have been passed into SQS +/// +/// In the case where the [aws_lambda_events::sqs::SqsEvent] contains an `s3:TestEvent` which is +/// fired when S3 Bucket Notifications are first enabled, the event will be ignored to avoid +/// errorsin the processing pipeline +async fn s3_from_sqs(event: SqsEvent) -> Result { + let mut records = vec![]; + for record in event.records.iter() { + /* each record is an SqsMessage */ + if let Some(body) = &record.body { + match serde_json::from_str::(body) { + Ok(s3event) => { + for s3record in s3event.records { + records.push(s3record.clone()); + } + } + Err(err) => { + // if we cannot deserialize and the event is an s3::TestEvent, then we should + // just return empty records. + let test_event = serde_json::from_str::(body); + // Early exit with the original error if we cannot parse the JSON at all + if test_event.is_err() { + return Err(err.into()); + } + + // Ignore the error on deserialization if the event ends up being an S3 + // TestEvent which is fired when bucket notifications are originally configured + if "s3:TestEvent" != test_event.unwrap().event { + return Err(err.into()); + } + } + }; + } + } + Ok(aws_lambda_events::s3::S3Event { records: records }) +} + + + +async fn function_handler(event: LambdaEvent, client: &S3Client) -> Result<(), Error> { let input_pattern = std::env::var("INPUT_PATTERN").expect("You must define INPUT_PATTERN in the environment"); let output_template = std::env::var("OUTPUT_TEMPLATE") @@ -18,9 +72,9 @@ async fn function_handler(event: LambdaEvent, client: &S3Client) -> Res .parse(&output_template)?; router.add(input_pattern, 1)?; - info!("Processing records: {event:?}"); + let records = s3_from_sqs(event.payload); - for entity in entities_from(event.payload)? { + for entity in entities_from(records.await?)? { debug!("Processing {entity:?}"); if let Some(source_key) = entity.object.key { @@ -221,4 +275,4 @@ mod tests { "databases/oltp/a_table/ds=2023-09-05/some.parquet" ); } -} +} \ No newline at end of file From f46eaf1ccaeea0864e4b2f5102c82529b1f1fd94 Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu <42655948+kuntalkumarbasu@users.noreply.github.com> Date: Thu, 28 Mar 2024 12:36:34 -0400 Subject: [PATCH 2/3] Update Cargo.toml Co-authored-by: R. Tyler Croy --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c6e369b..9c834a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ anyhow = "1.0.75" aws-config = "0.56.0" aws-sdk-s3 = "0.29.0" aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] } -deltalake = { version = "0.16.5", features = ["s3", "json"]} chrono = "0.4.31" liquid = "0.26" serde = { version = "=1", features = ["rc"] } From c7177b2325156871efa4d60bb98a341a43a4bff3 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 28 Mar 2024 11:51:28 -0700 Subject: [PATCH 3/3] Allow the handling of S3 Event Notifications and SQS events directly in the lambda --- src/main.rs | 43 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/src/main.rs b/src/main.rs index a4b6504..59c661c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,12 @@ use aws_lambda_events::event::s3::{S3Entity, S3Event}; +use aws_lambda_events::sqs::SqsEvent; use aws_sdk_s3::Client as S3Client; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use routefinder::Router; use tracing::log::*; -use aws_lambda_events::sqs::SqsEvent; -// use deltalake::{DeltaResult}; use std::collections::HashMap; - /// A simple structure to make deserializing test events for identification easier /// /// See @@ -18,7 +16,6 @@ struct TestEvent { event: String, } - /// Convert the given [aws_lambda_events::sqs::SqsEvent] to a collection of /// [aws_lambda_events::s3::S3EventRecord] entities. This is mostly useful for handling S3 Bucket /// Notifications which have been passed into SQS @@ -26,7 +23,7 @@ struct TestEvent { /// In the case where the [aws_lambda_events::sqs::SqsEvent] contains an `s3:TestEvent` which is /// fired when S3 Bucket Notifications are first enabled, the event will be ignored to avoid /// errorsin the processing pipeline -async fn s3_from_sqs(event: SqsEvent) -> Result { +fn s3_from_sqs(event: SqsEvent) -> Result { let mut records = vec![]; for record in event.records.iter() { /* each record is an SqsMessage */ @@ -55,12 +52,13 @@ async fn s3_from_sqs(event: SqsEvent) -> Result { }; } } - Ok(aws_lambda_events::s3::S3Event { records: records }) + Ok(aws_lambda_events::s3::S3Event { records }) } - - -async fn function_handler(event: LambdaEvent, client: &S3Client) -> Result<(), Error> { +async fn function_handler( + event: LambdaEvent, + client: &S3Client, +) -> Result<(), Error> { let input_pattern = std::env::var("INPUT_PATTERN").expect("You must define INPUT_PATTERN in the environment"); let output_template = std::env::var("OUTPUT_TEMPLATE") @@ -72,9 +70,13 @@ async fn function_handler(event: LambdaEvent, client: &S3Client) -> Re .parse(&output_template)?; router.add(input_pattern, 1)?; - let records = s3_from_sqs(event.payload); - for entity in entities_from(records.await?)? { + let records = match serde_json::from_value::(event.payload.clone()) { + Ok(sqs_event) => s3_from_sqs(sqs_event)?, + Err(_) => serde_json::from_value(event.payload)?, + }; + + for entity in entities_from(records)? { debug!("Processing {entity:?}"); if let Some(source_key) = entity.object.key { @@ -116,13 +118,10 @@ async fn main() -> Result<(), Error> { run(func).await } -/** - * Return the deserialized and useful objects from the event payload - * - * This function will apply a filter to make sure that it is only return objects which have been - * put in this invocation - */ - +/// Return the deserialized and useful objects from the event payload +/// +/// This function will apply a filter to make sure that it is only return objects which have been +/// put in this invocation fn entities_from(event: S3Event) -> Result, anyhow::Error> { Ok(event .records @@ -133,10 +132,8 @@ fn entities_from(event: S3Event) -> Result, anyhow::Error> { .collect()) } -/** - * Take the source key and the already configured router in order to access a collection of - * captured parameters in a HashMap format - */ +/// Take the source key and the already configured router in order to access a collection of +/// captured parameters in a HashMap format fn captured_parameters( router: &Router, source_key: &str, @@ -275,4 +272,4 @@ mod tests { "databases/oltp/a_table/ds=2023-09-05/some.parquet" ); } -} \ No newline at end of file +}