Skip to content

Commit

Permalink
feat: increase granularity (#365)
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell authored Jul 10, 2024
1 parent 4eb20aa commit 3b48fcb
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 21 deletions.
14 changes: 11 additions & 3 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy_consensus::{Header, Sealed};
use anyhow::{anyhow, Result};
use core::fmt::Debug;
use kona_derive::{
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder},
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
Expand Down Expand Up @@ -136,8 +136,16 @@ impl DerivationDriver {
let mut attributes = None;
while attributes.is_none() {
match self.pipeline.step(self.l2_safe_head).await {
Ok(_) => info!(target: "client_derivation_driver", "Stepped derivation pipeline"),
Err(e) => {
StepResult::PreparedAttributes => {
info!(target: "client_derivation_driver", "Stepped derivation pipeline")
}
StepResult::AdvancedOrigin => {
info!(target: "client_derivation_driver", "Advanced origin")
}
StepResult::OriginAdvanceErr(e) => {
warn!(target: "client_derivation_driver", "Failed to advance origin: {:?}", e)
}
StepResult::StepFailed(e) => {
warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e)
}
}
Expand Down
11 changes: 11 additions & 0 deletions crates/derive/src/online/alloy_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ impl AlloyChainProvider {
let inner = ReqwestProvider::new_http(url);
Self::new(inner)
}

/// Returns the chain ID.
pub async fn chain_id(&mut self) -> Result<u64> {
let chain_id: TransportResult<alloc::string::String> =
self.inner.raw_request("eth_chainId".into(), ()).await;
let chain_id = match chain_id {
Ok(s) => alloc::string::String::from(s.trim_start_matches("0x")),
Err(e) => return Err(anyhow!(e)),
};
u64::from_str_radix(&chain_id, 16).map_err(|e| anyhow!(e))
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/online/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub use crate::{
pipeline::{DerivationPipeline, PipelineBuilder},
sources::EthereumDataSource,
stages::StatefulAttributesBuilder,
traits::{ChainProvider, L2ChainProvider, OriginProvider, Pipeline},
traits::{ChainProvider, L2ChainProvider, OriginProvider, Pipeline, StepResult},
types::{BlockInfo, RollupConfig},
};

Expand Down
14 changes: 8 additions & 6 deletions crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use super::{
L2ChainProvider, NextAttributes, OriginAdvancer, OriginProvider, Pipeline, ResettableStage,
StageError,
StageError, StepResult,
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use anyhow::bail;
Expand Down Expand Up @@ -97,22 +97,24 @@ where
/// An error is expected when the underlying source closes.
/// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the
/// derivation process.
async fn step(&mut self, cursor: L2BlockInfo) -> anyhow::Result<()> {
async fn step(&mut self, cursor: L2BlockInfo) -> StepResult {
match self.attributes.next_attributes(cursor).await {
Ok(a) => {
trace!(target: "pipeline", "Prepared L2 attributes: {:?}", a);
self.prepared.push_back(a);
return Ok(());
StepResult::PreparedAttributes
}
Err(StageError::Eof) => {
trace!(target: "pipeline", "Pipeline advancing origin");
self.attributes.advance_origin().await.map_err(|e| anyhow::anyhow!(e))?;
if let Err(e) = self.attributes.advance_origin().await {
return StepResult::OriginAdvanceErr(e);
}
StepResult::AdvancedOrigin
}
Err(err) => {
warn!(target: "pipeline", "Attributes queue step failed: {:?}", err);
bail!(err);
StepResult::StepFailed(err)
}
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/derive/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/// Re-export trait arguments.
pub use crate::traits::{
ChainProvider, DataAvailabilityProvider, L2ChainProvider, NextAttributes, OriginAdvancer,
OriginProvider, Pipeline, PreviousStage, ResetProvider, ResettableStage,
OriginProvider, Pipeline, PreviousStage, ResetProvider, ResettableStage, StepResult,
};

/// Re-export stage types that are needed as inputs.
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! pipeline.
mod pipeline;
pub use pipeline::Pipeline;
pub use pipeline::{Pipeline, StepResult};

mod attributes;
pub use attributes::NextAttributes;
Expand Down
16 changes: 15 additions & 1 deletion crates/derive/src/traits/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
//! Defines the interface for the core derivation pipeline.
use super::OriginProvider;
use crate::types::StageError;
use alloc::boxed::Box;
use async_trait::async_trait;
use core::iter::Iterator;
use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo};

/// A pipeline error.
#[derive(Debug)]
pub enum StepResult {
/// Attributes were successfully prepared.
PreparedAttributes,
/// Origin was advanced.
AdvancedOrigin,
/// Origin advance failed.
OriginAdvanceErr(StageError),
/// Step failed.
StepFailed(StageError),
}

/// This trait defines the interface for interacting with the derivation pipeline.
#[async_trait]
pub trait Pipeline: OriginProvider + Iterator<Item = L2AttributesWithParent> {
Expand All @@ -16,5 +30,5 @@ pub trait Pipeline: OriginProvider + Iterator<Item = L2AttributesWithParent> {
async fn reset(&mut self, origin: BlockInfo) -> anyhow::Result<()>;

/// Attempts to progress the pipeline.
async fn step(&mut self, cursor: L2BlockInfo) -> anyhow::Result<()>;
async fn step(&mut self, cursor: L2BlockInfo) -> StepResult;
}
27 changes: 19 additions & 8 deletions examples/trusted-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ async fn sync(cli: cli::Cli) -> Result<()> {
AlloyL2ChainProvider::new_http(l2_rpc_url.clone(), Arc::new(Default::default()));
let l2_chain_id =
l2_provider.chain_id().await.expect("Failed to fetch chain ID from L2 provider");
metrics::CHAIN_ID.inc_by(l2_chain_id);
let cfg = RollupConfig::from_l2_chain_id(l2_chain_id)
.expect("Failed to fetch rollup config from L2 chain ID");
let cfg = Arc::new(cfg);
metrics::GENESIS_L2_BLOCK.inc_by(cfg.genesis.l2.number);

// Construct the pipeline
let mut l1_provider = AlloyChainProvider::new_http(l1_rpc_url);
let l1_chain_id = l1_provider.chain_id().await?;
metrics::CONSENSUS_CHAIN_ID.inc_by(l1_chain_id);

let mut start =
cli.start_l2_block.filter(|n| *n >= cfg.genesis.l2.number).unwrap_or(cfg.genesis.l2.number);
Expand All @@ -61,8 +65,9 @@ async fn sync(cli: cli::Cli) -> Result<()> {
start = l2_provider.latest_block_number().await?.saturating_sub(blocks);
info!(target: LOG_TARGET, "Starting {} blocks from tip at L2 block number: {}", blocks, start);
}
metrics::START_L2_BLOCK.inc_by(start);
println!("Starting from L2 block number: {}", metrics::START_L2_BLOCK.get());

println!("Starting from L2 block number: {}", start);
let mut l2_provider = AlloyL2ChainProvider::new_http(l2_rpc_url.clone(), cfg.clone());
let attributes =
StatefulAttributesBuilder::new(cfg.clone(), l2_provider.clone(), l1_provider.clone());
Expand Down Expand Up @@ -107,22 +112,28 @@ async fn sync(cli: cli::Cli) -> Result<()> {
}
}
match pipeline.step(cursor).await {
Ok(_) => {
StepResult::PreparedAttributes => {
metrics::PIPELINE_STEPS.with_label_values(&["success"]).inc();
info!(target: "loop", "Stepped derivation pipeline");
info!(target: "loop", "Prepared attributes");
}
Err(e) => {
StepResult::AdvancedOrigin => {
metrics::PIPELINE_STEPS.with_label_values(&["origin_advance"]).inc();
info!(target: "loop", "Advanced origin");
}
StepResult::OriginAdvanceErr(e) => {
metrics::PIPELINE_STEPS.with_label_values(&["origin_advance_failure"]).inc();
error!(target: "loop", "Error advancing origin: {:?}", e);
}
StepResult::StepFailed(e) => {
metrics::PIPELINE_STEPS.with_label_values(&["failure"]).inc();
debug!(target: "loop", "Error stepping derivation pipeline: {:?}", e);
error!(target: "loop", "Error stepping derivation pipeline: {:?}", e);
}
}

// Peek at the next prepared attributes and validate them.
if let Some(attributes) = pipeline.peek() {
match validator.validate(attributes).await {
Ok(true) => {
info!(target: LOG_TARGET, "Validated payload attributes");
}
Ok(true) => info!(target: LOG_TARGET, "Validated payload attributes"),
Ok(false) => {
error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash);
metrics::FAILED_PAYLOAD_DERIVATION.inc();
Expand Down
16 changes: 16 additions & 0 deletions examples/trusted-sync/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ use lazy_static::lazy_static;
use prometheus::{register_gauge_vec, register_int_counter};

lazy_static! {
/// Tracks the starting L2 block number.
pub static ref START_L2_BLOCK: IntCounter =
register_int_counter!("trusted_sync_start_l2_block", "Starting L2 block number").expect("Failed to register start L2 block metric");

/// Tracks the genesis L2 block number.
pub static ref GENESIS_L2_BLOCK: IntCounter =
register_int_counter!("trusted_sync_genesis_l2_block", "Genesis L2 block number").expect("Failed to register genesis L2 block metric");

/// Tracks the Chain ID currently being synced.
pub static ref CHAIN_ID: IntCounter =
register_int_counter!("trusted_sync_chain_id", "Chain ID").expect("Failed to register chain ID metric");

/// Tracks the Chain ID for the consensus layer.
pub static ref CONSENSUS_CHAIN_ID: IntCounter =
register_int_counter!("trusted_sync_consensus_chain_id", "Consensus Chain ID").expect("Failed to register consensus chain ID metric");

/// Tracks the number of failed payload derivations.
pub static ref FAILED_PAYLOAD_DERIVATION: IntCounter =
register_int_counter!("trusted_sync_failed_payload_derivation", "Number of failed payload derivations")
Expand Down

0 comments on commit 3b48fcb

Please sign in to comment.