From a83f0cfbd3704343b74c288a6a7ff889aaf9e098 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 15 Oct 2024 18:25:52 -0400 Subject: [PATCH] feat(derive): signal receiver logic (#696) --- Cargo.lock | 89 ++++++------- bin/client/src/l1/driver.rs | 48 +++++-- crates/derive/src/pipeline/core.rs | 123 +++++++++--------- crates/derive/src/pipeline/mod.rs | 2 +- crates/derive/src/stages/attributes_queue.rs | 66 ++++------ crates/derive/src/stages/batch_queue.rs | 78 +++++------ crates/derive/src/stages/batch_stream.rs | 46 ++----- .../src/stages/channel/channel_assembler.rs | 26 ++-- .../derive/src/stages/channel/channel_bank.rs | 31 ++--- .../src/stages/channel/channel_provider.rs | 10 +- .../src/stages/channel/channel_reader.rs | 56 ++++---- crates/derive/src/stages/frame_queue.rs | 30 ++--- crates/derive/src/stages/l1_retrieval.rs | 74 ++++++++--- crates/derive/src/stages/l1_traversal.rs | 62 +++++++-- crates/derive/src/stages/multiplexed.rs | 33 +++-- .../derive/src/test_utils/attributes_queue.rs | 21 ++- crates/derive/src/test_utils/batch_queue.rs | 21 ++- crates/derive/src/test_utils/batch_stream.rs | 21 ++- crates/derive/src/test_utils/channel_bank.rs | 7 +- .../derive/src/test_utils/channel_reader.rs | 7 +- crates/derive/src/test_utils/frame_queue.rs | 7 +- crates/derive/src/test_utils/pipeline.rs | 17 +-- crates/derive/src/traits/mod.rs | 4 +- crates/derive/src/traits/pipeline.rs | 108 +++++++++++++-- crates/derive/src/traits/stages.rs | 18 +-- 25 files changed, 549 insertions(+), 456 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3487a95b8..0c8554d98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,10 +53,11 @@ checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "alloy-chains" -version = "0.1.36" +version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94c225801d42099570d0674701dddd4142f0ef715282aeb5985042e2ec962df7" +checksum = "156bfc5dcd52ef9a5f33381701fa03310317e14c65093a9430d3e3557b08dcd3" dependencies = [ + "alloy-primitives", "num_enum", "strum", ] @@ -196,9 +197,9 @@ dependencies = [ [[package]] name = "alloy-primitives" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ecb848c43f6b06ae3de2e4a67496cbbabd78ae87db0f1248934f15d76192c6a" +checksum = "38f35429a652765189c1c5092870d8360ee7b7769b09b06d89ebaefd34676446" dependencies = [ "alloy-rlp", "arbitrary", @@ -377,9 +378,9 @@ dependencies = [ [[package]] name = "alloy-sol-macro" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "661c516eb1fa3294cc7f2fb8955b3b609d639c282ac81a4eedb14d3046db503a" +checksum = "3b2395336745358cc47207442127c47c63801a7065ecc0aa928da844f8bb5576" dependencies = [ "alloy-sol-macro-expander", "alloy-sol-macro-input", @@ -391,9 +392,9 @@ dependencies = [ [[package]] name = "alloy-sol-macro-expander" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecbabb8fc3d75a0c2cea5215be22e7a267e3efde835b0f2a8922f5e3f5d47683" +checksum = "9ed5047c9a241df94327879c2b0729155b58b941eae7805a7ada2e19436e6b39" dependencies = [ "alloy-sol-macro-input", "const-hex", @@ -409,9 +410,9 @@ dependencies = [ [[package]] name = "alloy-sol-macro-input" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16517f2af03064485150d89746b8ffdcdbc9b6eeb3d536fb66efd7c2846fbc75" +checksum = "5dee02a81f529c415082235129f0df8b8e60aa1601b9c9298ffe54d75f57210b" dependencies = [ "const-hex", "dunce", @@ -424,9 +425,9 @@ dependencies = [ [[package]] name = "alloy-sol-types" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e448d879903624863f608c552d10efb0e0905ddbee98b0049412799911eb062" +checksum = "c2841af22d99e2c0f82a78fe107b6481be3dd20b89bfb067290092794734343a" dependencies = [ "alloy-primitives", "alloy-sol-macro", @@ -913,9 +914,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" [[package]] name = "byteorder" @@ -966,9 +967,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.28" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "jobserver", "libc", @@ -1785,9 +1786,9 @@ checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -1982,9 +1983,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] @@ -2842,9 +2843,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.13" +version = "2.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdbef9d1d47087a895abd220ed25eb4ad973a5e26f6a4367b038c25e28dfc2d9" +checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442" dependencies = [ "memchr", "thiserror 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3254,9 +3255,9 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "rend" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31c1f1959e4db12c985c0283656be0925f1539549db1e47c4bd0b8b599e1ef7" +checksum = "a35e8a6bf28cd121053a66aa2e6a2e3eaffad4a60012179f0e864aa5ffeff215" dependencies = [ "bytecheck", ] @@ -3572,9 +3573,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" @@ -3589,9 +3590,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "rusty-fork" @@ -4007,9 +4008,9 @@ dependencies = [ [[package]] name = "syn-solidity" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20e7b52ad118b2153644eea95c6fc740b6c1555b2344fdab763fc9de4075f665" +checksum = "ebfc1bfd06acc78f16d8fd3ef846bc222ee7002468d10a7dce8d703d6eab89a3" dependencies = [ "paste", "proc-macro2", @@ -4496,9 +4497,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -4507,9 +4508,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -4522,9 +4523,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.43" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -4534,9 +4535,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4544,9 +4545,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -4557,15 +4558,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "web-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index b54a78ffb..73c5242a6 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -12,14 +12,14 @@ use anyhow::{anyhow, Result}; use core::fmt::Debug; use kona_derive::{ attributes::StatefulAttributesBuilder, - errors::PipelineErrorKind, + errors::{PipelineErrorKind, ResetError}, pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, - traits::{BlobProvider, OriginProvider, Signal}, + traits::{ActivationSignal, BlobProvider, OriginProvider, ResetSignal, Signal, SignalReceiver}, }; use kona_executor::{KonaHandleRegister, StatelessL2BlockExecutor}; use kona_mpt::{TrieHinter, TrieProvider}; @@ -256,18 +256,38 @@ where // stages can make progress. match e { PipelineErrorKind::Temporary(_) => { /* continue */ } - PipelineErrorKind::Reset(_) => { - // Reset the pipeline to the initial L2 safe head and L1 origin, - // and try again. - self.pipeline - .signal(Signal::Reset { - l2_safe_head: self.l2_safe_head, - l1_origin: self - .pipeline - .origin() - .ok_or_else(|| anyhow!("Missing L1 origin"))?, - }) - .await?; + PipelineErrorKind::Reset(e) => { + if matches!(e, ResetError::HoloceneActivation) { + self.pipeline + .signal( + ActivationSignal { + l2_safe_head: self.l2_safe_head, + l1_origin: self + .pipeline + .origin() + .ok_or_else(|| anyhow!("Missing L1 origin"))?, + system_config: None, + } + .signal(), + ) + .await?; + } else { + // Reset the pipeline to the initial L2 safe head and L1 origin, + // and try again. + self.pipeline + .signal( + ResetSignal { + l2_safe_head: self.l2_safe_head, + l1_origin: self + .pipeline + .origin() + .ok_or_else(|| anyhow!("Missing L1 origin"))?, + system_config: None, + } + .signal(), + ) + .await?; + } } PipelineErrorKind::Critical(_) => return Err(e.into()), } diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index c0aa094d2..702ab8322 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -2,11 +2,11 @@ use super::{ NextAttributes, OriginAdvancer, OriginProvider, Pipeline, PipelineError, PipelineResult, - ResettableStage, StepResult, + StepResult, }; use crate::{ errors::PipelineErrorKind, - traits::{FlushableStage, Signal}, + traits::{ActivationSignal, ResetSignal, Signal, SignalReceiver}, }; use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc}; use async_trait::async_trait; @@ -21,13 +21,7 @@ use tracing::{error, trace, warn}; #[derive(Debug)] pub struct DerivationPipeline where - S: NextAttributes - + ResettableStage - + FlushableStage - + OriginProvider - + OriginAdvancer - + Debug - + Send, + S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, { /// A handle to the next attributes. @@ -44,13 +38,7 @@ where impl DerivationPipeline where - S: NextAttributes - + ResettableStage - + FlushableStage - + OriginProvider - + OriginAdvancer - + Debug - + Send, + S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, { /// Creates a new instance of the [DerivationPipeline]. @@ -65,13 +53,7 @@ where impl OriginProvider for DerivationPipeline where - S: NextAttributes - + ResettableStage - + FlushableStage - + OriginProvider - + OriginAdvancer - + Debug - + Send, + S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send, P: L2ChainProvider + Send + Sync + Debug, { fn origin(&self) -> Option { @@ -81,14 +63,7 @@ where impl Iterator for DerivationPipeline where - S: NextAttributes - + ResettableStage - + FlushableStage - + OriginProvider - + OriginAdvancer - + Debug - + Send - + Sync, + S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, { type Item = OpAttributesWithParent; @@ -99,40 +74,28 @@ where } #[async_trait] -impl Pipeline for DerivationPipeline +impl SignalReceiver for DerivationPipeline where - S: NextAttributes - + ResettableStage - + FlushableStage - + OriginProvider - + OriginAdvancer - + Debug - + Send - + Sync, + S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, { - /// Peeks at the next prepared [OpAttributesWithParent] from the pipeline. - fn peek(&self) -> Option<&OpAttributesWithParent> { - self.prepared.front() - } - - /// Resets the pipeline by calling the [`ResettableStage::reset`] method. + /// Signals the pipeline by calling the [`SignalReceiver::signal`] method. /// - /// During a reset, each stage is recursively called from the top-level + /// During a [`Signal::Reset`], each stage is recursively called from the top-level /// [crate::stages::AttributesQueue] to the bottom [crate::stages::L1Traversal] /// with a head-recursion pattern. This effectively clears the internal state /// of each stage in the pipeline from bottom on up. /// - /// ### Parameters + /// [`Signal::Activation`] does a similar thing to the reset, with different + /// holocene-specific reset rules. /// - /// The `l2_block_info` is the new L2 cursor to step on. It is needed during - /// reset to fetch the system config at that block height. + /// ### Parameters /// - /// The `l1_block_info` is the new L1 origin set in the [crate::stages::L1Traversal] - /// stage. + /// The `signal` is contains the signal variant with any necessary parameters. async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { match signal { - Signal::Reset { l2_safe_head, l1_origin } => { + s @ Signal::Reset(ResetSignal { l2_safe_head, .. }) | + s @ Signal::Activation(ActivationSignal { l2_safe_head, .. }) => { let system_config = self .l2_chain_provider .system_config_by_number( @@ -141,7 +104,8 @@ where ) .await .map_err(|e| PipelineError::Provider(e.to_string()).temp())?; - match self.attributes.reset(l1_origin, &system_config).await { + s.with_system_config(system_config); + match self.attributes.signal(s).await { Ok(()) => trace!(target: "pipeline", "Stages reset"), Err(err) => { if let PipelineErrorKind::Temporary(PipelineError::Eof) = err { @@ -154,11 +118,23 @@ where } } Signal::FlushChannel => { - self.attributes.flush_channel().await?; + self.attributes.signal(signal).await?; } } Ok(()) } +} + +#[async_trait] +impl Pipeline for DerivationPipeline +where + S: NextAttributes + SignalReceiver + OriginProvider + OriginAdvancer + Debug + Send + Sync, + P: L2ChainProvider + Send + Sync + Debug, +{ + /// Peeks at the next prepared [OpAttributesWithParent] from the pipeline. + fn peek(&self) -> Option<&OpAttributesWithParent> { + self.prepared.front() + } /// Attempts to progress the pipeline. /// @@ -201,13 +177,13 @@ mod tests { use crate::{ pipeline::{DerivationPipeline, PipelineError, StepResult}, test_utils::*, - traits::{Pipeline, Signal}, + traits::{ActivationSignal, Pipeline, ResetSignal, Signal, SignalReceiver}, }; use alloc::sync::Arc; use alloy_rpc_types_engine::PayloadAttributes; use kona_providers::test_utils::TestL2ChainProvider; use op_alloy_genesis::{RollupConfig, SystemConfig}; - use op_alloy_protocol::{BlockInfo, L2BlockInfo}; + use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::{OpAttributesWithParent, OpPayloadAttributes}; fn default_test_payload_attributes() -> OpAttributesWithParent { @@ -290,6 +266,31 @@ mod tests { assert_eq!(result, StepResult::AdvancedOrigin); } + #[tokio::test] + async fn test_derivation_pipeline_signal_activation() { + let rollup_config = Arc::new(RollupConfig::default()); + let mut l2_chain_provider = TestL2ChainProvider::default(); + l2_chain_provider.system_configs.insert(0, SystemConfig::default()); + let attributes = TestNextAttributes::default(); + let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + + // Signal the pipeline to reset. + let result = pipeline.signal(ActivationSignal::default().signal()).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_derivation_pipeline_flush_channel() { + let rollup_config = Arc::new(RollupConfig::default()); + let l2_chain_provider = TestL2ChainProvider::default(); + let attributes = TestNextAttributes::default(); + let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); + + // Signal the pipeline to reset. + let result = pipeline.signal(Signal::FlushChannel).await; + assert!(result.is_ok()); + } + #[tokio::test] async fn test_derivation_pipeline_signal_reset_missing_sys_config() { let rollup_config = Arc::new(RollupConfig::default()); @@ -298,9 +299,7 @@ mod tests { let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); // Signal the pipeline to reset. - let l2_safe_head = L2BlockInfo::default(); - let l1_origin = BlockInfo::default(); - let result = pipeline.signal(Signal::Reset { l2_safe_head, l1_origin }).await.unwrap_err(); + let result = pipeline.signal(ResetSignal::default().signal()).await.unwrap_err(); assert_eq!(result, PipelineError::Provider("System config not found".to_string()).temp()); } @@ -313,9 +312,7 @@ mod tests { let mut pipeline = DerivationPipeline::new(attributes, rollup_config, l2_chain_provider); // Signal the pipeline to reset. - let l2_safe_head = L2BlockInfo::default(); - let l1_origin = BlockInfo::default(); - let result = pipeline.signal(Signal::Reset { l2_safe_head, l1_origin }).await; + let result = pipeline.signal(ResetSignal::default().signal()).await; assert!(result.is_ok()); } } diff --git a/crates/derive/src/pipeline/mod.rs b/crates/derive/src/pipeline/mod.rs index 67232aa77..578461bdf 100644 --- a/crates/derive/src/pipeline/mod.rs +++ b/crates/derive/src/pipeline/mod.rs @@ -3,7 +3,7 @@ /// Re-export trait arguments. pub use crate::traits::{ AttributesBuilder, DataAvailabilityProvider, NextAttributes, OriginAdvancer, OriginProvider, - Pipeline, ResetProvider, ResettableStage, StepResult, + Pipeline, ResetProvider, Signal, SignalReceiver, StepResult, }; /// Re-export kona provider traits. diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 6e0e67619..0e4c94654 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -4,14 +4,13 @@ use crate::{ batch::SingleBatch, errors::{PipelineError, PipelineResult, ResetError}, traits::{ - AttributesBuilder, FlushableStage, NextAttributes, OriginAdvancer, OriginProvider, - ResettableStage, + AttributesBuilder, NextAttributes, OriginAdvancer, OriginProvider, Signal, SignalReceiver, }, }; use alloc::{boxed::Box, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::{OpAttributesWithParent, OpPayloadAttributes}; use tracing::info; @@ -44,7 +43,7 @@ pub trait AttributesProvider { #[derive(Debug)] pub struct AttributesQueue where - P: AttributesProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. @@ -61,7 +60,7 @@ where impl AttributesQueue where - P: AttributesProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. @@ -155,7 +154,7 @@ where #[async_trait] impl OriginAdvancer for AttributesQueue where - P: AttributesProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, + P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send, AB: AttributesBuilder + Debug + Send, { async fn advance_origin(&mut self) -> PipelineResult<()> { @@ -166,7 +165,7 @@ where #[async_trait] impl NextAttributes for AttributesQueue where - P: AttributesProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, + P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send, AB: AttributesBuilder + Debug + Send, { async fn next_attributes( @@ -179,7 +178,7 @@ where impl OriginProvider for AttributesQueue where - P: AttributesProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, AB: AttributesBuilder + Debug, { fn origin(&self) -> Option { @@ -188,48 +187,35 @@ where } #[async_trait] -impl ResettableStage for AttributesQueue +impl SignalReceiver for AttributesQueue where - P: AttributesProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, AB: AttributesBuilder + Send + Debug, { - async fn reset( - &mut self, - block_info: BlockInfo, - system_config: &SystemConfig, - ) -> PipelineResult<()> { - self.prev.reset(block_info, system_config).await?; - self.batch = None; - self.is_last_in_span = false; - crate::inc!(STAGE_RESETS, &["attributes-queue"]); + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + s @ Signal::Reset(_) | s @ Signal::Activation(_) => { + self.prev.signal(s).await?; + self.batch = None; + self.is_last_in_span = false; + crate::inc!(STAGE_RESETS, &["attributes-queue"]); + } + s @ Signal::FlushChannel => { + self.batch = None; + self.prev.signal(s).await?; + } + } Ok(()) } } -#[async_trait] -impl FlushableStage for AttributesQueue -where - P: AttributesProvider - + OriginAdvancer - + OriginProvider - + ResettableStage - + FlushableStage - + Debug - + Send, - AB: AttributesBuilder + Debug + Send, -{ - async fn flush_channel(&mut self) -> PipelineResult<()> { - self.batch = None; - self.prev.flush_channel().await - } -} - #[cfg(test)] mod tests { use super::*; use crate::{ errors::{BuilderError, PipelineErrorKind}, test_utils::{new_test_attributes_provider, TestAttributesBuilder, TestAttributesProvider}, + traits::ResetSignal, }; use alloc::{sync::Arc, vec, vec::Vec}; use alloy_primitives::{b256, Address, Bytes, B256}; @@ -267,7 +253,7 @@ mod tests { let mut attributes_queue = new_attributes_queue(None, None, vec![]); attributes_queue.batch = Some(SingleBatch::default()); assert!(!attributes_queue.prev.flushed); - attributes_queue.flush_channel().await.unwrap(); + attributes_queue.signal(Signal::FlushChannel).await.unwrap(); assert!(attributes_queue.prev.flushed); assert!(attributes_queue.batch.is_none()); } @@ -280,9 +266,7 @@ mod tests { let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); aq.batch = Some(SingleBatch::default()); assert!(!aq.prev.reset); - let block_info = BlockInfo::default(); - let system_config = SystemConfig::default(); - aq.reset(block_info, &system_config).await.unwrap(); + aq.signal(ResetSignal::default().signal()).await.unwrap(); assert!(aq.batch.is_none()); assert!(aq.prev.reset); } diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 158bdfc43..8b1baf3cd 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -4,13 +4,13 @@ use crate::{ batch::{Batch, BatchValidity, BatchWithInclusionBlock, SingleBatch}, errors::{PipelineEncodingError, PipelineError, PipelineErrorKind, PipelineResult, ResetError}, stages::attributes_queue::AttributesProvider, - traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, ResetSignal, Signal, SignalReceiver}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::fmt::Debug; use kona_providers::L2ChainProvider; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use tracing::{error, info, warn}; @@ -51,7 +51,7 @@ pub trait BatchQueueProvider { #[derive(Debug)] pub struct BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { /// The rollup config. @@ -79,7 +79,7 @@ where impl BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. @@ -282,7 +282,7 @@ where #[async_trait] impl OriginAdvancer for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { @@ -293,7 +293,7 @@ where #[async_trait] impl AttributesProvider for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { /// Returns the next valid batch upon the given safe head. @@ -450,7 +450,7 @@ where impl OriginProvider for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option { @@ -459,45 +459,35 @@ where } #[async_trait] -impl ResettableStage for BatchQueue +impl SignalReceiver for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { - async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> PipelineResult<()> { - self.prev.reset(base, system_config).await?; - self.origin = Some(base); - self.batches.clear(); - // Include the new origin as an origin to build on. - // This is only for the initialization case. - // During normal resets we will later throw out this block. - self.l1_blocks.clear(); - self.l1_blocks.push(base); - self.next_spans.clear(); - crate::inc!(STAGE_RESETS, &["batch-queue"]); + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + s @ Signal::Reset(ResetSignal { l1_origin, .. }) => { + self.prev.signal(s).await?; + self.origin = Some(l1_origin); + self.batches.clear(); + // Include the new origin as an origin to build on. + // This is only for the initialization case. + // During normal resets we will later throw out this block. + self.l1_blocks.clear(); + self.l1_blocks.push(l1_origin); + self.next_spans.clear(); + crate::inc!(STAGE_RESETS, &["batch-queue"]); + } + s @ Signal::Activation(_) | s @ Signal::FlushChannel => { + self.prev.signal(s).await?; + self.batches.clear(); + self.next_spans.clear(); + } + } Ok(()) } } -#[async_trait] -impl FlushableStage for BatchQueue -where - P: BatchQueueProvider - + OriginAdvancer - + OriginProvider - + ResettableStage - + FlushableStage - + Send - + Debug, - BF: L2ChainProvider + Send + Debug, -{ - async fn flush_channel(&mut self) -> PipelineResult<()> { - self.batches.clear(); - self.next_spans.clear(); - self.prev.flush_channel().await - } -} - #[cfg(test)] mod tests { use super::*; @@ -553,13 +543,11 @@ mod tests { batch: Batch::Single(SingleBatch::default()), }); assert!(!bq.prev.reset); - let base = BlockInfo::default(); - let system_config = SystemConfig::default(); - bq.reset(base, &system_config).await.unwrap(); + bq.signal(ResetSignal::default().signal()).await.unwrap(); assert!(bq.prev.reset); - assert_eq!(bq.origin, Some(base)); + assert_eq!(bq.origin, Some(BlockInfo::default())); assert!(bq.batches.is_empty()); - assert_eq!(bq.l1_blocks, vec![base]); + assert_eq!(bq.l1_blocks, vec![BlockInfo::default()]); assert!(bq.next_spans.is_empty()); } @@ -575,7 +563,7 @@ mod tests { inclusion_block: BlockInfo::default(), batch: Batch::Single(SingleBatch::default()), }); - bq.flush_channel().await.unwrap(); + bq.signal(Signal::FlushChannel).await.unwrap(); assert!(bq.prev.flushed); assert!(bq.batches.is_empty()); assert!(!bq.l1_blocks.is_empty()); diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index a93b6c003..6d7bcca05 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -5,12 +5,12 @@ use crate::{ errors::{PipelineEncodingError, PipelineError, PipelineResult}, pipeline::L2ChainProvider, stages::BatchQueueProvider, - traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use tracing::{error, trace}; @@ -36,7 +36,7 @@ pub trait BatchStreamProvider { #[derive(Debug)] pub struct BatchStream where - P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { /// The previous stage in the derivation pipeline. @@ -54,7 +54,7 @@ where impl BatchStream where - P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { /// Create a new [BatchStream] stage. @@ -102,7 +102,7 @@ where #[async_trait] impl BatchQueueProvider for BatchStream where - P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { fn flush(&mut self) { @@ -181,7 +181,7 @@ where #[async_trait] impl OriginAdvancer for BatchStream where - P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { @@ -191,7 +191,7 @@ where impl OriginProvider for BatchStream where - P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option { @@ -200,13 +200,13 @@ where } #[async_trait] -impl ResettableStage for BatchStream +impl SignalReceiver for BatchStream where - P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send, BF: L2ChainProvider + Send + Debug, { - async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { - self.prev.reset(base, cfg).await?; + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.prev.signal(signal).await?; self.buffer.clear(); self.span.take(); crate::inc!(STAGE_RESETS, &["batch-span"]); @@ -214,31 +214,13 @@ where } } -#[async_trait] -impl FlushableStage for BatchStream -where - P: BatchStreamProvider - + OriginAdvancer - + OriginProvider - + ResettableStage - + FlushableStage - + Debug - + Send, - BF: L2ChainProvider + Debug + Send, -{ - async fn flush_channel(&mut self) -> PipelineResult<()> { - self.span.take(); - self.buffer.clear(); - self.prev.flush_channel().await - } -} - #[cfg(test)] mod test { use super::*; use crate::{ batch::{SingleBatch, SpanBatchElement}, test_utils::{CollectingLayer, TestBatchStreamProvider, TraceStorage}, + traits::ResetSignal, }; use kona_providers::test_utils::TestL2ChainProvider; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -265,7 +247,7 @@ mod test { stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.prev.reset); - stream.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); + stream.signal(ResetSignal::default().signal()).await.unwrap(); assert!(stream.prev.reset); assert!(stream.buffer.is_empty()); assert!(stream.span.is_none()); @@ -279,7 +261,7 @@ mod test { stream.buffer.push_back(SingleBatch::default()); stream.span = Some(SpanBatch::default()); assert!(!stream.prev.flushed); - stream.flush_channel().await.unwrap(); + stream.signal(Signal::FlushChannel).await.unwrap(); assert!(stream.prev.flushed); assert!(stream.buffer.is_empty()); assert!(stream.span.is_none()); diff --git a/crates/derive/src/stages/channel/channel_assembler.rs b/crates/derive/src/stages/channel/channel_assembler.rs index d015743b0..a4564ef60 100644 --- a/crates/derive/src/stages/channel/channel_assembler.rs +++ b/crates/derive/src/stages/channel/channel_assembler.rs @@ -2,14 +2,14 @@ use super::{ChannelReaderProvider, NextFrameProvider}; use crate::{ - pipeline::{OriginAdvancer, PipelineResult, ResettableStage}, + pipeline::{OriginAdvancer, PipelineResult, Signal, SignalReceiver}, prelude::{OriginProvider, PipelineError}, }; use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, Channel}; /// The [ChannelAssembler] stage is responsible for assembling the [Frame]s from the [FrameQueue] @@ -21,7 +21,7 @@ use op_alloy_protocol::{BlockInfo, Channel}; #[derive(Debug)] pub struct ChannelAssembler

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// The rollup configuration. pub(crate) cfg: Arc, @@ -33,7 +33,7 @@ where impl

ChannelAssembler

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Creates a new [ChannelAssembler] stage with the given configuration and previous stage. pub fn new(cfg: Arc, prev: P) -> Self { @@ -64,7 +64,7 @@ where #[async_trait] impl

ChannelReaderProvider for ChannelAssembler

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn next_data(&mut self) -> PipelineResult> { let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; @@ -113,7 +113,7 @@ where #[async_trait] impl

OriginAdvancer for ChannelAssembler

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await @@ -122,7 +122,7 @@ where impl

OriginProvider for ChannelAssembler

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { fn origin(&self) -> Option { self.prev.origin() @@ -130,16 +130,12 @@ where } #[async_trait] -impl

ResettableStage for ChannelAssembler

+impl

SignalReceiver for ChannelAssembler

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { - async fn reset( - &mut self, - block_info: BlockInfo, - system_config: &SystemConfig, - ) -> PipelineResult<()> { - self.prev.reset(block_info, system_config).await?; + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.prev.signal(signal).await?; self.channel = None; crate::inc!(STAGE_RESETS, &["channel-assembly"]); Ok(()) diff --git a/crates/derive/src/stages/channel/channel_bank.rs b/crates/derive/src/stages/channel/channel_bank.rs index 8606985dd..f2f4422fe 100644 --- a/crates/derive/src/stages/channel/channel_bank.rs +++ b/crates/derive/src/stages/channel/channel_bank.rs @@ -4,13 +4,13 @@ use super::NextFrameProvider; use crate::{ errors::{PipelineError, PipelineErrorKind, PipelineResult}, stages::ChannelReaderProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use alloy_primitives::{hex, map::HashMap, Bytes}; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, Channel, ChannelId, Frame}; use tracing::{trace, warn}; @@ -34,7 +34,7 @@ pub(crate) const FJORD_MAX_CHANNEL_BANK_SIZE: usize = 1_000_000_000; #[derive(Debug)] pub struct ChannelBank

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// The rollup configuration. pub(crate) cfg: Arc, @@ -48,7 +48,7 @@ where impl

ChannelBank

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { @@ -210,7 +210,7 @@ where #[async_trait] impl

OriginAdvancer for ChannelBank

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await @@ -220,7 +220,7 @@ where #[async_trait] impl

ChannelReaderProvider for ChannelBank

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn next_data(&mut self) -> PipelineResult> { crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_bank"], timer); @@ -251,7 +251,7 @@ where impl

OriginProvider for ChannelBank

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { fn origin(&self) -> Option { self.prev.origin() @@ -259,16 +259,12 @@ where } #[async_trait] -impl

ResettableStage for ChannelBank

+impl

SignalReceiver for ChannelBank

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { - async fn reset( - &mut self, - block_info: BlockInfo, - system_config: &SystemConfig, - ) -> PipelineResult<()> { - self.prev.reset(block_info, system_config).await?; + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.prev.signal(signal).await?; self.channels.clear(); self.channel_queue = VecDeque::with_capacity(10); crate::inc!(STAGE_RESETS, &["channel-bank"]); @@ -282,6 +278,7 @@ mod tests { use crate::{ stages::frame_queue::tests::new_test_frames, test_utils::{CollectingLayer, TestNextFrameProvider, TraceStorage}, + traits::ResetSignal, }; use alloc::vec; use op_alloy_genesis::{BASE_MAINNET_CONFIG, OP_MAINNET_CONFIG}; @@ -457,10 +454,8 @@ mod tests { let mut channel_bank = ChannelBank::new(cfg, mock); channel_bank.channels.insert([0xFF; 16], Channel::default()); channel_bank.channel_queue.push_back([0xFF; 16]); - let block_info = BlockInfo::default(); - let system_config = SystemConfig::default(); assert!(!channel_bank.prev.reset); - channel_bank.reset(block_info, &system_config).await.unwrap(); + channel_bank.signal(ResetSignal::default().signal()).await.unwrap(); assert_eq!(channel_bank.channels.len(), 0); assert_eq!(channel_bank.channel_queue.len(), 0); assert!(channel_bank.prev.reset); diff --git a/crates/derive/src/stages/channel/channel_provider.rs b/crates/derive/src/stages/channel/channel_provider.rs index 47a6e844e..c51f68d4f 100644 --- a/crates/derive/src/stages/channel/channel_provider.rs +++ b/crates/derive/src/stages/channel/channel_provider.rs @@ -16,7 +16,7 @@ multiplexed_stage!( #[async_trait] impl

ChannelReaderProvider for ChannelProvider

where - P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn next_data(&mut self) -> PipelineResult> { match self.active_stage_mut()? { @@ -30,13 +30,13 @@ where mod test { use super::{ActiveStage, ChannelProvider}; use crate::{ - pipeline::ResettableStage, prelude::{OriginProvider, PipelineError}, stages::{frame_queue::tests::new_test_frames, ChannelReaderProvider}, test_utils::TestNextFrameProvider, + traits::{ResetSignal, SignalReceiver}, }; use alloc::sync::Arc; - use op_alloy_genesis::{RollupConfig, SystemConfig}; + use op_alloy_genesis::RollupConfig; use op_alloy_protocol::BlockInfo; #[test] @@ -170,7 +170,7 @@ mod test { assert!(channel_bank.channel_queue.len() == 1); // Reset the channel provider. - channel_provider.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); + channel_provider.signal(ResetSignal::default().signal()).await.unwrap(); // Ensure the channel queue is empty after reset. let Ok(ActiveStage::ChannelBank(channel_bank)) = channel_provider.active_stage_mut() else { @@ -200,7 +200,7 @@ mod test { assert!(channel_assembler.channel.is_some()); // Reset the channel provider. - channel_provider.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); + channel_provider.signal(ResetSignal::default().signal()).await.unwrap(); // Ensure the channel assembler is empty after reset. let Ok(ActiveStage::ChannelAssembler(channel_assembler)) = diff --git a/crates/derive/src/stages/channel/channel_reader.rs b/crates/derive/src/stages/channel/channel_reader.rs index 50717b069..49cf630c7 100644 --- a/crates/derive/src/stages/channel/channel_reader.rs +++ b/crates/derive/src/stages/channel/channel_reader.rs @@ -4,7 +4,7 @@ use crate::{ batch::Batch, errors::{PipelineError, PipelineResult}, stages::{decompress_brotli, BatchStreamProvider}, - traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; use alloy_primitives::Bytes; @@ -13,7 +13,7 @@ use async_trait::async_trait; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec_zlib; use op_alloy_genesis::{ - RollupConfig, SystemConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD, + RollupConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD, }; use op_alloy_protocol::BlockInfo; use tracing::{debug, error, warn}; @@ -49,7 +49,7 @@ pub trait ChannelReaderProvider { #[derive(Debug)] pub struct ChannelReader

where - P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// The previous stage of the derivation pipeline. prev: P, @@ -61,7 +61,7 @@ where impl

ChannelReader

where - P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { @@ -98,7 +98,7 @@ where #[async_trait] impl

OriginAdvancer for ChannelReader

where - P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await @@ -108,7 +108,7 @@ where #[async_trait] impl

BatchStreamProvider for ChannelReader

where - P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { /// This method is called by the BatchStream if an invalid span batch is found. /// In the case of an invalid span batch, the associated channel must be flushed. @@ -148,7 +148,7 @@ where impl

OriginProvider for ChannelReader

where - P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { fn origin(&self) -> Option { self.prev.origin() @@ -156,27 +156,23 @@ where } #[async_trait] -impl

ResettableStage for ChannelReader

+impl

SignalReceiver for ChannelReader

where - P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, + P: ChannelReaderProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send, { - async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { - self.prev.reset(base, cfg).await?; - self.next_channel(); - crate::inc!(STAGE_RESETS, &["channel-reader"]); - Ok(()) - } -} - -#[async_trait] -impl

FlushableStage for ChannelReader

-where - P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, -{ - async fn flush_channel(&mut self) -> PipelineResult<()> { - // Drop the current in-progress channel. - warn!(target: "channel-reader", "Flushed channel"); - self.next_batch = None; + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + Signal::FlushChannel => { + // Drop the current in-progress channel. + warn!(target: "channel-reader", "Flushed channel"); + self.next_batch = None; + } + s => { + self.prev.signal(s).await?; + self.next_channel(); + crate::inc!(STAGE_RESETS, &["channel-reader"]); + } + } Ok(()) } } @@ -278,7 +274,9 @@ impl BatchReader { #[cfg(test)] mod test { use super::*; - use crate::{errors::PipelineErrorKind, test_utils::TestChannelReaderProvider}; + use crate::{ + errors::PipelineErrorKind, test_utils::TestChannelReaderProvider, traits::ResetSignal, + }; use alloc::vec; fn new_compressed_batch_data() -> Bytes { @@ -297,7 +295,7 @@ mod test { new_compressed_batch_data(), MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, )); - reader.flush_channel().await.unwrap(); + reader.signal(Signal::FlushChannel).await.unwrap(); assert!(reader.next_batch.is_none()); } @@ -310,7 +308,7 @@ mod test { MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, )); assert!(!reader.prev.reset); - reader.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); + reader.signal(ResetSignal::default().signal()).await.unwrap(); assert!(reader.next_batch.is_none()); assert!(reader.prev.reset); } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 3a66f63dd..23b358c73 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -3,13 +3,13 @@ use crate::{ errors::{PipelineError, PipelineResult}, stages::NextFrameProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, Frame}; use tracing::{debug, error, trace}; @@ -32,7 +32,7 @@ pub trait FrameQueueProvider { #[derive(Debug)] pub struct FrameQueue

where - P: FrameQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// The previous stage in the pipeline. pub prev: P, @@ -44,7 +44,7 @@ where impl

FrameQueue

where - P: FrameQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// @@ -148,7 +148,7 @@ where #[async_trait] impl

OriginAdvancer for FrameQueue

where - P: FrameQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await @@ -158,7 +158,7 @@ where #[async_trait] impl

NextFrameProvider for FrameQueue

where - P: FrameQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn next_frame(&mut self) -> PipelineResult { self.load_frames().await?; @@ -175,7 +175,7 @@ where impl

OriginProvider for FrameQueue

where - P: FrameQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { fn origin(&self) -> Option { self.prev.origin() @@ -183,16 +183,12 @@ where } #[async_trait] -impl

ResettableStage for FrameQueue

+impl

SignalReceiver for FrameQueue

where - P: FrameQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { - async fn reset( - &mut self, - block_info: BlockInfo, - system_config: &SystemConfig, - ) -> PipelineResult<()> { - self.prev.reset(block_info, system_config).await?; + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.prev.signal(signal).await?; self.queue = VecDeque::default(); crate::inc!(STAGE_RESETS, &["frame-queue"]); Ok(()) @@ -202,7 +198,7 @@ where #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::test_utils::TestFrameQueueProvider; + use crate::{test_utils::TestFrameQueueProvider, traits::ResetSignal}; use alloc::{vec, vec::Vec}; use op_alloy_protocol::DERIVATION_VERSION_0; @@ -236,7 +232,7 @@ pub(crate) mod tests { let mock = TestFrameQueueProvider::new(vec![]); let mut frame_queue = FrameQueue::new(mock, Default::default()); assert!(!frame_queue.prev.reset); - frame_queue.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); + frame_queue.signal(ResetSignal::default().signal()).await.unwrap(); assert_eq!(frame_queue.queue.len(), 0); assert!(frame_queue.prev.reset); } diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 9148990ae..b909b2b95 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -4,13 +4,13 @@ use crate::{ errors::{PipelineError, PipelineErrorKind, PipelineResult}, stages::FrameQueueProvider, traits::{ - AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, ResettableStage, + ActivationSignal, AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, + ResetSignal, Signal, SignalReceiver, }, }; use alloc::boxed::Box; use alloy_primitives::Address; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::BlockInfo; /// Provides L1 blocks for the [L1Retrieval] stage. @@ -41,7 +41,7 @@ pub trait L1RetrievalProvider { pub struct L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginAdvancer + OriginProvider + ResettableStage, + P: L1RetrievalProvider + OriginAdvancer + OriginProvider + SignalReceiver, { /// The previous stage in the pipeline. pub prev: P, @@ -54,7 +54,7 @@ where impl L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginAdvancer + OriginProvider + ResettableStage, + P: L1RetrievalProvider + OriginAdvancer + OriginProvider + SignalReceiver, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] stage and given /// [DataAvailabilityProvider]. @@ -70,7 +70,7 @@ where impl OriginAdvancer for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginAdvancer + OriginProvider + ResettableStage + Send, + P: L1RetrievalProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await @@ -81,7 +81,7 @@ where impl FrameQueueProvider for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginAdvancer + OriginProvider + ResettableStage + Send, + P: L1RetrievalProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send, { type Item = DAP::Item; @@ -110,7 +110,7 @@ where impl OriginProvider for L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginAdvancer + OriginProvider + ResettableStage, + P: L1RetrievalProvider + OriginAdvancer + OriginProvider + SignalReceiver, { fn origin(&self) -> Option { self.prev.origin() @@ -118,15 +118,21 @@ where } #[async_trait] -impl ResettableStage for L1Retrieval +impl SignalReceiver for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginAdvancer + OriginProvider + ResettableStage + Send, + P: L1RetrievalProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send, { - async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { - self.prev.reset(base, cfg).await?; - self.data = Some(self.provider.open_data(&base).await?); - crate::inc!(STAGE_RESETS, &["l1-retrieval"]); + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.prev.signal(signal).await?; + match signal { + Signal::Reset(ResetSignal { l1_origin, .. }) | + Signal::Activation(ActivationSignal { l1_origin, .. }) => { + self.data = Some(self.provider.open_data(&l1_origin).await?); + crate::inc!(STAGE_RESETS, &["l1-retrieval"]); + } + _ => {} + } Ok(()) } } @@ -142,14 +148,52 @@ mod tests { use alloy_primitives::Bytes; #[tokio::test] - async fn test_l1_retrieval_reset() { + async fn test_l1_retrieval_flush_channel() { + let traversal = new_populated_test_traversal(); + let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; + let mut retrieval = L1Retrieval::new(traversal, dap); + retrieval.prev.block = None; + assert!(retrieval.prev.block.is_none()); + retrieval.data = None; + retrieval.signal(Signal::FlushChannel).await.unwrap(); + assert!(retrieval.data.is_none()); + assert!(retrieval.prev.block.is_none()); + } + + #[tokio::test] + async fn test_l1_retrieval_activation_signal() { + let traversal = new_populated_test_traversal(); + let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; + let mut retrieval = L1Retrieval::new(traversal, dap); + retrieval.prev.block = None; + assert!(retrieval.prev.block.is_none()); + retrieval.data = None; + retrieval + .signal( + ActivationSignal { system_config: Some(Default::default()), ..Default::default() } + .signal(), + ) + .await + .unwrap(); + assert!(retrieval.data.is_some()); + assert_eq!(retrieval.prev.block, Some(BlockInfo::default())); + } + + #[tokio::test] + async fn test_l1_retrieval_reset_signal() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; let mut retrieval = L1Retrieval::new(traversal, dap); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); retrieval.data = None; - retrieval.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); + retrieval + .signal( + ResetSignal { system_config: Some(Default::default()), ..Default::default() } + .signal(), + ) + .await + .unwrap(); assert!(retrieval.data.is_some()); assert_eq!(retrieval.prev.block, Some(BlockInfo::default())); } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index dfe2014ee..ffff372a6 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -3,7 +3,9 @@ use crate::{ errors::{PipelineError, PipelineResult, ResetError}, stages::L1RetrievalProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{ + ActivationSignal, OriginAdvancer, OriginProvider, ResetSignal, Signal, SignalReceiver, + }, }; use alloc::{boxed::Box, string::ToString, sync::Arc}; use alloy_primitives::Address; @@ -129,12 +131,19 @@ impl OriginProvider for L1Traversal { } #[async_trait] -impl ResettableStage for L1Traversal { - async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { - self.block = Some(base); - self.done = false; - self.system_config = *cfg; - crate::inc!(STAGE_RESETS, &["l1-traversal"]); +impl SignalReceiver for L1Traversal { + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + Signal::Reset(ResetSignal { l1_origin, system_config, .. }) | + Signal::Activation(ActivationSignal { l1_origin, system_config, .. }) => { + self.block = Some(l1_origin); + self.done = false; + self.system_config = system_config.expect("System config must be provided."); + crate::inc!(STAGE_RESETS, &["l1-traversal"]); + } + _ => {} + } + Ok(()) } } @@ -211,16 +220,47 @@ pub(crate) mod tests { } #[tokio::test] - async fn test_l1_traversal_reset() { + async fn test_l1_traversal_flush_channel() { + let blocks = vec![BlockInfo::default(), BlockInfo::default()]; + let receipts = new_receipts(); + let mut traversal = new_test_traversal(blocks, receipts); + assert!(traversal.advance_origin().await.is_ok()); + traversal.done = true; + assert!(traversal.signal(Signal::FlushChannel).await.is_ok()); + assert_eq!(traversal.origin(), Some(BlockInfo::default())); + assert!(traversal.done); + } + + #[tokio::test] + async fn test_l1_traversal_activation_signal() { + let blocks = vec![BlockInfo::default(), BlockInfo::default()]; + let receipts = new_receipts(); + let mut traversal = new_test_traversal(blocks, receipts); + assert!(traversal.advance_origin().await.is_ok()); + let cfg = SystemConfig::default(); + traversal.done = true; + assert!(traversal + .signal(ActivationSignal { system_config: Some(cfg), ..Default::default() }.signal()) + .await + .is_ok()); + assert_eq!(traversal.origin(), Some(BlockInfo::default())); + assert_eq!(traversal.system_config, cfg); + assert!(!traversal.done); + } + + #[tokio::test] + async fn test_l1_traversal_reset_signal() { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); assert!(traversal.advance_origin().await.is_ok()); - let base = BlockInfo::default(); let cfg = SystemConfig::default(); traversal.done = true; - assert!(traversal.reset(base, &cfg).await.is_ok()); - assert_eq!(traversal.origin(), Some(base)); + assert!(traversal + .signal(ResetSignal { system_config: Some(cfg), ..Default::default() }.signal()) + .await + .is_ok()); + assert_eq!(traversal.origin(), Some(BlockInfo::default())); assert_eq!(traversal.system_config, cfg); assert!(!traversal.done); } diff --git a/crates/derive/src/stages/multiplexed.rs b/crates/derive/src/stages/multiplexed.rs index a86c0523f..64527a7bf 100644 --- a/crates/derive/src/stages/multiplexed.rs +++ b/crates/derive/src/stages/multiplexed.rs @@ -14,7 +14,7 @@ pub enum MultiplexerError { /// depending on the active hardfork. /// /// By default, the stage struct generated by this macro: -/// - Implements [OriginAdvancer], [OriginProvider], and [ResettableStage]. +/// - Implements [OriginAdvancer], [OriginProvider], and [SignalReceiver]. /// - Contains an enum that represents the active stage, in the `stages` key. /// - Activates stages based on the conditions provided in the `stages` key. /// @@ -22,7 +22,7 @@ pub enum MultiplexerError { /// stage is dissolved and the ownership of the previous stage is transferred to the new stage. /// /// Stage requirements: -/// - The previous stage must implement [OriginAdvancer], [OriginProvider], [ResettableStage], and +/// - The previous stage must implement [OriginAdvancer], [OriginProvider], [SignalReceiver], and /// [Debug]. /// - The stages must implement an `into_prev` method that returns the owned previous stage. /// @@ -61,7 +61,7 @@ pub enum MultiplexerError { /// /// [OriginAdvancer]: crate::pipeline::OriginAdvancer /// [OriginProvider]: crate::pipeline::OriginProvider -/// [ResettableStage]: crate::pipeline::ResettableStage +/// [SignalReceiver]: crate::pipeline::SignalReceiver /// [Debug]: core::fmt::Debug macro_rules! multiplexed_stage { ( @@ -78,7 +78,7 @@ macro_rules! multiplexed_stage { default_stage: $last_stage_name:ident$(($($last_input_name:ident$(,)?)+))? ) => { use $crate::{ - pipeline::{OriginAdvancer, OriginProvider, ResettableStage, PipelineError, PipelineResult}, + pipeline::{OriginAdvancer, OriginProvider, SignalReceiver, Signal, PipelineError, PipelineResult}, stages::MultiplexerError }; use async_trait::async_trait; @@ -88,7 +88,7 @@ macro_rules! multiplexed_stage { #[derive(Debug)] enum ActiveStage where - P: $prev_type + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { $($stage_name($stage_name

,),)* $last_stage_name($last_stage_name

), @@ -96,7 +96,7 @@ macro_rules! multiplexed_stage { impl ActiveStage where - P: $prev_type + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Dissolves the active stage and returns the previous stage. pub(crate) fn into_prev(self) -> P { @@ -111,7 +111,7 @@ macro_rules! multiplexed_stage { #[derive(Debug)] pub struct $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// The rollup configuration. cfg: alloc::sync::Arc, @@ -137,7 +137,7 @@ macro_rules! multiplexed_stage { impl $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// Creates a new instance of the provider. pub const fn new(cfg: alloc::sync::Arc, prev: P$( $(, $field_name: $field_type)+ )?) -> Self { @@ -206,7 +206,7 @@ macro_rules! multiplexed_stage { #[async_trait] impl OriginAdvancer for $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { match self.active_stage_mut()? { @@ -218,7 +218,7 @@ macro_rules! multiplexed_stage { impl OriginProvider for $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { fn origin(&self) -> Option { match self.active_stage_ref() { @@ -234,18 +234,17 @@ macro_rules! multiplexed_stage { } #[async_trait] - impl ResettableStage for $provider_name + impl SignalReceiver for $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { - async fn reset( + async fn signal( &mut self, - block_info: op_alloy_protocol::BlockInfo, - system_config: &op_alloy_genesis::SystemConfig, + signal: Signal, ) -> PipelineResult<()> { match self.active_stage_mut()? { - $(ActiveStage::$stage_name(stage) => stage.reset(block_info, system_config).await,)* - ActiveStage::$last_stage_name(stage) => stage.reset(block_info, system_config).await, + $(ActiveStage::$stage_name(stage) => stage.signal(signal).await,)* + ActiveStage::$last_stage_name(stage) => stage.signal(signal).await, } } } diff --git a/crates/derive/src/test_utils/attributes_queue.rs b/crates/derive/src/test_utils/attributes_queue.rs index 0b6ec2ea1..d6e140b35 100644 --- a/crates/derive/src/test_utils/attributes_queue.rs +++ b/crates/derive/src/test_utils/attributes_queue.rs @@ -4,12 +4,11 @@ use crate::{ batch::SingleBatch, errors::{BuilderError, PipelineError, PipelineErrorKind, PipelineResult}, stages::AttributesProvider, - traits::{AttributesBuilder, FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, + traits::{AttributesBuilder, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, string::ToString, vec::Vec}; use alloy_eips::BlockNumHash; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpPayloadAttributes; @@ -65,17 +64,13 @@ impl OriginAdvancer for TestAttributesProvider { } #[async_trait] -impl FlushableStage for TestAttributesProvider { - async fn flush_channel(&mut self) -> PipelineResult<()> { - self.flushed = true; - Ok(()) - } -} - -#[async_trait] -impl ResettableStage for TestAttributesProvider { - async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { - self.reset = true; +impl SignalReceiver for TestAttributesProvider { + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + Signal::FlushChannel => self.flushed = true, + Signal::Reset { .. } => self.reset = true, + _ => {} + } Ok(()) } } diff --git a/crates/derive/src/test_utils/batch_queue.rs b/crates/derive/src/test_utils/batch_queue.rs index c5d96b5ee..e0319c3a4 100644 --- a/crates/derive/src/test_utils/batch_queue.rs +++ b/crates/derive/src/test_utils/batch_queue.rs @@ -4,11 +4,10 @@ use crate::{ batch::Batch, errors::{PipelineError, PipelineResult}, stages::BatchQueueProvider, - traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; /// A mock provider for the [BatchQueue] stage. @@ -56,17 +55,13 @@ impl OriginAdvancer for TestBatchQueueProvider { } #[async_trait] -impl FlushableStage for TestBatchQueueProvider { - async fn flush_channel(&mut self) -> PipelineResult<()> { - self.flushed = true; - Ok(()) - } -} - -#[async_trait] -impl ResettableStage for TestBatchQueueProvider { - async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { - self.reset = true; +impl SignalReceiver for TestBatchQueueProvider { + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + Signal::Reset { .. } => self.reset = true, + Signal::FlushChannel => self.flushed = true, + _ => {} + } Ok(()) } } diff --git a/crates/derive/src/test_utils/batch_stream.rs b/crates/derive/src/test_utils/batch_stream.rs index cc7b91aa9..4e49b8900 100644 --- a/crates/derive/src/test_utils/batch_stream.rs +++ b/crates/derive/src/test_utils/batch_stream.rs @@ -6,11 +6,10 @@ use crate::{ batch::Batch, errors::{PipelineError, PipelineResult}, stages::BatchStreamProvider, - traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::BlockInfo; /// A mock provider for the [`BatchStream`] stage. @@ -41,14 +40,6 @@ impl OriginProvider for TestBatchStreamProvider { } } -#[async_trait] -impl FlushableStage for TestBatchStreamProvider { - async fn flush_channel(&mut self) -> PipelineResult<()> { - self.flushed = true; - Ok(()) - } -} - #[async_trait] impl BatchStreamProvider for TestBatchStreamProvider { fn flush(&mut self) {} @@ -66,9 +57,13 @@ impl OriginAdvancer for TestBatchStreamProvider { } #[async_trait] -impl ResettableStage for TestBatchStreamProvider { - async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { - self.reset = true; +impl SignalReceiver for TestBatchStreamProvider { + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + Signal::Reset { .. } => self.reset = true, + Signal::FlushChannel => self.flushed = true, + _ => {} + } Ok(()) } } diff --git a/crates/derive/src/test_utils/channel_bank.rs b/crates/derive/src/test_utils/channel_bank.rs index b8ab05ee0..26d694989 100644 --- a/crates/derive/src/test_utils/channel_bank.rs +++ b/crates/derive/src/test_utils/channel_bank.rs @@ -5,11 +5,10 @@ use crate::{ errors::{PipelineError, PipelineResult}, stages::NextFrameProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::{BlockInfo, Frame}; /// A mock [NextFrameProvider] for testing the [ChannelBank] stage. @@ -57,8 +56,8 @@ impl NextFrameProvider for TestNextFrameProvider { } #[async_trait] -impl ResettableStage for TestNextFrameProvider { - async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { +impl SignalReceiver for TestNextFrameProvider { + async fn signal(&mut self, _: Signal) -> PipelineResult<()> { self.reset = true; Ok(()) } diff --git a/crates/derive/src/test_utils/channel_reader.rs b/crates/derive/src/test_utils/channel_reader.rs index e8e047e60..043e83296 100644 --- a/crates/derive/src/test_utils/channel_reader.rs +++ b/crates/derive/src/test_utils/channel_reader.rs @@ -5,12 +5,11 @@ use crate::{ errors::{PipelineError, PipelineResult}, stages::ChannelReaderProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::BlockInfo; /// A mock [ChannelReaderProvider] for testing the [ChannelReader] stage. @@ -54,8 +53,8 @@ impl ChannelReaderProvider for TestChannelReaderProvider { } #[async_trait] -impl ResettableStage for TestChannelReaderProvider { - async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { +impl SignalReceiver for TestChannelReaderProvider { + async fn signal(&mut self, _: Signal) -> PipelineResult<()> { self.reset = true; Ok(()) } diff --git a/crates/derive/src/test_utils/frame_queue.rs b/crates/derive/src/test_utils/frame_queue.rs index d7a79f3dd..db23a6a65 100644 --- a/crates/derive/src/test_utils/frame_queue.rs +++ b/crates/derive/src/test_utils/frame_queue.rs @@ -3,12 +3,11 @@ use crate::{ errors::{PipelineError, PipelineResult}, stages::FrameQueueProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::BlockInfo; /// A mock [FrameQueueProvider] for testing the [FrameQueue] stage. @@ -59,8 +58,8 @@ impl FrameQueueProvider for TestFrameQueueProvider { } #[async_trait] -impl ResettableStage for TestFrameQueueProvider { - async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { +impl SignalReceiver for TestFrameQueueProvider { + async fn signal(&mut self, _: Signal) -> PipelineResult<()> { self.reset = true; Ok(()) } diff --git a/crates/derive/src/test_utils/pipeline.rs b/crates/derive/src/test_utils/pipeline.rs index 5e766c7d7..8705d58dd 100644 --- a/crates/derive/src/test_utils/pipeline.rs +++ b/crates/derive/src/test_utils/pipeline.rs @@ -2,7 +2,7 @@ //! as well as its stages and providers. use alloc::{boxed::Box, sync::Arc}; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; @@ -18,8 +18,7 @@ pub use crate::{ }, test_utils::TestAttributesBuilder, traits::{ - test_utils::TestDAP, FlushableStage, NextAttributes, OriginAdvancer, OriginProvider, - ResettableStage, + test_utils::TestDAP, NextAttributes, OriginAdvancer, OriginProvider, Signal, SignalReceiver, }, }; pub use kona_providers::test_utils::{TestChainProvider, TestL2ChainProvider}; @@ -32,17 +31,9 @@ pub struct TestNextAttributes { } #[async_trait::async_trait] -impl FlushableStage for TestNextAttributes { - /// Flushes the stage. - async fn flush_channel(&mut self) -> PipelineResult<()> { - Ok(()) - } -} - -#[async_trait::async_trait] -impl ResettableStage for TestNextAttributes { +impl SignalReceiver for TestNextAttributes { /// Resets the derivation stage to its initial state. - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> PipelineResult<()> { + async fn signal(&mut self, _: Signal) -> PipelineResult<()> { Ok(()) } } diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index eece8a732..b7e8fb518 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,7 @@ //! pipeline. mod pipeline; -pub use pipeline::{Pipeline, Signal, StepResult}; +pub use pipeline::{ActivationSignal, Pipeline, ResetSignal, Signal, StepResult}; mod attributes; pub use attributes::{AttributesBuilder, NextAttributes}; @@ -14,7 +14,7 @@ mod reset; pub use reset::ResetProvider; mod stages; -pub use stages::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}; +pub use stages::{OriginAdvancer, OriginProvider, SignalReceiver}; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index 78ac22556..3b1f28d55 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -1,10 +1,11 @@ //! Defines the interface for the core derivation pipeline. use super::OriginProvider; -use crate::errors::{PipelineErrorKind, PipelineResult}; +use crate::errors::PipelineErrorKind; use alloc::boxed::Box; use async_trait::async_trait; use core::iter::Iterator; +use op_alloy_genesis::SystemConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; @@ -26,25 +27,112 @@ pub enum StepResult { #[allow(clippy::large_enum_variant)] pub enum Signal { /// Reset the pipeline. - Reset { - /// The L2 safe head to reset to. - l2_safe_head: L2BlockInfo, - /// The L1 origin to reset to. - l1_origin: BlockInfo, - }, + Reset(ResetSignal), + /// Hardfork Activation. + Activation(ActivationSignal), /// Flush the currently active channel. FlushChannel, } +impl Signal { + /// Sets the [SystemConfig] for the signal. + pub const fn with_system_config(self, system_config: SystemConfig) -> Self { + match self { + Self::Reset(reset) => reset.with_system_config(system_config).signal(), + Self::Activation(activation) => activation.with_system_config(system_config).signal(), + Self::FlushChannel => Self::FlushChannel, + } + } +} + +/// A pipeline reset signal. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct ResetSignal { + /// The L2 safe head to reset to. + pub l2_safe_head: L2BlockInfo, + /// The L1 origin to reset to. + pub l1_origin: BlockInfo, + /// The optional [SystemConfig] to reset with. + pub system_config: Option, +} + +impl ResetSignal { + /// Creates a new [Signal::Reset] from the [ResetSignal]. + pub const fn signal(self) -> Signal { + Signal::Reset(self) + } + + /// Sets the [SystemConfig] for the signal. + pub const fn with_system_config(self, system_config: SystemConfig) -> Self { + Self { system_config: Some(system_config), ..self } + } +} + +/// A pipeline hardfork activation signal. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct ActivationSignal { + /// The L2 safe head to reset to. + pub l2_safe_head: L2BlockInfo, + /// The L1 origin to reset to. + pub l1_origin: BlockInfo, + /// The optional [SystemConfig] to reset with. + pub system_config: Option, +} + +impl ActivationSignal { + /// Creates a new [Signal::Activation] from the [ActivationSignal]. + pub const fn signal(self) -> Signal { + Signal::Activation(self) + } + + /// Sets the [SystemConfig] for the signal. + pub const fn with_system_config(self, system_config: SystemConfig) -> Self { + Self { system_config: Some(system_config), ..self } + } +} + /// This trait defines the interface for interacting with the derivation pipeline. #[async_trait] pub trait Pipeline: OriginProvider + Iterator { /// Peeks at the next [OpAttributesWithParent] from the pipeline. fn peek(&self) -> Option<&OpAttributesWithParent>; - /// Resets the pipeline on the next [Pipeline::step] call. - async fn signal(&mut self, signal: Signal) -> PipelineResult<()>; - /// Attempts to progress the pipeline. async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reset_signal() { + let signal = ResetSignal::default(); + assert_eq!(signal.signal(), Signal::Reset(signal)); + } + + #[test] + fn test_activation_signal() { + let signal = ActivationSignal::default(); + assert_eq!(signal.signal(), Signal::Activation(signal)); + } + + #[test] + fn test_signal_with_system_config() { + let signal = ResetSignal::default(); + let system_config = SystemConfig::default(); + assert_eq!( + signal.with_system_config(system_config).signal(), + Signal::Reset(ResetSignal { system_config: Some(system_config), ..signal }) + ); + + let signal = ActivationSignal::default(); + let system_config = SystemConfig::default(); + assert_eq!( + signal.with_system_config(system_config).signal(), + Signal::Activation(ActivationSignal { system_config: Some(system_config), ..signal }) + ); + + assert_eq!(Signal::FlushChannel.with_system_config(system_config), Signal::FlushChannel); + } +} diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index fa783cf82..5bc9d8bc0 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -2,23 +2,15 @@ use alloc::boxed::Box; use async_trait::async_trait; -use op_alloy_genesis::SystemConfig; use op_alloy_protocol::BlockInfo; -use crate::errors::PipelineResult; +use crate::{errors::PipelineResult, traits::Signal}; -/// Describes the functionality fo a resettable stage within the derivation pipeline. +/// Providers a way for the pipeline to accept a signal from the driver. #[async_trait] -pub trait ResettableStage { - /// Resets the derivation stage to its initial state. - async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()>; -} - -/// Describes the functionality of a stage that can receive a channel flush signal. -#[async_trait] -pub trait FlushableStage { - /// Flushes the current channel. - async fn flush_channel(&mut self) -> PipelineResult<()>; +pub trait SignalReceiver { + /// Receives a signal from the driver. + async fn signal(&mut self, signal: Signal) -> PipelineResult<()>; } /// Provides a method for accessing the pipeline's current L1 origin.