From e34cb93c79c2d99f1a67c086f15bf6e9bc4d514b Mon Sep 17 00:00:00 2001 From: clabby Date: Thu, 3 Oct 2024 14:47:51 -0400 Subject: [PATCH] feat(derive): Holocene flush signal (#612) --- crates/derive/src/pipeline/core.rs | 51 +++++++++++++++++--- crates/derive/src/stages/attributes_queue.rs | 32 +++++++++--- crates/derive/src/stages/batch_queue.rs | 22 ++++++++- crates/derive/src/stages/batch_stream.rs | 22 ++++++++- crates/derive/src/stages/channel_reader.rs | 15 +++++- crates/derive/src/traits/attributes.rs | 3 +- crates/derive/src/traits/mod.rs | 2 +- crates/derive/src/traits/stages.rs | 7 +++ 8 files changed, 135 insertions(+), 19 deletions(-) diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 9dd489c60..7ec1ac29e 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -4,7 +4,10 @@ use super::{ NextAttributes, OriginAdvancer, OriginProvider, Pipeline, PipelineError, PipelineResult, ResettableStage, StepResult, }; -use crate::{errors::PipelineErrorKind, traits::Signal}; +use crate::{ + errors::PipelineErrorKind, + traits::{FlushableStage, Signal}, +}; use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; @@ -18,7 +21,13 @@ use tracing::{error, trace, warn}; #[derive(Debug)] pub struct DerivationPipeline where - S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send, + S: NextAttributes + + ResettableStage + + FlushableStage + + OriginProvider + + OriginAdvancer + + Debug + + Send, P: L2ChainProvider + Send + Sync + Debug, { /// A handle to the next attributes. @@ -35,7 +44,13 @@ where impl DerivationPipeline where - S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send, + S: NextAttributes + + ResettableStage + + FlushableStage + + OriginProvider + + OriginAdvancer + + Debug + + Send, P: L2ChainProvider + Send + Sync + Debug, { /// Creates a new instance of the [DerivationPipeline]. @@ -50,7 +65,13 @@ where impl OriginProvider for DerivationPipeline where - S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send, + S: NextAttributes + + ResettableStage + + FlushableStage + + OriginProvider + + OriginAdvancer + + Debug + + Send, P: L2ChainProvider + Send + Sync + Debug, { fn origin(&self) -> Option { @@ -60,7 +81,14 @@ where impl Iterator for DerivationPipeline where - S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send + Sync, + S: NextAttributes + + ResettableStage + + FlushableStage + + OriginProvider + + OriginAdvancer + + Debug + + Send + + Sync, P: L2ChainProvider + Send + Sync + Debug, { type Item = OptimismAttributesWithParent; @@ -73,7 +101,14 @@ where #[async_trait] impl Pipeline for DerivationPipeline where - S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send + Sync, + S: NextAttributes + + ResettableStage + + FlushableStage + + OriginProvider + + OriginAdvancer + + Debug + + Send + + Sync, P: L2ChainProvider + Send + Sync + Debug, { /// Peeks at the next prepared [OptimismAttributesWithParent] from the pipeline. @@ -118,7 +153,9 @@ where } } } - _ => unimplemented!("Signal not implemented"), + Signal::FlushChannel => { + self.attributes.flush_channel().await?; + } } Ok(()) } diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index b1d184bdc..7a414b325 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,5 +1,13 @@ //! Contains the logic for the `AttributesQueue` stage. +use crate::{ + batch::SingleBatch, + errors::{PipelineError, PipelineResult, ResetError}, + traits::{ + AttributesBuilder, FlushableStage, NextAttributes, OriginAdvancer, OriginProvider, + ResettableStage, + }, +}; use alloc::{boxed::Box, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; @@ -8,12 +16,6 @@ use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::{OptimismAttributesWithParent, OptimismPayloadAttributes}; use tracing::info; -use crate::{ - batch::SingleBatch, - errors::{PipelineError, PipelineResult, ResetError}, - traits::{AttributesBuilder, NextAttributes, OriginAdvancer, OriginProvider, ResettableStage}, -}; - /// [AttributesProvider] is a trait abstraction that generalizes the [BatchQueue] stage. /// /// [BatchQueue]: crate::stages::BatchQueue @@ -207,6 +209,24 @@ where } } +#[async_trait] +impl FlushableStage for AttributesQueue +where + P: AttributesProvider + + OriginAdvancer + + OriginProvider + + ResettableStage + + FlushableStage + + Debug + + Send, + AB: AttributesBuilder + Debug + Send, +{ + async fn flush_channel(&mut self) -> PipelineResult<()> { + self.batch = None; + self.prev.flush_channel().await + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 489e7676f..64bd75aa3 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -4,7 +4,7 @@ use crate::{ batch::{Batch, BatchValidity, BatchWithInclusionBlock, SingleBatch}, errors::{PipelineEncodingError, PipelineError, PipelineErrorKind, PipelineResult, ResetError}, stages::attributes_queue::AttributesProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; @@ -467,6 +467,26 @@ where } } +#[async_trait] +impl FlushableStage for BatchQueue +where + P: BatchQueueProvider + + OriginAdvancer + + OriginProvider + + ResettableStage + + FlushableStage + + Send + + Debug, + BF: L2ChainProvider + Send + Debug, +{ + async fn flush_channel(&mut self) -> PipelineResult<()> { + self.batches.clear(); + self.l1_blocks.clear(); + self.next_spans.clear(); + self.prev.flush_channel().await + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index 0d8722151..4b6cb3509 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -5,7 +5,7 @@ use crate::{ errors::{PipelineEncodingError, PipelineError, PipelineResult}, pipeline::L2ChainProvider, stages::BatchQueueProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use async_trait::async_trait; @@ -195,12 +195,32 @@ where { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { self.prev.reset(base, cfg).await?; + self.buffer.clear(); self.span.take(); crate::inc!(STAGE_RESETS, &["batch-span"]); Ok(()) } } +#[async_trait] +impl FlushableStage for BatchStream +where + P: BatchStreamProvider + + OriginAdvancer + + OriginProvider + + ResettableStage + + FlushableStage + + Debug + + Send, + BF: L2ChainProvider + Debug + Send, +{ + async fn flush_channel(&mut self) -> PipelineResult<()> { + self.span.take(); + self.buffer.clear(); + self.prev.flush_channel().await + } +} + #[cfg(test)] mod test { use super::*; diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 5d8b356e9..c847784c1 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -4,7 +4,7 @@ use crate::{ batch::Batch, errors::{PipelineError, PipelineResult}, stages::{decompress_brotli, BatchStreamProvider}, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; use alloy_primitives::Bytes; @@ -157,6 +157,19 @@ where } } +#[async_trait] +impl

FlushableStage for ChannelReader

+where + P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, +{ + async fn flush_channel(&mut self) -> PipelineResult<()> { + // Drop the current in-progress channel. + warn!(target: "channel-reader", "Flushed channel"); + self.next_batch = None; + Ok(()) + } +} + /// Batch Reader provides a function that iteratively consumes batches from the reader. /// The L1Inclusion block is also provided at creation time. /// Warning: the batch reader can read every batch-type. diff --git a/crates/derive/src/traits/attributes.rs b/crates/derive/src/traits/attributes.rs index 6e271850f..8dc4e5c86 100644 --- a/crates/derive/src/traits/attributes.rs +++ b/crates/derive/src/traits/attributes.rs @@ -1,13 +1,12 @@ //! Contains traits for working with payload attributes and their providers. +use crate::errors::PipelineResult; use alloc::boxed::Box; use alloy_eips::BlockNumHash; use async_trait::async_trait; use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::{OptimismAttributesWithParent, OptimismPayloadAttributes}; -use crate::errors::PipelineResult; - /// [NextAttributes] defines the interface for pulling attributes from /// the top level `AttributesQueue` stage of the pipeline. #[async_trait] diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index ebee59385..eece8a732 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -14,7 +14,7 @@ mod reset; pub use reset::ResetProvider; mod stages; -pub use stages::{OriginAdvancer, OriginProvider, ResettableStage}; +pub use stages::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage}; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index f32ce2117..fa783cf82 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -14,6 +14,13 @@ pub trait ResettableStage { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()>; } +/// Describes the functionality of a stage that can receive a channel flush signal. +#[async_trait] +pub trait FlushableStage { + /// Flushes the current channel. + async fn flush_channel(&mut self) -> PipelineResult<()>; +} + /// Provides a method for accessing the pipeline's current L1 origin. pub trait OriginProvider { /// Returns the optional L1 [BlockInfo] origin.