Skip to content

Commit

Permalink
ref(store): Multithread the store using a rayon pool (#3837)
Browse files Browse the repository at this point in the history
We've seen the store is not being able to keep up if the processor gets
parallelized too much. This switches the CPU heavy processing of the
store to its own thread pool.
  • Loading branch information
Dav1dde authored Jul 22, 2024
1 parent 8e31303 commit 4d5941a
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 84 deletions.
87 changes: 74 additions & 13 deletions relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! This module contains the kafka producer related code.
use std::borrow::Cow;
use std::cell::Cell;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -22,7 +22,7 @@ use utils::{Context, ThreadedProducer};
#[cfg(feature = "schemas")]
mod schemas;

const REPORT_FREQUENCY: Duration = Duration::from_secs(1);
const REPORT_FREQUENCY_SECS: u64 = 1;
const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);

/// Kafka producer errors.
Expand Down Expand Up @@ -87,20 +87,20 @@ pub trait Message {

/// Single kafka producer config with assigned topic.
struct Producer {
/// Time of the latest collection of stats.
last_report: Cell<Instant>,
/// Kafka topic name.
topic_name: String,
/// Real kafka producer.
producer: Arc<ThreadedProducer>,
/// Debouncer for metrics.
metrics: Debounced,
}

impl Producer {
fn new(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
Self {
last_report: Cell::new(Instant::now()),
topic_name,
producer,
metrics: Debounced::new(REPORT_FREQUENCY_SECS),
}
}

Expand All @@ -124,10 +124,9 @@ impl Producer {
impl fmt::Debug for Producer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Producer")
.field("last_report", &self.last_report)
.field("topic_name", &self.topic_name)
.field("producer", &"<ThreadedProducer>")
.finish()
.finish_non_exhaustive()
}
}

Expand All @@ -136,7 +135,7 @@ impl fmt::Debug for Producer {
pub struct KafkaClient {
producers: HashMap<KafkaTopic, Producer>,
#[cfg(feature = "schemas")]
schema_validator: std::cell::RefCell<schemas::Validator>,
schema_validator: schemas::Validator,
}

impl KafkaClient {
Expand All @@ -156,7 +155,6 @@ impl KafkaClient {
let serialized = message.serialize()?;
#[cfg(feature = "schemas")]
self.schema_validator
.borrow_mut()
.validate_message_schema(topic, &serialized)
.map_err(ClientError::SchemaValidationFailed)?;
let key = message.key();
Expand Down Expand Up @@ -261,7 +259,7 @@ impl KafkaClientBuilder {
KafkaClient {
producers: self.producers,
#[cfg(feature = "schemas")]
schema_validator: schemas::Validator::default().into(),
schema_validator: schemas::Validator::default(),
}
}
}
Expand Down Expand Up @@ -304,14 +302,13 @@ impl Producer {
record = record.headers(kafka_headers);
}

if self.last_report.get().elapsed() > REPORT_FREQUENCY {
self.last_report.replace(Instant::now());
self.metrics.debounce(|| {
metric!(
gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
variant = variant,
topic = topic_name
);
}
});

self.producer
.send(record)
Expand All @@ -332,3 +329,67 @@ impl Producer {
})
}
}

struct Debounced {
/// Time of last activation in seconds.
last_activation: AtomicU64,
/// Debounce interval in seconds.
interval: u64,
/// Relative instant used for measurements.
instant: Instant,
}

impl Debounced {
pub fn new(interval: u64) -> Self {
Self {
last_activation: AtomicU64::new(0),
interval,
instant: Instant::now(),
}
}

fn debounce(&self, f: impl FnOnce()) -> bool {
// Add interval to make sure it always triggers immediately.
let now = self.instant.elapsed().as_secs() + self.interval;

let prev = self.last_activation.load(Ordering::Relaxed);
if now.saturating_sub(prev) < self.interval {
return false;
}

if self
.last_activation
.compare_exchange(prev, now, Ordering::SeqCst, Ordering::Acquire)
.is_ok()
{
f();
return true;
}

false
}
}

#[cfg(test)]
mod tests {
use std::thread;

use super::*;

#[test]
fn test_debounce() {
let d = Debounced::new(1);

assert!(d.debounce(|| {}));
for _ in 0..10 {
assert!(!d.debounce(|| {}));
}

thread::sleep(Duration::from_secs(1));

assert!(d.debounce(|| {}));
for _ in 0..10 {
assert!(!d.debounce(|| {}));
}
}
}
16 changes: 10 additions & 6 deletions relay-kafka/src/producer/schemas.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};

use sentry_kafka_schemas::{Schema as SentrySchema, SchemaError as SentrySchemaError};
use thiserror::Error;
Expand Down Expand Up @@ -39,13 +40,13 @@ pub enum SchemaError {
#[derive(Debug, Default)]
pub struct Validator {
/// Caches the schema for given topics.
schemas: BTreeMap<KafkaTopic, Option<SentrySchema>>,
schemas: Mutex<BTreeMap<KafkaTopic, Option<Arc<SentrySchema>>>>,
}

impl Validator {
/// Validate a message for a given topic's schema.
pub fn validate_message_schema(
&mut self,
&self,
topic: KafkaTopic,
message: &[u8],
) -> Result<(), SchemaError> {
Expand All @@ -59,8 +60,10 @@ impl Validator {
.map(drop)
}

fn get_schema(&mut self, topic: KafkaTopic) -> Result<Option<&SentrySchema>, SchemaError> {
Ok(match self.schemas.entry(topic) {
fn get_schema(&self, topic: KafkaTopic) -> Result<Option<Arc<SentrySchema>>, SchemaError> {
let mut schemas = self.schemas.lock().unwrap_or_else(|e| e.into_inner());

Ok(match schemas.entry(topic) {
Entry::Vacant(entry) => entry.insert({
let default_assignments = TopicAssignments::default();
let logical_topic_name = match default_assignments.get(topic) {
Expand All @@ -69,13 +72,14 @@ impl Validator {
};

match sentry_kafka_schemas::get_schema(logical_topic_name, None) {
Ok(schema) => Some(schema),
Ok(schema) => Some(Arc::new(schema)),
Err(SentrySchemaError::TopicNotFound) => None,
Err(err) => return Err(SchemaError::SchemaCompiled(err)),
}
}),
Entry::Occupied(entry) => entry.into_mut(),
}
.as_ref())
.as_ref()
.map(Arc::clone))
}
}
19 changes: 19 additions & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
Ok(pool)
}

#[cfg(feature = "processing")]
fn create_store_pool(config: &Config) -> Result<ThreadPool> {
// Spawn a store worker for every 12 threads in the processor pool.
// This ratio was found emperically and may need adjustments in the future.
//
// Ideally in the future the store will be single threaded again, after we move
// all the heavy processing (de- and re-serialization) into the processor.
let thread_count = config.cpu_concurrency().div_ceil(12);
relay_log::info!("starting {thread_count} store workers");

let pool = crate::utils::ThreadPoolBuilder::new("store")
.num_threads(thread_count)
.runtime(tokio::runtime::Handle::current())
.build()?;

Ok(pool)
}

#[derive(Debug)]
struct StateInner {
config: Arc<Config>,
Expand Down Expand Up @@ -188,6 +206,7 @@ impl ServiceState {
.processing_enabled()
.then(|| {
StoreService::create(
create_store_pool(&config)?,
config.clone(),
global_config_handle.clone(),
outcome_aggregator.clone(),
Expand Down
69 changes: 9 additions & 60 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use relay_statsd::metric;
use relay_system::{Addr, FromMessage, NoResponse, Service};
use reqwest::header;
use smallvec::{smallvec, SmallVec};
use tokio::sync::Semaphore;

#[cfg(feature = "processing")]
use {
Expand Down Expand Up @@ -85,6 +84,7 @@ use crate::services::upstream::{
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{
self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope,
WorkerGroup,
};
use crate::{http, metrics};

Expand Down Expand Up @@ -1113,7 +1113,7 @@ impl Default for Addrs {
}

struct InnerProcessor {
pool: ThreadPool,
workers: WorkerGroup,
config: Arc<Config>,
global_config: GlobalConfigHandle,
cogs: Cogs,
Expand Down Expand Up @@ -1152,7 +1152,7 @@ impl EnvelopeProcessorService {
});

let inner = InnerProcessor {
pool,
workers: WorkerGroup::new(pool),
global_config,
cogs,
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -2814,31 +2814,13 @@ impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let semaphore = Arc::new(Semaphore::new(self.inner.pool.current_num_threads()));

tokio::spawn(async move {
loop {
let next_msg = async {
let permit_result = semaphore.clone().acquire_owned().await;
// `permit_result` might get dropped when this future is cancelled while awaiting
// `rx.recv()`. This is OK though: No envelope is received so the permit is not
// required.
(rx.recv().await, permit_result)
};

tokio::select! {
biased;

(Some(message), Ok(permit)) = next_msg => {
let service = self.clone();
self.inner.pool.spawn(move || {
service.handle_message(message);
drop(permit);
});
},

else => break
}
while let Some(message) = rx.recv().await {
let service = self.clone();
self.inner
.workers
.spawn(move || service.handle_message(message))
.await;
}
});
}
Expand Down Expand Up @@ -3602,39 +3584,6 @@ mod tests {
"#);
}

/// This is a stand-in test to assert panicking behavior for spawn_blocking.
///
/// [`EnvelopeProcessorService`] relies on tokio to restart the worker threads for blocking
/// tasks if there is a panic during processing. Tokio does not explicitly mention this behavior
/// in documentation, though the `spawn_blocking` contract suggests that this is intentional.
///
/// This test should be moved if the worker pool is extracted into a utility.
#[test]
fn test_processor_panics() {
let future = async {
let semaphore = Arc::new(Semaphore::new(1));

// loop multiple times to prove that the runtime creates new threads
for _ in 0..3 {
// the previous permit should have been released during panic unwind
let permit = semaphore.clone().acquire_owned().await.unwrap();

let handle = tokio::task::spawn_blocking(move || {
let _permit = permit; // drop(permit) after panic!() would warn as "unreachable"
panic!("ignored");
});

assert!(handle.await.is_err());
}
};

tokio::runtime::Builder::new_current_thread()
.max_blocking_threads(1)
.build()
.unwrap()
.block_on(future);
}

/// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
/// correct. Unit test is placed here because it has dependencies to relay-server and therefore
/// cannot be called from relay-metrics.
Expand Down
Loading

0 comments on commit 4d5941a

Please sign in to comment.