Skip to content

Commit

Permalink
feat(kafka): Add better error message (#4464)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Jan 21, 2025
1 parent 8d47ad5 commit 7f4d035
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 9 deletions.
6 changes: 3 additions & 3 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ use relay_redis::{PooledClient, RedisError, RedisPool, RedisPools, RedisScripts}
use relay_system::{channel, Addr, Service, ServiceRunner};

/// Indicates the type of failure of the server.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
#[derive(Debug, thiserror::Error)]
pub enum ServiceError {
/// GeoIp construction failed.
#[error("could not load the Geoip Db")]
GeoIp,

/// Initializing the Kafka producer failed.
#[cfg(feature = "processing")]
#[error("could not initialize kafka producer")]
Kafka,
#[error("could not initialize kafka producer: {0}")]
Kafka(String),

/// Initializing the Redis cluster client failed.
#[cfg(feature = "processing")]
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fmt, mem};

#[cfg(feature = "processing")]
use anyhow::Context;
use chrono::{DateTime, SecondsFormat, Utc};
use relay_base_schema::organization::OrganizationId;
use relay_base_schema::project::ProjectId;
Expand Down Expand Up @@ -817,10 +815,12 @@ impl KafkaOutcomesProducer {
let mut client_builder = KafkaClient::builder();

for topic in &[KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
let kafka_config = &config.kafka_config(*topic).context(ServiceError::Kafka)?;
let kafka_config = &config
.kafka_config(*topic)
.map_err(|e| ServiceError::Kafka(e.to_string()))?;
client_builder = client_builder
.add_kafka_topic_config(*topic, kafka_config, config.kafka_validate_topics())
.context(ServiceError::Kafka)?;
.map_err(|e| ServiceError::Kafka(e.to_string()))?;
}

Ok(Self {
Expand Down
3 changes: 1 addition & 2 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! This module contains the service that forwards events and attachments to the Sentry store.
//! The service uses Kafka topics to forward data to Sentry
use anyhow::Context;
use relay_base_schema::organization::OrganizationId;
use serde::ser::SerializeMap;
use std::borrow::Cow;
Expand Down Expand Up @@ -69,7 +68,7 @@ impl Producer {
let kafka_config = &config.kafka_config(*topic)?;
client_builder = client_builder
.add_kafka_topic_config(*topic, kafka_config, config.kafka_validate_topics())
.context(ServiceError::Kafka)?;
.map_err(|e| ServiceError::Kafka(e.to_string()))?;
}

Ok(Self {
Expand Down

0 comments on commit 7f4d035

Please sign in to comment.