Skip to content

Commit

Permalink
fix(derive): Derive full SpanBatch in channel reader (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby authored Apr 13, 2024
1 parent 74a838a commit fe0d803
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 65 deletions.
21 changes: 13 additions & 8 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
SingleBatch, StageError, StageResult, SystemConfig,
},
};
use alloc::{boxed::Box, vec::Vec};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
Expand Down Expand Up @@ -45,7 +45,7 @@ where
BF: L2ChainProvider + Debug,
{
/// The rollup config.
cfg: RollupConfig,
cfg: Arc<RollupConfig>,
/// The previous stage of the derivation pipeline.
prev: P,
/// The l1 block ref
Expand Down Expand Up @@ -75,7 +75,7 @@ where
BF: L2ChainProvider + Debug,
{
/// Creates a new [BatchQueue] stage.
pub fn new(cfg: RollupConfig, prev: P, fetcher: BF) -> Self {
pub fn new(cfg: Arc<RollupConfig>, prev: P, fetcher: BF) -> Self {
Self {
cfg,
prev,
Expand Down Expand Up @@ -343,7 +343,11 @@ where
match batch {
Batch::Single(sb) => Ok(sb),
Batch::Span(sb) => {
let batches = sb.get_singular_batches(&self.l1_blocks, parent);
let batches = sb.get_singular_batches(&self.l1_blocks, parent).map_err(|e| {
StageError::Custom(anyhow!(
"Could not get singular batches from span batch: {e}"
))
})?;
self.next_spans = batches;
let nb = self
.pop_next_batch(parent)
Expand Down Expand Up @@ -413,7 +417,7 @@ mod tests {
#[test]
fn test_derive_next_batch_missing_origin() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let cfg = RollupConfig::default();
let cfg = Arc::new(RollupConfig::default());
let mock = MockBatchQueueProvider::new(data);
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
Expand All @@ -425,10 +429,11 @@ mod tests {
#[tokio::test]
async fn test_next_batch_not_enough_data() {
let mut reader = new_batch_reader();
let batch = reader.next_batch().unwrap();
let cfg = Arc::new(RollupConfig::default());
let batch = reader.next_batch(cfg.as_ref()).unwrap();
let mock = MockBatchQueueProvider::new(vec![Ok(batch)]);
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(RollupConfig::default(), mock, fetcher);
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err();
assert_eq!(res, StageError::NotEnoughData);
assert!(bq.is_last_in_span());
Expand Down Expand Up @@ -456,7 +461,7 @@ mod tests {
#[tokio::test]
async fn test_batch_queue_empty_bytes() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let cfg = RollupConfig::default();
let cfg = Arc::new(RollupConfig::default());
let mock = MockBatchQueueProvider::new(data);
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
Expand Down
26 changes: 14 additions & 12 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
use crate::{
stages::BatchQueueProvider,
traits::OriginProvider,
types::{Batch, BlockInfo, StageError, StageResult},
types::{Batch, BlockInfo, RollupConfig, StageError, StageResult},
};

use alloc::{boxed::Box, vec::Vec};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloy_primitives::Bytes;
use async_trait::async_trait;
use core::fmt::Debug;
Expand All @@ -33,15 +33,17 @@ where
prev: P,
/// The batch reader.
next_batch: Option<BatchReader>,
/// The rollup coonfiguration.
cfg: Arc<RollupConfig>,
}

impl<P> ChannelReader<P>
where
P: ChannelReaderProvider + OriginProvider + Debug,
{
/// Create a new [ChannelReader] stage.
pub fn new(prev: P) -> Self {
Self { prev, next_batch: None }
pub fn new(prev: P, cfg: Arc<RollupConfig>) -> Self {
Self { prev, next_batch: None, cfg: cfg.clone() }
}

/// Creates the batch reader from available channel data.
Expand Down Expand Up @@ -75,7 +77,7 @@ where
.next_batch
.as_mut()
.expect("Cannot be None")
.next_batch()
.next_batch(self.cfg.as_ref())
.ok_or(StageError::NotEnoughData)
{
Ok(batch) => Ok(batch),
Expand Down Expand Up @@ -112,7 +114,7 @@ pub(crate) struct BatchReader {

impl BatchReader {
/// Pulls out the next batch from the reader.
pub(crate) fn next_batch(&mut self) -> Option<Batch> {
pub(crate) fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
// If the data is not already decompressed, decompress it.
if let Some(data) = self.data.take() {
let decompressed_data = decompress_to_vec_zlib(&data).ok()?;
Expand All @@ -121,7 +123,7 @@ impl BatchReader {

// Decompress and RLP decode the batch data, before finally decoding the batch itself.
let mut decompressed_reader = self.decompressed.as_slice();
let batch = Batch::decode(&mut decompressed_reader).ok()?;
let batch = Batch::decode(&mut decompressed_reader, cfg).ok()?;

// Advance the cursor on the reader.
self.cursor += self.decompressed.len() - decompressed_reader.len();
Expand Down Expand Up @@ -153,23 +155,23 @@ mod test {
#[tokio::test]
async fn test_next_batch_batch_reader_set_fails() {
let mock = MockChannelReaderProvider::new(vec![Err(StageError::Eof)]);
let mut reader = ChannelReader::new(mock);
let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default()));
assert_eq!(reader.next_batch().await, Err(StageError::Eof));
assert!(reader.next_batch.is_none());
}

#[tokio::test]
async fn test_next_batch_batch_reader_no_data() {
let mock = MockChannelReaderProvider::new(vec![Ok(None)]);
let mut reader = ChannelReader::new(mock);
let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default()));
assert_eq!(reader.next_batch().await, Err(StageError::NoChannel));
assert!(reader.next_batch.is_none());
}

#[tokio::test]
async fn test_next_batch_not_enough_data() {
let mock = MockChannelReaderProvider::new(vec![Ok(Some(Bytes::default()))]);
let mut reader = ChannelReader::new(mock);
let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default()));
assert_eq!(reader.next_batch().await, Err(StageError::NotEnoughData));
assert!(reader.next_batch.is_none());
}
Expand All @@ -178,7 +180,7 @@ mod test {
async fn test_next_batch_succeeds() {
let raw = new_compressed_batch_data();
let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]);
let mut reader = ChannelReader::new(mock);
let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default()));
let res = reader.next_batch().await.unwrap();
matches!(res, Batch::Span(_));
assert!(reader.next_batch.is_some());
Expand All @@ -192,7 +194,7 @@ mod test {

let compressed_raw_data = compress_to_vec_zlib(typed_data.as_slice(), 5);
let mut reader = BatchReader::from(compressed_raw_data);
reader.next_batch().unwrap();
reader.next_batch(&RollupConfig::default()).unwrap();

assert_eq!(reader.cursor, typed_data.len());
}
Expand Down
24 changes: 8 additions & 16 deletions crates/derive/src/types/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use crate::{
traits::L2ChainProvider,
types::{BlockInfo, L2BlockInfo, RollupConfig},
};
use alloc::vec::Vec;
use alloy_rlp::{Buf, Decodable, Encodable};
use alloy_rlp::{Buf, Decodable};

mod batch_type;
pub use batch_type::BatchType;
Expand Down Expand Up @@ -66,7 +65,7 @@ pub enum Batch {
/// A single batch
Single(SingleBatch),
/// Span Batches
Span(RawSpanBatch),
Span(SpanBatch),
}

impl Batch {
Expand All @@ -78,19 +77,8 @@ impl Batch {
}
}

/// Attempts to encode a batch into a writer.
pub fn encode(&self, w: &mut Vec<u8>) -> Result<(), DecodeError> {
match self {
Self::Single(single_batch) => {
single_batch.encode(w);
Ok(())
}
Self::Span(span_batch) => span_batch.encode(w).map_err(DecodeError::SpanBatchError),
}
}

/// Attempts to decode a batch from a reader.
pub fn decode(r: &mut &[u8]) -> Result<Self, DecodeError> {
pub fn decode(r: &mut &[u8], cfg: &RollupConfig) -> Result<Self, DecodeError> {
if r.is_empty() {
return Err(DecodeError::EmptyBuffer);
}
Expand All @@ -105,7 +93,11 @@ impl Batch {
Ok(Batch::Single(single_batch))
}
BatchType::Span => {
let span_batch = RawSpanBatch::decode(r).map_err(DecodeError::SpanBatchError)?;
let mut raw_span_batch =
RawSpanBatch::decode(r).map_err(DecodeError::SpanBatchError)?;
let span_batch = raw_span_batch
.derive(cfg.block_time, cfg.genesis.timestamp, cfg.l2_chain_id)
.map_err(DecodeError::SpanBatchError)?;
Ok(Batch::Span(span_batch))
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/types/batch/span_batch/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct SpanBatch {

impl SpanBatch {
/// Returns the timestamp for the first batch in the span.
pub fn get_timestamp(&self) -> u64 {
pub fn timestamp(&self) -> u64 {
self.batches[0].timestamp
}

Expand Down Expand Up @@ -87,7 +87,7 @@ impl SpanBatch {
/// stage.
pub fn get_singular_batches(
&self,
l1_origins: Vec<BlockInfo>,
l1_origins: &[BlockInfo],
l2_safe_head: L2BlockInfo,
) -> Result<Vec<SingleBatch>, SpanBatchError> {
let mut single_batches = Vec::new();
Expand Down
30 changes: 3 additions & 27 deletions crates/derive/src/types/batch/span_batch/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
use alloc::vec::Vec;

use crate::{
traits::L2ChainProvider,
types::{
BatchType, BatchValidity, BlockInfo, L2BlockInfo, RawTransaction, RollupConfig,
SingleBatch, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix, SpanDecodingError,
},
use crate::types::{
BatchType, RawTransaction, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix,
SpanDecodingError,
};

use super::{SpanBatch, SpanBatchError};
Expand All @@ -32,27 +29,6 @@ impl RawSpanBatch {
self.prefix.rel_timestamp
}

/// Checks if the span batch is valid.
pub fn check_batch<BF: L2ChainProvider>(
&self,
_cfg: &RollupConfig,
_l1_blocks: &[BlockInfo],
_l2_safe_head: L2BlockInfo,
_inclusion_block: &BlockInfo,
_fetcher: &BF,
) -> BatchValidity {
unimplemented!()
}

/// Derives [SingleBatch]s from the span batch.
pub fn get_singular_batches(
&self,
_l1_blocks: &[BlockInfo],
_parent: L2BlockInfo,
) -> Vec<SingleBatch> {
unimplemented!()
}

/// Encodes the [RawSpanBatch] into a writer.
pub fn encode(&self, w: &mut Vec<u8>) -> Result<(), SpanBatchError> {
self.prefix.encode_prefix(w);
Expand Down

0 comments on commit fe0d803

Please sign in to comment.