diff --git a/Cargo.toml b/Cargo.toml index 3170254..9c834a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,10 +11,10 @@ authors = [ anyhow = "1.0.75" aws-config = "0.56.0" aws-sdk-s3 = "0.29.0" -aws_lambda_events = { version = "0.10.0", default-features = false, features = ["s3"] } -chrono = "0.4.26" +aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] } +chrono = "0.4.31" liquid = "0.26" - +serde = { version = "=1", features = ["rc"] } lambda_runtime = "0.8.1" routefinder = "0.5.3" serde_json = "1.0.105" @@ -22,4 +22,4 @@ tokio = { version = "1", features = ["macros"] } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "tracing-log"] } urlencoding = "2.1.3" - +url = { version = "2.3", features = ["serde"] } diff --git a/src/main.rs b/src/main.rs index 2982927..59c661c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use aws_lambda_events::event::s3::{S3Entity, S3Event}; +use aws_lambda_events::sqs::SqsEvent; use aws_sdk_s3::Client as S3Client; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use routefinder::Router; @@ -6,7 +7,58 @@ use tracing::log::*; use std::collections::HashMap; -async fn function_handler(event: LambdaEvent, client: &S3Client) -> Result<(), Error> { +/// A simple structure to make deserializing test events for identification easier +/// +/// See +#[derive(serde::Deserialize)] +#[serde(rename_all = "PascalCase")] +struct TestEvent { + event: String, +} + +/// Convert the given [aws_lambda_events::sqs::SqsEvent] to a collection of +/// [aws_lambda_events::s3::S3EventRecord] entities. This is mostly useful for handling S3 Bucket +/// Notifications which have been passed into SQS +/// +/// In the case where the [aws_lambda_events::sqs::SqsEvent] contains an `s3:TestEvent` which is +/// fired when S3 Bucket Notifications are first enabled, the event will be ignored to avoid +/// errorsin the processing pipeline +fn s3_from_sqs(event: SqsEvent) -> Result { + let mut records = vec![]; + for record in event.records.iter() { + /* each record is an SqsMessage */ + if let Some(body) = &record.body { + match serde_json::from_str::(body) { + Ok(s3event) => { + for s3record in s3event.records { + records.push(s3record.clone()); + } + } + Err(err) => { + // if we cannot deserialize and the event is an s3::TestEvent, then we should + // just return empty records. + let test_event = serde_json::from_str::(body); + // Early exit with the original error if we cannot parse the JSON at all + if test_event.is_err() { + return Err(err.into()); + } + + // Ignore the error on deserialization if the event ends up being an S3 + // TestEvent which is fired when bucket notifications are originally configured + if "s3:TestEvent" != test_event.unwrap().event { + return Err(err.into()); + } + } + }; + } + } + Ok(aws_lambda_events::s3::S3Event { records }) +} + +async fn function_handler( + event: LambdaEvent, + client: &S3Client, +) -> Result<(), Error> { let input_pattern = std::env::var("INPUT_PATTERN").expect("You must define INPUT_PATTERN in the environment"); let output_template = std::env::var("OUTPUT_TEMPLATE") @@ -18,9 +70,13 @@ async fn function_handler(event: LambdaEvent, client: &S3Client) -> Res .parse(&output_template)?; router.add(input_pattern, 1)?; - info!("Processing records: {event:?}"); - for entity in entities_from(event.payload)? { + let records = match serde_json::from_value::(event.payload.clone()) { + Ok(sqs_event) => s3_from_sqs(sqs_event)?, + Err(_) => serde_json::from_value(event.payload)?, + }; + + for entity in entities_from(records)? { debug!("Processing {entity:?}"); if let Some(source_key) = entity.object.key { @@ -62,13 +118,10 @@ async fn main() -> Result<(), Error> { run(func).await } -/** - * Return the deserialized and useful objects from the event payload - * - * This function will apply a filter to make sure that it is only return objects which have been - * put in this invocation - */ - +/// Return the deserialized and useful objects from the event payload +/// +/// This function will apply a filter to make sure that it is only return objects which have been +/// put in this invocation fn entities_from(event: S3Event) -> Result, anyhow::Error> { Ok(event .records @@ -79,10 +132,8 @@ fn entities_from(event: S3Event) -> Result, anyhow::Error> { .collect()) } -/** - * Take the source key and the already configured router in order to access a collection of - * captured parameters in a HashMap format - */ +/// Take the source key and the already configured router in order to access a collection of +/// captured parameters in a HashMap format fn captured_parameters( router: &Router, source_key: &str,