diff --git a/Cargo.lock b/Cargo.lock index 0eb31ded31..d0ae06388c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2745,8 +2745,7 @@ dependencies = [ [[package]] name = "libmdbx" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4693531e6e24c92e6ac110a52ce45a0f6b4aa61f7b8d6a4127dde411e597a927" +source = "git+https://github.com/paberr/libmdbx-rs#d178a204e15e40f3f8d0e5b03264354002e62a62" dependencies = [ "bitflags 2.6.0", "derive_more", @@ -3336,8 +3335,7 @@ checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "mdbx-sys" version = "12.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f0fd8a9c29c480c11e163225463e0e8c0d11b009c837dfa7643cd20a20ae70" +source = "git+https://github.com/paberr/libmdbx-rs#d178a204e15e40f3f8d0e5b03264354002e62a62" dependencies = [ "bindgen", "cc", @@ -3359,6 +3357,16 @@ dependencies = [ "libc", ] +[[package]] +name = "metrics" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" +dependencies = [ + "ahash", + "portable-atomic", +] + [[package]] name = "mime" version = "0.3.17" @@ -3795,10 +3803,15 @@ dependencies = [ "bitflags 2.6.0", "criterion", "libmdbx", + "metrics", "nimiq-database-value", "nimiq-test-log", + "parking_lot", "pprof", "rand", + "rustc-hash", + "strum", + "strum_macros", "tempfile", "thiserror", "tracing", @@ -4169,10 +4182,12 @@ dependencies = [ "http-body-util", "hyper 1.3.1", "hyper-util", + "metrics", "nimiq-blockchain", "nimiq-blockchain-interface", "nimiq-blockchain-proxy", "nimiq-consensus", + "nimiq-database", "nimiq-mempool", "nimiq-network-interface", "nimiq-network-libp2p", @@ -6677,6 +6692,25 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.68", +] + [[package]] name = "subtle" version = "2.6.1" diff --git a/client/src/main.rs b/client/src/main.rs index 234d8b636c..4c5dfa1347 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -1,7 +1,6 @@ use std::time::Duration; use log::info; -use nimiq::prover::prover_main; pub use nimiq::{ client::Client, config::{command_line::CommandLine, config::ClientConfig, config_file::ConfigFile}, @@ -13,6 +12,7 @@ pub use nimiq::{ signal_handling::initialize_signal_handler, }, }; +use nimiq::{extras::metrics_server::install_metrics, prover::prover_main}; async fn main_inner() -> Result<(), Error> { // Keep for potential future reactivation @@ -39,7 +39,7 @@ async fn main_inner() -> Result<(), Error> { // Initialize panic hook. initialize_panic_reporting(); - // Initialize signal handler + // Initialize signal handler. initialize_signal_handler(); // Early return in case of a proving process. @@ -63,6 +63,12 @@ async fn main_inner() -> Result<(), Error> { let metrics_config = config.metrics_server.clone(); let metrics_enabled = metrics_config.is_some(); + // Initialize database logging. + let mut metrics_collector = None; + if metrics_enabled { + metrics_collector = Some(install_metrics()); + } + // Create client from config. let mut client: Client = Client::from_config(config).await?; @@ -128,6 +134,7 @@ async fn main_inner() -> Result<(), Error> { client.consensus_proxy(), client.network(), &nimiq_task_metric, + metrics_collector.unwrap(), ) } diff --git a/database/Cargo.toml b/database/Cargo.toml index e7f6b8cc80..f6f45241b2 100644 --- a/database/Cargo.toml +++ b/database/Cargo.toml @@ -24,10 +24,15 @@ workspace = true [dependencies] bitflags = "2.6" -libmdbx = "0.5.0" +libmdbx = { git = "https://github.com/paberr/libmdbx-rs" } log = { workspace = true } tempfile = "3" thiserror = "1.0" +metrics = "0.23" +parking_lot = "0.12" +rustc-hash = "1.1" +strum = "0.26" +strum_macros = "0.26" nimiq-database-value = { workspace = true } diff --git a/database/src/lib.rs b/database/src/lib.rs index 7a7c14d842..18b6d61976 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -2,6 +2,7 @@ use bitflags::bitflags; mod error; pub mod mdbx; +mod metrics; /// Database implementation that can handle volatile and persistent storage. pub mod proxy; /// Abstraction for methods related to the database. diff --git a/database/src/mdbx/cursor.rs b/database/src/mdbx/cursor.rs index 40876b4d6c..ce0a5cd5a4 100644 --- a/database/src/mdbx/cursor.rs +++ b/database/src/mdbx/cursor.rs @@ -1,15 +1,20 @@ -use std::{borrow::Cow, marker::PhantomData}; +use std::{borrow::Cow, marker::PhantomData, sync::Arc}; use libmdbx::{TransactionKind, WriteFlags, RO, RW}; use nimiq_database_value::{AsDatabaseBytes, FromDatabaseValue}; use super::{DbKvPair, IntoIter}; -use crate::traits::{ReadCursor, WriteCursor}; +use crate::{ + metrics::{DatabaseEnvMetrics, Operation}, + traits::{ReadCursor, WriteCursor}, +}; /// A cursor for navigating the entries within a table. /// Wraps the libmdbx cursor so that we only expose our own methods. pub struct MdbxCursor<'txn, K: TransactionKind> { cursor: libmdbx::Cursor<'txn, K>, + metrics: Option>, + table_name: String, } /// Instantiation of the `MdbxCursor` for read transactions. pub type MdbxReadCursor<'txn> = MdbxCursor<'txn, RO>; @@ -20,8 +25,33 @@ impl<'txn, Kind> MdbxCursor<'txn, Kind> where Kind: TransactionKind, { - pub(crate) fn new(cursor: libmdbx::Cursor<'txn, Kind>) -> Self { - MdbxCursor { cursor } + pub(crate) fn new( + table_name: &str, + cursor: libmdbx::Cursor<'txn, Kind>, + metrics: Option>, + ) -> Self { + MdbxCursor { + table_name: table_name.to_string(), + cursor, + metrics, + } + } + + /// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value + /// size. + /// + /// Otherwise, just execute the closure. + fn execute_with_operation_metric( + &mut self, + operation: Operation, + value_size: Option, + f: impl FnOnce(&mut Self) -> R, + ) -> R { + if let Some(metrics) = self.metrics.as_ref().cloned() { + metrics.record_operation(&self.table_name.clone(), operation, value_size, || f(self)) + } else { + f(self) + } } } @@ -246,14 +276,18 @@ where { fn clone(&self) -> Self { Self { + table_name: self.table_name.clone(), cursor: self.cursor.clone(), + metrics: self.metrics.as_ref().cloned(), } } } impl<'txn> WriteCursor<'txn> for MdbxWriteCursor<'txn> { fn remove(&mut self) { - self.cursor.del(WriteFlags::empty()).unwrap(); + self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |cursor| { + cursor.cursor.del(WriteFlags::empty()).unwrap(); + }); } fn append(&mut self, key: &K, value: &V) diff --git a/database/src/mdbx/database.rs b/database/src/mdbx/database.rs index 019191c140..65d41a34c2 100644 --- a/database/src/mdbx/database.rs +++ b/database/src/mdbx/database.rs @@ -1,18 +1,27 @@ -use std::{borrow::Cow, fs, path::Path, sync::Arc}; +use std::{borrow::Cow, fmt, fs, path::Path, sync::Arc}; use libmdbx::NoWriteMap; use log::info; use super::{MdbxReadTransaction, MdbxWriteTransaction}; -use crate::{traits::Database, DatabaseProxy, Error, TableFlags}; +use crate::{metrics::DatabaseEnvMetrics, traits::Database, DatabaseProxy, Error, TableFlags}; pub(super) type DbKvPair<'a> = (Cow<'a, [u8]>, Cow<'a, [u8]>); /// Wrapper around the mdbx database handle. /// A database can hold multiple tables. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct MdbxDatabase { pub(super) db: Arc>, + pub(super) metrics: Option>, +} + +impl fmt::Debug for MdbxDatabase { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MdbxDatabase") + .field("db", &self.db) + .finish() + } } impl Database for MdbxDatabase { @@ -49,15 +58,22 @@ impl Database for MdbxDatabase { txn.create_table(Some(&name), table_flags).unwrap(); txn.commit().unwrap(); + if let Some(ref metrics) = self.metrics { + metrics.register_table(&name); + } + MdbxTable { name } } fn read_transaction(&self) -> Self::ReadTransaction<'_> { - MdbxReadTransaction::new(self.db.begin_ro_txn().unwrap()) + MdbxReadTransaction::new(self.db.begin_ro_txn().unwrap(), self.metrics.clone()) } fn write_transaction(&self) -> Self::WriteTransaction<'_> { - MdbxWriteTransaction::new(self.db.begin_rw_txn().unwrap()) + MdbxWriteTransaction::new( + self.db.begin_rw_txn().unwrap(), + self.metrics.as_ref().cloned(), + ) } } @@ -91,6 +107,30 @@ impl MdbxDatabase { )?)) } + #[allow(clippy::new_ret_no_self)] + pub fn new_with_metrics>( + path: P, + size: usize, + max_tables: u32, + ) -> Result { + let mut db = MdbxDatabase::new_mdbx_database(path.as_ref(), size, max_tables, None)?; + db.with_metrics(); + Ok(DatabaseProxy::Persistent(db)) + } + + #[allow(clippy::new_ret_no_self)] + pub fn new_with_max_readers_and_metrics>( + path: P, + size: usize, + max_tables: u32, + max_readers: u32, + ) -> Result { + let mut db = + MdbxDatabase::new_mdbx_database(path.as_ref(), size, max_tables, Some(max_readers))?; + db.with_metrics(); + Ok(DatabaseProxy::Persistent(db)) + } + pub(crate) fn new_mdbx_database( path: &Path, size: usize, @@ -119,47 +159,16 @@ impl MdbxDatabase { let cur_mapsize = info.map_size(); info!(cur_mapsize, "MDBX memory map size"); - let mdbx = MdbxDatabase { db: Arc::new(db) }; - if mdbx.need_resize(0) { - info!("MDBX memory needs to be resized."); - } + let mdbx = MdbxDatabase { + db: Arc::new(db), + metrics: None, + }; Ok(mdbx) } - pub fn need_resize(&self, threshold_size: usize) -> bool { - let info = self.db.info().unwrap(); - let stat = self.db.stat().unwrap(); - - let size_used = (stat.page_size() as usize) * (info.last_pgno() + 1); - - if threshold_size > 0 && info.map_size() - size_used < threshold_size { - info!( - size_used, - threshold_size, - map_size = info.map_size(), - space_remaining = info.map_size() - size_used, - "DB settings (threshold-based)" - ); - return true; - } - - // Resize is currently not supported. So don't let the resize happen - // if a specific percentage is reached. - let resize_percent: f64 = 1_f64; - - if (size_used as f64) / (info.map_size() as f64) > resize_percent { - info!( - map_size = info.map_size(), - size_used, - space_remaining = info.map_size() - size_used, - percent_used = (size_used as f64) / (info.map_size() as f64), - "DB resize (percent-based)" - ); - return true; - } - - false + pub(crate) fn with_metrics(&mut self) { + self.metrics = Some(DatabaseEnvMetrics::new().into()); } } diff --git a/database/src/mdbx/metrics_handler.rs b/database/src/mdbx/metrics_handler.rs new file mode 100644 index 0000000000..66ff0f66be --- /dev/null +++ b/database/src/mdbx/metrics_handler.rs @@ -0,0 +1,132 @@ +// This file has been ported and adapted from reth (https://github.com/paradigmxyz/reth). +// Commit: 87cdfb185eaa721f18bc691ace87456fa348dbad +// License: MIT OR Apache-2.0 + +use std::{ + backtrace::Backtrace, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use libmdbx::CommitLatency; +use log::{trace, warn}; + +use crate::metrics::{DatabaseEnvMetrics, TransactionMode, TransactionOutcome}; + +/// Duration after which a transaction is considered long-running and its backtrace is logged. +const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60); + +pub struct MetricsHandler { + /// Some identifier of transaction. + txn_id: u64, + /// The time when transaction was opened. + start: Instant, + /// If `true`, the metric about transaction closing has already been recorded and we don't need + /// to do anything on [`Drop::drop`]. + close_recorded: bool, + /// If `true`, the backtrace of transaction will be recorded and logged. + /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`]. + record_backtrace: bool, + /// If `true`, the backtrace of transaction has already been recorded and logged. + /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`]. + backtrace_recorded: AtomicBool, + pub(super) env_metrics: Arc, + transaction_mode: TransactionMode, +} + +impl MetricsHandler { + pub(super) fn new(txn_id: u64, env_metrics: Arc, read_only: bool) -> Self { + Self { + txn_id, + start: Instant::now(), + close_recorded: false, + record_backtrace: true, + backtrace_recorded: AtomicBool::new(false), + env_metrics, + transaction_mode: if read_only { + TransactionMode::ReadOnly + } else { + TransactionMode::ReadWrite + }, + } + } + + pub(super) const fn transaction_mode(&self) -> TransactionMode { + self.transaction_mode + } + + /// Logs the caller location and ID of the transaction that was opened. + #[track_caller] + pub(super) fn log_transaction_opened(&self) { + trace!( + caller = %core::panic::Location::caller(), + id = %self.txn_id, + read_only = %self.transaction_mode().is_read_only(), + "Transaction opened", + ); + } + + /// Logs the backtrace of current call if the duration that the read transaction has been open + /// is more than [`LONG_TRANSACTION_DURATION`] and `record_backtrace == true`. + /// The backtrace is recorded and logged just once, guaranteed by `backtrace_recorded` atomic. + /// + /// NOTE: Backtrace is recorded using [`Backtrace::force_capture`], so `RUST_BACKTRACE` env var + /// is not needed. + pub(super) fn log_backtrace_on_long_read_transaction(&self) { + if self.record_backtrace + && !self.backtrace_recorded.load(Ordering::Relaxed) + && self.transaction_mode().is_read_only() + { + let open_duration = self.start.elapsed(); + if open_duration >= LONG_TRANSACTION_DURATION { + self.backtrace_recorded.store(true, Ordering::Relaxed); + warn!( + target: "storage::db::mdbx", + ?open_duration, + %self.txn_id, + "The database read transaction has been open for too long. Backtrace:\n{}", Backtrace::force_capture() + ); + } + } + } + + #[inline] + pub(super) fn set_close_recorded(&mut self) { + self.close_recorded = true; + } + + #[inline] + pub(super) fn record_close( + &mut self, + outcome: TransactionOutcome, + close_duration: Duration, + commit_latency: Option, + ) { + let open_duration = self.start.elapsed(); + self.env_metrics.record_closed_transaction( + self.transaction_mode(), + outcome, + open_duration, + Some(close_duration), + commit_latency, + ); + } +} + +impl Drop for MetricsHandler { + fn drop(&mut self) { + if !self.close_recorded { + self.log_backtrace_on_long_read_transaction(); + self.env_metrics.record_closed_transaction( + self.transaction_mode(), + TransactionOutcome::Drop, + self.start.elapsed(), + None, + None, + ); + } + } +} diff --git a/database/src/mdbx/mod.rs b/database/src/mdbx/mod.rs index 0de1f9e62a..95e88a54f6 100644 --- a/database/src/mdbx/mod.rs +++ b/database/src/mdbx/mod.rs @@ -1,9 +1,10 @@ mod cursor; mod database; mod iterators; +mod metrics_handler; mod transaction; -pub use self::{cursor::*, database::*, iterators::*, transaction::*}; +pub use self::{cursor::*, database::*, iterators::*, metrics_handler::*, transaction::*}; #[cfg(test)] mod tests { diff --git a/database/src/mdbx/transaction.rs b/database/src/mdbx/transaction.rs index 914b296eef..af47f0e5d2 100644 --- a/database/src/mdbx/transaction.rs +++ b/database/src/mdbx/transaction.rs @@ -1,16 +1,29 @@ -use std::borrow::Cow; +use std::{borrow::Cow, fmt, sync::Arc, time::Instant}; -use libmdbx::{NoWriteMap, TransactionKind, WriteFlags, RO, RW}; +use libmdbx::{CommitLatency, NoWriteMap, TransactionKind, WriteFlags, RO, RW}; +use log::debug; use nimiq_database_value::{AsDatabaseBytes, FromDatabaseValue, IntoDatabaseValue}; -use super::{MdbxCursor, MdbxTable, MdbxWriteCursor}; -use crate::traits::{ReadTransaction, WriteTransaction}; +use super::{MdbxCursor, MdbxTable, MdbxWriteCursor, MetricsHandler}; +use crate::{ + metrics::{DatabaseEnvMetrics, Operation, TransactionOutcome}, + traits::{ReadTransaction, WriteTransaction}, +}; /// Wrapper around mdbx transactions that only exposes our own traits. -#[derive(Debug)] pub struct MdbxTransaction<'db, K: TransactionKind> { txn: libmdbx::Transaction<'db, K, NoWriteMap>, + metrics_handler: Option, } + +impl<'db, K: TransactionKind> fmt::Debug for MdbxTransaction<'db, K> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MdbxTransaction") + .field("txn", &self.txn) + .finish() + } +} + /// Instantiation for read-only transactions. pub type MdbxReadTransaction<'db> = MdbxTransaction<'db, RO>; /// Instantiation for read-write transactions. @@ -20,13 +33,84 @@ impl<'db, Kind> MdbxTransaction<'db, Kind> where Kind: TransactionKind, { - pub(crate) fn new(txn: libmdbx::Transaction<'db, Kind, NoWriteMap>) -> Self { - MdbxTransaction { txn } + pub(crate) fn new( + txn: libmdbx::Transaction<'db, Kind, NoWriteMap>, + metrics: Option>, + ) -> Self { + MdbxTransaction { + metrics_handler: metrics.map(|m| { + let handler = MetricsHandler::new(txn.id(), m, Kind::ONLY_CLEAN); + handler + .env_metrics + .record_opened_transaction(handler.transaction_mode()); + handler.log_transaction_opened(); + handler + }), + txn, + } } pub(super) fn open_table(&self, table: &MdbxTable) -> libmdbx::Table { self.txn.open_table(Some(&table.name)).unwrap() } + + /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and + /// record a metric with the provided transaction outcome. + /// + /// Otherwise, just execute the closure. + fn execute_with_close_transaction_metric( + mut self, + outcome: TransactionOutcome, + f: impl FnOnce(Self) -> Option, + ) { + let run = |tx| { + let start = Instant::now(); + let commit_latency = f(tx); + let total_duration = start.elapsed(); + + if outcome.is_commit() { + debug!( + ?total_duration, + ?commit_latency, + is_read_only = Kind::ONLY_CLEAN, + "Commit" + ); + } + + (commit_latency, total_duration) + }; + + if let Some(mut metrics_handler) = self.metrics_handler.take() { + metrics_handler.set_close_recorded(); + metrics_handler.log_backtrace_on_long_read_transaction(); + + let (commit_latency, close_duration) = run(self); + metrics_handler.record_close(outcome, close_duration, commit_latency); + } else { + run(self); + } + } + + /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and + /// record a metric with the provided operation. + /// + /// Otherwise, just execute the closure. + fn execute_with_operation_metric<'a, R>( + &'a self, + table_name: &str, + operation: Operation, + value_size: Option, + f: impl FnOnce(&'a libmdbx::Transaction<'db, Kind, NoWriteMap>) -> R, + ) -> R { + if let Some(metrics_handler) = &self.metrics_handler { + metrics_handler.log_backtrace_on_long_read_transaction(); + metrics_handler + .env_metrics + .record_operation(table_name, operation, value_size, || f(&self.txn)) + } else { + f(&self.txn) + } + } } impl<'db, Kind> ReadTransaction<'db> for MdbxTransaction<'db, Kind> @@ -41,20 +125,27 @@ where K: AsDatabaseBytes + ?Sized, V: FromDatabaseValue, { - let table = self.open_table(table); + let inner_table = self.open_table(table); - let result: Option> = self - .txn - .get(&table, &AsDatabaseBytes::as_database_bytes(key)) - .unwrap(); + let result: Option> = + self.execute_with_operation_metric(&table.name, Operation::Get, None, |txn| { + txn.get(&inner_table, &AsDatabaseBytes::as_database_bytes(key)) + .unwrap() + }); Some(FromDatabaseValue::copy_from_database(&result?).unwrap()) } fn cursor<'txn>(&'txn self, table: &MdbxTable) -> MdbxCursor<'txn, Kind> { - let table = self.open_table(table); + let inner_table = self.open_table(table); - MdbxCursor::new(self.txn.cursor(&table).unwrap()) + MdbxCursor::new( + &table.name, + self.txn.cursor(&inner_table).unwrap(), + self.metrics_handler + .as_ref() + .map(|h| Arc::clone(&h.env_metrics)), + ) } } @@ -66,15 +157,20 @@ impl<'db> WriteTransaction<'db> for MdbxWriteTransaction<'db> { K: AsDatabaseBytes + ?Sized, V: IntoDatabaseValue + ?Sized, { - let table = self.open_table(table); + let inner_table = self.open_table(table); let key = AsDatabaseBytes::as_database_bytes(key); let value_size = IntoDatabaseValue::database_byte_size(value); - let bytes: &mut [u8] = self - .txn - .reserve(&table, key, value_size, WriteFlags::empty()) - .unwrap(); + let bytes: &mut [u8] = self.execute_with_operation_metric( + &table.name, + Operation::Put, + Some(value_size), + |txn| { + txn.reserve(&inner_table, key, value_size, WriteFlags::empty()) + .unwrap() + }, + ); IntoDatabaseValue::copy_into_database(value, bytes); } @@ -84,14 +180,15 @@ impl<'db> WriteTransaction<'db> for MdbxWriteTransaction<'db> { K: AsDatabaseBytes + ?Sized, V: AsDatabaseBytes + ?Sized, { - let table = self.open_table(table); + let inner_table = self.open_table(table); let key = AsDatabaseBytes::as_database_bytes(key); let value = AsDatabaseBytes::as_database_bytes(value); - self.txn - .put(&table, key, value, WriteFlags::empty()) - .unwrap(); + self.execute_with_operation_metric(&table.name, Operation::Put, Some(value.len()), |txn| { + txn.put(&inner_table, key, value, WriteFlags::empty()) + .unwrap() + }); } fn append(&mut self, table: &MdbxTable, key: &K, value: &V) @@ -113,15 +210,16 @@ impl<'db> WriteTransaction<'db> for MdbxWriteTransaction<'db> { where K: AsDatabaseBytes + ?Sized, { - let table = self.open_table(table); + let inner_table = self.open_table(table); - self.txn - .del( - &table, + self.execute_with_operation_metric(&table.name, Operation::Delete, None, |txn| { + txn.del( + &inner_table, AsDatabaseBytes::as_database_bytes(key).as_ref(), None, ) - .unwrap(); + .unwrap() + }); } fn remove_item(&mut self, table: &MdbxTable, key: &K, value: &V) @@ -129,30 +227,49 @@ impl<'db> WriteTransaction<'db> for MdbxWriteTransaction<'db> { K: AsDatabaseBytes + ?Sized, V: AsDatabaseBytes + ?Sized, { - let table = self.open_table(table); + let inner_table = self.open_table(table); - self.txn - .del( - &table, + self.execute_with_operation_metric(&table.name, Operation::Delete, None, |txn| { + txn.del( + &inner_table, AsDatabaseBytes::as_database_bytes(key).as_ref(), Some(AsDatabaseBytes::as_database_bytes(value).as_ref()), ) - .unwrap(); + .unwrap() + }); } fn commit(self) { - self.txn.commit().unwrap(); + self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| { + let (_, latency, _) = this.txn.commit_and_rebind_open_dbs_with_latency().unwrap(); + Some(latency) + }); + } + + fn abort(self) { + self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| { + drop(this.txn); + None + }) } fn cursor<'txn>(&'txn self, table: &MdbxTable) -> MdbxWriteCursor<'txn> { - let table = self.open_table(table); + let inner_table = self.open_table(table); - MdbxWriteCursor::new(self.txn.cursor(&table).unwrap()) + MdbxWriteCursor::new( + &table.name, + self.txn.cursor(&inner_table).unwrap(), + self.metrics_handler + .as_ref() + .map(|h| Arc::clone(&h.env_metrics)), + ) } fn clear_database(&mut self, table: &MdbxTable) { - let table = self.open_table(table); + let inner_table = self.open_table(table); - self.txn.clear_table(&table).unwrap(); + self.execute_with_operation_metric(&table.name, Operation::Clear, None, |txn| { + txn.clear_table(&inner_table).unwrap() + }); } } diff --git a/database/src/metrics.rs b/database/src/metrics.rs new file mode 100644 index 0000000000..a608eb9c3d --- /dev/null +++ b/database/src/metrics.rs @@ -0,0 +1,417 @@ +// This file has been ported and adapted from reth (https://github.com/paradigmxyz/reth). +// Commit: 87cdfb185eaa721f18bc691ace87456fa348dbad +// License: MIT OR Apache-2.0 + +use std::{ + collections::HashMap, + hash::BuildHasherDefault, + time::{Duration, Instant}, +}; + +use libmdbx::CommitLatency; +use log::warn; +use metrics::{counter, gauge, histogram, Counter, Gauge, Histogram, IntoLabels}; +use parking_lot::RwLock; +use rustc_hash::{FxHashMap, FxHasher}; +use strum::{EnumCount, IntoEnumIterator}; +use strum_macros::{EnumCount as EnumCountMacro, EnumIter}; + +const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096; + +/// Caches metric handles for database environment to make sure handles are not re-created +/// on every operation. +/// +/// Requires a metric recorder to be registered before creating an instance of this struct. +/// Otherwise, metric recording will no-op. +pub(crate) struct DatabaseEnvMetrics { + /// Caches `OperationMetrics` handles for each table and operation tuple. + operations: RwLock>, + /// Caches `TransactionMetrics` handles for counters grouped by only transaction mode. + /// Updated both at tx open and close. + transactions: FxHashMap, + /// Caches `TransactionOutcomeMetrics` handles for counters grouped by transaction mode and + /// outcome. Can only be updated at tx close, as outcome is only known at that point. + transaction_outcomes: + FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>, +} + +impl DatabaseEnvMetrics { + pub(crate) fn new() -> Self { + // Pre-populate metric handle maps with all possible combinations of labels + // to avoid runtime locks on the map when recording metrics. + Self { + operations: Default::default(), + transactions: Self::generate_transaction_handles(), + transaction_outcomes: Self::generate_transaction_outcome_handles(), + } + } + + /// Generate a map of all possible transaction modes to metric handles. + /// Used for tracking a counter of open transactions. + fn generate_transaction_handles() -> FxHashMap { + TransactionMode::iter() + .map(|mode| { + ( + mode, + TransactionMetrics::new_with_labels(&[( + Labels::TransactionMode.as_str(), + mode.as_str(), + )]), + ) + }) + .collect() + } + + /// Generate a map of all possible transaction mode and outcome handles. + /// Used for tracking various stats for finished transactions (e.g. commit duration). + fn generate_transaction_outcome_handles( + ) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> { + let mut transaction_outcomes = HashMap::with_capacity_and_hasher( + TransactionMode::COUNT * TransactionOutcome::COUNT, + BuildHasherDefault::::default(), + ); + for mode in TransactionMode::iter() { + for outcome in TransactionOutcome::iter() { + transaction_outcomes.insert( + (mode, outcome), + TransactionOutcomeMetrics::new_with_labels(&[ + (Labels::TransactionMode.as_str(), mode.as_str()), + (Labels::TransactionOutcome.as_str(), outcome.as_str()), + ]), + ); + } + } + transaction_outcomes + } + + /// Registers a new table for metrics recording. + pub(crate) fn register_table(&self, table: &str) { + for operation in Operation::iter() { + self.operations.write().insert( + (table.to_string(), operation), + OperationMetrics::new_with_labels(&[ + (Labels::Table.as_str(), table.to_string()), + (Labels::Operation.as_str(), operation.as_str().to_string()), + ]), + ); + } + } + + /// Record a metric for database operation executed in `f`. + pub(crate) fn record_operation( + &self, + table: &str, + operation: Operation, + value_size: Option, + f: impl FnOnce() -> R, + ) -> R { + if let Some(table_metrics) = self.operations.read().get(&(table.to_string(), operation)) { + table_metrics.record(value_size, f) + } else { + warn!("no metric recorder found for table '{}'", table); + f() + } + } + + /// Record metrics for opening a database transaction. + pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) { + self.transactions + .get(&mode) + .expect("transaction mode metric handle not found") + .record_open(); + } + + /// Record metrics for closing a database transactions. + pub(crate) fn record_closed_transaction( + &self, + mode: TransactionMode, + outcome: TransactionOutcome, + open_duration: Duration, + close_duration: Option, + commit_latency: Option, + ) { + self.transactions + .get(&mode) + .expect("transaction mode metric handle not found") + .record_close(); + + self.transaction_outcomes + .get(&(mode, outcome)) + .expect("transaction outcome metric handle not found") + .record(open_duration, close_duration, commit_latency); + } +} + +/// Transaction mode for the database, either read-only or read-write. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCountMacro, EnumIter)] +pub(crate) enum TransactionMode { + /// Read-only transaction mode. + ReadOnly, + /// Read-write transaction mode. + ReadWrite, +} + +impl TransactionMode { + /// Returns the transaction mode as a string. + pub(crate) const fn as_str(&self) -> &'static str { + match self { + Self::ReadOnly => "read-only", + Self::ReadWrite => "read-write", + } + } + + /// Returns `true` if the transaction mode is read-only. + pub(crate) const fn is_read_only(&self) -> bool { + matches!(self, Self::ReadOnly) + } +} + +/// Transaction outcome after a database operation - commit, abort, or drop. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCountMacro, EnumIter)] +pub(crate) enum TransactionOutcome { + /// Successful commit of the transaction. + Commit, + /// Aborted transaction. + Abort, + /// Dropped transaction. + Drop, +} + +impl TransactionOutcome { + /// Returns the transaction outcome as a string. + pub(crate) const fn as_str(&self) -> &'static str { + match self { + Self::Commit => "commit", + Self::Abort => "abort", + Self::Drop => "drop", + } + } + + /// Returns `true` if the transaction outcome is a commit. + pub(crate) const fn is_commit(&self) -> bool { + matches!(self, Self::Commit) + } +} + +/// Types of operations conducted on the database: get, put, delete, and various cursor operations. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCountMacro, EnumIter)] +pub(crate) enum Operation { + /// Database get operation. + Get, + /// Database put operation. + Put, + /// Database delete operation. + Delete, + /// Database clear operation. + Clear, + /// Database cursor delete current operation. + CursorDeleteCurrent, +} + +impl Operation { + /// Returns the operation as a string. + pub(crate) const fn as_str(&self) -> &'static str { + match self { + Self::Get => "get", + Self::Put => "put", + Self::Delete => "delete", + Self::Clear => "clear", + Self::CursorDeleteCurrent => "cursor-delete-current", + } + } +} + +/// Enum defining labels for various aspects used in metrics. +enum Labels { + /// Label representing a table. + Table, + /// Label representing a transaction mode. + TransactionMode, + /// Label representing a transaction outcome. + TransactionOutcome, + /// Label representing a database operation. + Operation, +} + +impl Labels { + /// Converts each label variant into its corresponding string representation. + pub(crate) const fn as_str(&self) -> &'static str { + match self { + Self::Table => "table", + Self::TransactionMode => "mode", + Self::TransactionOutcome => "outcome", + Self::Operation => "operation", + } + } +} + +#[derive(Clone)] +pub(crate) struct TransactionMetrics { + /// Total number of currently open database transactions + open_total: Gauge, +} + +impl TransactionMetrics { + pub(crate) fn new_with_labels(labels: impl IntoLabels + Clone) -> Self { + Self { + open_total: gauge!("open_total", labels), + } + } + + pub(crate) fn record_open(&self) { + self.open_total.increment(1.0); + } + + pub(crate) fn record_close(&self) { + self.open_total.decrement(1.0); + } +} + +#[derive(Clone)] +pub(crate) struct TransactionOutcomeMetrics { + /// The time a database transaction has been open + open_duration_seconds: Histogram, + /// The time it took to close a database transaction + close_duration_seconds: Histogram, + /// The time it took to prepare a transaction commit + commit_preparation_duration_seconds: Histogram, + /// Duration of GC update during transaction commit by wall clock + commit_gc_wallclock_duration_seconds: Histogram, + /// The time it took to conduct audit of a transaction commit + commit_audit_duration_seconds: Histogram, + /// The time it took to write dirty/modified data pages to a filesystem during transaction + /// commit + commit_write_duration_seconds: Histogram, + /// The time it took to sync written data to the disk/storage during transaction commit + commit_sync_duration_seconds: Histogram, + /// The time it took to release resources during transaction commit + commit_ending_duration_seconds: Histogram, + /// The total duration of a transaction commit + commit_whole_duration_seconds: Histogram, + /// User-mode CPU time spent on GC update during transaction commit + commit_gc_cputime_duration_seconds: Histogram, +} + +impl TransactionOutcomeMetrics { + pub(crate) fn new_with_labels(labels: impl IntoLabels + Clone) -> Self { + Self { + open_duration_seconds: histogram!("open_duration_seconds", labels.clone()), + close_duration_seconds: histogram!("close_duration_seconds", labels.clone()), + commit_preparation_duration_seconds: histogram!( + "commit_preparation_duration_seconds", + labels.clone() + ), + commit_gc_wallclock_duration_seconds: histogram!( + "commit_gc_wallclock_duration_seconds", + labels.clone() + ), + commit_audit_duration_seconds: histogram!( + "commit_audit_duration_seconds", + labels.clone() + ), + commit_write_duration_seconds: histogram!( + "commit_write_duration_seconds", + labels.clone() + ), + commit_sync_duration_seconds: histogram!( + "commit_sync_duration_seconds", + labels.clone() + ), + commit_ending_duration_seconds: histogram!( + "commit_ending_duration_seconds", + labels.clone() + ), + commit_whole_duration_seconds: histogram!( + "commit_whole_duration_seconds", + labels.clone() + ), + commit_gc_cputime_duration_seconds: histogram!( + "commit_gc_cputime_duration_seconds", + labels + ), + } + } + + /// Record transaction closing with the duration it was open and the duration it took to close + /// it. + pub(crate) fn record( + &self, + open_duration: Duration, + close_duration: Option, + commit_latency: Option, + ) { + self.open_duration_seconds.record(open_duration); + + if let Some(close_duration) = close_duration { + self.close_duration_seconds.record(close_duration) + } + + if let Some(commit_latency) = commit_latency { + self.commit_preparation_duration_seconds + .record(commit_latency.preparation()); + self.commit_gc_wallclock_duration_seconds + .record(commit_latency.gc_wallclock()); + self.commit_audit_duration_seconds + .record(commit_latency.audit()); + self.commit_write_duration_seconds + .record(commit_latency.write()); + self.commit_sync_duration_seconds + .record(commit_latency.sync()); + self.commit_ending_duration_seconds + .record(commit_latency.ending()); + self.commit_whole_duration_seconds + .record(commit_latency.whole()); + self.commit_gc_cputime_duration_seconds + .record(commit_latency.gc_cputime()); + } + } +} + +impl std::fmt::Debug for TransactionOutcomeMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TransactionOutcomeMetrics").finish() + } +} + +#[derive(Clone)] +pub(crate) struct OperationMetrics { + /// Total number of database operations made + calls_total: Counter, + /// The time it took to execute a database operation (`put/upsert/insert/append/append_dup`) + /// with value larger than [`LARGE_VALUE_THRESHOLD_BYTES`] bytes. + large_value_duration_seconds: Histogram, +} + +impl OperationMetrics { + pub(crate) fn new_with_labels(labels: impl IntoLabels + Clone) -> Self { + Self { + calls_total: counter!("calls_total", labels.clone()), + large_value_duration_seconds: histogram!("large_value_duration_seconds", labels), + } + } + + /// Record operation metric. + /// + /// The duration it took to execute the closure is recorded only if the provided `value_size` is + /// larger than [`LARGE_VALUE_THRESHOLD_BYTES`]. + pub(crate) fn record(&self, value_size: Option, f: impl FnOnce() -> R) -> R { + self.calls_total.increment(1); + + // Record duration only for large values to prevent the performance hit of clock syscall + // on small operations + if value_size.map_or(false, |size| size > LARGE_VALUE_THRESHOLD_BYTES) { + let start = Instant::now(); + let result = f(); + self.large_value_duration_seconds.record(start.elapsed()); + result + } else { + f() + } + } +} + +impl std::fmt::Debug for OperationMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OperationMetrics").finish() + } +} diff --git a/database/src/volatile.rs b/database/src/volatile.rs index 8776727165..c6c524c6f1 100644 --- a/database/src/volatile.rs +++ b/database/src/volatile.rs @@ -78,6 +78,39 @@ impl VolatileDatabase { db, })) } + + pub fn with_metrics(max_tables: u32) -> Result { + let temp_dir = TempDir::new().map_err(Error::CreateDirectory)?; + let mut db = MdbxDatabase::new_mdbx_database( + temp_dir.path(), + 1024 * 1024 * 1024 * 1024, + max_tables, + None, + )?; + db.with_metrics(); + Ok(DatabaseProxy::Volatile(VolatileDatabase { + temp_dir: Arc::new(temp_dir), + db, + })) + } + + pub fn with_max_readers_and_metrics( + max_tables: u32, + max_readers: u32, + ) -> Result { + let temp_dir = TempDir::new().map_err(Error::CreateDirectory)?; + let mut db = MdbxDatabase::new_mdbx_database( + temp_dir.path(), + 1024 * 1024 * 1024 * 1024, + max_tables, + Some(max_readers), + )?; + db.with_metrics(); + Ok(DatabaseProxy::Volatile(VolatileDatabase { + temp_dir: Arc::new(temp_dir), + db, + })) + } } pub type VolatileTable = MdbxTable; @@ -95,7 +128,7 @@ mod tests { #[test] fn it_can_save_basic_objects() { - let db = VolatileDatabase::new(1).unwrap(); + let db = VolatileDatabase::with_metrics(1).unwrap(); { let table = db.open_table("test".to_string()); diff --git a/lib/src/config/config.rs b/lib/src/config/config.rs index efe2955232..7f9ab501c2 100644 --- a/lib/src/config/config.rs +++ b/lib/src/config/config.rs @@ -358,7 +358,7 @@ impl StorageConfig { )) })? .to_string(); - MdbxDatabase::new_with_max_readers( + MdbxDatabase::new_with_max_readers_and_metrics( db_path, db_config.size, db_config.max_dbs, diff --git a/lib/src/extras/metrics_server.rs b/lib/src/extras/metrics_server.rs index 203433b98e..5bd918be34 100644 --- a/lib/src/extras/metrics_server.rs +++ b/lib/src/extras/metrics_server.rs @@ -4,7 +4,7 @@ use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_consensus::ConsensusProxy; #[cfg(feature = "nimiq-mempool")] use nimiq_mempool::mempool::Mempool; -pub use nimiq_metrics_server::NimiqTaskMonitor; +pub use nimiq_metrics_server::{install_metrics, MetricsCollector, NimiqTaskMonitor}; use nimiq_network_interface::network::Network; pub fn start_metrics_server( @@ -14,6 +14,7 @@ pub fn start_metrics_server( consensus_proxy: ConsensusProxy, network: Arc, task_monitors: &[NimiqTaskMonitor], + collector: MetricsCollector, ) { #[cfg(not(feature = "nimiq-mempool"))] let mempool = None; @@ -24,5 +25,6 @@ pub fn start_metrics_server( consensus_proxy, network, task_monitors, + collector, ); } diff --git a/metrics-server/Cargo.toml b/metrics-server/Cargo.toml index bc018466a1..e1ccb13d43 100644 --- a/metrics-server/Cargo.toml +++ b/metrics-server/Cargo.toml @@ -26,6 +26,7 @@ http-body-util = { version = "0.1" } hyper = { version = "1.3" } hyper-util = { version = "0.1", features = ["server-auto", "tokio"] } log = { workspace = true } +metrics = "0.23" parking_lot = "0.12" prometheus-client = "0.22.2" tokio = { version = "1.38", features = [ @@ -36,6 +37,7 @@ tokio = { version = "1.38", features = [ ] } tokio-metrics = "0.3" +nimiq-database = { workspace = true } nimiq-blockchain = { workspace = true, features = ["metrics"] } nimiq-blockchain-interface = { workspace = true } nimiq-blockchain-proxy = { workspace = true, features = ["full"] } diff --git a/metrics-server/src/database.rs b/metrics-server/src/database.rs new file mode 100644 index 0000000000..4d03c5efb6 --- /dev/null +++ b/metrics-server/src/database.rs @@ -0,0 +1,12 @@ +use prometheus_client::registry::Registry; + +use crate::metrics::MetricsCollector; + +pub struct DatabaseMetrics {} + +impl DatabaseMetrics { + pub fn register(registry: &mut Registry, collector: MetricsCollector) { + let sub_registry = registry.sub_registry_with_prefix("database"); + sub_registry.register_collector(Box::new(collector)); + } +} diff --git a/metrics-server/src/lib.rs b/metrics-server/src/lib.rs index 56aecafdb7..044cf979c4 100644 --- a/metrics-server/src/lib.rs +++ b/metrics-server/src/lib.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, net::SocketAddr, sync::Arc}; +use ::metrics::set_global_recorder; +use database::DatabaseMetrics; use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_consensus::ConsensusProxy; use nimiq_mempool::mempool::Mempool; @@ -14,6 +16,7 @@ use prometheus_client::{ use tokio_metrics::RuntimeMonitor; use tokio_metrics::TaskMonitor; +pub use crate::metrics::MetricsCollector; #[cfg(tokio_unstable)] use crate::tokio_runtime::TokioRuntimeMetrics; use crate::{ @@ -23,7 +26,9 @@ use crate::{ mod chain; mod consensus; +mod database; mod mempool; +mod metrics; mod network; mod server; #[cfg(tokio_unstable)] @@ -78,6 +83,14 @@ impl Debug for NumericClosureMetric { } } +/// To be called at the beginning of the program to install the metrics collector. +/// This is currently only used for database metrics. +pub fn install_metrics() -> MetricsCollector { + let collector = MetricsCollector::default(); + set_global_recorder(collector.clone()).unwrap(); + collector +} + pub fn start_metrics_server( addr: SocketAddr, blockchain_proxy: BlockchainProxy, @@ -85,6 +98,7 @@ pub fn start_metrics_server( consensus_proxy: ConsensusProxy, network: Arc, task_monitors: &[NimiqTaskMonitor], + collector: MetricsCollector, ) { let mut registry = Registry::default(); let nimiq_registry = registry.sub_registry_with_prefix("nimiq"); @@ -92,6 +106,7 @@ pub fn start_metrics_server( BlockMetrics::register(nimiq_registry, blockchain_proxy); ConsensusMetrics::register(nimiq_registry, consensus_proxy); NetworkMetrics::register(nimiq_registry, network); + DatabaseMetrics::register(nimiq_registry, collector); if let Some(mempool) = mempool { MempoolMetrics::register(nimiq_registry, mempool); diff --git a/metrics-server/src/metrics.rs b/metrics-server/src/metrics.rs new file mode 100644 index 0000000000..c189896baa --- /dev/null +++ b/metrics-server/src/metrics.rs @@ -0,0 +1,266 @@ +use std::{ + collections::HashMap, + fmt, + sync::{atomic::Ordering, Arc}, +}; + +use metrics::{ + atomics::AtomicU64, Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, + Label, Metadata, Recorder, SharedString, Unit, +}; +use parking_lot::RwLock; +use prometheus_client::{ + collector::Collector, + encoding::{DescriptorEncoder, EncodeMetric, MetricEncoder}, + metrics::MetricType, + registry::Unit as PrometheusUnit, +}; + +const HIST_BUCKETS: [f64; 11] = [ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, +]; + +#[derive(Debug, Default)] +struct Descriptor { + help: String, + unit: Option, +} + +impl Descriptor { + fn new(help: String, unit: Option) -> Self { + Self { + help, + unit: unit.map(convert_unit_to_prometheus), + } + } +} + +#[derive(Debug)] +enum Metric { + Counter(Arc), + Gauge(Arc), + Histogram(Arc), +} + +impl EncodeMetric for Metric { + fn encode(&self, encoder: MetricEncoder) -> Result<(), fmt::Error> { + match self { + Metric::Counter(counter) => counter.inner.encode(encoder), + Metric::Gauge(gauge) => gauge.inner.encode(encoder), + Metric::Histogram(hist) => hist.inner.encode(encoder), + } + } + + fn metric_type(&self) -> MetricType { + match self { + Metric::Counter(_) => MetricType::Counter, + Metric::Gauge(_) => MetricType::Gauge, + Metric::Histogram(_) => MetricType::Histogram, + } + } +} + +/// This module provides compatibility with the `metrics` crate. +/// It registers as a `Collector` with the `prometheus_client` crate +/// and implements the `Recorder` trait of the `metrics` crate. +#[derive(Debug, Default)] +pub struct MetricsCollector { + metrics: Arc, Metric)>>>>, + descriptors: Arc>>, +} + +impl Clone for MetricsCollector { + fn clone(&self) -> Self { + Self { + metrics: Arc::clone(&self.metrics), + descriptors: Arc::clone(&self.descriptors), + } + } +} + +impl Collector for MetricsCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), fmt::Error> { + for (key_name, metrics) in self.metrics.read().iter() { + // Find descriptor for the metric. + let descriptors = self.descriptors.read(); + let (help, unit) = descriptors + .get(key_name) + .map(|d| (d.help.as_str(), d.unit.as_ref())) + .unwrap_or_else(|| ("", None)); + + // Gather statistics about the metrics. + if metrics.is_empty() { + continue; + } + let metric_type = metrics[0].1.metric_type(); + // If there is more than one entry, this is always true. + let has_labels = !metrics[0].0.is_empty(); + + // Encode descriptor and metric. + let mut descriptor_encoder = + encoder.encode_descriptor(key_name.as_str(), help, unit, metric_type)?; + + // Encode metrics for this key. + // If labels are present, encode the metric as a family. + if has_labels { + for (labels, metric) in metrics { + let metric_encoder = descriptor_encoder.encode_family(labels)?; + metric.encode(metric_encoder)?; + } + } else { + let metric = &metrics[0].1; + metric.encode(descriptor_encoder)?; + } + } + Ok(()) + } +} + +impl MetricsCollector { + fn register(&self, key: &Key, metric: Metric) { + let (key_name, labels) = key.clone().into_parts(); + let labels = convert_labels_to_prometheus(labels); + + let mut metrics = self.metrics.write(); + let entry = metrics.entry(key_name).or_default(); + + // Make sure that all metrics for a key have the same type + // and that labels are set on duplicate entries.. + assert!( + entry.is_empty() + || (entry[0].1.metric_type().as_str() == metric.metric_type().as_str() + && !entry[0].0.is_empty() + && !labels.is_empty()), + "Registering a metric with a different type or missing labels: `{:?}`", + key + ); + entry.push((labels, metric)); + } + + fn describe(&self, key: &KeyName, unit: Option, description: SharedString) { + assert!( + self.descriptors + .write() + .insert(key.clone(), Descriptor::new(description.into_owned(), unit)) + .is_none(), + "Registering a duplicate metric descriptor: `{:?}`", + key + ); + } +} + +impl Recorder for MetricsCollector { + fn describe_counter(&self, key: KeyName, unit: Option, description: SharedString) { + self.describe(&key, unit, description) + } + + fn describe_gauge(&self, key: KeyName, unit: Option, description: SharedString) { + self.describe(&key, unit, description) + } + + fn describe_histogram(&self, key: KeyName, unit: Option, description: SharedString) { + self.describe(&key, unit, description) + } + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter { + let counter = Arc::new(MetricsCounter::default()); + self.register(key, Metric::Counter(Arc::clone(&counter))); + Counter::from_arc(counter) + } + + fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge { + let gauge = Arc::new(MetricsGauge::default()); + self.register(key, Metric::Gauge(Arc::clone(&gauge))); + Gauge::from_arc(gauge) + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram { + let hist = Arc::new(MetricsHistogram::default()); + self.register(key, Metric::Histogram(Arc::clone(&hist))); + Histogram::from_arc(hist) + } +} + +fn convert_labels_to_prometheus(labels: Vec