Skip to content

Commit

Permalink
feat(derive): pipeline builder
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Apr 19, 2024
1 parent b299e06 commit f75eb5e
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 47 deletions.
86 changes: 86 additions & 0 deletions crates/derive/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! Contains a concrete implementation of the [DerivationPipeline].
use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
},
traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider},
types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult},
};
use alloc::sync::Arc;
use core::fmt::Debug;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug)]
pub struct DerivationPipeline<N: NextAttributes + Debug> {
/// The attributes queue to retrieve the next attributes.
pub attributes: N,
/// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes.
pub cursor: L2BlockInfo,
}

impl<N: NextAttributes + Debug + Send> DerivationPipeline<N> {
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: N, cursor: L2BlockInfo) -> Self {
Self { attributes, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Get the next attributes from the pipeline.
pub async fn next(&mut self) -> StageResult<L2AttributesWithParent> {
self.attributes.next_attributes(self.cursor).await
}
}

impl<P, DAP, F, B> DerivationPipeline<KonaAttributes<P, DAP, F, B>>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Creates a new instance of the [DerivationPipeline] from the given attributes.
pub fn new_online_pipeline(
attributes: KonaAttributes<P, DAP, F, B>,
cursor: L2BlockInfo,
) -> Self {
Self::new(attributes, cursor)
}
}

/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type.
pub type KonaDerivationPipeline<P, DAP, F, B> = DerivationPipeline<KonaAttributes<P, DAP, F, B>>;

/// [KonaAttributes] is a concrete [NextAttributes] type.
pub type KonaAttributes<P, DAP, F, B> = AttributesQueue<
BatchQueue<ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<P>>>>>, F>,
B,
>;

/// Creates a new [KonaAttributes] instance.
pub fn new_online_pipeline<P, DAP, F, B>(
rollup_config: Arc<RollupConfig>,
chain_provider: P,
dap_source: DAP,
fetcher: F,
builder: B,
) -> KonaAttributes<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
}
27 changes: 9 additions & 18 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@

extern crate alloc;

use alloc::sync::Arc;
use core::fmt::Debug;
use traits::ChainProvider;
use types::RollupConfig;
/// Prelude exports common types and traits.
pub mod prelude {
pub use super::{builder::DerivationPipeline, params::*};
// pub use super::traits::prelude::*;
// pub use super::types::prelude::*;
// pub use super::stages::prelude::*;
// pub use super::sources::prelude::*;
}

mod params;
pub use params::{
Expand All @@ -19,6 +23,7 @@ pub use params::{
MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS,
};

pub mod builder;
pub mod sources;
pub mod stages;
pub mod traits;
Expand All @@ -28,17 +33,3 @@ pub mod types;
mod online;
#[cfg(feature = "online")]
pub use online::prelude::*;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;

impl DerivationPipeline {
/// Creates a new instance of the [DerivationPipeline].
pub fn new<P>(_rollup_config: Arc<RollupConfig>, _chain_provider: P) -> Self
where
P: ChainProvider + Clone + Debug + Send,
{
unimplemented!("TODO: High-level pipeline composition helper.")
}
}
22 changes: 22 additions & 0 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub trait AttributesProvider {
fn is_last_in_span(&self) -> bool;
}

/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [L2PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
Expand Down Expand Up @@ -139,6 +147,20 @@ where
}
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn next_attributes(
&mut self,
parent: L2BlockInfo,
) -> StageResult<L2AttributesWithParent> {
self.next_attributes(parent).await
}
}

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
Expand Down
72 changes: 51 additions & 21 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ where
// We always update the origin of this stage if it's not the same so
// after the update code runs, this is consistent.
let origin_behind =
self.origin.map_or(true, |origin| origin.number < parent.l1_origin.number);
self.prev.origin().map_or(true, |origin| origin.number < parent.l1_origin.number);

// Advance the origin if needed.
// The entire pipeline has the same origin.
Expand Down Expand Up @@ -334,6 +334,7 @@ where
}

// Attempt to derive more batches.
assert!(self.l1_blocks.is_empty());
let batch = match self.derive_next_batch(out_of_data, parent).await {
Ok(b) => b,
Err(e) => match e {
Expand Down Expand Up @@ -408,12 +409,17 @@ where
mod tests {
use super::*;
use crate::{
stages::{channel_reader::BatchReader, test_utils::MockBatchQueueProvider},
stages::{
channel_reader::BatchReader,
test_utils::{CollectingLayer, MockBatchQueueProvider, TraceStorage},
},
traits::test_utils::MockBlockFetcher,
types::BatchType,
types::{BatchType, BlockID},
};
use alloc::vec;
use miniz_oxide::deflate::compress_to_vec_zlib;
use tracing::Level;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn new_batch_reader() -> BatchReader {
let raw_data = include_bytes!("../../testdata/raw_batch.hex");
Expand Down Expand Up @@ -448,24 +454,48 @@ mod tests {
assert!(bq.is_last_in_span());
}

// TODO(refcell): The batch reader here loops forever.
// Maybe the cursor isn't being used?
// UPDATE: the batch data is not valid
// #[tokio::test]
// async fn test_next_batch_succeeds() {
// let mut reader = new_batch_reader();
// let mut batch_vec: Vec<StageResult<Batch>> = vec![];
// while let Some(batch) = reader.next_batch() {
// batch_vec.push(Ok(batch));
// }
// let mock = MockBatchQueueProvider::new(batch_vec);
// let telemetry = TestTelemetry::new();
// let fetcher = MockBlockFetcher::default();
// let mut bq = BatchQueue::new(RollupConfig::default(), mock, telemetry, fetcher);
// let res = bq.next_batch(L2BlockInfo::default()).await.unwrap();
// assert_eq!(res, SingleBatch::default());
// assert!(bq.is_last_in_span());
// }
#[tokio::test]
async fn test_next_batch_origin_behind() {
let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<StageResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let parent = L2BlockInfo {
l1_origin: BlockID { number: 10, ..Default::default() },
..Default::default()
};
let res = bq.next_batch(parent).await.unwrap_err();
assert_eq!(res, StageError::NotEnoughData);
}

// TODO: fix
#[tokio::test]
async fn test_next_batch_succeeds() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<StageResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err();
let logs = trace_store.get_by_level(Level::WARN);
let str = "Deriving next batch for epoch: 1";
assert_eq!(logs[0], str);
}

#[tokio::test]
async fn test_batch_queue_empty_bytes() {
Expand Down
2 changes: 0 additions & 2 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ where
let first = self.channel_queue[0];
let channel = self.channels.get(&first).ok_or(StageError::ChannelNotFound)?;
let origin = self.origin().ok_or(StageError::MissingOrigin)?;

// Remove all timed out channels from the front of the `channel_queue`.
if channel.open_block_number() + self.cfg.channel_timeout < origin.number {
warn!("Channel {:?} timed out", first);
self.channels.remove(&first);
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ impl BatchReader {
}

// Decompress and RLP decode the batch data, before finally decoding the batch itself.
let mut decompressed_reader = self.decompressed.as_slice();
let mut decompressed_reader = self.decompressed.as_slice()[self.cursor..].as_ref();
let batch = Batch::decode(&mut decompressed_reader, cfg).ok()?;

// Advance the cursor on the reader.
self.cursor += self.decompressed.len() - decompressed_reader.len();
self.cursor = self.decompressed.len() - decompressed_reader.len();

Some(batch)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub use batch_queue::{BatchQueue, BatchQueueProvider};

mod attributes_queue;
pub use attributes_queue::{
AttributesBuilder, AttributesProvider, AttributesQueue, StatefulAttributesBuilder,
SystemConfigL2Fetcher,
AttributesBuilder, AttributesProvider, AttributesQueue, NextAttributes,
StatefulAttributesBuilder, SystemConfigL2Fetcher,
};

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/test_utils/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use async_trait::async_trait;
#[derive(Debug, Default)]
pub struct MockBatchQueueProvider {
/// The origin of the L1 block.
origin: Option<BlockInfo>,
pub origin: Option<BlockInfo>,
/// A list of batches to return.
batches: Vec<StageResult<Batch>>,
pub batches: Vec<StageResult<Batch>>,
}

impl MockBatchQueueProvider {
Expand Down

0 comments on commit f75eb5e

Please sign in to comment.