diff --git a/crates/derive/src/stages/channel/channel_assembler.rs b/crates/derive/src/stages/channel/channel_assembler.rs index a4564ef60..b727baa3a 100644 --- a/crates/derive/src/stages/channel/channel_assembler.rs +++ b/crates/derive/src/stages/channel/channel_assembler.rs @@ -6,11 +6,12 @@ use crate::{ prelude::{OriginProvider, PipelineError}, }; use alloc::{boxed::Box, sync::Arc}; -use alloy_primitives::Bytes; +use alloy_primitives::{hex, Bytes}; use async_trait::async_trait; use core::fmt::Debug; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, Channel}; +use tracing::{debug, error, info, warn}; /// The [ChannelAssembler] stage is responsible for assembling the [Frame]s from the [FrameQueue] /// stage into a raw compressed [Channel]. @@ -70,14 +71,23 @@ where let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; // Time out the channel if it has timed out. - if self.channel.is_some() && self.is_timed_out()? { - #[cfg(feature = "metrics")] - { - let open_block_number = - self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default(); - crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64); + if let Some(channel) = self.channel.as_ref() { + if self.is_timed_out()? { + #[cfg(feature = "metrics")] + { + let open_block_number = + self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default(); + crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64); + } + warn!( + target: "channel-assembler", + "Channel (ID: {}) timed out at L1 origin #{}, open block #{}. Discarding channel.", + hex::encode(channel.id()), + origin.number, + channel.open_block_number() + ); + self.channel = None; } - self.channel = None; } // Grab the next frame from the previous stage. @@ -85,13 +95,32 @@ where // Start a new channel if the frame number is 0. if next_frame.number == 0 { + info!( + target: "channel-assembler", + "Starting new channel (ID: {}) at L1 origin #{}", + hex::encode(next_frame.id), + origin.number + ); self.channel = Some(Channel::new(next_frame.id, origin)); } if let Some(channel) = self.channel.as_mut() { // Add the frame to the channel. If this fails, return NotEnoughData and discard the // frame. + debug!( + target: "channel-assembler", + "Adding frame #{} to channel (ID: {}) at L1 origin #{}", + next_frame.number, + hex::encode(channel.id()), + origin.number + ); if channel.add_frame(next_frame, origin).is_err() { + error!( + target: "channel-assembler", + "Failed to add frame to channel (ID: {}) at L1 origin #{}", + hex::encode(channel.id()), + origin.number + ); return Err(PipelineError::NotEnoughData.temp()); } @@ -100,6 +129,12 @@ where let channel_bytes = channel.frame_data().ok_or(PipelineError::ChannelNotFound.crit())?; + info!( + target: "channel-assembler", + "Channel (ID: {}) ready for decompression.", + hex::encode(channel.id()), + ); + // Reset the channel and return the compressed bytes. self.channel = None; return Ok(Some(channel_bytes)); @@ -148,14 +183,20 @@ mod test { use crate::{ prelude::PipelineError, stages::{frame_queue::tests::new_test_frames, ChannelReaderProvider}, - test_utils::TestNextFrameProvider, + test_utils::{CollectingLayer, TestNextFrameProvider, TraceStorage}, }; use alloc::sync::Arc; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::BlockInfo; + use tracing::Level; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::test] async fn test_assembler_channel_timeout() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + let frames = new_test_frames(2); let mock = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect()); let cfg = Arc::new(RollupConfig::default()); @@ -178,6 +219,18 @@ mod test { assert!(assembler.is_timed_out().unwrap()); assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp()); assert!(assembler.channel.is_none()); + + // Assert that the info log was emitted. + let info_logs = trace_store.get_by_level(Level::INFO); + assert_eq!(info_logs.len(), 1); + let info_str = "Starting new channel"; + assert!(info_logs[0].contains(info_str)); + + // Assert that the warning log was emitted. + let warning_logs = trace_store.get_by_level(Level::WARN); + assert_eq!(warning_logs.len(), 1); + let warn_str = "timed out at L1 origin"; + assert!(warning_logs[0].contains(warn_str)); } #[tokio::test] @@ -196,6 +249,10 @@ mod test { #[tokio::test] async fn test_assembler_already_built() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + let frames = new_test_frames(2); let mock = TestNextFrameProvider::new(frames.clone().into_iter().rev().map(Ok).collect()); let cfg = Arc::new(RollupConfig::default()); @@ -217,5 +274,11 @@ mod test { // Send in the second frame again. This should return the channel bytes. assert!(assembler.next_data().await.unwrap().is_some()); assert!(assembler.channel.is_none()); + + // Assert that the error log was emitted. + let error_logs = trace_store.get_by_level(Level::ERROR); + assert_eq!(error_logs.len(), 1); + let error_str = "Failed to add frame to channel"; + assert!(error_logs[0].contains(error_str)); } }