From 1e1b1e58b102f6a7c8e5d327fe3ee729a2aa2134 Mon Sep 17 00:00:00 2001 From: "KPNNL\\hol504" Date: Tue, 28 Nov 2023 09:04:21 +0100 Subject: [PATCH] Make backtrace optional in dlq --- src/dlq.rs | 114 ++++++++++------------------------------------------- 1 file changed, 20 insertions(+), 94 deletions(-) diff --git a/src/dlq.rs b/src/dlq.rs index a90b215..d3eba7d 100644 --- a/src/dlq.rs +++ b/src/dlq.rs @@ -9,91 +9,19 @@ //! //! It is up to the user to implement the strategy and logic for retrying messages. //! -//! ## How to use -//! On top of your error's you want to send to the DLQ, implement the `ErrorToDlq` trait. -//! -//! ### Example: -//! -//! Example on what to implement in your service: -//! -//! #### ErrorToDlq trait -//! ``` -//! use dsh_sdk::dlq; -//! use std::backtrace::Backtrace; -//! use thiserror::Error; -//! -//! #[derive(Error, Debug)] -//! enum ConsumerError { -//! #[error("Deserialization error: {0}")] -//! DeserializeError(String), -//! } -//! -//! impl dlq::ErrorToDlq for ConsumerError { -//! fn to_dlq(&self, kafka_message: rdkafka::message::OwnedMessage) -> dlq::SendToDlq { -//! let backtrace = Backtrace::force_capture(); -//! dlq::SendToDlq::new(kafka_message, self.retryable(), self.to_string(), backtrace.to_string()) -//! } -//! // Definition if error is retryable or not -//! fn retryable(&self) -> dlq::Retryable { -//! match self { -//! ConsumerError::DeserializeError(e) => dlq::Retryable::NonRetryable, -//! } -//! } -//! } -//! ``` -//! -//! #### Main -//! -//! In your service, create a `dlq::Dlq` struct and run it in a separate tokio task. -//! ```no_run -//! use dsh_sdk::dlq::Dlq; -//! use dsh_sdk::graceful_shutdown::Shutdown; -//! use dsh_sdk::bootstrap::Bootstrap; -//! -//! #[tokio::main] -//! async fn main() -> Result<(), Box> { -//! let bootstrap = Bootstrap::new().await.expect("Error creating bootstrap"); -//! let shutdown = Shutdown::new(); -//! let mut dlq = Dlq::new(&bootstrap, shutdown.clone())?; -//! // get the dlq channel sender to send messages to the dlq -//! // for example in your consumer -//! let dlq_tx = dlq.get_dlq_tx(); -//! // run the dlq in a separate tokio task -//! let dlq_handle = tokio::spawn(async move { -//! dlq.run().await; -//! }); -//! Ok(()) -//! } -//! ``` +//! ### How it works +//! The DLQ struct can //! -//! #### Consumer -//! -//! Example of how to handle the errors in your consumer, it uses a hypothetical deserialize function that returns a `Result`: -//! ```ignore -//! use rdkafka::consumer::stream_consumer::StreamConsumer; -//! use tokio::sync::mpsc; +//! ## How to use +//! 1. Implement the `ErrorToDlq` trait on top your (custom) error type. +//! 2. Initialize the `Dlq` struct in your service in main. +//! 3. Get the dlq channel sender from the `Dlq` struct and use this channel to communicate with the `Dlq` struct from other threads. +//! 4. Run the `Dlq` struct in a separate tokio thread. This will run the producer that will produce towards the dead/retry topics. //! -//! fn deserialize(msg: rdkafka::message::OwnedMessage) -> Result { -//! match msg.payload() { -//! Some(payload) => Ok(String::from_utf8(payload.to_vec())?), -//! None => Err(ConsumerError::DeserializeError("No payload".to_string())), -//! } -//! } +//! The topics are set via environment variables DLQ_DEAD_TOPIC and DLQ_RETRY_TOPIC. //! -//! async fn consume(consumer: &mut StreamConsumer, dlq_tx: &mut mpsc::Sender) { -//! while let Some(message) = consumer.recv().await { -//! match message { -//! Err(e) => {} -//! Ok(m) => { -//! match deserialize(m) { -//! Err(e) => e.to_dlq(msg.detach()).send(&mut dlq_tx).await, -//! Ok(payload) => {println!("Payload: {}", payload)} -//! } -//! } -//! } -//! } -//! } -//! ``` +//! ### Example: +//! See the examples folder on github for a working example. #[cfg(not(any(feature = "rdkafka-ssl", feature = "rdkafka-ssl-vendored")))] compile_error!("feature \"dlq\" requires feature \"rdkafka-ssl\" or \"rdkafka-ssl-vendored\""); @@ -152,7 +80,7 @@ pub struct SendToDlq { kafka_message: OwnedMessage, retryable: Retryable, error: String, - stack_trace: String, + stack_trace: Option, } impl SendToDlq { @@ -161,7 +89,7 @@ impl SendToDlq { kafka_message: OwnedMessage, retryable: Retryable, error: String, - stack_trace: String, + stack_trace: Option, ) -> Self { Self { kafka_message, @@ -223,7 +151,7 @@ impl Dlq { let dlq_dead_topic = env::var("DLQ_DEAD_TOPIC")?; let dlq_retry_topic = env::var("DLQ_RETRY_TOPIC")?; bootstrap.kafka_properties().verify_list_of_topics( - vec![&dlq_dead_topic, &dlq_retry_topic], + &vec![&dlq_dead_topic, &dlq_retry_topic], ReadWriteAccess::Write, )?; Ok(Self { @@ -375,10 +303,9 @@ impl DlqHeaders for OwnedMessage { "dlq_error", Some(dlq_message.error.to_string().as_bytes().to_vec()), ); - hashmap_headers.insert( - "dlq_stack_trace", - Some(dlq_message.stack_trace.as_bytes().to_vec()), - ); + if let Some(stack_trace) = &dlq_message.stack_trace { + hashmap_headers.insert("dlq_stack_trace", Some(stack_trace.as_bytes().to_vec())); + } // update dlq_retries with +1 if exists, else add dlq_retries wiith 1 let retries = hashmap_headers .get("dlq_retries") @@ -452,7 +379,7 @@ mod tests { kafka_message, self.retryable(), self.to_string(), - backtrace.to_string(), + Some(backtrace.to_string()), ) } @@ -602,10 +529,9 @@ mod tests { ); expected_headers.insert("dlq_retries", Some(1_i32.to_be_bytes().to_vec())); expected_headers.insert("dlq_error", Some(error.to_string().as_bytes().to_vec())); - expected_headers.insert( - "dlq_stack_trace", - Some(dlq_message.stack_trace.as_bytes().to_vec()), - ); + if let Some(stack_trace) = &dlq_message.stack_trace { + expected_headers.insert("dlq_stack_trace", Some(stack_trace.as_bytes().to_vec())); + } let result = owned_message.generate_dlq_headers(&mut dlq_message); for header in result.iter() {