From 28c8c7c7e4c1fb372434e1fd4e4649c6058164f3 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 25 Jun 2024 10:52:51 -0400 Subject: [PATCH] feat(derive): Stage Level Metrics (#309) * feat(derive): stage metrics * feat: pull out feature flags into macros * fix: merge * fix: use macros --- crates/derive/Cargo.toml | 9 +- crates/derive/src/lib.rs | 7 +- crates/derive/src/macros.rs | 55 ++++++++++ crates/derive/src/metrics.rs | 54 ++++++++++ crates/derive/src/online/alloy_providers.rs | 100 +++++++++--------- crates/derive/src/online/beacon_client.rs | 34 +++--- crates/derive/src/online/blob_provider.rs | 11 +- crates/derive/src/online/metrics.rs | 18 ---- crates/derive/src/online/mod.rs | 2 - crates/derive/src/stages/attributes_queue.rs | 18 +++- crates/derive/src/stages/batch_queue.rs | 54 +++++++--- crates/derive/src/stages/channel_bank.rs | 18 +++- crates/derive/src/stages/channel_reader.rs | 3 + crates/derive/src/stages/frame_queue.rs | 2 + crates/derive/src/stages/l1_traversal.rs | 1 + .../derive/src/types/batch/span_batch/raw.rs | 1 - crates/derive/src/types/channel.rs | 2 - crates/derive/src/types/frame.rs | 2 - examples/trusted-sync/Cargo.toml | 2 +- 19 files changed, 268 insertions(+), 125 deletions(-) create mode 100644 crates/derive/src/macros.rs create mode 100644 crates/derive/src/metrics.rs delete mode 100644 crates/derive/src/online/metrics.rs diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index 4c90225eb..581a8c819 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -39,8 +39,6 @@ alloc-no-stdlib = "2.0.4" serde = { version = "1.0.203", default-features = false, features = ["derive"], optional = true } # `online` feature dependencies -lazy_static = { version = "1.5.0", optional = true } -prometheus = { version = "0.13.4", features = ["process"], optional = true } c-kzg = { version = "1.0.2", default-features = false, optional = true } sha2 = { version = "0.10.8", default-features = false, optional = true } alloy-transport = { version = "0.1", default-features = false, optional = true } @@ -50,6 +48,10 @@ alloy-rpc-types = { version = "0.1", default-features = false, optional = true } serde_json = { version = "1.0.94", default-features = false, optional = true } reqwest = { version = "0.12.4", default-features = false, optional = true } +# `metrics` feature dependencies +lazy_static = { version = "1.5.0", optional = true } +prometheus = { version = "0.13.4", features = ["process"], optional = true } + # `test-utils` feature dependencies alloy-node-bindings = { version = "0.1", default-features = false, optional = true } tracing-subscriber = { version = "0.3.18", optional = true } @@ -73,6 +75,7 @@ serde = [ "op-alloy-consensus/serde" ] k256 = ["alloy-primitives/k256", "alloy-consensus/k256", "op-alloy-consensus/k256"] +metrics = ["dep:prometheus", "dep:lazy_static"] online = [ "dep:serde_json", "dep:revm", @@ -83,8 +86,6 @@ online = [ "dep:alloy-transport", "dep:alloy-transport-http", "dep:reqwest", - "dep:prometheus", - "dep:lazy_static", "alloy-provider/reqwest", "alloy-rpc-client/reqwest", "alloy-transport-http/reqwest", diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 2f7a66e5d..1e5c76c91 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -2,10 +2,12 @@ #![warn(missing_debug_implementations, missing_docs, unreachable_pub, rustdoc::all)] #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -#![no_std] +#![cfg_attr(not(any(test, feature = "metrics")), no_std)] extern crate alloc; +mod macros; + mod params; pub use params::{ ChannelID, CHANNEL_ID_LENGTH, CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC, @@ -22,3 +24,6 @@ pub mod types; #[cfg(feature = "online")] pub mod online; + +#[cfg(feature = "metrics")] +pub mod metrics; diff --git a/crates/derive/src/macros.rs b/crates/derive/src/macros.rs new file mode 100644 index 000000000..1a0cb0295 --- /dev/null +++ b/crates/derive/src/macros.rs @@ -0,0 +1,55 @@ +//! Macros for use across derive. + +/// Starts the timer with a label value. +#[macro_export] +macro_rules! timer { + (START, $metric:ident, $labels:expr, $timer:ident) => { + #[cfg(feature = "metrics")] + let $timer = $crate::metrics::$metric.with_label_values($labels).start_timer(); + #[cfg(not(feature = "metrics"))] + let $timer = (); + }; + (DISCARD, $timer:ident) => { + #[cfg(feature = "metrics")] + $timer.stop_and_discard(); + }; + (STOP, $timer:ident) => { + #[cfg(feature = "metrics")] + $timer.stop_and_record(); + }; +} + +/// Increments a metric with a label value. +#[macro_export] +macro_rules! inc_gauge { + ($metric:ident, $label:expr) => { + #[cfg(feature = "metrics")] + $crate::metrics::$metric.with_label_values(&[$label]).inc(); + }; + ($metric:ident, $value:expr, $label:expr) => { + #[cfg(feature = "metrics")] + $crate::metrics::$metric.with_label_values(&[$label]).add($value); + }; +} + +/// Observes a metric with a label value. +#[macro_export] +macro_rules! observe_histogram { + ($metric:ident, $value:expr) => { + #[cfg(feature = "metrics")] + $crate::metrics::$metric.observe($value); + }; + ($metric:ident, $value:expr, $label:expr) => { + #[cfg(feature = "metrics")] + $crate::metrics::$metric.with_label_values(&[$label]).observe($value); + }; +} + +/// Sets a metric value. +#[macro_export] +macro_rules! metrics_set { + ($metric:ident, $value:expr) => { + #[cfg(feature = "metrics")] + $crate::metrics::$metric.set($value); + }; +} diff --git a/crates/derive/src/metrics.rs b/crates/derive/src/metrics.rs new file mode 100644 index 000000000..cae8988d9 --- /dev/null +++ b/crates/derive/src/metrics.rs @@ -0,0 +1,54 @@ +//! Metrics for derivation pipeline stages. + +use alloc::{boxed::Box, string::String}; +use lazy_static::lazy_static; +use prometheus::{ + self, opts, register_gauge_vec, register_histogram, register_histogram_vec, register_int_gauge, + GaugeVec, Histogram, HistogramVec, IntGauge, +}; + +const RESPONSE_TIME_CUSTOM_BUCKETS: &[f64; 18] = &[ + 0.00001, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.008, 0.01, 0.02, 0.05, 0.08, + 0.1, 0.2, 0.5, 0.8, 1.0, +]; + +lazy_static! { + /// Tracks the L1 origin for the L1 Traversal Stage. + pub static ref ORIGIN_GAUGE: IntGauge = register_int_gauge!( + "origin_gauge", + "Tracks the L1 origin for the L1 Traversal Stage" + ).expect("Origin Gauge failed to register"); + + /// Tracks the time taken for provider methods. + pub static ref PROVIDER_RESPONSE_TIME: HistogramVec = register_histogram_vec!( + "provider_response_time_seconds", + "Provider response times", + &["provider", "method"], + RESPONSE_TIME_CUSTOM_BUCKETS.to_vec() + ) + .expect("Failed to register histogram vec"); + + /// Tracks the time taken for stage advance methods. + pub static ref STAGE_ADVANCE_RESPONSE_TIME: HistogramVec = register_histogram_vec!( + "stage_advance_response_time_seconds", + "Stage advance response times", + &["stage"], + RESPONSE_TIME_CUSTOM_BUCKETS.to_vec() + ).expect("Failed to register histogram vec"); + + /// Tracks the number of derived frames. + pub static ref DERIVED_FRAMES_COUNT: GaugeVec = { + let opts = opts!("derived_frames_count", "Number of derived frames"); + register_gauge_vec!(opts, &["status"]).expect("Derived Frames Count failed to register") + }; + + /// Tracks the number of channel timeouts. + pub static ref CHANNEL_TIMEOUTS: Histogram = { + let channel_timeout_buckets: [f64; 100] = core::array::from_fn(|i| (i * 10) as f64); + register_histogram!( + "channel_timeouts", + "Channel timeouts", + channel_timeout_buckets.to_vec() + ).expect("Failed to register histogram vec") + }; +} diff --git a/crates/derive/src/online/alloy_providers.rs b/crates/derive/src/online/alloy_providers.rs index 8183d2729..3371dc22b 100644 --- a/crates/derive/src/online/alloy_providers.rs +++ b/crates/derive/src/online/alloy_providers.rs @@ -65,11 +65,8 @@ impl AlloyChainProvider { #[async_trait] impl ChainProvider for AlloyChainProvider { async fn header_by_hash(&mut self, hash: B256) -> Result
{ - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["chain_provider", "header_by_hash"]) - .start_timer(); + crate::timer!(START, PROVIDER_RESPONSE_TIME, &["chain_provider", "header_by_hash"], timer); if let Some(header) = self.header_by_hash_cache.get(&hash) { - timer.observe_duration(); return Ok(header.clone()); } @@ -78,29 +75,30 @@ impl ChainProvider for AlloyChainProvider { let raw_header: Bytes = match raw_header.map_err(|e| anyhow!(e)) { Ok(b) => b, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; match Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e)) { Ok(header) => { self.header_by_hash_cache.put(hash, header.clone()); - timer.observe_duration(); Ok(header) } Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); Err(e) } } } async fn block_info_by_number(&mut self, number: u64) -> Result { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["chain_provider", "block_info_by_number"]) - .start_timer(); + crate::timer!( + START, + PROVIDER_RESPONSE_TIME, + &["chain_provider", "block_info_by_number"], + timer + ); if let Some(block_info) = self.block_info_by_number_cache.get(&number) { - timer.observe_duration(); return Ok(*block_info); } @@ -109,14 +107,14 @@ impl ChainProvider for AlloyChainProvider { let raw_header: Bytes = match raw_header.map_err(|e| anyhow!(e)) { Ok(b) => b, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; let header = match Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e)) { Ok(h) => h, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; @@ -128,16 +126,17 @@ impl ChainProvider for AlloyChainProvider { timestamp: header.timestamp, }; self.block_info_by_number_cache.put(number, block_info); - timer.observe_duration(); Ok(block_info) } async fn receipts_by_hash(&mut self, hash: B256) -> Result> { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["chain_provider", "receipts_by_hash"]) - .start_timer(); + crate::timer!( + START, + PROVIDER_RESPONSE_TIME, + &["chain_provider", "receipts_by_hash"], + timer + ); if let Some(receipts) = self.receipts_by_hash_cache.get(&hash) { - timer.observe_duration(); return Ok(receipts.clone()); } @@ -146,7 +145,7 @@ impl ChainProvider for AlloyChainProvider { let raw_receipts: Vec = match raw_receipts.map_err(|e| anyhow!(e)) { Ok(r) => r, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; @@ -167,12 +166,11 @@ impl ChainProvider for AlloyChainProvider { { Ok(r) => r, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; self.receipts_by_hash_cache.put(hash, receipts.clone()); - timer.observe_duration(); Ok(receipts) } @@ -180,12 +178,14 @@ impl ChainProvider for AlloyChainProvider { &mut self, hash: B256, ) -> Result<(BlockInfo, Vec)> { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["chain_provider", "block_info_and_transactions_by_hash"]) - .start_timer(); + crate::timer!( + START, + PROVIDER_RESPONSE_TIME, + &["chain_provider", "block_info_and_transactions_by_hash"], + timer + ); if let Some(block_info_and_txs) = self.block_info_and_transactions_by_hash_cache.get(&hash) { - timer.observe_duration(); return Ok(block_info_and_txs.clone()); } @@ -194,14 +194,14 @@ impl ChainProvider for AlloyChainProvider { let raw_block: Bytes = match raw_block.map_err(|e| anyhow!(e)) { Ok(b) => b, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; let block = match Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e)) { Ok(b) => b, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; @@ -213,7 +213,6 @@ impl ChainProvider for AlloyChainProvider { timestamp: block.header.timestamp, }; self.block_info_and_transactions_by_hash_cache.put(hash, (block_info, block.body.clone())); - timer.observe_duration(); Ok((block_info, block.body)) } } @@ -271,39 +270,42 @@ impl AlloyL2ChainProvider { #[async_trait] impl L2ChainProvider for AlloyL2ChainProvider { async fn l2_block_info_by_number(&mut self, number: u64) -> Result { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["l2_chain_provider", "l2_block_info_by_number"]) - .start_timer(); + crate::timer!( + START, + PROVIDER_RESPONSE_TIME, + &["l2_chain_provider", "l2_block_info_by_number"], + timer + ); if let Some(l2_block_info) = self.l2_block_info_by_number_cache.get(&number) { - timer.observe_duration(); return Ok(*l2_block_info); } let payload = match self.payload_by_number(number).await { Ok(p) => p, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; let l2_block_info = match payload.to_l2_block_ref(self.rollup_config.as_ref()) { Ok(b) => b, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; self.l2_block_info_by_number_cache.put(number, l2_block_info); - timer.observe_duration(); Ok(l2_block_info) } async fn payload_by_number(&mut self, number: u64) -> Result { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["l2_chain_provider", "payload_by_number"]) - .start_timer(); + crate::timer!( + START, + PROVIDER_RESPONSE_TIME, + &["l2_chain_provider", "payload_by_number"], + timer + ); if let Some(payload) = self.payload_by_number_cache.get(&number) { - timer.observe_duration(); return Ok(payload.clone()); } @@ -312,21 +314,20 @@ impl L2ChainProvider for AlloyL2ChainProvider { let raw_block: Bytes = match raw_block.map_err(|e| anyhow!(e)) { Ok(b) => b, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; let block = match OpBlock::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e)) { Ok(b) => b, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; let payload_envelope: L2ExecutionPayloadEnvelope = block.into(); self.payload_by_number_cache.put(number, payload_envelope.clone()); - timer.observe_duration(); Ok(payload_envelope) } @@ -335,30 +336,31 @@ impl L2ChainProvider for AlloyL2ChainProvider { number: u64, rollup_config: Arc, ) -> Result { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["l2_chain_provider", "system_config_by_number"]) - .start_timer(); + crate::timer!( + START, + PROVIDER_RESPONSE_TIME, + &["l2_chain_provider", "system_config_by_number"], + timer + ); if let Some(system_config) = self.system_config_by_number_cache.get(&number) { - timer.observe_duration(); return Ok(system_config.clone()); } let envelope = match self.payload_by_number(number).await { Ok(e) => e, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; let sys_config = match envelope.to_system_config(&rollup_config) { Ok(s) => s, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; self.system_config_by_number_cache.put(number, sys_config.clone()); - timer.observe_duration(); Ok(sys_config) } } diff --git a/crates/derive/src/online/beacon_client.rs b/crates/derive/src/online/beacon_client.rs index 4020e63ec..8aa98ed9f 100644 --- a/crates/derive/src/online/beacon_client.rs +++ b/crates/derive/src/online/beacon_client.rs @@ -56,9 +56,7 @@ impl OnlineBeaconClient { #[async_trait] impl BeaconClient for OnlineBeaconClient { async fn config_spec(&self) -> Result { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["beacon_client", "config_spec"]) - .start_timer(); + crate::timer!(START, PROVIDER_RESPONSE_TIME, &["beacon_client", "config_spec"], timer); let first = match self .inner .get(format!("{}/{}", self.base, SPEC_METHOD)) @@ -68,19 +66,15 @@ impl BeaconClient for OnlineBeaconClient { { Ok(response) => response, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; - let res = first.json::().await.map_err(|e| anyhow!(e)); - timer.observe_duration(); - res + first.json::().await.map_err(|e| anyhow!(e)) } async fn beacon_genesis(&self) -> Result { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["beacon_client", "beacon_genesis"]) - .start_timer(); + crate::timer!(START, PROVIDER_RESPONSE_TIME, &["beacon_client", "beacon_genesis"], timer); let first = match self .inner .get(format!("{}/{}", self.base, GENESIS_METHOD)) @@ -90,13 +84,11 @@ impl BeaconClient for OnlineBeaconClient { { Ok(response) => response, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; - let res = first.json::().await.map_err(|e| anyhow!(e)); - timer.observe_duration(); - res + first.json::().await.map_err(|e| anyhow!(e)) } async fn beacon_blob_side_cars( @@ -104,9 +96,12 @@ impl BeaconClient for OnlineBeaconClient { slot: u64, hashes: &[IndexedBlobHash], ) -> Result> { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["beacon_client", "beacon_blob_side_cars"]) - .start_timer(); + crate::timer!( + START, + PROVIDER_RESPONSE_TIME, + &["beacon_client", "beacon_blob_side_cars"], + timer + ); let raw_response = match self .inner .get(format!("{}/{}/{}", self.base, SIDECARS_METHOD_PREFIX, slot)) @@ -116,7 +111,7 @@ impl BeaconClient for OnlineBeaconClient { { Ok(response) => response, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; @@ -124,7 +119,7 @@ impl BeaconClient for OnlineBeaconClient { match raw_response.json::().await.map_err(|e| anyhow!(e)) { Ok(response) => response, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; @@ -140,7 +135,6 @@ impl BeaconClient for OnlineBeaconClient { } }); - timer.observe_duration(); Ok(sidecars) } } diff --git a/crates/derive/src/online/blob_provider.rs b/crates/derive/src/online/blob_provider.rs index 74f3a9d4c..57368b2c8 100644 --- a/crates/derive/src/online/blob_provider.rs +++ b/crates/derive/src/online/blob_provider.rs @@ -128,13 +128,11 @@ where block_ref: &BlockInfo, blob_hashes: &[IndexedBlobHash], ) -> Result, BlobProviderError> { - let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME - .with_label_values(&["blob_provider", "get_blobs"]) - .start_timer(); + crate::timer!(START, PROVIDER_RESPONSE_TIME, &["blob_provider", "get_blobs"], timer); // Fetches the genesis timestamp and slot interval from the // [BeaconGenesis] and [ConfigSpec] if not previously loaded. if let Err(e) = self.load_configs().await { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } @@ -142,7 +140,7 @@ where let sidecars = match self.fetch_filtered_sidecars(block_ref, blob_hashes).await { Ok(sidecars) => sidecars, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(e); } }; @@ -162,12 +160,11 @@ where { Ok(blobs) => blobs, Err(e) => { - timer.observe_duration(); + crate::timer!(DISCARD, timer); return Err(BlobProviderError::Custom(e)); } }; - timer.observe_duration(); Ok(blobs) } } diff --git a/crates/derive/src/online/metrics.rs b/crates/derive/src/online/metrics.rs deleted file mode 100644 index 77ded2c09..000000000 --- a/crates/derive/src/online/metrics.rs +++ /dev/null @@ -1,18 +0,0 @@ -//! Metrics for the online derivation pipeline. - -use alloc::boxed::Box; -use lazy_static::lazy_static; -use prometheus::{self, register_histogram_vec, HistogramVec}; - -const RESPONSE_TIME_CUSTOM_BUCKETS: &[f64; 14] = - &[0.0005, 0.001, 0.002, 0.005, 0.008, 0.01, 0.02, 0.05, 0.08, 0.1, 0.2, 0.5, 0.8, 1.0]; - -lazy_static! { - pub static ref PROVIDER_RESPONSE_TIME: HistogramVec = register_histogram_vec!( - "provider_response_time_seconds", - "Provider response times", - &["provider", "method"], - RESPONSE_TIME_CUSTOM_BUCKETS.to_vec() - ) - .expect("Failed to register histogram vec"); -} diff --git a/crates/derive/src/online/mod.rs b/crates/derive/src/online/mod.rs index e3d7369c4..e622ab162 100644 --- a/crates/derive/src/online/mod.rs +++ b/crates/derive/src/online/mod.rs @@ -9,8 +9,6 @@ pub use crate::{ types::{BlockInfo, RollupConfig}, }; -mod metrics; - mod pipeline; pub use pipeline::{ new_online_pipeline, OnlineAttributesBuilder, OnlineAttributesQueue, OnlineDataProvider, diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 3f5775169..921c4e4b3 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -85,11 +85,23 @@ where &mut self, parent: L2BlockInfo, ) -> StageResult { - // Load the batch. - let batch = self.load_batch(parent).await?; + crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["attributes_queue"], timer); + let batch = match self.load_batch(parent).await { + Ok(batch) => batch, + Err(e) => { + crate::timer!(DISCARD, timer); + return Err(e); + } + }; // Construct the payload attributes from the loaded batch. - let attributes = self.create_next_attributes(batch, parent).await?; + let attributes = match self.create_next_attributes(batch, parent).await { + Ok(attributes) => attributes, + Err(e) => { + crate::timer!(DISCARD, timer); + return Err(e); + } + }; let populated_attributes = L2AttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span }; diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index e54d3cbd1..70570cce8 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -264,10 +264,12 @@ where /// Returns the next valid batch upon the given safe head. /// Also returns the boolean that indicates if the batch is the last block in the batch. async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult { + crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["batch_queue"], timer); if !self.next_spans.is_empty() { // There are cached singular batches derived from the span batch. // Check if the next cached batch matches the given parent block. if self.next_spans[0].timestamp == parent.block_info.timestamp + self.cfg.block_time { + crate::timer!(DISCARD, timer); return self .pop_next_batch(parent) .ok_or(anyhow!("failed to pop next batch from span batch").into()); @@ -312,7 +314,13 @@ where if self.origin != self.prev.origin() { self.origin = self.prev.origin(); if !origin_behind { - let origin = self.origin.as_ref().ok_or_else(|| anyhow!("missing origin"))?; + let origin = match self.origin.as_ref().ok_or_else(|| anyhow!("missing origin")) { + Ok(o) => o, + Err(e) => { + crate::timer!(DISCARD, timer); + return Err(StageError::Custom(e)); + } + }; self.l1_blocks.push(*origin); } else { // This is to handle the special case of startup. @@ -335,12 +343,16 @@ where } } Err(StageError::Eof) => out_of_data = true, - Err(e) => return Err(e), + Err(e) => { + crate::timer!(DISCARD, timer); + return Err(e); + } } // Skip adding the data unless up to date with the origin, // but still fully empty the previous stages. if origin_behind { + crate::timer!(DISCARD, timer); if out_of_data { return Err(StageError::Eof); } @@ -350,15 +362,18 @@ where // Attempt to derive more batches. let batch = match self.derive_next_batch(out_of_data, parent).await { Ok(b) => b, - Err(e) => match e { - StageError::Eof => { - if out_of_data { - return Err(StageError::Eof); + Err(e) => { + crate::timer!(DISCARD, timer); + match e { + StageError::Eof => { + if out_of_data { + return Err(StageError::Eof); + } + return Err(StageError::NotEnoughData); } - return Err(StageError::NotEnoughData); + _ => return Err(e), } - _ => return Err(e), - }, + } }; // If the next batch is derived from the span batch, it's the last batch of the span. @@ -366,15 +381,28 @@ where match batch { Batch::Single(sb) => Ok(sb), Batch::Span(sb) => { - let batches = sb.get_singular_batches(&self.l1_blocks, parent).map_err(|e| { + let batches = match sb.get_singular_batches(&self.l1_blocks, parent).map_err(|e| { StageError::Custom(anyhow!( "Could not get singular batches from span batch: {e}" )) - })?; + }) { + Ok(b) => b, + Err(e) => { + crate::timer!(DISCARD, timer); + return Err(e); + } + }; self.next_spans = batches; - let nb = self + let nb = match self .pop_next_batch(parent) - .ok_or_else(|| anyhow!("failed to pop next batch from span batch"))?; + .ok_or_else(|| anyhow!("failed to pop next batch from span batch")) + { + Ok(b) => b, + Err(e) => { + crate::timer!(DISCARD, timer); + return Err(StageError::Custom(e)); + } + }; Ok(nb) } } diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index e9f3b072f..0782c9983 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -119,6 +119,10 @@ where let origin = self.origin().ok_or(StageError::MissingOrigin)?; if channel.open_block_number() + self.cfg.channel_timeout < origin.number { warn!(target: "channel-bank", "Channel {:?} timed out", first); + crate::observe_histogram!( + CHANNEL_TIMEOUTS, + (origin.number - channel.open_block_number()) as f64 + ); self.channels.remove(&first); self.channel_queue.pop_front(); return Ok(None); @@ -179,19 +183,29 @@ where P: ChannelBankProvider + PreviousStage + Send + Debug, { async fn next_data(&mut self) -> StageResult> { + crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_bank"], timer); match self.read() { Err(StageError::Eof) => { // continue - we will attempt to load data into the channel bank } Err(e) => { + crate::timer!(DISCARD, timer); return Err(anyhow!("Error fetching next data from channel bank: {:?}", e).into()); } data => return data, }; // Load the data into the channel bank - let frame = self.prev.next_frame().await?; - self.ingest_frame(frame)?; + let frame = match self.prev.next_frame().await { + Ok(f) => f, + Err(e) => { + crate::timer!(DISCARD, timer); + return Err(e); + } + }; + let res = self.ingest_frame(frame); + crate::timer!(DISCARD, timer); + res?; Err(StageError::NotEnoughData) } } diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index cd41c0985..07b8ec96e 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -88,9 +88,11 @@ where P: ChannelReaderProvider + PreviousStage + Send + Debug, { async fn next_batch(&mut self) -> StageResult { + crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_reader"], timer); if let Err(e) = self.set_batch_reader().await { warn!(target: "channel-reader", "Failed to set batch reader: {:?}", e); self.next_channel(); + crate::timer!(DISCARD, timer); return Err(e); } match self @@ -103,6 +105,7 @@ where Ok(batch) => Ok(batch), Err(e) => { self.next_channel(); + crate::timer!(DISCARD, timer); Err(e) } } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 708f721fa..be4263ed9 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -71,8 +71,10 @@ where match self.prev.next_data().await { Ok(data) => { if let Ok(frames) = into_frames(Ok(data)) { + crate::inc_gauge!(DERIVED_FRAMES_COUNT, frames.len() as f64, "success"); self.queue.extend(frames); } else { + crate::inc_gauge!(DERIVED_FRAMES_COUNT, "failed"); // There may be more frames in the queue for the // pipeline to advance, so don't return an error here. error!(target: "frame-queue", "Failed to parse frames from data."); diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 18677b3dc..d4b850256 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -104,6 +104,7 @@ impl OriginAdvancer for L1Traversal { return Err(StageError::SystemConfigUpdate(e)); } + crate::metrics_set!(ORIGIN_GAUGE, next_l1_origin.number as i64); self.block = Some(next_l1_origin); self.done = false; Ok(()) diff --git a/crates/derive/src/types/batch/span_batch/raw.rs b/crates/derive/src/types/batch/span_batch/raw.rs index 997c6065f..8d9762e0b 100644 --- a/crates/derive/src/types/batch/span_batch/raw.rs +++ b/crates/derive/src/types/batch/span_batch/raw.rs @@ -139,7 +139,6 @@ impl RawSpanBatch { #[cfg(test)] mod test { - extern crate std; use super::{RawSpanBatch, RollupConfig, SpanBatch, SpanBatchElement}; use alloc::{vec, vec::Vec}; use alloy_primitives::FixedBytes; diff --git a/crates/derive/src/types/channel.rs b/crates/derive/src/types/channel.rs index 0f719f52e..f731b7a3c 100644 --- a/crates/derive/src/types/channel.rs +++ b/crates/derive/src/types/channel.rs @@ -154,8 +154,6 @@ mod test { vec::Vec, }; - extern crate std; - struct FrameValidityTestCase { name: String, frames: Vec, diff --git a/crates/derive/src/types/frame.rs b/crates/derive/src/types/frame.rs index 281d4bcd8..24f4ed683 100644 --- a/crates/derive/src/types/frame.rs +++ b/crates/derive/src/types/frame.rs @@ -106,8 +106,6 @@ impl Frame { #[cfg(test)] mod test { - extern crate std; - use super::*; #[test] diff --git a/examples/trusted-sync/Cargo.toml b/examples/trusted-sync/Cargo.toml index 3e91e167f..d8babdbf8 100644 --- a/examples/trusted-sync/Cargo.toml +++ b/examples/trusted-sync/Cargo.toml @@ -14,7 +14,7 @@ homepage.workspace = true anyhow.workspace = true tracing.workspace = true alloy-primitives = { workspace = true, features = ["serde"] } -kona-derive = { path = "../../crates/derive", version = "0.0.2", features = ["serde", "k256", "online"] } +kona-derive = { path = "../../crates/derive", version = "0.0.2", features = ["serde", "k256", "online", "metrics"] } # Custom dependencies lazy_static = "1.5.0"