diff --git a/crates/derive/src/batch/single_batch.rs b/crates/derive/src/batch/single_batch.rs index 79d07eaec..7857c6b6c 100644 --- a/crates/derive/src/batch/single_batch.rs +++ b/crates/derive/src/batch/single_batch.rs @@ -63,7 +63,11 @@ impl SingleBatch { } if self.timestamp < next_timestamp { warn!("dropping batch with old timestamp, min_timestamp: {next_timestamp}"); - return BatchValidity::Drop; + return if cfg.is_holocene_active(self.timestamp) { + BatchValidity::Past + } else { + BatchValidity::Drop + }; } // Dependent on the above timestamp check. @@ -233,6 +237,44 @@ mod tests { assert_eq!(validity, BatchValidity::Drop); } + #[test] + fn test_drop_old_batch() { + let single_batch = SingleBatch { + parent_hash: B256::ZERO, + epoch_num: 0xFF, + epoch_hash: B256::ZERO, + timestamp: 0x00, + transactions: vec![Bytes::from(hex!("00"))], + }; + + let cfg = RollupConfig { block_time: 2, ..Default::default() }; + let l1_blocks = vec![BlockInfo::default()]; + let l2_safe_head = L2BlockInfo::default(); + let inclusion_block = BlockInfo { number: 10, ..Default::default() }; + + let validity = single_batch.check_batch(&cfg, &l1_blocks, l2_safe_head, &inclusion_block); + assert_eq!(validity, BatchValidity::Drop); + } + + #[test] + fn test_drop_old_batch_holocene() { + let single_batch = SingleBatch { + parent_hash: B256::ZERO, + epoch_num: 0xFF, + epoch_hash: B256::ZERO, + timestamp: 0x00, + transactions: vec![Bytes::from(hex!("00"))], + }; + + let cfg = RollupConfig { holocene_time: Some(0), block_time: 2, ..Default::default() }; + let l1_blocks = vec![BlockInfo::default()]; + let l2_safe_head = L2BlockInfo::default(); + let inclusion_block = BlockInfo { number: 10, ..Default::default() }; + + let validity = single_batch.check_batch(&cfg, &l1_blocks, l2_safe_head, &inclusion_block); + assert_eq!(validity, BatchValidity::Past); + } + #[test] fn test_drop_prior_epoch_num() { let single_batch = SingleBatch { diff --git a/crates/derive/src/batch/span_batch/batch.rs b/crates/derive/src/batch/span_batch/batch.rs index 1d8231311..12b4e1c0f 100644 --- a/crates/derive/src/batch/span_batch/batch.rs +++ b/crates/derive/src/batch/span_batch/batch.rs @@ -442,7 +442,11 @@ impl SpanBatch { // Drop the batch if it has no new blocks after the safe head. if self.final_timestamp() < next_timestamp { warn!("span batch has no new blocks after safe head"); - return BatchValidity::Drop; + return if cfg.is_holocene_active(self.final_timestamp()) { + BatchValidity::Past + } else { + BatchValidity::Drop + } } BatchValidity::Accept diff --git a/crates/derive/src/batch/validity.rs b/crates/derive/src/batch/validity.rs index 963a3b9e6..665163375 100644 --- a/crates/derive/src/batch/validity.rs +++ b/crates/derive/src/batch/validity.rs @@ -15,6 +15,9 @@ pub enum BatchValidity { Undecided, /// The batch may be valid, but cannot be processed yet and should be checked again later Future, + /// Introduced in Holocene, a special variant of the Drop variant that signals not to flush + /// the active batch and channel, in the case of processing an old batch + Past, } impl BatchValidity { @@ -23,6 +26,11 @@ impl BatchValidity { matches!(self, Self::Drop) } + /// Returns if the batch is outdated. + pub const fn is_outdated(&self) -> bool { + matches!(self, Self::Past) + } + /// Returns if the batch is future. pub const fn is_future(&self) -> bool { matches!(self, Self::Future) diff --git a/crates/derive/src/errors.rs b/crates/derive/src/errors.rs index b0bb88e7a..33d1f8e57 100644 --- a/crates/derive/src/errors.rs +++ b/crates/derive/src/errors.rs @@ -75,6 +75,9 @@ pub enum PipelineError { /// [L1Retrieval]: crate::stages::L1Retrieval #[error("L1 Retrieval missing data")] MissingL1Data, + /// Invalid batch validity variant. + #[error("Invalid batch validity")] + InvalidBatchValidity, /// [SystemConfig] update error. /// /// [SystemConfig]: op_alloy_genesis::SystemConfig diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index cf59b0eb0..77ef532a6 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -183,6 +183,15 @@ where self.batches = remaining; return Err(PipelineError::Eof.temp()); } + BatchValidity::Past => { + if !self.cfg.is_holocene_active(origin.timestamp) { + error!(target: "batch-queue", "BatchValidity::Past is not allowed pre-holocene"); + return Err(PipelineError::InvalidBatchValidity.crit()); + } + + warn!(target: "batch-queue", "[HOLOCENE] Dropping outdated batch with parent: {}", parent.block_info.number); + continue; + } } } self.batches = remaining; @@ -261,6 +270,9 @@ where if drop { self.prev.flush(); return Ok(()); + } else if validity.is_outdated() { + // If the batch is outdated, we drop it without flushing the previous stage. + return Ok(()); } self.batches.push(data); Ok(()) @@ -673,6 +685,41 @@ mod tests { assert!(bq.batches.is_empty()); } + #[tokio::test] + async fn test_add_old_batch_drop_holocene() { + // Construct a single batch with BatchValidity::Past. + let cfg = + Arc::new(RollupConfig { holocene_time: Some(0), block_time: 2, ..Default::default() }); + assert!(cfg.is_holocene_active(0)); + let batch = SingleBatch { + parent_hash: B256::default(), + epoch_num: 0, + epoch_hash: B256::default(), + timestamp: 100, + transactions: Vec::new(), + }; + let parent = L2BlockInfo { + block_info: BlockInfo { timestamp: 101, ..Default::default() }, + ..Default::default() + }; + + // Setup batch queue deps + let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; + let mut mock = TestBatchQueueProvider::new(batch_vec); + mock.origin = Some(BlockInfo::default()); + let fetcher = TestL2ChainProvider::default(); + + // Configure batch queue + let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); + bq.origin = Some(BlockInfo::default()); // Set the origin + bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks + bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq + + // Add the batch to the batch queue + bq.add_batch(Batch::Single(batch), parent).await.unwrap(); + assert!(bq.batches.is_empty()); + } + #[tokio::test] async fn test_derive_next_batch_missing_origin() { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index 42e65323c..2d7b24fbc 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -12,7 +12,7 @@ use async_trait::async_trait; use core::fmt::Debug; use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; -use tracing::trace; +use tracing::{error, trace}; /// Provides [Batch]es for the [BatchStream] stage. #[async_trait] @@ -153,6 +153,14 @@ where return Err(PipelineError::Eof.temp()); } + BatchValidity::Past => { + if !self.is_active()? { + error!(target: "batch-stream", "BatchValidity::Past is not allowed pre-holocene"); + return Err(PipelineError::InvalidBatchValidity.crit()); + } + + return Err(PipelineError::Eof.temp()); + } BatchValidity::Undecided | BatchValidity::Future => { return Err(PipelineError::NotEnoughData.temp()) }