Skip to content

Commit

Permalink
Add metrics to the database
Browse files Browse the repository at this point in the history
  • Loading branch information
paberr committed Jun 28, 2024
1 parent 788e140 commit f20c68b
Show file tree
Hide file tree
Showing 19 changed files with 1,192 additions and 98 deletions.
42 changes: 38 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use log::info;
use nimiq::prover::prover_main;
pub use nimiq::{
client::Client,
config::{command_line::CommandLine, config::ClientConfig, config_file::ConfigFile},
Expand All @@ -13,6 +12,7 @@ pub use nimiq::{
signal_handling::initialize_signal_handler,
},
};
use nimiq::{extras::metrics_server::install_metrics, prover::prover_main};

async fn main_inner() -> Result<(), Error> {
// Keep for potential future reactivation
Expand All @@ -39,7 +39,7 @@ async fn main_inner() -> Result<(), Error> {
// Initialize panic hook.
initialize_panic_reporting();

// Initialize signal handler
// Initialize signal handler.
initialize_signal_handler();

// Early return in case of a proving process.
Expand All @@ -63,6 +63,12 @@ async fn main_inner() -> Result<(), Error> {
let metrics_config = config.metrics_server.clone();
let metrics_enabled = metrics_config.is_some();

// Initialize database logging.
let mut metrics_collector = None;
if metrics_enabled {
metrics_collector = Some(install_metrics());
}

// Create client from config.
let mut client: Client = Client::from_config(config).await?;

Expand Down Expand Up @@ -128,6 +134,7 @@ async fn main_inner() -> Result<(), Error> {
client.consensus_proxy(),
client.network(),
&nimiq_task_metric,
metrics_collector.unwrap(),
)
}

Expand Down
7 changes: 6 additions & 1 deletion database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ workspace = true

[dependencies]
bitflags = "2.6"
libmdbx = "0.5.0"
libmdbx = { git = "https://github.com/paberr/libmdbx-rs" }
log = { workspace = true }
tempfile = "3"
thiserror = "1.0"
metrics = "0.23"
parking_lot = "0.12"
rustc-hash = "1.1"
strum = "0.26"
strum_macros = "0.26"

nimiq-database-value = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use bitflags::bitflags;

mod error;
pub mod mdbx;
mod metrics;
/// Database implementation that can handle volatile and persistent storage.
pub mod proxy;
/// Abstraction for methods related to the database.
Expand Down
44 changes: 39 additions & 5 deletions database/src/mdbx/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use std::{borrow::Cow, marker::PhantomData};
use std::{borrow::Cow, marker::PhantomData, sync::Arc};

use libmdbx::{TransactionKind, WriteFlags, RO, RW};
use nimiq_database_value::{AsDatabaseBytes, FromDatabaseValue};

use super::{DbKvPair, IntoIter};
use crate::traits::{ReadCursor, WriteCursor};
use crate::{
metrics::{DatabaseEnvMetrics, Operation},
traits::{ReadCursor, WriteCursor},
};

/// A cursor for navigating the entries within a table.
/// Wraps the libmdbx cursor so that we only expose our own methods.
pub struct MdbxCursor<'txn, K: TransactionKind> {
cursor: libmdbx::Cursor<'txn, K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
table_name: String,
}
/// Instantiation of the `MdbxCursor` for read transactions.
pub type MdbxReadCursor<'txn> = MdbxCursor<'txn, RO>;
Expand All @@ -20,8 +25,33 @@ impl<'txn, Kind> MdbxCursor<'txn, Kind>
where
Kind: TransactionKind,
{
pub(crate) fn new(cursor: libmdbx::Cursor<'txn, Kind>) -> Self {
MdbxCursor { cursor }
pub(crate) fn new(
table_name: &str,
cursor: libmdbx::Cursor<'txn, Kind>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
) -> Self {
MdbxCursor {
table_name: table_name.to_string(),
cursor,
metrics,
}
}

/// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
/// size.
///
/// Otherwise, just execute the closure.
fn execute_with_operation_metric<R>(
&mut self,
operation: Operation,
value_size: Option<usize>,
f: impl FnOnce(&mut Self) -> R,
) -> R {
if let Some(metrics) = self.metrics.as_ref().cloned() {
metrics.record_operation(&self.table_name.clone(), operation, value_size, || f(self))
} else {
f(self)
}
}
}

Expand Down Expand Up @@ -246,14 +276,18 @@ where
{
fn clone(&self) -> Self {
Self {
table_name: self.table_name.clone(),
cursor: self.cursor.clone(),
metrics: self.metrics.as_ref().cloned(),
}
}
}

impl<'txn> WriteCursor<'txn> for MdbxWriteCursor<'txn> {
fn remove(&mut self) {
self.cursor.del(WriteFlags::empty()).unwrap();
self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |cursor| {
cursor.cursor.del(WriteFlags::empty()).unwrap();
});
}

fn append<K, V>(&mut self, key: &K, value: &V)
Expand Down
93 changes: 51 additions & 42 deletions database/src/mdbx/database.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
use std::{borrow::Cow, fs, path::Path, sync::Arc};
use std::{borrow::Cow, fmt, fs, path::Path, sync::Arc};

use libmdbx::NoWriteMap;
use log::info;

use super::{MdbxReadTransaction, MdbxWriteTransaction};
use crate::{traits::Database, DatabaseProxy, Error, TableFlags};
use crate::{metrics::DatabaseEnvMetrics, traits::Database, DatabaseProxy, Error, TableFlags};

pub(super) type DbKvPair<'a> = (Cow<'a, [u8]>, Cow<'a, [u8]>);

/// Wrapper around the mdbx database handle.
/// A database can hold multiple tables.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct MdbxDatabase {
pub(super) db: Arc<libmdbx::Database<NoWriteMap>>,
pub(super) metrics: Option<Arc<DatabaseEnvMetrics>>,
}

impl fmt::Debug for MdbxDatabase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MdbxDatabase")
.field("db", &self.db)
.finish()
}
}

impl Database for MdbxDatabase {
Expand Down Expand Up @@ -49,15 +58,22 @@ impl Database for MdbxDatabase {
txn.create_table(Some(&name), table_flags).unwrap();
txn.commit().unwrap();

if let Some(ref metrics) = self.metrics {
metrics.register_table(&name);
}

MdbxTable { name }
}

fn read_transaction(&self) -> Self::ReadTransaction<'_> {
MdbxReadTransaction::new(self.db.begin_ro_txn().unwrap())
MdbxReadTransaction::new(self.db.begin_ro_txn().unwrap(), self.metrics.clone())
}

fn write_transaction(&self) -> Self::WriteTransaction<'_> {
MdbxWriteTransaction::new(self.db.begin_rw_txn().unwrap())
MdbxWriteTransaction::new(
self.db.begin_rw_txn().unwrap(),
self.metrics.as_ref().cloned(),
)
}
}

Expand Down Expand Up @@ -91,6 +107,30 @@ impl MdbxDatabase {
)?))
}

#[allow(clippy::new_ret_no_self)]
pub fn new_with_metrics<P: AsRef<Path>>(
path: P,
size: usize,
max_tables: u32,
) -> Result<DatabaseProxy, Error> {
let mut db = MdbxDatabase::new_mdbx_database(path.as_ref(), size, max_tables, None)?;
db.with_metrics();
Ok(DatabaseProxy::Persistent(db))
}

#[allow(clippy::new_ret_no_self)]
pub fn new_with_max_readers_and_metrics<P: AsRef<Path>>(
path: P,
size: usize,
max_tables: u32,
max_readers: u32,
) -> Result<DatabaseProxy, Error> {
let mut db =
MdbxDatabase::new_mdbx_database(path.as_ref(), size, max_tables, Some(max_readers))?;
db.with_metrics();
Ok(DatabaseProxy::Persistent(db))
}

pub(crate) fn new_mdbx_database(
path: &Path,
size: usize,
Expand Down Expand Up @@ -119,47 +159,16 @@ impl MdbxDatabase {
let cur_mapsize = info.map_size();
info!(cur_mapsize, "MDBX memory map size");

let mdbx = MdbxDatabase { db: Arc::new(db) };
if mdbx.need_resize(0) {
info!("MDBX memory needs to be resized.");
}
let mdbx = MdbxDatabase {
db: Arc::new(db),
metrics: None,
};

Ok(mdbx)
}

pub fn need_resize(&self, threshold_size: usize) -> bool {
let info = self.db.info().unwrap();
let stat = self.db.stat().unwrap();

let size_used = (stat.page_size() as usize) * (info.last_pgno() + 1);

if threshold_size > 0 && info.map_size() - size_used < threshold_size {
info!(
size_used,
threshold_size,
map_size = info.map_size(),
space_remaining = info.map_size() - size_used,
"DB settings (threshold-based)"
);
return true;
}

// Resize is currently not supported. So don't let the resize happen
// if a specific percentage is reached.
let resize_percent: f64 = 1_f64;

if (size_used as f64) / (info.map_size() as f64) > resize_percent {
info!(
map_size = info.map_size(),
size_used,
space_remaining = info.map_size() - size_used,
percent_used = (size_used as f64) / (info.map_size() as f64),
"DB resize (percent-based)"
);
return true;
}

false
pub(crate) fn with_metrics(&mut self) {
self.metrics = Some(DatabaseEnvMetrics::new().into());
}
}

Expand Down
Loading

0 comments on commit f20c68b

Please sign in to comment.