diff --git a/crates/derive/src/macros.rs b/crates/derive/src/macros.rs index ed350d041..a6ce974e1 100644 --- a/crates/derive/src/macros.rs +++ b/crates/derive/src/macros.rs @@ -54,4 +54,8 @@ macro_rules! set { #[cfg(feature = "metrics")] $crate::metrics::$metric.set($value); }; + ($metric:ident, $value:expr, $labels:expr) => { + #[cfg(feature = "metrics")] + $crate::metrics::$metric.with_label_values($labels).set($value as f64); + }; } diff --git a/crates/derive/src/metrics.rs b/crates/derive/src/metrics.rs index bbfcf7c2f..589642f19 100644 --- a/crates/derive/src/metrics.rs +++ b/crates/derive/src/metrics.rs @@ -16,6 +16,12 @@ const RESPONSE_TIME_CUSTOM_BUCKETS: &[f64; 18] = &[ const FRAME_COUNT_BUCKETS: &[f64; 10] = &[1.0, 2.0, 3.0, 5.0, 8.0, 10.0, 12.0, 15.0, 18.0, 20.0]; lazy_static! { + /// Tracks stage resets. + pub static ref STAGE_RESETS: GaugeVec = { + let opts = opts!("kona_derive_stage_resets", "Number of times various stages are reset"); + register_gauge_vec!(opts, &["stage"]).expect("Stage reset metric failed to register") + }; + /// Tracks the L1 origin for the L1 Traversal Stage. pub static ref ORIGIN_GAUGE: IntGauge = register_int_gauge!( "kona_derive_origin_gauge", diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index ca719edb3..30d43fcca 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -75,12 +75,16 @@ where /// Resets the pipelien by calling the [`ResettableStage::reset`] method. /// This will bubble down the stages all the way to the `L1Traversal` stage. - async fn reset(&mut self, block_info: BlockInfo) -> anyhow::Result<()> { + async fn reset( + &mut self, + l2_block_info: BlockInfo, + l1_block_info: BlockInfo, + ) -> anyhow::Result<()> { let system_config = self .l2_chain_provider - .system_config_by_number(block_info.number, Arc::clone(&self.rollup_config)) + .system_config_by_number(l2_block_info.number, Arc::clone(&self.rollup_config)) .await?; - match self.attributes.reset(block_info, &system_config).await { + match self.attributes.reset(l1_block_info, &system_config).await { Ok(()) => trace!(target: "pipeline", "Stages reset"), Err(StageError::Eof) => trace!(target: "pipeline", "Stages reset with EOF"), Err(err) => { diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 921c4e4b3..6c79c31f3 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -67,6 +67,7 @@ where { /// Create a new [AttributesQueue] stage. pub fn new(cfg: Arc, prev: P, builder: AB) -> Self { + crate::set!(STAGE_RESETS, 0, &["attributes-queue"]); Self { cfg, prev, is_last_in_span: false, batch: None, builder } } @@ -209,10 +210,10 @@ where system_config: &SystemConfig, ) -> StageResult<()> { self.prev.reset(block_info, system_config).await?; - info!(target: "attributes-queue", "resetting attributes queue"); self.batch = None; self.is_last_in_span = false; - Err(StageError::Eof) + crate::inc!(STAGE_RESETS, &["attributes-queue"]); + Ok(()) } } diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 4000af67d..19299e67a 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -80,6 +80,7 @@ where { /// Creates a new [BatchQueue] stage. pub fn new(cfg: Arc, prev: P, fetcher: BF) -> Self { + crate::set!(STAGE_RESETS, 0, &["batch-queue"]); Self { cfg, prev, @@ -442,9 +443,6 @@ where { async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> StageResult<()> { self.prev.reset(base, system_config).await?; - // Copy over the Origin from the next stage. - // It is set in the engine queue (two stages away) - // such that the L2 Safe Head origin is the progress. self.origin = Some(base); self.batches.clear(); // Include the new origin as an origin to build on. @@ -453,7 +451,8 @@ where self.l1_blocks.clear(); self.l1_blocks.push(base); self.next_spans.clear(); - Err(StageError::Eof) + crate::inc!(STAGE_RESETS, &["batch-queue"]); + Ok(()) } } diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 21d989a58..11177445b 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -55,6 +55,7 @@ where { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { + crate::set!(STAGE_RESETS, 0, &["channel-bank"]); Self { cfg, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } } @@ -258,7 +259,8 @@ where self.prev.reset(block_info, system_config).await?; self.channels.clear(); self.channel_queue = VecDeque::with_capacity(10); - Err(StageError::Eof) + crate::inc!(STAGE_RESETS, &["channel-bank"]); + Ok(()) } } diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index c7213159d..3f4e1b181 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -53,6 +53,7 @@ where { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { + crate::set!(STAGE_RESETS, 0, &["channel-reader"]); Self { prev, next_batch: None, cfg: cfg.clone() } } @@ -129,6 +130,7 @@ where async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { self.prev.reset(base, cfg).await?; self.next_channel(); + crate::inc!(STAGE_RESETS, &["channel-reader"]); Ok(()) } } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 830698b04..3a37ab83b 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -47,6 +47,7 @@ where /// /// [L1Retrieval]: crate::stages::L1Retrieval pub fn new(prev: P) -> Self { + crate::set!(STAGE_RESETS, 0, &["frame-queue"]); Self { prev, queue: VecDeque::new() } } } @@ -127,7 +128,8 @@ where ) -> StageResult<()> { self.prev.reset(block_info, system_config).await?; self.queue = VecDeque::default(); - Err(StageError::Eof) + crate::inc!(STAGE_RESETS, &["frame-queue"]); + Ok(()) } } diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index c549f5f33..233db5944 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -60,6 +60,7 @@ where /// /// [L1Traversal]: crate::stages::L1Traversal pub fn new(prev: P, provider: DAP) -> Self { + crate::set!(STAGE_RESETS, 0, &["l1-retrieval"]); Self { prev, provider, data: None } } } @@ -134,6 +135,7 @@ where async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { self.prev.reset(base, cfg).await?; self.data = Some(self.provider.open_data(&base).await?); + crate::inc!(STAGE_RESETS, &["l1-retrieval"]); Ok(()) } } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index d79a62a3b..43b387c6a 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -50,6 +50,7 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. pub fn new(data_source: F, cfg: Arc) -> Self { + crate::set!(STAGE_RESETS, 0, &["l1-traversal"]); Self { block: Some(BlockInfo::default()), data_source, @@ -129,7 +130,8 @@ impl ResettableStage for L1Traversal { self.block = Some(base); self.done = false; self.system_config = cfg.clone(); - Err(StageError::Eof) + crate::inc!(STAGE_RESETS, &["l1-traversal"]); + Ok(()) } } diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index 3ed7f699d..bda924ad9 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -27,7 +27,7 @@ pub trait Pipeline: OriginProvider + Iterator { fn peek(&self) -> Option<&L2AttributesWithParent>; /// Resets the pipeline on the next [Pipeline::step] call. - async fn reset(&mut self, origin: BlockInfo) -> anyhow::Result<()>; + async fn reset(&mut self, l2_block_info: BlockInfo, origin: BlockInfo) -> anyhow::Result<()>; /// Attempts to progress the pipeline. async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index f8668b725..4dc7876dd 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -87,13 +87,15 @@ async fn sync(cli: cli::Cli) -> Result<()> { let validator = validation::OnlineValidator::new_http(l2_rpc_url.clone(), &cfg); let genesis_l2_block_number = cfg.genesis.l2.number; let mut pipeline = - new_online_pipeline(cfg, l1_provider, dap, l2_provider.clone(), attributes, tip); + new_online_pipeline(cfg, l1_provider.clone(), dap, l2_provider.clone(), attributes, tip); // Reset metrics so they can be queried. metrics::FAILED_PAYLOAD_DERIVATION.reset(); metrics::DRIFT_WALKBACK.set(0); metrics::DRIFT_WALKBACK_TIMESTAMP.set(0); metrics::DERIVED_ATTRIBUTES_COUNT.reset(); + metrics::FAST_FORWARD_BLOCK.set(0); + metrics::FAST_FORWARD_TIMESTAMP.set(0); // Continuously step on the pipeline and validate payloads. let mut advance_cursor_flag = false; @@ -101,7 +103,6 @@ async fn sync(cli: cli::Cli) -> Result<()> { // Update the reference l2 head. match l2_provider.latest_block_number().await { Ok(latest) => { - let prev = metrics::REFERENCE_L2_HEAD.get(); metrics::REFERENCE_L2_HEAD.set(latest as i64); let timestamp = match std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -115,9 +116,7 @@ async fn sync(cli: cli::Cli) -> Result<()> { }; // Update the timestamp - if latest as i64 > prev { - metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(timestamp as i64); - } + metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(timestamp as i64); // Don't check drift if we're within 10 blocks of origin. if cursor.block_info.number - genesis_l2_block_number <= 10 { @@ -130,37 +129,49 @@ async fn sync(cli: cli::Cli) -> Result<()> { // If walkback isn't enabled, jump to 10 blocks less than the reference l2 // head. if drift > cli.drift_threshold as i64 && !cli.enable_reorg_walkback { - cursor = if let Ok(c) = - l2_provider.l2_block_info_by_number(latest - 10).await - { - c + metrics::FAST_FORWARD_BLOCK.set(cursor.block_info.number as i64); + metrics::FAST_FORWARD_TIMESTAMP.set(timestamp as i64); + if let Ok(c) = l2_provider.l2_block_info_by_number(latest - 100).await { + let l1_block_info = l1_provider + .block_info_by_number(c.l1_origin.number) + .await + .expect("Failed to fetch L1 block info for fast forward"); + info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info); + if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await { + error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); + continue; + } + cursor = c; + advance_cursor_flag = false; } else { - error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", latest - 10); + error!(target: LOG_TARGET, "Failed to get block info by number: {}", latest - 100); continue; - }; - advance_cursor_flag = false; - if let Err(e) = pipeline.reset(cursor.block_info).await { - error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); } } else if drift > cli.drift_threshold as i64 && timestamp as i64 > metrics::DRIFT_WALKBACK_TIMESTAMP.get() + 300 { metrics::DRIFT_WALKBACK.set(cursor.block_info.number as i64); + metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64); warn!(target: LOG_TARGET, "Detected drift of over {} blocks, walking back", drift); - cursor = if let Ok(c) = - l2_provider.l2_block_info_by_number(cursor.block_info.number - 10).await + if let Ok(c) = l2_provider + .l2_block_info_by_number(cursor.block_info.number - 100) + .await { - c + let l1_block_info = l1_provider + .block_info_by_number(c.l1_origin.number) + .await + .expect("Failed to fetch L1 block info for fast forward"); + info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info); + if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await { + error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); + continue; + } + cursor = c; + advance_cursor_flag = false; } else { error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", cursor.block_info.number - 10); continue; - }; - advance_cursor_flag = false; - if let Err(e) = pipeline.reset(cursor.block_info).await { - error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); } - - metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64); } } } diff --git a/examples/trusted-sync/src/metrics.rs b/examples/trusted-sync/src/metrics.rs index 1cbb157cb..822985ced 100644 --- a/examples/trusted-sync/src/metrics.rs +++ b/examples/trusted-sync/src/metrics.rs @@ -52,6 +52,14 @@ lazy_static! { pub static ref DRIFT_WALKBACK_TIMESTAMP: IntGauge = register_int_gauge!("trusted_sync_drift_walkback_timestamp", "Timestamp of the last drift walkback").expect("Failed to register drift walkback timestamp metric"); + /// Tracks the block number when a fast forward last happened. + pub static ref FAST_FORWARD_BLOCK: IntGauge = + register_int_gauge!("trusted_sync_fast_forward_block", "Latest fast forward block").expect("Failed to register fast forward metric"); + + /// Tracks the timestamp of the last fast forward. + pub static ref FAST_FORWARD_TIMESTAMP: IntGauge = + register_int_gauge!("trusted_sync_fast_forward_timestamp", "Timestamp of the latest fast forward block").expect("Failed to register fast forward timestamp metric"); + /// Tracks the latest reference l2 safe head update. pub static ref LATEST_REF_SAFE_HEAD_UPDATE: IntGauge = register_int_gauge!( "trusted_sync_latest_ref_safe_head_update",