Skip to content

Commit

Permalink
feat(derive): Pipeline Builder (#127)
Browse files Browse the repository at this point in the history
* feat(derive): span batch validation

* feat(derive): span batch validity unit tests

* feat(derive): span batch unit tests with acceptance test

* fix(derive): unit tests

* fix(derive): add more unit tests

* feat(derive): span batch validity unit tests for txs

* feat(derive): pipeline builder

* fix(derive): so close :sadge

* fix(derive): ugly refactor

* fix(derive): pipeline construction and trait abstractions

* fix(derive): nit fixes

* fix(derive): temp manual deposit type check
  • Loading branch information
refcell authored Apr 27, 2024
1 parent 2caf119 commit 52036db
Show file tree
Hide file tree
Showing 21 changed files with 751 additions and 137 deletions.
1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ online = [
"dep:alloy-provider",
"dep:alloy-transport-http",
"dep:reqwest",
"alloy-provider/reqwest",
"alloy-consensus/serde",
"c-kzg/serde",
"revm-primitives/serde",
Expand Down
121 changes: 121 additions & 0 deletions crates/derive/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//! Contains a concrete implementation of the [DerivationPipeline].
use crate::{
stages::NextAttributes,
traits::{OriginAdvancer, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig,
},
};
use alloc::{boxed::Box, collections::VecDeque};
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides the [BlockInfo] and [SystemConfig] for the stack to reset the stages.
#[async_trait]
pub trait ResetProvider {
/// Returns the current [BlockInfo] for the pipeline to reset.
async fn block_info(&self) -> BlockInfo;

/// Returns the current [SystemConfig] for the pipeline to reset.
async fn system_config(&self) -> SystemConfig;
}

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug)]
pub struct DerivationPipeline<
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> {
/// A handle to the next attributes.
pub attributes: S,
/// Reset provider for the pipeline.
pub reset: R,
/// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer.
pub prepared: VecDeque<L2AttributesWithParent>,
/// A flag to tell the pipeline to reset.
pub needs_reset: bool,
/// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes.
pub cursor: L2BlockInfo,
}

impl<
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> DerivationPipeline<S, R>
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Returns the next [L2AttributesWithParent] from the pipeline.
pub fn next_attributes(&mut self) -> Option<L2AttributesWithParent> {
self.prepared.pop_front()
}

/// Flags the pipeline to reset on the next [DerivationPipeline::step] call.
pub fn reset(&mut self) {
self.needs_reset = true;
}

/// Resets the pipeline.
async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
}

/// Attempts to progress the pipeline.
/// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data.
/// Any other error is critical and the derivation pipeline should be reset.
/// An error is expected when the underlying source closes.
/// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the
/// derivation process.
pub async fn step(&mut self) -> StageResult<()> {
tracing::info!("DerivationPipeline::step");

// Reset the pipeline if needed.
if self.needs_reset {
let block_info = self.reset.block_info().await;
let system_config = self.reset.system_config().await;
self.reset_pipe(block_info, &system_config).await?;
self.needs_reset = false;
}

match self.attributes.next_attributes(self.cursor).await {
Ok(a) => {
tracing::info!("attributes queue stage step returned l2 attributes");
tracing::info!("prepared L2 attributes: {:?}", a);
self.prepared.push_back(a);
return Ok(());
}
Err(StageError::Eof) => {
tracing::info!("attributes queue stage complete");
self.attributes.advance_origin().await?;
}
// TODO: match on the EngineELSyncing error here and log
Err(err) => {
tracing::error!("attributes queue stage failed: {:?}", err);
return Err(err);
}
}

Ok(())
}
}
27 changes: 7 additions & 20 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@

extern crate alloc;

use alloc::sync::Arc;
use core::fmt::Debug;
use traits::ChainProvider;
use types::RollupConfig;

mod params;
pub use params::{
ChannelID, CHANNEL_ID_LENGTH, CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC,
Expand All @@ -19,6 +14,9 @@ pub use params::{
MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS,
};

pub mod builder;
pub use builder::DerivationPipeline;

pub mod sources;
pub mod stages;
pub mod traits;
Expand All @@ -27,18 +25,7 @@ pub mod types;
#[cfg(feature = "online")]
mod online;
#[cfg(feature = "online")]
pub use online::prelude::*;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;

impl DerivationPipeline {
/// Creates a new instance of the [DerivationPipeline].
pub fn new<P>(_rollup_config: Arc<RollupConfig>, _chain_provider: P) -> Self
where
P: ChainProvider + Clone + Debug + Send,
{
unimplemented!("TODO: High-level pipeline composition helper.")
}
}
pub use online::{
new_online_stack, AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient,
OnlineBlobProvider, SimpleSlotDerivation,
};
38 changes: 32 additions & 6 deletions crates/derive/src/online/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,37 @@
//! Contains "online" implementations for providers.
/// Prelude for online providers.
pub(crate) mod prelude {
pub use super::{
AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient,
OnlineBlobProvider, SimpleSlotDerivation,
};
use crate::{
stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal, NextAttributes, StatefulAttributesBuilder,
},
traits::{DataAvailabilityProvider, ResettableStage},
types::RollupConfig,
};

use alloc::sync::Arc;
use alloy_provider::ReqwestProvider;
use core::fmt::Debug;

/// Creates a new [OnlineStageStack].
#[cfg(feature = "online")]
pub fn new_online_stack(
rollup_config: Arc<RollupConfig>,
chain_provider: AlloyChainProvider<ReqwestProvider>,
dap_source: impl DataAvailabilityProvider + Send + Sync + Debug,
fetcher: AlloyL2ChainProvider<ReqwestProvider>,
builder: StatefulAttributesBuilder<
AlloyChainProvider<ReqwestProvider>,
AlloyL2ChainProvider<ReqwestProvider>,
>,
) -> impl NextAttributes + ResettableStage + Debug + Send {
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
}

#[cfg(test)]
Expand Down
60 changes: 54 additions & 6 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the logic for the `AttributesQueue` stage.
use crate::{
traits::{OriginProvider, ResettableStage},
traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -30,6 +30,14 @@ pub trait AttributesProvider {
fn is_last_in_span(&self) -> bool;
}

/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [L2PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
Expand All @@ -45,7 +53,7 @@ pub trait AttributesProvider {
#[derive(Debug)]
pub struct AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
Expand All @@ -62,7 +70,7 @@ where

impl<P, AB> AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
Expand Down Expand Up @@ -139,9 +147,44 @@ where
}
}

impl<P, AB> PreviousStage for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
fn previous(&self) -> Option<Box<&dyn PreviousStage>> {
Some(Box::new(&self.prev))
}
}

#[async_trait]
impl<P, AB> OriginAdvancer for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn advance_origin(&mut self) -> StageResult<()> {
self.prev.advance_origin().await
}
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn next_attributes(
&mut self,
parent: L2BlockInfo,
) -> StageResult<L2AttributesWithParent> {
self.next_attributes(parent).await
}
}

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
Expand All @@ -152,10 +195,15 @@ where
#[async_trait]
impl<P, AB> ResettableStage for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Send + Debug,
P: AttributesProvider + PreviousStage + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
async fn reset(
&mut self,
block_info: BlockInfo,
system_config: &SystemConfig,
) -> StageResult<()> {
self.prev.reset(block_info, system_config).await?;
info!("resetting attributes queue");
self.batch = None;
self.is_last_in_span = false;
Expand Down
Loading

0 comments on commit 52036db

Please sign in to comment.