diff --git a/Cargo.lock b/Cargo.lock index 62ece0cd7..a189b0925 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1727,7 +1727,9 @@ dependencies = [ "alloy-provider", "alloy-rlp", "alloy-rpc-client", + "alloy-rpc-types", "alloy-sol-types", + "alloy-transport", "alloy-transport-http", "anyhow", "async-trait", diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index c5dd38f9e..7dbf7dc3f 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -39,12 +39,15 @@ c-kzg = { version = "1.0.2", default-features = false, optional = true } sha2 = { version = "0.10.8", default-features = false, optional = true } alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "cb95183", optional = true} alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", rev = "cb95183", optional = true } +alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "cb95183", default-features = false, optional = true } +serde_json = { version = "1.0.94", default-features = false, optional = true } reqwest = { version = "0.12.4", default-features = false, optional = true } # `test-utils` feature dependencies alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "cb95183", default-features = false, optional = true } tracing-subscriber = { version = "0.3.18", optional = true } alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "cb95183", default-features = false, optional = true } +alloy-transport = { git = "https://github.com/alloy-rs/alloy", rev = "cb95183", default-features = false, optional = true } [dev-dependencies] tokio = { version = "1.38", features = ["full"] } @@ -65,10 +68,13 @@ serde = [ ] k256 = ["alloy-primitives/k256", "alloy-consensus/k256", "op-alloy-consensus/k256"] online = [ + "dep:serde_json", "dep:revm", "dep:c-kzg", "dep:sha2", "dep:alloy-provider", + "dep:alloy-rpc-types", + "dep:alloy-transport", "dep:alloy-transport-http", "dep:reqwest", "alloy-provider/reqwest", diff --git a/crates/derive/USAGE.md b/crates/derive/USAGE.md new file mode 100644 index 000000000..b8f7d172a --- /dev/null +++ b/crates/derive/USAGE.md @@ -0,0 +1,46 @@ +## Usage + +```rust +use alloc::sync::Arc; +use kona_derive::online::*; +use kona_derive::pipeline::*; +use kona_primitives::{BlockInfo, L2BlockInfo, RollupConfig}; + +// Creates a new chain provider using the `L1_RPC_URL` environment variable. +let l1_rpc_url = std::env::var("L1_RPC_URL").expect("L1_RPC_URL must be set"); +let chain_provider = AlloyChainProvider::new_http(l1_rpc_url.parse().unwrap()); + +// Creates a new l2 chain provider using the `L2_RPC_URL` environment variable. +let l2_rpc_url = std::env::var("L2_RPC_URL").expect("L2_RPC_URL must be set"); +let l2_chain_provider = AlloyL2ChainProvider::new_http(l2_rpc_url.parse().unwrap()); + +// TODO(refcell): replace this will a rollup config +// fetched from the superchain-registry via network id. +let rollup_config = Arc::new(RollupConfig::default()); + +// Create the beacon client used to fetch blob data. +let beacon_url = std::env::var("BEACON_URL").expect("BEACON_URL must be set"); +let beacon_client = OnlineBeaconClient::new_http(beacon_url.parse().unwrap()); + +// Build the online blob provider. +let blob_provider = OnlineBlobProvider::<_, SimpleSlotDerivation>::new(true, beacon_client, None, None); + +// Build the ethereum data source +let dap_source = EthereumDataSource::new(chain_provider.clone(), blob_provider, &rollup_config); + +// The payload attributes builder that is stateful. +let attributes_builder = StatefulAttributesBuilder::new(rollup_config.clone(), l2_chain_provider.clone(), chain_provider.clone()); + +// Build the pipeline. +let pipeline = PipelineBuilder::new() + .rollup_config(rollup_config) + .dap_source(dap_source) + .l2_chain_provider(l2_chain_provider) + .chain_provider(chain_provider) + .builder(attributes_builder) + .build(); + +// The pipeline should be at the default state. +assert_eq!(pipeline.tip, BlockInfo::default()); +assert_eq!(pipeline.cursor, L2BlockInfo::default()); +``` diff --git a/crates/derive/src/online/mod.rs b/crates/derive/src/online/mod.rs index fdff54421..55aa774b0 100644 --- a/crates/derive/src/online/mod.rs +++ b/crates/derive/src/online/mod.rs @@ -48,6 +48,9 @@ where #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; +mod validation; +pub use validation::{OnlineValidator, Validator}; + mod beacon_client; pub use beacon_client::{BeaconClient, OnlineBeaconClient}; diff --git a/crates/derive/src/online/validation.rs b/crates/derive/src/online/validation.rs new file mode 100644 index 000000000..fc3e2c926 --- /dev/null +++ b/crates/derive/src/online/validation.rs @@ -0,0 +1,94 @@ +//! Contains logic to validate derivation pipeline outputs. + +use crate::types::{L2AttributesWithParent, L2PayloadAttributes, RawTransaction}; +use alloc::{boxed::Box, vec, vec::Vec}; +use alloy_provider::{Provider, ReqwestProvider}; +use alloy_rpc_types::{Block, BlockNumberOrTag, Header}; +use alloy_transport::TransportResult; +use anyhow::Result; +use async_trait::async_trait; +use tracing::warn; + +/// Validator +/// +/// The validator trait describes the interface for validating the derivation outputs. +#[async_trait] +pub trait Validator { + /// Validates the given [`L2AttributesWithParent`]. + async fn validate(&self, attributes: &L2AttributesWithParent) -> bool; +} + +/// OnlineValidator +/// +/// Validates the [`L2AttributesWithParent`] by fetching the associated L2 block from +/// a trusted L2 RPC and constructing the L2 Attributes from the block. +#[derive(Debug, Clone)] +pub struct OnlineValidator { + /// The L2 provider. + provider: ReqwestProvider, +} + +impl OnlineValidator { + /// Creates a new `OnlineValidator`. + pub fn new(provider: ReqwestProvider) -> Self { + Self { provider } + } + + /// Creates a new [OnlineValidator] from the provided [reqwest::Url]. + pub fn new_http(url: reqwest::Url) -> Self { + let inner = ReqwestProvider::new_http(url); + Self::new(inner) + } + + /// Fetches a block [Header] and a list of raw RLP encoded transactions from the L2 provider. + /// + /// This method needs to fetch the non-hydrated block and then + /// fetch the raw transactions using the `debug_*` namespace. + pub(crate) async fn get_block( + &self, + tag: BlockNumberOrTag, + ) -> Result<(Header, Vec)> { + // Don't hydrate the block so we only get a list of transaction hashes. + let block: TransportResult = + self.provider.raw_request("eth_getBlockByNumber".into(), (tag, false)).await; + let block = block.map_err(|e| anyhow::anyhow!(e))?; + // For each transaction hash, fetch the raw transaction RLP. + let mut txs = vec![]; + for tx in block.transactions.hashes() { + let tx: TransportResult = + self.provider.raw_request("debug_getRawTransaction".into(), tx).await; + if let Ok(tx) = tx { + txs.push(tx); + } else { + warn!("Failed to fetch transaction: {:?}", tx); + } + } + Ok((block.header, txs)) + } + + /// Gets the payload for the specified [BlockNumberOrTag]. + pub(crate) async fn get_payload(&self, tag: BlockNumberOrTag) -> Result { + let (header, transactions) = self.get_block(tag).await?; + Ok(L2PayloadAttributes { + timestamp: header.timestamp, + prev_randao: header.mix_hash.unwrap_or_default(), + fee_recipient: header.miner, + // Withdrawals on optimism are always empty + withdrawals: Default::default(), + parent_beacon_block_root: Some(header.parent_hash), + transactions, + no_tx_pool: true, + gas_limit: Some(header.gas_limit as u64), + }) + } +} + +#[async_trait] +impl Validator for OnlineValidator { + async fn validate(&self, attributes: &L2AttributesWithParent) -> bool { + let expected = attributes.parent.block_info.number + 1; + let tag = BlockNumberOrTag::from(expected); + let payload = self.get_payload(tag).await.unwrap(); + attributes.attributes == payload + } +} diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 372e0c149..bea16b472 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -1,15 +1,14 @@ //! Contains the `PipelineBuilder` object that is used to build a `DerivationPipeline`. use super::{ - AttributesBuilder, ChainProvider, DataAvailabilityProvider, DerivationPipeline, - L2ChainProvider, ResetProvider, + AttributesBuilder, ChainProvider, DataAvailabilityProvider, DerivationPipeline, L2ChainProvider, }; use crate::stages::{ AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }; use alloc::{collections::VecDeque, sync::Arc}; use core::fmt::Debug; -use kona_primitives::{L2BlockInfo, RollupConfig}; +use kona_primitives::{BlockInfo, L2BlockInfo, RollupConfig}; type L1TraversalStage

= L1Traversal

; type L1RetrievalStage = L1Retrieval>; @@ -20,78 +19,13 @@ type BatchQueueStage = BatchQueue, T>; type AttributesQueueStage = AttributesQueue, B>; /// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern. -#[cfg_attr( - feature = "online", - doc = " -## Usage - -```rust -use alloc::sync::Arc; -use alloy_provider::ReqwestProvider; -use alloy_rpc_client::RpcClient; -use alloy_transport_http::Http; -use kona_derive::{ - online::{ - AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProvider, - SimpleSlotDerivation, - }, - pipeline::*, - }; -use kona_primitives::L2BlockInfo; -use reqwest::Client; - - // Creates a new chain provider using the `L1_RPC_URL` environment variable. - let l1_rpc_url = std::env::var(\"L1_RPC_URL\").expect(\"L1_RPC_URL must be set\"); - let l1_rpc_url = l1_rpc_url.parse().unwrap(); - let http = Http::::new(l1_rpc_url); - let chain_provider = AlloyChainProvider::new(ReqwestProvider::new(RpcClient::new(http, true))); - - // Creates a new l2 chain provider using the `L2_RPC_URL` environment variable. - let l2_rpc_url = std::env::var(\"L2_RPC_URL\").expect(\"L2_RPC_URL must be set\"); - let l2_rpc_url = l2_rpc_url.parse().unwrap(); - let http = Http::::new(l2_rpc_url); - let l2_chain_provider = - AlloyL2ChainProvider::new(ReqwestProvider::new(RpcClient::new(http, true))); - - // TODO(refcell): replace this will a rollup config - // fetched from the superchain-registry via network id. - let rollup_config = Arc::new(RollupConfig::default()); - - // Create the beacon client used to fetch blob data. - let beacon_url = std::env::var(\"BEACON_URL\").expect(\"BEACON_URL must be set\"); - let beacon_url = beacon_url.parse().unwrap(); - let http = Http::::new(beacon_url); - let beacon_client = OnlineBeaconClient::new(ReqwestProvider::new(RpcClient::new(http, true))); - - // Build the online blob provider. - let blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> = - OnlineBlobProvider::new(true, beacon_client, None, None); - - // Build the ethereum data source - let dap_source = EthereumDataSource::new(chain_provider.clone(), blob_provider, &rollup_config); - - let builder = PipelineBuilder::new(); - let pipeline = builder - .rollup_config(rollup_config) - .dap_source(dap_source) - .l2_chain_provider(l2_chain_provider) - .chain_provider(chain_provider) - .builder(OnlineAttributesBuilder::new()) - .reset(ResetProvider::new()) - .start_cursor(L2BlockInfo::default()) - .build(); - - assert_eq!(pipeline.needs_reset, false); -``` -" -)] +#[cfg_attr(feature = "online", doc = include_str!("../../USAGE.md"))] #[derive(Debug)] -pub struct PipelineBuilder +pub struct PipelineBuilder where - R: ResetProvider + Send + Debug, B: AttributesBuilder + Send + Debug, P: ChainProvider + Send + Sync + Debug, - T: L2ChainProvider + Send + Sync + Debug, + T: L2ChainProvider + Clone + Send + Sync + Debug, D: DataAvailabilityProvider + Send + Sync + Debug, { l2_chain_provider: Option, @@ -99,16 +33,15 @@ where chain_provider: Option

, builder: Option, rollup_config: Option>, - reset: Option, start_cursor: Option, + tip: Option, } -impl Default for PipelineBuilder +impl Default for PipelineBuilder where - R: ResetProvider + Send + Debug, B: AttributesBuilder + Send + Debug, P: ChainProvider + Send + Sync + Debug, - T: L2ChainProvider + Send + Sync + Debug, + T: L2ChainProvider + Clone + Send + Sync + Debug, D: DataAvailabilityProvider + Send + Sync + Debug, { fn default() -> Self { @@ -117,19 +50,18 @@ where dap_source: None, chain_provider: None, builder: None, + tip: None, rollup_config: None, - reset: None, start_cursor: None, } } } -impl PipelineBuilder +impl PipelineBuilder where - R: ResetProvider + Send + Debug, B: AttributesBuilder + Send + Debug, P: ChainProvider + Send + Sync + Debug, - T: L2ChainProvider + Send + Sync + Debug, + T: L2ChainProvider + Clone + Send + Sync + Debug, D: DataAvailabilityProvider + Send + Sync + Debug, { /// Creates a new pipeline builder. @@ -143,6 +75,18 @@ where self } + /// Sets the tip for the pipeline. + pub fn tip(mut self, tip: BlockInfo) -> Self { + self.tip = Some(tip); + self + } + + /// Sets the start cursor for the pipeline. + pub fn start_cursor(mut self, cursor: L2BlockInfo) -> Self { + self.start_cursor = Some(cursor); + self + } + /// Sets the data availability provider for the pipeline. pub fn dap_source(mut self, dap_source: D) -> Self { self.dap_source = Some(dap_source); @@ -167,40 +111,26 @@ where self } - /// Sets the reset provider for the pipeline. - pub fn reset(mut self, reset: R) -> Self { - self.reset = Some(reset); - self - } - - /// Sets the start cursor for the pipeline. - pub fn start_cursor(mut self, cursor: L2BlockInfo) -> Self { - self.start_cursor = Some(cursor); - self - } - /// Builds the pipeline. - pub fn build(self) -> DerivationPipeline, R> { + pub fn build(self) -> DerivationPipeline, T> { self.into() } } -impl From> - for DerivationPipeline, R> +impl From> + for DerivationPipeline, T> where - R: ResetProvider + Send + Debug, B: AttributesBuilder + Send + Debug, P: ChainProvider + Send + Sync + Debug, - T: L2ChainProvider + Send + Sync + Debug, + T: L2ChainProvider + Clone + Send + Sync + Debug, D: DataAvailabilityProvider + Send + Sync + Debug, { - fn from(builder: PipelineBuilder) -> Self { + fn from(builder: PipelineBuilder) -> Self { // Extract the builder fields. let rollup_config = builder.rollup_config.expect("rollup_config must be set"); let chain_provider = builder.chain_provider.expect("chain_provider must be set"); let l2_chain_provider = builder.l2_chain_provider.expect("chain_provider must be set"); let dap_source = builder.dap_source.expect("dap_source must be set"); - let reset = builder.reset.expect("reset must be set"); let attributes_builder = builder.builder.expect("builder must be set"); // Compose the stage stack. @@ -209,16 +139,18 @@ where let frame_queue = FrameQueue::new(l1_retrieval); let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue); let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config)); - let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, l2_chain_provider); + let batch_queue = + BatchQueue::new(rollup_config.clone(), channel_reader, l2_chain_provider.clone()); let attributes = AttributesQueue::new(*rollup_config, batch_queue, attributes_builder); // Create the pipeline. DerivationPipeline { attributes, - reset, + tip: builder.tip.unwrap_or_default(), prepared: VecDeque::new(), - needs_reset: false, cursor: builder.start_cursor.unwrap_or_default(), + rollup_config, + l2_chain_provider, } } } diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 582e221e2..9bf777447 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -1,42 +1,65 @@ //! Contains the core derivation pipeline. use super::{ - NextAttributes, OriginAdvancer, Pipeline, ResetProvider, ResettableStage, StageError, - StageResult, + L2ChainProvider, NextAttributes, OriginAdvancer, Pipeline, ResettableStage, StageError, }; -use alloc::{boxed::Box, collections::VecDeque}; +use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; -use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo, SystemConfig}; +use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo, RollupConfig}; /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] -pub struct DerivationPipeline< +pub struct DerivationPipeline +where S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, - R: ResetProvider + Send, -> { + P: L2ChainProvider + Send + Sync + Debug, +{ /// A handle to the next attributes. pub attributes: S, /// Reset provider for the pipeline. - pub reset: R, /// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer. pub prepared: VecDeque, - /// A flag to tell the pipeline to reset. - pub needs_reset: bool, /// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes. pub cursor: L2BlockInfo, + /// L1 Origin Tip + pub tip: BlockInfo, + /// The rollup config. + pub rollup_config: Arc, + /// The L2 Chain Provider used to fetch the system config on reset. + pub l2_chain_provider: P, } -#[async_trait] -impl Pipeline for DerivationPipeline +impl DerivationPipeline where S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, - R: ResetProvider + Send, + P: L2ChainProvider + Send + Sync + Debug, { - fn reset(&mut self) { - self.needs_reset = true; + /// Creates a new instance of the [DerivationPipeline]. + pub fn new( + attributes: S, + tip: BlockInfo, + cursor: L2BlockInfo, + rollup_config: Arc, + l2_chain_provider: P, + ) -> Self { + Self { + attributes, + prepared: VecDeque::new(), + rollup_config, + tip, + cursor, + l2_chain_provider, + } } +} +#[async_trait] +impl Pipeline for DerivationPipeline +where + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + P: L2ChainProvider + Send + Sync + Debug, +{ /// Pops the next prepared [L2AttributesWithParent] from the pipeline. fn pop(&mut self) -> Option { self.prepared.pop_front() @@ -48,6 +71,30 @@ where self.cursor = cursor; } + /// Sets the L1 Origin of the pipeline. + fn set_origin(&mut self, origin: BlockInfo) { + self.tip = origin; + } + + /// Resets the pipelien by calling the [`ResettableStage::reset`] method. + /// This will bubble down the stages all the way to the `L1Traversal` stage. + async fn reset(&mut self, block_info: BlockInfo) -> anyhow::Result<()> { + self.tip = block_info; + let system_config = self + .l2_chain_provider + .system_config_by_number(self.tip.number, Arc::clone(&self.rollup_config)) + .await?; + match self.attributes.reset(self.tip, &system_config).await { + Ok(()) => tracing::info!("Stages reset"), + Err(StageError::Eof) => tracing::info!("Stages reset with EOF"), + Err(err) => { + tracing::error!("Stages reset failed: {:?}", err); + anyhow::bail!(err); + } + } + Ok(()) + } + /// Attempts to progress the pipeline. /// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data. /// Any other error is critical and the derivation pipeline should be reset. @@ -55,16 +102,6 @@ where /// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the /// derivation process. async fn step(&mut self) -> anyhow::Result<()> { - tracing::info!("DerivationPipeline::step"); - - // Reset the pipeline if needed. - if self.needs_reset { - let block_info = self.reset.block_info().await; - let system_config = self.reset.system_config().await; - self.reset_pipe(block_info, &system_config).await.map_err(|e| anyhow::anyhow!(e))?; - self.needs_reset = false; - } - match self.attributes.next_attributes(self.cursor).await { Ok(a) => { tracing::info!("attributes queue stage step returned l2 attributes"); @@ -78,39 +115,10 @@ where } // TODO: match on the EngineELSyncing error here and log Err(err) => { - tracing::error!("attributes queue stage failed: {:?}", err); + tracing::error!("attributes queue step failed: {:?}", err); return Err(anyhow::anyhow!(err)); } } - - Ok(()) - } -} - -impl DerivationPipeline -where - S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, - R: ResetProvider + Send, -{ - /// Creates a new instance of the [DerivationPipeline]. - pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self { - Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor } - } - - /// Internal helper to reset the pipeline. - async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { - match self.attributes.reset(bi, sc).await { - Ok(()) => { - tracing::info!("Stages reset"); - } - Err(StageError::Eof) => { - tracing::info!("Stages reset with EOF"); - } - Err(err) => { - tracing::error!("Stages reset failed: {:?}", err); - return Err(err); - } - } Ok(()) } } diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index c2e986e2f..03bb5f589 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -2,13 +2,16 @@ use alloc::boxed::Box; use async_trait::async_trait; -use kona_primitives::{L2AttributesWithParent, L2BlockInfo}; +use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo}; /// This trait defines the interface for interacting with the derivation pipeline. #[async_trait] pub trait Pipeline { /// Resets the pipeline on the next [Pipeline::step] call. - fn reset(&mut self); + async fn reset(&mut self, origin: BlockInfo) -> anyhow::Result<()>; + + /// Sets the L1 Origin of the pipeline. + fn set_origin(&mut self, origin: BlockInfo); /// Attempts to progress the pipeline. async fn step(&mut self) -> anyhow::Result<()>;