Skip to content

Commit

Permalink
Opentelemetry implementation (#761)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuris authored Dec 11, 2024
1 parent 29d0d3d commit d99bc32
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 8 deletions.
5 changes: 5 additions & 0 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ tikv-jemallocator = "0.6.0"
env_logger = "0.11.5"
zipf = "7.0.1"
log = "0.4.20"
fastrace = { version = "0.7.4", features = ["enable"] }
fastrace-opentelemetry = { version = "0.8.0" }
opentelemetry-otlp = "0.27.0"
opentelemetry = "0.27.0"
opentelemetry_sdk = "0.27.1"

[features]
logger = ["firewood/logger"]
5 changes: 5 additions & 0 deletions benchmark/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use std::error::Error;
use std::time::Instant;

use fastrace::prelude::SpanContext;
use fastrace::{func_path, Span};
use firewood::db::Db;
use firewood::v2::api::{Db as _, Proposal as _};
use log::info;
Expand All @@ -21,6 +23,9 @@ impl TestRunner for Create {
let start = Instant::now();

for key in 0..args.number_of_batches {
let root = Span::root(func_path!(), SpanContext::random());
let _guard = root.set_local_parent();

let batch = Self::generate_inserts(key * keys, args.batch_size).collect();

let proposal = db.propose(batch).await.expect("proposal should succeed");
Expand Down
34 changes: 34 additions & 0 deletions benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
//

use clap::{Parser, Subcommand};
use fastrace_opentelemetry::OpenTelemetryReporter;
use firewood::logger::trace;
use log::LevelFilter;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;
use sha2::{Digest, Sha256};
use std::borrow::Cow;
use std::error::Error;
use std::net::{Ipv6Addr, SocketAddr};
use std::num::NonZeroUsize;
Expand All @@ -26,6 +28,15 @@ use std::time::Duration;
use firewood::db::{BatchOp, Db, DbConfig};
use firewood::manager::RevisionManagerConfig;

use fastrace::collector::Config;

use opentelemetry::trace::SpanKind;
use opentelemetry::InstrumentationScope;
use opentelemetry::KeyValue;
use opentelemetry_otlp::SpanExporter;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::Resource;

#[derive(Parser, Debug)]
struct Args {
#[arg(short, long, default_value_t = 10000)]
Expand Down Expand Up @@ -130,6 +141,27 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let reporter = OpenTelemetryReporter::new(
SpanExporter::builder()
.with_tonic()
.with_endpoint("http://127.0.0.1:4317".to_string())
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(Duration::from_secs(
opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
))
.build()
.expect("initialize oltp exporter"),
SpanKind::Server,
Cow::Owned(Resource::new([KeyValue::new(
"service.name",
"avalabs.firewood.benchmark",
)])),
InstrumentationScope::builder("firewood")
.with_version(env!("CARGO_PKG_VERSION"))
.build(),
);
fastrace::set_reporter(reporter, Config::default());

let args = Args::parse();

if args.test_name == TestName::Single && args.batch_size > 1000 {
Expand Down Expand Up @@ -204,5 +236,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("{}", prometheus_handle.render());
}

fastrace::flush();

Ok(())
}
1 change: 1 addition & 0 deletions firewood/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ test-case = "3.3.1"
integer-encoding = "4.0.0"
io-uring = {version = "0.7", optional = true }
smallvec = "1.6.1"
fastrace = { version = "0.7.4" }

[features]
default = []
Expand Down
9 changes: 9 additions & 0 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ where
Ok(self.manager.read().await.all_hashes())
}

#[fastrace::trace(short_name = true)]
async fn propose<'p, K: KeyType, V: ValueType>(
&'p self,
batch: api::Batch<K, V>,
Expand All @@ -158,6 +159,7 @@ where
let parent = self.manager.read().await.current_revision();
let proposal = NodeStore::new(parent)?;
let mut merkle = Merkle::from(proposal);
let span = fastrace::Span::enter_with_local_parent("merkleops");
for op in batch {
match op {
BatchOp::Put { key, value } => {
Expand All @@ -168,9 +170,15 @@ where
}
}
}

drop(span);
let span = fastrace::Span::enter_with_local_parent("freeze");

let nodestore = merkle.into_inner();
let immutable: Arc<NodeStore<Arc<ImmutableProposal>, FileBacked>> =
Arc::new(nodestore.into());

drop(span);
self.manager.write().await.add_proposal(immutable.clone());

self.metrics.proposals.increment(1);
Expand Down Expand Up @@ -266,6 +274,7 @@ impl api::DbView for Proposal<'_> {
impl<'a> api::Proposal for Proposal<'a> {
type Proposal = Proposal<'a>;

#[fastrace::trace(short_name = true)]
async fn propose<K: KeyType, V: ValueType>(
self: Arc<Self>,
batch: api::Batch<K, V>,
Expand Down
1 change: 1 addition & 0 deletions firewood/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl RevisionManager {
/// This write can be delayed, but would mean that recovery will not roll forward to this revision.
/// 8. Proposal Cleanup.
/// Any other proposals that have this proposal as a parent should be reparented to the committed version.
#[fastrace::trace(short_name = true)]
pub fn commit(&mut self, proposal: ProposedRevision) -> Result<(), RevisionManagerError> {
// 1. Commit check
let current_revision = self.current_revision();
Expand Down
2 changes: 0 additions & 2 deletions firewood/src/v2/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ impl<T: api::DbView + Send + Sync> api::Proposal for Proposal<T> {
}

async fn commit(self: Arc<Self>) -> Result<(), api::Error> {
// TODO: commit should modify the db; this will only work for
// emptydb at the moment
match &self.base {
ProposalBase::Proposal(base) => base.clone().commit().await,
ProposalBase::View(_) => Ok(()),
Expand Down
1 change: 1 addition & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ log = { version = "0.4.20", optional = true }
bytemuck = "1.7.0"
bytemuck_derive = "1.7.0"
bitfield = "0.17.0"
fastrace = { version = "0.7.4" }

[dev-dependencies]
rand = "0.8.5"
Expand Down
5 changes: 5 additions & 0 deletions storage/src/nodestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arc_swap::access::DynAccess;
use arc_swap::ArcSwap;
use bincode::{DefaultOptions, Options as _};
use bytemuck_derive::{AnyBitPattern, NoUninit};
use fastrace::local::LocalSpan;
use metrics::counter;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -218,6 +219,8 @@ impl<T: ReadInMemoryNode, S: ReadableStorage> NodeStore<T, S> {

let addr = addr.get() + 1; // skip the length byte

let _span = LocalSpan::enter_with_local_parent("read_and_deserialize");

let area_stream = self.storage.stream_from(addr)?;
let node = Node::from_reader(area_stream)?;
Ok(node.into())
Expand Down Expand Up @@ -925,6 +928,7 @@ impl<T, S: WritableStorage> NodeStore<T, S> {

impl<S: WritableStorage> NodeStore<Arc<ImmutableProposal>, S> {
/// Persist the freelist from this proposal to storage.
#[fastrace::trace(short_name = true)]
pub fn flush_freelist(&self) -> Result<(), Error> {
// Write the free lists to storage
let free_list_bytes = bytemuck::bytes_of(&self.header.free_lists);
Expand All @@ -934,6 +938,7 @@ impl<S: WritableStorage> NodeStore<Arc<ImmutableProposal>, S> {
}

/// Persist all the nodes of a proposal to storage.
#[fastrace::trace(short_name = true)]
pub fn flush_nodes(&self) -> Result<(), Error> {
for (addr, (area_size_index, node)) in self.kind.new.iter() {
let mut stored_area_bytes = Vec::new();
Expand Down
7 changes: 1 addition & 6 deletions storage/src/trie_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,9 @@ impl From<GenericArray<u8, typenum::U32>> for TrieHash {

impl TrieHash {
/// Return the length of a TrieHash
const fn len() -> usize {
pub(crate) const fn len() -> usize {
std::mem::size_of::<TrieHash>()
}

/// Returns true iff each element in this hash is 0.
pub fn is_empty(&self) -> bool {
*self == TrieHash::default()
}
}

impl Serialize for TrieHash {
Expand Down

0 comments on commit d99bc32

Please sign in to comment.