From 17bc238e70d6a5d980ba9354ab0c75505c614130 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 4 Jun 2024 13:35:27 -0600 Subject: [PATCH] feat: refactor the pipeline builder --- crates/derive/src/lib.rs | 4 +- crates/derive/src/pipeline/builder.rs | 67 ++++++++++++++ .../src/{builder.rs => pipeline/core.rs} | 92 ++++++++++--------- crates/derive/src/pipeline/mod.rs | 13 +++ crates/derive/src/stages/attributes_queue.rs | 10 +- crates/derive/src/stages/mod.rs | 3 +- crates/derive/src/traits/attributes.rs | 14 +++ crates/derive/src/traits/mod.rs | 6 ++ crates/derive/src/traits/pipeline.rs | 22 +++++ 9 files changed, 172 insertions(+), 59 deletions(-) create mode 100644 crates/derive/src/pipeline/builder.rs rename crates/derive/src/{builder.rs => pipeline/core.rs} (75%) create mode 100644 crates/derive/src/pipeline/mod.rs create mode 100644 crates/derive/src/traits/attributes.rs create mode 100644 crates/derive/src/traits/pipeline.rs diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 2a861af60..217076d40 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -13,9 +13,7 @@ pub use params::{ MAX_RLP_BYTES_PER_CHANNEL, MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS, }; -pub mod builder; -pub use builder::DerivationPipeline; - +pub mod pipeline; pub mod sources; pub mod stages; pub mod traits; diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs new file mode 100644 index 000000000..70d426ae0 --- /dev/null +++ b/crates/derive/src/pipeline/builder.rs @@ -0,0 +1,67 @@ +//! Contains the `PipelineBuilder` object that is used to build a `DerivationPipeline`. + +use super::{DerivationPipeline, NextAttributes, OriginAdvancer, ResetProvider, ResettableStage}; +use alloc::collections::VecDeque; +use core::fmt::Debug; +use kona_primitives::L2BlockInfo; + +/// The PipelineBuilder constructs a [DerivationPipeline]. +#[derive(Debug)] +pub struct PipelineBuilder +where + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, +{ + attributes: Option, + reset: Option, + start_cursor: Option, +} + +impl PipelineBuilder +where + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, +{ + /// Sets the attributes for the pipeline. + pub fn attributes(mut self, attributes: S) -> Self { + self.attributes = Some(attributes); + self + } + + /// Sets the reset provider for the pipeline. + pub fn reset(mut self, reset: R) -> Self { + self.reset = Some(reset); + self + } + + /// Sets the start cursor for the pipeline. + pub fn start_cursor(mut self, cursor: L2BlockInfo) -> Self { + self.start_cursor = Some(cursor); + self + } + + /// Builds the pipeline. + pub fn build(self) -> DerivationPipeline { + self.into() + } +} + +impl From> for DerivationPipeline +where + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, +{ + fn from(builder: PipelineBuilder) -> Self { + let attributes = builder.attributes.expect("attributes must be set"); + let reset = builder.reset.expect("reset must be set"); + let start_cursor = builder.start_cursor.expect("start_cursor must be set"); + + DerivationPipeline { + attributes, + reset, + prepared: VecDeque::new(), + needs_reset: false, + cursor: start_cursor, + } + } +} diff --git a/crates/derive/src/builder.rs b/crates/derive/src/pipeline/core.rs similarity index 75% rename from crates/derive/src/builder.rs rename to crates/derive/src/pipeline/core.rs index 31cb28e7d..82f5099ef 100644 --- a/crates/derive/src/builder.rs +++ b/crates/derive/src/pipeline/core.rs @@ -1,10 +1,6 @@ -//! Contains a concrete implementation of the [DerivationPipeline]. +//! Contains the core derivation pipeline. -use crate::{ - stages::NextAttributes, - traits::{OriginAdvancer, ResettableStage}, - types::{StageError, StageResult}, -}; +use super::{NextAttributes, OriginAdvancer, Pipeline, ResettableStage, StageError, StageResult}; use alloc::{boxed::Box, collections::VecDeque}; use async_trait::async_trait; use core::fmt::Debug; @@ -15,7 +11,6 @@ use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo, SystemConf pub trait ResetProvider { /// Returns the current [BlockInfo] for the pipeline to reset. async fn block_info(&self) -> BlockInfo; - /// Returns the current [SystemConfig] for the pipeline to reset. async fn system_config(&self) -> SystemConfig; } @@ -38,46 +33,25 @@ pub struct DerivationPipeline< pub cursor: L2BlockInfo, } -impl< - S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, - R: ResetProvider + Send, - > DerivationPipeline +#[async_trait] +impl Pipeline for DerivationPipeline +where + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, { - /// Creates a new instance of the [DerivationPipeline]. - pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self { - Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor } - } - - /// Set the [L2BlockInfo] cursor to be used when pulling the next attributes. - pub fn set_cursor(&mut self, cursor: L2BlockInfo) { - self.cursor = cursor; + fn reset(&mut self) { + self.needs_reset = true; } - /// Returns the next [L2AttributesWithParent] from the pipeline. - pub fn next_attributes(&mut self) -> Option { + /// Pops the next prepared [L2AttributesWithParent] from the pipeline. + fn pop(&mut self) -> Option { self.prepared.pop_front() } - /// Flags the pipeline to reset on the next [DerivationPipeline::step] call. - pub fn reset(&mut self) { - self.needs_reset = true; - } - - /// Resets the pipeline. - async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { - match self.attributes.reset(bi, sc).await { - Ok(()) => { - tracing::info!("Stages reset"); - } - Err(StageError::Eof) => { - tracing::info!("Stages reset with EOF"); - } - Err(err) => { - tracing::error!("Stages reset failed: {:?}", err); - return Err(err); - } - } - Ok(()) + /// Updates the L2 Safe Head cursor of the pipeline. + /// The cursor is used to fetch the next attributes. + fn update_cursor(&mut self, cursor: L2BlockInfo) { + self.cursor = cursor; } /// Attempts to progress the pipeline. @@ -86,14 +60,14 @@ impl< /// An error is expected when the underlying source closes. /// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the /// derivation process. - pub async fn step(&mut self) -> StageResult<()> { + async fn step(&mut self) -> anyhow::Result<()> { tracing::info!("DerivationPipeline::step"); // Reset the pipeline if needed. if self.needs_reset { let block_info = self.reset.block_info().await; let system_config = self.reset.system_config().await; - self.reset_pipe(block_info, &system_config).await?; + self.reset_pipe(block_info, &system_config).await.map_err(|e| anyhow::anyhow!(e))?; self.needs_reset = false; } @@ -106,15 +80,43 @@ impl< } Err(StageError::Eof) => { tracing::info!("attributes queue stage complete"); - self.attributes.advance_origin().await?; + self.attributes.advance_origin().await.map_err(|e| anyhow::anyhow!(e))?; } // TODO: match on the EngineELSyncing error here and log Err(err) => { tracing::error!("attributes queue stage failed: {:?}", err); - return Err(err); + return Err(anyhow::anyhow!(err)); } } Ok(()) } } + +impl DerivationPipeline +where + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, +{ + /// Creates a new instance of the [DerivationPipeline]. + pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self { + Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor } + } + + /// Internal helper to reset the pipeline. + async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { + match self.attributes.reset(bi, sc).await { + Ok(()) => { + tracing::info!("Stages reset"); + } + Err(StageError::Eof) => { + tracing::info!("Stages reset with EOF"); + } + Err(err) => { + tracing::error!("Stages reset failed: {:?}", err); + return Err(err); + } + } + Ok(()) + } +} diff --git a/crates/derive/src/pipeline/mod.rs b/crates/derive/src/pipeline/mod.rs new file mode 100644 index 000000000..f13321188 --- /dev/null +++ b/crates/derive/src/pipeline/mod.rs @@ -0,0 +1,13 @@ +//! Module containing the derivation pipeline. + +/// Re-export trait arguments. +pub use crate::traits::{NextAttributes, OriginAdvancer, Pipeline, ResettableStage}; + +/// Re-export commonly used types. +pub use crate::types::{StageError, StageResult}; + +mod builder; +pub use builder::PipelineBuilder; + +mod core; +pub use core::{DerivationPipeline, ResetProvider}; diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index c3f4e87b4..4415354a4 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,7 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, + traits::{NextAttributes, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -30,14 +30,6 @@ pub trait AttributesProvider { fn is_last_in_span(&self) -> bool; } -/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage. -#[async_trait] -pub trait NextAttributes { - /// Returns the next [L2AttributesWithParent] from the current batch. - async fn next_attributes(&mut self, parent: L2BlockInfo) - -> StageResult; -} - /// [AttributesQueue] accepts batches from the [BatchQueue] stage /// and transforms them into [L2PayloadAttributes]. The outputted payload /// attributes cannot be buffered because each batch->attributes transformation diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 58329e03b..6a66c5785 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -33,8 +33,7 @@ pub use batch_queue::{BatchQueue, BatchQueueProvider}; mod attributes_queue; pub use attributes_queue::{ - AttributesBuilder, AttributesProvider, AttributesQueue, NextAttributes, - StatefulAttributesBuilder, + AttributesBuilder, AttributesProvider, AttributesQueue, StatefulAttributesBuilder, }; #[cfg(any(test, feature = "test-utils"))] diff --git a/crates/derive/src/traits/attributes.rs b/crates/derive/src/traits/attributes.rs new file mode 100644 index 000000000..ce5cefd6e --- /dev/null +++ b/crates/derive/src/traits/attributes.rs @@ -0,0 +1,14 @@ +//! Contains traits for working with payload attributes and their providers. + +use crate::types::{L2AttributesWithParent, L2BlockInfo, StageResult}; +use alloc::boxed::Box; +use async_trait::async_trait; + +/// [NextAttributes] defines the interface for pulling attributes from +/// the top level `AttributesQueue` stage of the pipeline. +#[async_trait] +pub trait NextAttributes { + /// Returns the next [L2AttributesWithParent] from the current batch. + async fn next_attributes(&mut self, parent: L2BlockInfo) + -> StageResult; +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 89f5ff924..71922d2b4 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -1,6 +1,12 @@ //! This module contains all of the traits describing functionality of portions of the derivation //! pipeline. +mod pipeline; +pub use pipeline::Pipeline; + +mod attributes; +pub use attributes::NextAttributes; + mod data_sources; pub use data_sources::*; diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs new file mode 100644 index 000000000..c2e986e2f --- /dev/null +++ b/crates/derive/src/traits/pipeline.rs @@ -0,0 +1,22 @@ +//! Defines the interface for the core derivation pipeline. + +use alloc::boxed::Box; +use async_trait::async_trait; +use kona_primitives::{L2AttributesWithParent, L2BlockInfo}; + +/// This trait defines the interface for interacting with the derivation pipeline. +#[async_trait] +pub trait Pipeline { + /// Resets the pipeline on the next [Pipeline::step] call. + fn reset(&mut self); + + /// Attempts to progress the pipeline. + async fn step(&mut self) -> anyhow::Result<()>; + + /// Pops the next prepared [L2AttributesWithParent] from the pipeline. + fn pop(&mut self) -> Option; + + /// Updates the L2 Safe Head cursor of the pipeline. + /// This is used when fetching the next attributes. + fn update_cursor(&mut self, cursor: L2BlockInfo); +}