Skip to content

Commit

Permalink
Make backtrace optional in dlq
Browse files Browse the repository at this point in the history
  • Loading branch information
toelo3 committed Nov 28, 2023
1 parent 8569b97 commit 1e1b1e5
Showing 1 changed file with 20 additions and 94 deletions.
114 changes: 20 additions & 94 deletions src/dlq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,91 +9,19 @@
//!
//! It is up to the user to implement the strategy and logic for retrying messages.
//!
//! ## How to use
//! On top of your error's you want to send to the DLQ, implement the `ErrorToDlq` trait.
//!
//! ### Example:
//!
//! Example on what to implement in your service:
//!
//! #### ErrorToDlq trait
//! ```
//! use dsh_sdk::dlq;
//! use std::backtrace::Backtrace;
//! use thiserror::Error;
//!
//! #[derive(Error, Debug)]
//! enum ConsumerError {
//! #[error("Deserialization error: {0}")]
//! DeserializeError(String),
//! }
//!
//! impl dlq::ErrorToDlq for ConsumerError {
//! fn to_dlq(&self, kafka_message: rdkafka::message::OwnedMessage) -> dlq::SendToDlq {
//! let backtrace = Backtrace::force_capture();
//! dlq::SendToDlq::new(kafka_message, self.retryable(), self.to_string(), backtrace.to_string())
//! }
//! // Definition if error is retryable or not
//! fn retryable(&self) -> dlq::Retryable {
//! match self {
//! ConsumerError::DeserializeError(e) => dlq::Retryable::NonRetryable,
//! }
//! }
//! }
//! ```
//!
//! #### Main
//!
//! In your service, create a `dlq::Dlq` struct and run it in a separate tokio task.
//! ```no_run
//! use dsh_sdk::dlq::Dlq;
//! use dsh_sdk::graceful_shutdown::Shutdown;
//! use dsh_sdk::bootstrap::Bootstrap;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let bootstrap = Bootstrap::new().await.expect("Error creating bootstrap");
//! let shutdown = Shutdown::new();
//! let mut dlq = Dlq::new(&bootstrap, shutdown.clone())?;
//! // get the dlq channel sender to send messages to the dlq
//! // for example in your consumer
//! let dlq_tx = dlq.get_dlq_tx();
//! // run the dlq in a separate tokio task
//! let dlq_handle = tokio::spawn(async move {
//! dlq.run().await;
//! });
//! Ok(())
//! }
//! ```
//! ### How it works
//! The DLQ struct can
//!
//! #### Consumer
//!
//! Example of how to handle the errors in your consumer, it uses a hypothetical deserialize function that returns a `Result<String, ConsumerError>`:
//! ```ignore
//! use rdkafka::consumer::stream_consumer::StreamConsumer;
//! use tokio::sync::mpsc;
//! ## How to use
//! 1. Implement the `ErrorToDlq` trait on top your (custom) error type.
//! 2. Initialize the `Dlq` struct in your service in main.
//! 3. Get the dlq channel sender from the `Dlq` struct and use this channel to communicate with the `Dlq` struct from other threads.
//! 4. Run the `Dlq` struct in a separate tokio thread. This will run the producer that will produce towards the dead/retry topics.
//!
//! fn deserialize(msg: rdkafka::message::OwnedMessage) -> Result<String, ConsumerError> {
//! match msg.payload() {
//! Some(payload) => Ok(String::from_utf8(payload.to_vec())?),
//! None => Err(ConsumerError::DeserializeError("No payload".to_string())),
//! }
//! }
//! The topics are set via environment variables DLQ_DEAD_TOPIC and DLQ_RETRY_TOPIC.
//!
//! async fn consume(consumer: &mut StreamConsumer, dlq_tx: &mut mpsc::Sender<dlq::SendToDlq>) {
//! while let Some(message) = consumer.recv().await {
//! match message {
//! Err(e) => {}
//! Ok(m) => {
//! match deserialize(m) {
//! Err(e) => e.to_dlq(msg.detach()).send(&mut dlq_tx).await,
//! Ok(payload) => {println!("Payload: {}", payload)}
//! }
//! }
//! }
//! }
//! }
//! ```
//! ### Example:
//! See the examples folder on github for a working example.
#[cfg(not(any(feature = "rdkafka-ssl", feature = "rdkafka-ssl-vendored")))]
compile_error!("feature \"dlq\" requires feature \"rdkafka-ssl\" or \"rdkafka-ssl-vendored\"");
Expand Down Expand Up @@ -152,7 +80,7 @@ pub struct SendToDlq {
kafka_message: OwnedMessage,
retryable: Retryable,
error: String,
stack_trace: String,
stack_trace: Option<String>,
}

impl SendToDlq {
Expand All @@ -161,7 +89,7 @@ impl SendToDlq {
kafka_message: OwnedMessage,
retryable: Retryable,
error: String,
stack_trace: String,
stack_trace: Option<String>,
) -> Self {
Self {
kafka_message,
Expand Down Expand Up @@ -223,7 +151,7 @@ impl Dlq {
let dlq_dead_topic = env::var("DLQ_DEAD_TOPIC")?;
let dlq_retry_topic = env::var("DLQ_RETRY_TOPIC")?;
bootstrap.kafka_properties().verify_list_of_topics(
vec![&dlq_dead_topic, &dlq_retry_topic],
&vec![&dlq_dead_topic, &dlq_retry_topic],
ReadWriteAccess::Write,
)?;
Ok(Self {
Expand Down Expand Up @@ -375,10 +303,9 @@ impl DlqHeaders for OwnedMessage {
"dlq_error",
Some(dlq_message.error.to_string().as_bytes().to_vec()),
);
hashmap_headers.insert(
"dlq_stack_trace",
Some(dlq_message.stack_trace.as_bytes().to_vec()),
);
if let Some(stack_trace) = &dlq_message.stack_trace {
hashmap_headers.insert("dlq_stack_trace", Some(stack_trace.as_bytes().to_vec()));
}
// update dlq_retries with +1 if exists, else add dlq_retries wiith 1
let retries = hashmap_headers
.get("dlq_retries")
Expand Down Expand Up @@ -452,7 +379,7 @@ mod tests {
kafka_message,
self.retryable(),
self.to_string(),
backtrace.to_string(),
Some(backtrace.to_string()),
)
}

Expand Down Expand Up @@ -602,10 +529,9 @@ mod tests {
);
expected_headers.insert("dlq_retries", Some(1_i32.to_be_bytes().to_vec()));
expected_headers.insert("dlq_error", Some(error.to_string().as_bytes().to_vec()));
expected_headers.insert(
"dlq_stack_trace",
Some(dlq_message.stack_trace.as_bytes().to_vec()),
);
if let Some(stack_trace) = &dlq_message.stack_trace {
expected_headers.insert("dlq_stack_trace", Some(stack_trace.as_bytes().to_vec()));
}

let result = owned_message.generate_dlq_headers(&mut dlq_message);
for header in result.iter() {
Expand Down

0 comments on commit 1e1b1e5

Please sign in to comment.