Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: enabling sqs event processing #2

Merged
merged 3 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ authors = [
anyhow = "1.0.75"
aws-config = "0.56.0"
aws-sdk-s3 = "0.29.0"
aws_lambda_events = { version = "0.10.0", default-features = false, features = ["s3"] }
chrono = "0.4.26"
aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] }
deltalake = { version = "0.16.5", features = ["s3", "json"]}
kuntalkumarbasu marked this conversation as resolved.
Show resolved Hide resolved
chrono = "0.4.31"
liquid = "0.26"

serde = { version = "=1", features = ["rc"] }
lambda_runtime = "0.8.1"
routefinder = "0.5.3"
serde_json = "1.0.105"
tokio = { version = "1", features = ["macros"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "tracing-log"] }
urlencoding = "2.1.3"

url = { version = "2.3", features = ["serde"] }
62 changes: 58 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,64 @@ use aws_sdk_s3::Client as S3Client;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use routefinder::Router;
use tracing::log::*;
use aws_lambda_events::sqs::SqsEvent;
// use deltalake::{DeltaResult};

use std::collections::HashMap;

async fn function_handler(event: LambdaEvent<S3Event>, client: &S3Client) -> Result<(), Error> {

/// A simple structure to make deserializing test events for identification easier
///
/// See <fhttps://github.com/buoyant-data/oxbow/issues/8>
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct TestEvent {
event: String,
}


/// Convert the given [aws_lambda_events::sqs::SqsEvent] to a collection of
/// [aws_lambda_events::s3::S3EventRecord] entities. This is mostly useful for handling S3 Bucket
/// Notifications which have been passed into SQS
///
/// In the case where the [aws_lambda_events::sqs::SqsEvent] contains an `s3:TestEvent` which is
/// fired when S3 Bucket Notifications are first enabled, the event will be ignored to avoid
/// errorsin the processing pipeline
async fn s3_from_sqs(event: SqsEvent) -> Result<S3Event,anyhow::Error> {
let mut records = vec![];
for record in event.records.iter() {
/* each record is an SqsMessage */
if let Some(body) = &record.body {
match serde_json::from_str::<S3Event>(body) {
Ok(s3event) => {
for s3record in s3event.records {
records.push(s3record.clone());
}
}
Err(err) => {
// if we cannot deserialize and the event is an s3::TestEvent, then we should
// just return empty records.
let test_event = serde_json::from_str::<TestEvent>(body);
// Early exit with the original error if we cannot parse the JSON at all
if test_event.is_err() {
return Err(err.into());
}

// Ignore the error on deserialization if the event ends up being an S3
// TestEvent which is fired when bucket notifications are originally configured
if "s3:TestEvent" != test_event.unwrap().event {
return Err(err.into());
}
}
};
}
}
Ok(aws_lambda_events::s3::S3Event { records: records })
}



async fn function_handler(event: LambdaEvent<SqsEvent>, client: &S3Client) -> Result<(), Error> {
let input_pattern =
std::env::var("INPUT_PATTERN").expect("You must define INPUT_PATTERN in the environment");
let output_template = std::env::var("OUTPUT_TEMPLATE")
Expand All @@ -18,9 +72,9 @@ async fn function_handler(event: LambdaEvent<S3Event>, client: &S3Client) -> Res
.parse(&output_template)?;

router.add(input_pattern, 1)?;
info!("Processing records: {event:?}");
let records = s3_from_sqs(event.payload);

for entity in entities_from(event.payload)? {
for entity in entities_from(records.await?)? {
debug!("Processing {entity:?}");

if let Some(source_key) = entity.object.key {
Expand Down Expand Up @@ -221,4 +275,4 @@ mod tests {
"databases/oltp/a_table/ds=2023-09-05/some.parquet"
);
}
}
}
Loading