Skip to content

Commit

Permalink
feat: refactor the pipeline builder
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Jun 4, 2024
1 parent 8cf8702 commit 17bc238
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 59 deletions.
4 changes: 1 addition & 3 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ pub use params::{
MAX_RLP_BYTES_PER_CHANNEL, MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS,
};

pub mod builder;
pub use builder::DerivationPipeline;

pub mod pipeline;
pub mod sources;
pub mod stages;
pub mod traits;
Expand Down
67 changes: 67 additions & 0 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Contains the `PipelineBuilder` object that is used to build a `DerivationPipeline`.
use super::{DerivationPipeline, NextAttributes, OriginAdvancer, ResetProvider, ResettableStage};
use alloc::collections::VecDeque;
use core::fmt::Debug;
use kona_primitives::L2BlockInfo;

/// The PipelineBuilder constructs a [DerivationPipeline].
#[derive(Debug)]
pub struct PipelineBuilder<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
attributes: Option<S>,
reset: Option<R>,
start_cursor: Option<L2BlockInfo>,
}

impl<S, R> PipelineBuilder<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
/// Sets the attributes for the pipeline.
pub fn attributes(mut self, attributes: S) -> Self {
self.attributes = Some(attributes);
self
}

/// Sets the reset provider for the pipeline.
pub fn reset(mut self, reset: R) -> Self {
self.reset = Some(reset);
self
}

/// Sets the start cursor for the pipeline.
pub fn start_cursor(mut self, cursor: L2BlockInfo) -> Self {
self.start_cursor = Some(cursor);
self
}

/// Builds the pipeline.
pub fn build(self) -> DerivationPipeline<S, R> {
self.into()
}
}

impl<S, R> From<PipelineBuilder<S, R>> for DerivationPipeline<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
fn from(builder: PipelineBuilder<S, R>) -> Self {
let attributes = builder.attributes.expect("attributes must be set");
let reset = builder.reset.expect("reset must be set");
let start_cursor = builder.start_cursor.expect("start_cursor must be set");

DerivationPipeline {
attributes,
reset,
prepared: VecDeque::new(),
needs_reset: false,
cursor: start_cursor,
}
}
}
92 changes: 47 additions & 45 deletions crates/derive/src/builder.rs → crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
//! Contains a concrete implementation of the [DerivationPipeline].
//! Contains the core derivation pipeline.
use crate::{
stages::NextAttributes,
traits::{OriginAdvancer, ResettableStage},
types::{StageError, StageResult},
};
use super::{NextAttributes, OriginAdvancer, Pipeline, ResettableStage, StageError, StageResult};
use alloc::{boxed::Box, collections::VecDeque};
use async_trait::async_trait;
use core::fmt::Debug;
Expand All @@ -15,7 +11,6 @@ use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo, SystemConf
pub trait ResetProvider {
/// Returns the current [BlockInfo] for the pipeline to reset.
async fn block_info(&self) -> BlockInfo;

/// Returns the current [SystemConfig] for the pipeline to reset.
async fn system_config(&self) -> SystemConfig;
}
Expand All @@ -38,46 +33,25 @@ pub struct DerivationPipeline<
pub cursor: L2BlockInfo,
}

impl<
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> DerivationPipeline<S, R>
#[async_trait]
impl<S, R> Pipeline for DerivationPipeline<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
fn reset(&mut self) {
self.needs_reset = true;
}

/// Returns the next [L2AttributesWithParent] from the pipeline.
pub fn next_attributes(&mut self) -> Option<L2AttributesWithParent> {
/// Pops the next prepared [L2AttributesWithParent] from the pipeline.
fn pop(&mut self) -> Option<L2AttributesWithParent> {
self.prepared.pop_front()
}

/// Flags the pipeline to reset on the next [DerivationPipeline::step] call.
pub fn reset(&mut self) {
self.needs_reset = true;
}

/// Resets the pipeline.
async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
/// Updates the L2 Safe Head cursor of the pipeline.
/// The cursor is used to fetch the next attributes.
fn update_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Attempts to progress the pipeline.
Expand All @@ -86,14 +60,14 @@ impl<
/// An error is expected when the underlying source closes.
/// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the
/// derivation process.
pub async fn step(&mut self) -> StageResult<()> {
async fn step(&mut self) -> anyhow::Result<()> {
tracing::info!("DerivationPipeline::step");

// Reset the pipeline if needed.
if self.needs_reset {
let block_info = self.reset.block_info().await;
let system_config = self.reset.system_config().await;
self.reset_pipe(block_info, &system_config).await?;
self.reset_pipe(block_info, &system_config).await.map_err(|e| anyhow::anyhow!(e))?;
self.needs_reset = false;
}

Expand All @@ -106,15 +80,43 @@ impl<
}
Err(StageError::Eof) => {
tracing::info!("attributes queue stage complete");
self.attributes.advance_origin().await?;
self.attributes.advance_origin().await.map_err(|e| anyhow::anyhow!(e))?;
}
// TODO: match on the EngineELSyncing error here and log
Err(err) => {
tracing::error!("attributes queue stage failed: {:?}", err);
return Err(err);
return Err(anyhow::anyhow!(err));
}
}

Ok(())
}
}

impl<S, R> DerivationPipeline<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Internal helper to reset the pipeline.
async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
}
}
13 changes: 13 additions & 0 deletions crates/derive/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Module containing the derivation pipeline.
/// Re-export trait arguments.
pub use crate::traits::{NextAttributes, OriginAdvancer, Pipeline, ResettableStage};

/// Re-export commonly used types.
pub use crate::types::{StageError, StageResult};

mod builder;
pub use builder::PipelineBuilder;

mod core;
pub use core::{DerivationPipeline, ResetProvider};
10 changes: 1 addition & 9 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the logic for the `AttributesQueue` stage.
use crate::{
traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage},
traits::{NextAttributes, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -30,14 +30,6 @@ pub trait AttributesProvider {
fn is_last_in_span(&self) -> bool;
}

/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [L2PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
Expand Down
3 changes: 1 addition & 2 deletions crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ pub use batch_queue::{BatchQueue, BatchQueueProvider};

mod attributes_queue;
pub use attributes_queue::{
AttributesBuilder, AttributesProvider, AttributesQueue, NextAttributes,
StatefulAttributesBuilder,
AttributesBuilder, AttributesProvider, AttributesQueue, StatefulAttributesBuilder,
};

#[cfg(any(test, feature = "test-utils"))]
Expand Down
14 changes: 14 additions & 0 deletions crates/derive/src/traits/attributes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Contains traits for working with payload attributes and their providers.
use crate::types::{L2AttributesWithParent, L2BlockInfo, StageResult};
use alloc::boxed::Box;
use async_trait::async_trait;

/// [NextAttributes] defines the interface for pulling attributes from
/// the top level `AttributesQueue` stage of the pipeline.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}
6 changes: 6 additions & 0 deletions crates/derive/src/traits/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! This module contains all of the traits describing functionality of portions of the derivation
//! pipeline.
mod pipeline;
pub use pipeline::Pipeline;

mod attributes;
pub use attributes::NextAttributes;

mod data_sources;
pub use data_sources::*;

Expand Down
22 changes: 22 additions & 0 deletions crates/derive/src/traits/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Defines the interface for the core derivation pipeline.
use alloc::boxed::Box;
use async_trait::async_trait;
use kona_primitives::{L2AttributesWithParent, L2BlockInfo};

/// This trait defines the interface for interacting with the derivation pipeline.
#[async_trait]
pub trait Pipeline {
/// Resets the pipeline on the next [Pipeline::step] call.
fn reset(&mut self);

/// Attempts to progress the pipeline.
async fn step(&mut self) -> anyhow::Result<()>;

/// Pops the next prepared [L2AttributesWithParent] from the pipeline.
fn pop(&mut self) -> Option<L2AttributesWithParent>;

/// Updates the L2 Safe Head cursor of the pipeline.
/// This is used when fetching the next attributes.
fn update_cursor(&mut self, cursor: L2BlockInfo);
}

0 comments on commit 17bc238

Please sign in to comment.