diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index dd8506a03..d3d8eb8f9 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -10,7 +10,7 @@ use alloy_consensus::{Header, Sealed}; use anyhow::{anyhow, Result}; use core::fmt::Debug; use kona_derive::{ - pipeline::{DerivationPipeline, Pipeline, PipelineBuilder}, + pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, @@ -136,8 +136,16 @@ impl DerivationDriver { let mut attributes = None; while attributes.is_none() { match self.pipeline.step(self.l2_safe_head).await { - Ok(_) => info!(target: "client_derivation_driver", "Stepped derivation pipeline"), - Err(e) => { + StepResult::PreparedAttributes => { + info!(target: "client_derivation_driver", "Stepped derivation pipeline") + } + StepResult::AdvancedOrigin => { + info!(target: "client_derivation_driver", "Advanced origin") + } + StepResult::OriginAdvanceErr(e) => { + warn!(target: "client_derivation_driver", "Failed to advance origin: {:?}", e) + } + StepResult::StepFailed(e) => { warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e) } } diff --git a/crates/derive/src/online/alloy_providers.rs b/crates/derive/src/online/alloy_providers.rs index 5f25f392e..fe914ed22 100644 --- a/crates/derive/src/online/alloy_providers.rs +++ b/crates/derive/src/online/alloy_providers.rs @@ -60,6 +60,17 @@ impl AlloyChainProvider { let inner = ReqwestProvider::new_http(url); Self::new(inner) } + + /// Returns the chain ID. + pub async fn chain_id(&mut self) -> Result { + let chain_id: TransportResult = + self.inner.raw_request("eth_chainId".into(), ()).await; + let chain_id = match chain_id { + Ok(s) => alloc::string::String::from(s.trim_start_matches("0x")), + Err(e) => return Err(anyhow!(e)), + }; + u64::from_str_radix(&chain_id, 16).map_err(|e| anyhow!(e)) + } } #[async_trait] diff --git a/crates/derive/src/online/mod.rs b/crates/derive/src/online/mod.rs index e622ab162..a550b4f15 100644 --- a/crates/derive/src/online/mod.rs +++ b/crates/derive/src/online/mod.rs @@ -5,7 +5,7 @@ pub use crate::{ pipeline::{DerivationPipeline, PipelineBuilder}, sources::EthereumDataSource, stages::StatefulAttributesBuilder, - traits::{ChainProvider, L2ChainProvider, OriginProvider, Pipeline}, + traits::{ChainProvider, L2ChainProvider, OriginProvider, Pipeline, StepResult}, types::{BlockInfo, RollupConfig}, }; diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 9e22dae48..ca719edb3 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -2,7 +2,7 @@ use super::{ L2ChainProvider, NextAttributes, OriginAdvancer, OriginProvider, Pipeline, ResettableStage, - StageError, + StageError, StepResult, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use anyhow::bail; @@ -97,22 +97,24 @@ where /// An error is expected when the underlying source closes. /// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the /// derivation process. - async fn step(&mut self, cursor: L2BlockInfo) -> anyhow::Result<()> { + async fn step(&mut self, cursor: L2BlockInfo) -> StepResult { match self.attributes.next_attributes(cursor).await { Ok(a) => { trace!(target: "pipeline", "Prepared L2 attributes: {:?}", a); self.prepared.push_back(a); - return Ok(()); + StepResult::PreparedAttributes } Err(StageError::Eof) => { trace!(target: "pipeline", "Pipeline advancing origin"); - self.attributes.advance_origin().await.map_err(|e| anyhow::anyhow!(e))?; + if let Err(e) = self.attributes.advance_origin().await { + return StepResult::OriginAdvanceErr(e); + } + StepResult::AdvancedOrigin } Err(err) => { warn!(target: "pipeline", "Attributes queue step failed: {:?}", err); - bail!(err); + StepResult::StepFailed(err) } } - Ok(()) } } diff --git a/crates/derive/src/pipeline/mod.rs b/crates/derive/src/pipeline/mod.rs index 3d42373f7..71a168ba0 100644 --- a/crates/derive/src/pipeline/mod.rs +++ b/crates/derive/src/pipeline/mod.rs @@ -3,7 +3,7 @@ /// Re-export trait arguments. pub use crate::traits::{ ChainProvider, DataAvailabilityProvider, L2ChainProvider, NextAttributes, OriginAdvancer, - OriginProvider, Pipeline, PreviousStage, ResetProvider, ResettableStage, + OriginProvider, Pipeline, PreviousStage, ResetProvider, ResettableStage, StepResult, }; /// Re-export stage types that are needed as inputs. diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 0b2463ec1..7ac17d7b9 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,7 @@ //! pipeline. mod pipeline; -pub use pipeline::Pipeline; +pub use pipeline::{Pipeline, StepResult}; mod attributes; pub use attributes::NextAttributes; diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index d7672e6e4..3ed7f699d 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -1,11 +1,25 @@ //! Defines the interface for the core derivation pipeline. use super::OriginProvider; +use crate::types::StageError; use alloc::boxed::Box; use async_trait::async_trait; use core::iter::Iterator; use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo}; +/// A pipeline error. +#[derive(Debug)] +pub enum StepResult { + /// Attributes were successfully prepared. + PreparedAttributes, + /// Origin was advanced. + AdvancedOrigin, + /// Origin advance failed. + OriginAdvanceErr(StageError), + /// Step failed. + StepFailed(StageError), +} + /// This trait defines the interface for interacting with the derivation pipeline. #[async_trait] pub trait Pipeline: OriginProvider + Iterator { @@ -16,5 +30,5 @@ pub trait Pipeline: OriginProvider + Iterator { async fn reset(&mut self, origin: BlockInfo) -> anyhow::Result<()>; /// Attempts to progress the pipeline. - async fn step(&mut self, cursor: L2BlockInfo) -> anyhow::Result<()>; + async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; } diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index 00f533f61..79180ba32 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -45,12 +45,16 @@ async fn sync(cli: cli::Cli) -> Result<()> { AlloyL2ChainProvider::new_http(l2_rpc_url.clone(), Arc::new(Default::default())); let l2_chain_id = l2_provider.chain_id().await.expect("Failed to fetch chain ID from L2 provider"); + metrics::CHAIN_ID.inc_by(l2_chain_id); let cfg = RollupConfig::from_l2_chain_id(l2_chain_id) .expect("Failed to fetch rollup config from L2 chain ID"); let cfg = Arc::new(cfg); + metrics::GENESIS_L2_BLOCK.inc_by(cfg.genesis.l2.number); // Construct the pipeline let mut l1_provider = AlloyChainProvider::new_http(l1_rpc_url); + let l1_chain_id = l1_provider.chain_id().await?; + metrics::CONSENSUS_CHAIN_ID.inc_by(l1_chain_id); let mut start = cli.start_l2_block.filter(|n| *n >= cfg.genesis.l2.number).unwrap_or(cfg.genesis.l2.number); @@ -61,8 +65,9 @@ async fn sync(cli: cli::Cli) -> Result<()> { start = l2_provider.latest_block_number().await?.saturating_sub(blocks); info!(target: LOG_TARGET, "Starting {} blocks from tip at L2 block number: {}", blocks, start); } + metrics::START_L2_BLOCK.inc_by(start); + println!("Starting from L2 block number: {}", metrics::START_L2_BLOCK.get()); - println!("Starting from L2 block number: {}", start); let mut l2_provider = AlloyL2ChainProvider::new_http(l2_rpc_url.clone(), cfg.clone()); let attributes = StatefulAttributesBuilder::new(cfg.clone(), l2_provider.clone(), l1_provider.clone()); @@ -107,22 +112,28 @@ async fn sync(cli: cli::Cli) -> Result<()> { } } match pipeline.step(cursor).await { - Ok(_) => { + StepResult::PreparedAttributes => { metrics::PIPELINE_STEPS.with_label_values(&["success"]).inc(); - info!(target: "loop", "Stepped derivation pipeline"); + info!(target: "loop", "Prepared attributes"); } - Err(e) => { + StepResult::AdvancedOrigin => { + metrics::PIPELINE_STEPS.with_label_values(&["origin_advance"]).inc(); + info!(target: "loop", "Advanced origin"); + } + StepResult::OriginAdvanceErr(e) => { + metrics::PIPELINE_STEPS.with_label_values(&["origin_advance_failure"]).inc(); + error!(target: "loop", "Error advancing origin: {:?}", e); + } + StepResult::StepFailed(e) => { metrics::PIPELINE_STEPS.with_label_values(&["failure"]).inc(); - debug!(target: "loop", "Error stepping derivation pipeline: {:?}", e); + error!(target: "loop", "Error stepping derivation pipeline: {:?}", e); } } // Peek at the next prepared attributes and validate them. if let Some(attributes) = pipeline.peek() { match validator.validate(attributes).await { - Ok(true) => { - info!(target: LOG_TARGET, "Validated payload attributes"); - } + Ok(true) => info!(target: LOG_TARGET, "Validated payload attributes"), Ok(false) => { error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash); metrics::FAILED_PAYLOAD_DERIVATION.inc(); diff --git a/examples/trusted-sync/src/metrics.rs b/examples/trusted-sync/src/metrics.rs index ac4741d2a..24cbc9050 100644 --- a/examples/trusted-sync/src/metrics.rs +++ b/examples/trusted-sync/src/metrics.rs @@ -8,6 +8,22 @@ use lazy_static::lazy_static; use prometheus::{register_gauge_vec, register_int_counter}; lazy_static! { + /// Tracks the starting L2 block number. + pub static ref START_L2_BLOCK: IntCounter = + register_int_counter!("trusted_sync_start_l2_block", "Starting L2 block number").expect("Failed to register start L2 block metric"); + + /// Tracks the genesis L2 block number. + pub static ref GENESIS_L2_BLOCK: IntCounter = + register_int_counter!("trusted_sync_genesis_l2_block", "Genesis L2 block number").expect("Failed to register genesis L2 block metric"); + + /// Tracks the Chain ID currently being synced. + pub static ref CHAIN_ID: IntCounter = + register_int_counter!("trusted_sync_chain_id", "Chain ID").expect("Failed to register chain ID metric"); + + /// Tracks the Chain ID for the consensus layer. + pub static ref CONSENSUS_CHAIN_ID: IntCounter = + register_int_counter!("trusted_sync_consensus_chain_id", "Consensus Chain ID").expect("Failed to register consensus chain ID metric"); + /// Tracks the number of failed payload derivations. pub static ref FAILED_PAYLOAD_DERIVATION: IntCounter = register_int_counter!("trusted_sync_failed_payload_derivation", "Number of failed payload derivations")