diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index 4e0eae55cac..c4c137faa6a 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -1,9 +1,9 @@ //! This module contains the kafka producer related code. use std::borrow::Cow; -use std::cell::Cell; use std::collections::{BTreeMap, HashMap}; use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -22,7 +22,7 @@ use utils::{Context, ThreadedProducer}; #[cfg(feature = "schemas")] mod schemas; -const REPORT_FREQUENCY: Duration = Duration::from_secs(1); +const REPORT_FREQUENCY_SECS: u64 = 1; const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30); /// Kafka producer errors. @@ -87,20 +87,20 @@ pub trait Message { /// Single kafka producer config with assigned topic. struct Producer { - /// Time of the latest collection of stats. - last_report: Cell, /// Kafka topic name. topic_name: String, /// Real kafka producer. producer: Arc, + /// Debouncer for metrics. + metrics: Debounced, } impl Producer { fn new(topic_name: String, producer: Arc) -> Self { Self { - last_report: Cell::new(Instant::now()), topic_name, producer, + metrics: Debounced::new(REPORT_FREQUENCY_SECS), } } @@ -124,10 +124,9 @@ impl Producer { impl fmt::Debug for Producer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Producer") - .field("last_report", &self.last_report) .field("topic_name", &self.topic_name) .field("producer", &"") - .finish() + .finish_non_exhaustive() } } @@ -136,7 +135,7 @@ impl fmt::Debug for Producer { pub struct KafkaClient { producers: HashMap, #[cfg(feature = "schemas")] - schema_validator: std::cell::RefCell, + schema_validator: schemas::Validator, } impl KafkaClient { @@ -156,7 +155,6 @@ impl KafkaClient { let serialized = message.serialize()?; #[cfg(feature = "schemas")] self.schema_validator - .borrow_mut() .validate_message_schema(topic, &serialized) .map_err(ClientError::SchemaValidationFailed)?; let key = message.key(); @@ -261,7 +259,7 @@ impl KafkaClientBuilder { KafkaClient { producers: self.producers, #[cfg(feature = "schemas")] - schema_validator: schemas::Validator::default().into(), + schema_validator: schemas::Validator::default(), } } } @@ -304,14 +302,13 @@ impl Producer { record = record.headers(kafka_headers); } - if self.last_report.get().elapsed() > REPORT_FREQUENCY { - self.last_report.replace(Instant::now()); + self.metrics.debounce(|| { metric!( gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64, variant = variant, topic = topic_name ); - } + }); self.producer .send(record) @@ -332,3 +329,67 @@ impl Producer { }) } } + +struct Debounced { + /// Time of last activation in seconds. + last_activation: AtomicU64, + /// Debounce interval in seconds. + interval: u64, + /// Relative instant used for measurements. + instant: Instant, +} + +impl Debounced { + pub fn new(interval: u64) -> Self { + Self { + last_activation: AtomicU64::new(0), + interval, + instant: Instant::now(), + } + } + + fn debounce(&self, f: impl FnOnce()) -> bool { + // Add interval to make sure it always triggers immediately. + let now = self.instant.elapsed().as_secs() + self.interval; + + let prev = self.last_activation.load(Ordering::Relaxed); + if now.saturating_sub(prev) < self.interval { + return false; + } + + if self + .last_activation + .compare_exchange(prev, now, Ordering::SeqCst, Ordering::Acquire) + .is_ok() + { + f(); + return true; + } + + false + } +} + +#[cfg(test)] +mod tests { + use std::thread; + + use super::*; + + #[test] + fn test_debounce() { + let d = Debounced::new(1); + + assert!(d.debounce(|| {})); + for _ in 0..10 { + assert!(!d.debounce(|| {})); + } + + thread::sleep(Duration::from_secs(1)); + + assert!(d.debounce(|| {})); + for _ in 0..10 { + assert!(!d.debounce(|| {})); + } + } +} diff --git a/relay-kafka/src/producer/schemas.rs b/relay-kafka/src/producer/schemas.rs index 8d8e531036a..1821882829c 100644 --- a/relay-kafka/src/producer/schemas.rs +++ b/relay-kafka/src/producer/schemas.rs @@ -1,5 +1,6 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; use sentry_kafka_schemas::{Schema as SentrySchema, SchemaError as SentrySchemaError}; use thiserror::Error; @@ -39,13 +40,13 @@ pub enum SchemaError { #[derive(Debug, Default)] pub struct Validator { /// Caches the schema for given topics. - schemas: BTreeMap>, + schemas: Mutex>>>, } impl Validator { /// Validate a message for a given topic's schema. pub fn validate_message_schema( - &mut self, + &self, topic: KafkaTopic, message: &[u8], ) -> Result<(), SchemaError> { @@ -59,8 +60,10 @@ impl Validator { .map(drop) } - fn get_schema(&mut self, topic: KafkaTopic) -> Result, SchemaError> { - Ok(match self.schemas.entry(topic) { + fn get_schema(&self, topic: KafkaTopic) -> Result>, SchemaError> { + let mut schemas = self.schemas.lock().unwrap_or_else(|e| e.into_inner()); + + Ok(match schemas.entry(topic) { Entry::Vacant(entry) => entry.insert({ let default_assignments = TopicAssignments::default(); let logical_topic_name = match default_assignments.get(topic) { @@ -69,13 +72,14 @@ impl Validator { }; match sentry_kafka_schemas::get_schema(logical_topic_name, None) { - Ok(schema) => Some(schema), + Ok(schema) => Some(Arc::new(schema)), Err(SentrySchemaError::TopicNotFound) => None, Err(err) => return Err(SchemaError::SchemaCompiled(err)), } }), Entry::Occupied(entry) => entry.into_mut(), } - .as_ref()) + .as_ref() + .map(Arc::clone)) } } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 651dedf15d3..51a01434128 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -115,6 +115,24 @@ fn create_processor_pool(config: &Config) -> Result { Ok(pool) } +#[cfg(feature = "processing")] +fn create_store_pool(config: &Config) -> Result { + // Spawn a store worker for every 12 threads in the processor pool. + // This ratio was found emperically and may need adjustments in the future. + // + // Ideally in the future the store will be single threaded again, after we move + // all the heavy processing (de- and re-serialization) into the processor. + let thread_count = config.cpu_concurrency().div_ceil(12); + relay_log::info!("starting {thread_count} store workers"); + + let pool = crate::utils::ThreadPoolBuilder::new("store") + .num_threads(thread_count) + .runtime(tokio::runtime::Handle::current()) + .build()?; + + Ok(pool) +} + #[derive(Debug)] struct StateInner { config: Arc, @@ -188,6 +206,7 @@ impl ServiceState { .processing_enabled() .then(|| { StoreService::create( + create_store_pool(&config)?, config.clone(), global_config_handle.clone(), outcome_aggregator.clone(), diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 5a834fe9702..996ae998049 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -44,7 +44,6 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; use smallvec::{smallvec, SmallVec}; -use tokio::sync::Semaphore; #[cfg(feature = "processing")] use { @@ -85,6 +84,7 @@ use crate::services::upstream::{ use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{ self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope, + WorkerGroup, }; use crate::{http, metrics}; @@ -1113,7 +1113,7 @@ impl Default for Addrs { } struct InnerProcessor { - pool: ThreadPool, + workers: WorkerGroup, config: Arc, global_config: GlobalConfigHandle, cogs: Cogs, @@ -1152,7 +1152,7 @@ impl EnvelopeProcessorService { }); let inner = InnerProcessor { - pool, + workers: WorkerGroup::new(pool), global_config, cogs, #[cfg(feature = "processing")] @@ -2814,31 +2814,13 @@ impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; fn spawn_handler(self, mut rx: relay_system::Receiver) { - let semaphore = Arc::new(Semaphore::new(self.inner.pool.current_num_threads())); - tokio::spawn(async move { - loop { - let next_msg = async { - let permit_result = semaphore.clone().acquire_owned().await; - // `permit_result` might get dropped when this future is cancelled while awaiting - // `rx.recv()`. This is OK though: No envelope is received so the permit is not - // required. - (rx.recv().await, permit_result) - }; - - tokio::select! { - biased; - - (Some(message), Ok(permit)) = next_msg => { - let service = self.clone(); - self.inner.pool.spawn(move || { - service.handle_message(message); - drop(permit); - }); - }, - - else => break - } + while let Some(message) = rx.recv().await { + let service = self.clone(); + self.inner + .workers + .spawn(move || service.handle_message(message)) + .await; } }); } @@ -3602,39 +3584,6 @@ mod tests { "#); } - /// This is a stand-in test to assert panicking behavior for spawn_blocking. - /// - /// [`EnvelopeProcessorService`] relies on tokio to restart the worker threads for blocking - /// tasks if there is a panic during processing. Tokio does not explicitly mention this behavior - /// in documentation, though the `spawn_blocking` contract suggests that this is intentional. - /// - /// This test should be moved if the worker pool is extracted into a utility. - #[test] - fn test_processor_panics() { - let future = async { - let semaphore = Arc::new(Semaphore::new(1)); - - // loop multiple times to prove that the runtime creates new threads - for _ in 0..3 { - // the previous permit should have been released during panic unwind - let permit = semaphore.clone().acquire_owned().await.unwrap(); - - let handle = tokio::task::spawn_blocking(move || { - let _permit = permit; // drop(permit) after panic!() would warn as "unreachable" - panic!("ignored"); - }); - - assert!(handle.await.is_err()); - } - }; - - tokio::runtime::Builder::new_current_thread() - .max_blocking_threads(1) - .build() - .unwrap() - .block_on(future); - } - /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is /// correct. Unit test is placed here because it has dependencies to relay-server and therefore /// cannot be called from relay-metrics. diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 763ce3d7a12..1e7d3ec2c49 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -36,7 +36,7 @@ use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::Processed; use crate::statsd::{RelayCounters, RelayTimers}; -use crate::utils::{is_rolled_out, FormDataIter, TypedEnvelope}; +use crate::utils::{is_rolled_out, FormDataIter, ThreadPool, TypedEnvelope, WorkerGroup}; /// Fallback name used for attachment items without a `filename` header. const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; @@ -140,6 +140,7 @@ impl FromMessage for Store { /// Service implementing the [`Store`] interface. pub struct StoreService { + workers: WorkerGroup, config: Arc, global_config: GlobalConfigHandle, outcome_aggregator: Addr, @@ -149,6 +150,7 @@ pub struct StoreService { impl StoreService { pub fn create( + pool: ThreadPool, config: Arc, global_config: GlobalConfigHandle, outcome_aggregator: Addr, @@ -156,6 +158,7 @@ impl StoreService { ) -> anyhow::Result { let producer = Producer::create(&config)?; Ok(Self { + workers: WorkerGroup::new(pool), config, global_config, outcome_aggregator, @@ -1083,11 +1086,16 @@ impl Service for StoreService { type Interface = Store; fn spawn_handler(self, mut rx: relay_system::Receiver) { + let this = Arc::new(self); + tokio::spawn(async move { relay_log::info!("store forwarder started"); while let Some(message) = rx.recv().await { - self.handle_message(message); + let service = Arc::clone(&this); + this.workers + .spawn(move || service.handle_message(message)) + .await; } relay_log::info!("store forwarder stopped"); diff --git a/relay-server/src/utils/garbage.rs b/relay-server/src/utils/garbage.rs index f99198f0ada..37bc96ba462 100644 --- a/relay-server/src/utils/garbage.rs +++ b/relay-server/src/utils/garbage.rs @@ -58,6 +58,12 @@ impl GarbageDisposal { } } +impl Default for GarbageDisposal { + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index 0cf4be7b139..205a1f0dbf5 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -1,7 +1,9 @@ +use std::sync::Arc; use std::thread; use tokio::runtime::Handle; pub use rayon::{ThreadPool, ThreadPoolBuildError}; +use tokio::sync::Semaphore; /// Used to create a new [`ThreadPool`] thread pool. pub struct ThreadPoolBuilder { @@ -63,18 +65,74 @@ impl ThreadPoolBuilder { } } +/// A [`WorkerGroup`] adds an async backpressure mechanism to a [`ThreadPool`]. +pub struct WorkerGroup { + pool: ThreadPool, + semaphore: Arc, +} + +impl WorkerGroup { + /// Creates a new worker group from a thread pool. + pub fn new(pool: ThreadPool) -> Self { + // Use `current_num_threads() * 2` to guarantee all threads immediately have a new item to work on. + let semaphore = Arc::new(Semaphore::new(pool.current_num_threads() * 2)); + Self { pool, semaphore } + } + + /// Spawns an asynchronous task on the thread pool. + /// + /// If the thread pool is saturated the returned future is pending until + /// the thread pool has capacity to work on the task. + /// + /// # Examples: + /// + /// ```ignore + /// # async fn test(mut messages: tokio::sync::mpsc::Receiver<()>) { + /// # use relay_server::utils::{WorkerGroup, ThreadPoolBuilder}; + /// # use std::thread; + /// # use std::time::Duration; + /// # let pool = ThreadPoolBuilder::new("test").num_threads(1).build().unwrap(); + /// let workers = WorkerGroup::new(pool); + /// + /// while let Some(message) = messages.recv().await { + /// workers.spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// println!("worked on message {message:?}") + /// }).await; + /// } + /// # } + /// ``` + pub async fn spawn(&self, op: impl FnOnce() + Send + 'static) { + let semaphore = Arc::clone(&self.semaphore); + let permit = semaphore + .acquire_owned() + .await + .expect("the semaphore is never closed"); + + self.pool.spawn(move || { + op(); + drop(permit); + }); + } +} + #[cfg(test)] mod tests { + use std::sync::Barrier; + use std::time::Duration; + + use futures::FutureExt; + use super::*; #[test] - fn test_num_threads() { + fn test_thread_pool_num_threads() { let pool = ThreadPoolBuilder::new("s").num_threads(3).build().unwrap(); assert_eq!(pool.current_num_threads(), 3); } #[test] - fn test_runtime() { + fn test_thread_pool_runtime() { let rt = tokio::runtime::Runtime::new().unwrap(); let pool = ThreadPoolBuilder::new("s") @@ -88,10 +146,67 @@ mod tests { } #[test] - fn test_no_runtime() { + fn test_thread_pool_no_runtime() { let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok()); assert!(!has_runtime); } + + #[test] + fn test_thread_pool_panic() { + let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); + let barrier = Arc::new(Barrier::new(2)); + + pool.spawn({ + let barrier = Arc::clone(&barrier); + move || { + barrier.wait(); + panic!(); + } + }); + barrier.wait(); + + pool.spawn({ + let barrier = Arc::clone(&barrier); + move || { + barrier.wait(); + } + }); + barrier.wait(); + } + + #[test] + fn test_worker_group_backpressure() { + let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); + let workers = WorkerGroup::new(pool); + + // Num Threads * 2 is the limit after backpressure kicks in + let barrier = Arc::new(Barrier::new(2)); + + let spawn = || { + let barrier = Arc::clone(&barrier); + workers + .spawn(move || { + barrier.wait(); + }) + .now_or_never() + .is_some() + }; + + for _ in 0..15 { + // Pool should accept two immediately. + assert!(spawn()); + assert!(spawn()); + // Pool should reject because there are already 2 tasks active. + assert!(!spawn()); + + // Unblock the barrier + barrier.wait(); // first spawn + barrier.wait(); // second spawn + + // wait a tiny bit to make sure the semaphore handle is dropped + thread::sleep(Duration::from_millis(50)); + } + } }