From d99bc32f2183ae5a841aa79bcf84568f18c65a31 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Wed, 11 Dec 2024 13:55:55 -0500 Subject: [PATCH] Opentelemetry implementation (#761) --- benchmark/Cargo.toml | 5 +++++ benchmark/src/create.rs | 5 +++++ benchmark/src/main.rs | 34 ++++++++++++++++++++++++++++++++++ firewood/Cargo.toml | 1 + firewood/src/db.rs | 9 +++++++++ firewood/src/manager.rs | 1 + firewood/src/v2/propose.rs | 2 -- storage/Cargo.toml | 1 + storage/src/nodestore.rs | 5 +++++ storage/src/trie_hash.rs | 7 +------ 10 files changed, 62 insertions(+), 8 deletions(-) diff --git a/benchmark/Cargo.toml b/benchmark/Cargo.toml index 29a129561..d6253b76b 100644 --- a/benchmark/Cargo.toml +++ b/benchmark/Cargo.toml @@ -18,6 +18,11 @@ tikv-jemallocator = "0.6.0" env_logger = "0.11.5" zipf = "7.0.1" log = "0.4.20" +fastrace = { version = "0.7.4", features = ["enable"] } +fastrace-opentelemetry = { version = "0.8.0" } +opentelemetry-otlp = "0.27.0" +opentelemetry = "0.27.0" +opentelemetry_sdk = "0.27.1" [features] logger = ["firewood/logger"] diff --git a/benchmark/src/create.rs b/benchmark/src/create.rs index f7f32e289..bcc13546b 100644 --- a/benchmark/src/create.rs +++ b/benchmark/src/create.rs @@ -4,6 +4,8 @@ use std::error::Error; use std::time::Instant; +use fastrace::prelude::SpanContext; +use fastrace::{func_path, Span}; use firewood::db::Db; use firewood::v2::api::{Db as _, Proposal as _}; use log::info; @@ -21,6 +23,9 @@ impl TestRunner for Create { let start = Instant::now(); for key in 0..args.number_of_batches { + let root = Span::root(func_path!(), SpanContext::random()); + let _guard = root.set_local_parent(); + let batch = Self::generate_inserts(key * keys, args.batch_size).collect(); let proposal = db.propose(batch).await.expect("proposal should succeed"); diff --git a/benchmark/src/main.rs b/benchmark/src/main.rs index a4f2a9c45..39c712cb9 100644 --- a/benchmark/src/main.rs +++ b/benchmark/src/main.rs @@ -12,11 +12,13 @@ // use clap::{Parser, Subcommand}; +use fastrace_opentelemetry::OpenTelemetryReporter; use firewood::logger::trace; use log::LevelFilter; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::MetricKindMask; use sha2::{Digest, Sha256}; +use std::borrow::Cow; use std::error::Error; use std::net::{Ipv6Addr, SocketAddr}; use std::num::NonZeroUsize; @@ -26,6 +28,15 @@ use std::time::Duration; use firewood::db::{BatchOp, Db, DbConfig}; use firewood::manager::RevisionManagerConfig; +use fastrace::collector::Config; + +use opentelemetry::trace::SpanKind; +use opentelemetry::InstrumentationScope; +use opentelemetry::KeyValue; +use opentelemetry_otlp::SpanExporter; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::Resource; + #[derive(Parser, Debug)] struct Args { #[arg(short, long, default_value_t = 10000)] @@ -130,6 +141,27 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { + let reporter = OpenTelemetryReporter::new( + SpanExporter::builder() + .with_tonic() + .with_endpoint("http://127.0.0.1:4317".to_string()) + .with_protocol(opentelemetry_otlp::Protocol::Grpc) + .with_timeout(Duration::from_secs( + opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, + )) + .build() + .expect("initialize oltp exporter"), + SpanKind::Server, + Cow::Owned(Resource::new([KeyValue::new( + "service.name", + "avalabs.firewood.benchmark", + )])), + InstrumentationScope::builder("firewood") + .with_version(env!("CARGO_PKG_VERSION")) + .build(), + ); + fastrace::set_reporter(reporter, Config::default()); + let args = Args::parse(); if args.test_name == TestName::Single && args.batch_size > 1000 { @@ -204,5 +236,7 @@ async fn main() -> Result<(), Box> { println!("{}", prometheus_handle.render()); } + fastrace::flush(); + Ok(()) } diff --git a/firewood/Cargo.toml b/firewood/Cargo.toml index 2392d519b..58f715516 100644 --- a/firewood/Cargo.toml +++ b/firewood/Cargo.toml @@ -33,6 +33,7 @@ test-case = "3.3.1" integer-encoding = "4.0.0" io-uring = {version = "0.7", optional = true } smallvec = "1.6.1" +fastrace = { version = "0.7.4" } [features] default = [] diff --git a/firewood/src/db.rs b/firewood/src/db.rs index d862ecb01..ad435fca1 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -148,6 +148,7 @@ where Ok(self.manager.read().await.all_hashes()) } + #[fastrace::trace(short_name = true)] async fn propose<'p, K: KeyType, V: ValueType>( &'p self, batch: api::Batch, @@ -158,6 +159,7 @@ where let parent = self.manager.read().await.current_revision(); let proposal = NodeStore::new(parent)?; let mut merkle = Merkle::from(proposal); + let span = fastrace::Span::enter_with_local_parent("merkleops"); for op in batch { match op { BatchOp::Put { key, value } => { @@ -168,9 +170,15 @@ where } } } + + drop(span); + let span = fastrace::Span::enter_with_local_parent("freeze"); + let nodestore = merkle.into_inner(); let immutable: Arc, FileBacked>> = Arc::new(nodestore.into()); + + drop(span); self.manager.write().await.add_proposal(immutable.clone()); self.metrics.proposals.increment(1); @@ -266,6 +274,7 @@ impl api::DbView for Proposal<'_> { impl<'a> api::Proposal for Proposal<'a> { type Proposal = Proposal<'a>; + #[fastrace::trace(short_name = true)] async fn propose( self: Arc, batch: api::Batch, diff --git a/firewood/src/manager.rs b/firewood/src/manager.rs index d5ba09ab7..e85123af4 100644 --- a/firewood/src/manager.rs +++ b/firewood/src/manager.rs @@ -135,6 +135,7 @@ impl RevisionManager { /// This write can be delayed, but would mean that recovery will not roll forward to this revision. /// 8. Proposal Cleanup. /// Any other proposals that have this proposal as a parent should be reparented to the committed version. + #[fastrace::trace(short_name = true)] pub fn commit(&mut self, proposal: ProposedRevision) -> Result<(), RevisionManagerError> { // 1. Commit check let current_revision = self.current_revision(); diff --git a/firewood/src/v2/propose.rs b/firewood/src/v2/propose.rs index 835ca61d0..a8dc0f216 100644 --- a/firewood/src/v2/propose.rs +++ b/firewood/src/v2/propose.rs @@ -166,8 +166,6 @@ impl api::Proposal for Proposal { } async fn commit(self: Arc) -> Result<(), api::Error> { - // TODO: commit should modify the db; this will only work for - // emptydb at the moment match &self.base { ProposalBase::Proposal(base) => base.clone().commit().await, ProposalBase::View(_) => Ok(()), diff --git a/storage/Cargo.toml b/storage/Cargo.toml index adb8e161d..e390de49d 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -21,6 +21,7 @@ log = { version = "0.4.20", optional = true } bytemuck = "1.7.0" bytemuck_derive = "1.7.0" bitfield = "0.17.0" +fastrace = { version = "0.7.4" } [dev-dependencies] rand = "0.8.5" diff --git a/storage/src/nodestore.rs b/storage/src/nodestore.rs index 34e3ba6d4..ef56b7893 100644 --- a/storage/src/nodestore.rs +++ b/storage/src/nodestore.rs @@ -6,6 +6,7 @@ use arc_swap::access::DynAccess; use arc_swap::ArcSwap; use bincode::{DefaultOptions, Options as _}; use bytemuck_derive::{AnyBitPattern, NoUninit}; +use fastrace::local::LocalSpan; use metrics::counter; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -218,6 +219,8 @@ impl NodeStore { let addr = addr.get() + 1; // skip the length byte + let _span = LocalSpan::enter_with_local_parent("read_and_deserialize"); + let area_stream = self.storage.stream_from(addr)?; let node = Node::from_reader(area_stream)?; Ok(node.into()) @@ -925,6 +928,7 @@ impl NodeStore { impl NodeStore, S> { /// Persist the freelist from this proposal to storage. + #[fastrace::trace(short_name = true)] pub fn flush_freelist(&self) -> Result<(), Error> { // Write the free lists to storage let free_list_bytes = bytemuck::bytes_of(&self.header.free_lists); @@ -934,6 +938,7 @@ impl NodeStore, S> { } /// Persist all the nodes of a proposal to storage. + #[fastrace::trace(short_name = true)] pub fn flush_nodes(&self) -> Result<(), Error> { for (addr, (area_size_index, node)) in self.kind.new.iter() { let mut stored_area_bytes = Vec::new(); diff --git a/storage/src/trie_hash.rs b/storage/src/trie_hash.rs index 0f48000a6..569812dc6 100644 --- a/storage/src/trie_hash.rs +++ b/storage/src/trie_hash.rs @@ -51,14 +51,9 @@ impl From> for TrieHash { impl TrieHash { /// Return the length of a TrieHash - const fn len() -> usize { + pub(crate) const fn len() -> usize { std::mem::size_of::() } - - /// Returns true iff each element in this hash is 0. - pub fn is_empty(&self) -> bool { - *self == TrieHash::default() - } } impl Serialize for TrieHash {