diff --git a/crates/derive/src/macros.rs b/crates/derive/src/macros.rs index 401d1e407..88fbda092 100644 --- a/crates/derive/src/macros.rs +++ b/crates/derive/src/macros.rs @@ -19,27 +19,27 @@ macro_rules! timer { /// Increments a metric with a label value. #[macro_export] -macro_rules! inc_gauge { - ($metric:ident, $label:expr) => { +macro_rules! inc { + ($metric:ident, $labels:expr) => { #[cfg(feature = "metrics")] - $crate::metrics::$metric.with_label_values(&[$label]).inc(); + $crate::metrics::$metric.with_label_values($labels).inc(); }; - ($metric:ident, $value:expr, $label:expr) => { + ($metric:ident, $value:expr, $labels:expr) => { #[cfg(feature = "metrics")] - $crate::metrics::$metric.with_label_values(&[$label]).add($value); + $crate::metrics::$metric.with_label_values($labels).add($value); }; } /// Observes a metric with a label value. #[macro_export] -macro_rules! observe_histogram { +macro_rules! observe { ($metric:ident, $value:expr) => { #[cfg(feature = "metrics")] $crate::metrics::$metric.observe($value); }; - ($metric:ident, $value:expr, $label:expr) => { + ($metric:ident, $value:expr, $labels:expr) => { #[cfg(feature = "metrics")] - $crate::metrics::$metric.with_label_values(&[$label]).observe($value); + $crate::metrics::$metric.with_label_values($label).observe($value); }; } diff --git a/crates/derive/src/metrics.rs b/crates/derive/src/metrics.rs index cae8988d9..9cfceba77 100644 --- a/crates/derive/src/metrics.rs +++ b/crates/derive/src/metrics.rs @@ -3,8 +3,9 @@ use alloc::{boxed::Box, string::String}; use lazy_static::lazy_static; use prometheus::{ - self, opts, register_gauge_vec, register_histogram, register_histogram_vec, register_int_gauge, - GaugeVec, Histogram, HistogramVec, IntGauge, + self, opts, register_counter_vec, register_gauge_vec, register_histogram, + register_histogram_vec, register_int_gauge, CounterVec, GaugeVec, Histogram, HistogramVec, + IntGauge, }; const RESPONSE_TIME_CUSTOM_BUCKETS: &[f64; 18] = &[ @@ -19,6 +20,20 @@ lazy_static! { "Tracks the L1 origin for the L1 Traversal Stage" ).expect("Origin Gauge failed to register"); + /// Tracks the number of provider method calls. + pub static ref PROVIDER_CALLS: CounterVec = register_counter_vec!( + "provider_calls", + "Number of provider method calls", + &["provider", "method"] + ).expect("Provider Calls failed to register"); + + /// Tracks the number of errors in provider methods. + pub static ref PROVIDER_ERRORS: CounterVec = register_counter_vec!( + "provider_errors", + "Number of provider errors", + &["provider", "method", "error"] + ).expect("Provider Errors failed to register"); + /// Tracks the time taken for provider methods. pub static ref PROVIDER_RESPONSE_TIME: HistogramVec = register_histogram_vec!( "provider_response_time_seconds", diff --git a/crates/derive/src/online/alloy_providers.rs b/crates/derive/src/online/alloy_providers.rs index 3371dc22b..1d8383b09 100644 --- a/crates/derive/src/online/alloy_providers.rs +++ b/crates/derive/src/online/alloy_providers.rs @@ -65,6 +65,7 @@ impl AlloyChainProvider { #[async_trait] impl ChainProvider for AlloyChainProvider { async fn header_by_hash(&mut self, hash: B256) -> Result
{ + crate::inc!(PROVIDER_CALLS, &["chain_provider", "header_by_hash"]); crate::timer!(START, PROVIDER_RESPONSE_TIME, &["chain_provider", "header_by_hash"], timer); if let Some(header) = self.header_by_hash_cache.get(&hash) { return Ok(header.clone()); @@ -76,6 +77,10 @@ impl ChainProvider for AlloyChainProvider { Ok(b) => b, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["chain_provider", "header_by_hash", "debug_getRawHeader"] + ); return Err(e); } }; @@ -86,12 +91,14 @@ impl ChainProvider for AlloyChainProvider { } Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["chain_provider", "header_by_hash", "decode"]); Err(e) } } } async fn block_info_by_number(&mut self, number: u64) -> Result { + crate::inc!(PROVIDER_CALLS, &["chain_provider", "block_info_by_number"]); crate::timer!( START, PROVIDER_RESPONSE_TIME, @@ -108,6 +115,10 @@ impl ChainProvider for AlloyChainProvider { Ok(b) => b, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["chain_provider", "block_info_by_number", "debug_getRawHeader"] + ); return Err(e); } }; @@ -115,6 +126,7 @@ impl ChainProvider for AlloyChainProvider { Ok(h) => h, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["chain_provider", "block_info_by_number", "decode"]); return Err(e); } }; @@ -130,6 +142,7 @@ impl ChainProvider for AlloyChainProvider { } async fn receipts_by_hash(&mut self, hash: B256) -> Result> { + crate::inc!(PROVIDER_CALLS, &["chain_provider", "receipts_by_hash"]); crate::timer!( START, PROVIDER_RESPONSE_TIME, @@ -146,6 +159,10 @@ impl ChainProvider for AlloyChainProvider { Ok(r) => r, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["chain_provider", "receipts_by_hash", "debug_getRawReceipts"] + ); return Err(e); } }; @@ -167,6 +184,7 @@ impl ChainProvider for AlloyChainProvider { Ok(r) => r, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["chain_provider", "receipts_by_hash", "decode"]); return Err(e); } }; @@ -178,6 +196,7 @@ impl ChainProvider for AlloyChainProvider { &mut self, hash: B256, ) -> Result<(BlockInfo, Vec)> { + crate::inc!(PROVIDER_CALLS, &["chain_provider", "block_info_and_transactions_by_hash"]); crate::timer!( START, PROVIDER_RESPONSE_TIME, @@ -195,6 +214,10 @@ impl ChainProvider for AlloyChainProvider { Ok(b) => b, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["chain_provider", "block_info_and_transactions_by_hash", "debug_getRawBlock"] + ); return Err(e); } }; @@ -202,6 +225,10 @@ impl ChainProvider for AlloyChainProvider { Ok(b) => b, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["chain_provider", "block_info_and_transactions_by_hash", "decode"] + ); return Err(e); } }; @@ -270,6 +297,7 @@ impl AlloyL2ChainProvider { #[async_trait] impl L2ChainProvider for AlloyL2ChainProvider { async fn l2_block_info_by_number(&mut self, number: u64) -> Result { + crate::inc!(PROVIDER_CALLS, &["l2_chain_provider", "l2_block_info_by_number"]); crate::timer!( START, PROVIDER_RESPONSE_TIME, @@ -284,6 +312,10 @@ impl L2ChainProvider for AlloyL2ChainProvider { Ok(p) => p, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["l2_chain_provider", "l2_block_info_by_number", "payload_by_number"] + ); return Err(e); } }; @@ -291,6 +323,10 @@ impl L2ChainProvider for AlloyL2ChainProvider { Ok(b) => b, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["l2_chain_provider", "l2_block_info_by_number", "to_l2_block_ref"] + ); return Err(e); } }; @@ -299,6 +335,7 @@ impl L2ChainProvider for AlloyL2ChainProvider { } async fn payload_by_number(&mut self, number: u64) -> Result { + crate::inc!(PROVIDER_CALLS, &["l2_chain_provider", "payload_by_number"]); crate::timer!( START, PROVIDER_RESPONSE_TIME, @@ -315,6 +352,10 @@ impl L2ChainProvider for AlloyL2ChainProvider { Ok(b) => b, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["l2_chain_provider", "payload_by_number", "debug_getRawBlock"] + ); return Err(e); } }; @@ -322,6 +363,7 @@ impl L2ChainProvider for AlloyL2ChainProvider { Ok(b) => b, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["l2_chain_provider", "payload_by_number", "decode"]); return Err(e); } }; @@ -336,6 +378,7 @@ impl L2ChainProvider for AlloyL2ChainProvider { number: u64, rollup_config: Arc, ) -> Result { + crate::inc!(PROVIDER_CALLS, &["l2_chain_provider", "system_config_by_number"]); crate::timer!( START, PROVIDER_RESPONSE_TIME, @@ -350,6 +393,10 @@ impl L2ChainProvider for AlloyL2ChainProvider { Ok(e) => e, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["l2_chain_provider", "system_config_by_number", "payload_by_number"] + ); return Err(e); } }; @@ -357,6 +404,10 @@ impl L2ChainProvider for AlloyL2ChainProvider { Ok(s) => s, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["l2_chain_provider", "system_config_by_number", "to_system_config"] + ); return Err(e); } }; diff --git a/crates/derive/src/online/beacon_client.rs b/crates/derive/src/online/beacon_client.rs index 8aa98ed9f..407fb275d 100644 --- a/crates/derive/src/online/beacon_client.rs +++ b/crates/derive/src/online/beacon_client.rs @@ -56,6 +56,7 @@ impl OnlineBeaconClient { #[async_trait] impl BeaconClient for OnlineBeaconClient { async fn config_spec(&self) -> Result { + crate::inc!(PROVIDER_CALLS, &["beacon_client", "config_spec"]); crate::timer!(START, PROVIDER_RESPONSE_TIME, &["beacon_client", "config_spec"], timer); let first = match self .inner @@ -67,13 +68,22 @@ impl BeaconClient for OnlineBeaconClient { Ok(response) => response, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["beacon_client", "config_spec", "request"]); return Err(e); } }; - first.json::().await.map_err(|e| anyhow!(e)) + match first.json::().await.map_err(|e| anyhow!(e)) { + Ok(response) => Ok(response), + Err(e) => { + crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["beacon_client", "config_spec", "decode"]); + Err(e) + } + } } async fn beacon_genesis(&self) -> Result { + crate::inc!(PROVIDER_CALLS, &["beacon_client", "beacon_genesis"]); crate::timer!(START, PROVIDER_RESPONSE_TIME, &["beacon_client", "beacon_genesis"], timer); let first = match self .inner @@ -85,10 +95,18 @@ impl BeaconClient for OnlineBeaconClient { Ok(response) => response, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["beacon_client", "beacon_genesis", "request"]); return Err(e); } }; - first.json::().await.map_err(|e| anyhow!(e)) + match first.json::().await.map_err(|e| anyhow!(e)) { + Ok(response) => Ok(response), + Err(e) => { + crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["beacon_client", "beacon_genesis", "decode"]); + Err(e) + } + } } async fn beacon_blob_side_cars( @@ -96,6 +114,7 @@ impl BeaconClient for OnlineBeaconClient { slot: u64, hashes: &[IndexedBlobHash], ) -> Result> { + crate::inc!(PROVIDER_CALLS, &["beacon_client", "beacon_blob_side_cars"]); crate::timer!( START, PROVIDER_RESPONSE_TIME, @@ -112,17 +131,25 @@ impl BeaconClient for OnlineBeaconClient { Ok(response) => response, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["beacon_client", "beacon_blob_side_cars", "request"] + ); + return Err(e); + } + }; + let raw_response = match raw_response + .json::() + .await + .map_err(|e| anyhow!(e)) + { + Ok(response) => response, + Err(e) => { + crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["beacon_client", "beacon_blob_side_cars", "decode"]); return Err(e); } }; - let raw_response = - match raw_response.json::().await.map_err(|e| anyhow!(e)) { - Ok(response) => response, - Err(e) => { - crate::timer!(DISCARD, timer); - return Err(e); - } - }; let mut sidecars = Vec::with_capacity(hashes.len()); diff --git a/crates/derive/src/online/blob_provider.rs b/crates/derive/src/online/blob_provider.rs index 57368b2c8..eb725edcf 100644 --- a/crates/derive/src/online/blob_provider.rs +++ b/crates/derive/src/online/blob_provider.rs @@ -128,11 +128,13 @@ where block_ref: &BlockInfo, blob_hashes: &[IndexedBlobHash], ) -> Result, BlobProviderError> { + crate::inc!(PROVIDER_CALLS, &["blob_provider", "get_blobs"]); crate::timer!(START, PROVIDER_RESPONSE_TIME, &["blob_provider", "get_blobs"], timer); // Fetches the genesis timestamp and slot interval from the // [BeaconGenesis] and [ConfigSpec] if not previously loaded. if let Err(e) = self.load_configs().await { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["blob_provider", "get_blobs", "load_configs"]); return Err(e); } @@ -141,6 +143,10 @@ where Ok(sidecars) => sidecars, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!( + PROVIDER_ERRORS, + &["blob_provider", "get_blobs", "fetch_filtered_sidecars"] + ); return Err(e); } }; @@ -161,6 +167,7 @@ where Ok(blobs) => blobs, Err(e) => { crate::timer!(DISCARD, timer); + crate::inc!(PROVIDER_ERRORS, &["blob_provider", "get_blobs", "verify_blob"]); return Err(BlobProviderError::Custom(e)); } }; diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 0782c9983..801d8bc74 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -119,10 +119,7 @@ where let origin = self.origin().ok_or(StageError::MissingOrigin)?; if channel.open_block_number() + self.cfg.channel_timeout < origin.number { warn!(target: "channel-bank", "Channel {:?} timed out", first); - crate::observe_histogram!( - CHANNEL_TIMEOUTS, - (origin.number - channel.open_block_number()) as f64 - ); + crate::observe!(CHANNEL_TIMEOUTS, (origin.number - channel.open_block_number()) as f64); self.channels.remove(&first); self.channel_queue.pop_front(); return Ok(None); diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index efcf987e1..830698b04 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -71,10 +71,10 @@ where match self.prev.next_data().await { Ok(data) => { if let Ok(frames) = into_frames(Ok(data)) { - crate::inc_gauge!(DERIVED_FRAMES_COUNT, frames.len() as f64, "success"); + crate::inc!(DERIVED_FRAMES_COUNT, frames.len() as f64, &["success"]); self.queue.extend(frames); } else { - crate::inc_gauge!(DERIVED_FRAMES_COUNT, "failed"); + crate::inc!(DERIVED_FRAMES_COUNT, &["failed"]); // There may be more frames in the queue for the // pipeline to advance, so don't return an error here. error!(target: "frame-queue", "Failed to parse frames from data.");