Skip to content

Commit

Permalink
chore(derive): Add tracing to ChannelAssembler (#701)
Browse files Browse the repository at this point in the history
* chore(derive): Add tracing to `ChannelAssembler`

* add log assertions
  • Loading branch information
clabby authored Oct 17, 2024
1 parent 2e3c6d5 commit e884f6e
Showing 1 changed file with 72 additions and 9 deletions.
81 changes: 72 additions & 9 deletions crates/derive/src/stages/channel/channel_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use crate::{
prelude::{OriginProvider, PipelineError},
};
use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::Bytes;
use alloy_primitives::{hex, Bytes};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BlockInfo, Channel};
use tracing::{debug, error, info, warn};

/// The [ChannelAssembler] stage is responsible for assembling the [Frame]s from the [FrameQueue]
/// stage into a raw compressed [Channel].
Expand Down Expand Up @@ -70,28 +71,56 @@ where
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;

// Time out the channel if it has timed out.
if self.channel.is_some() && self.is_timed_out()? {
#[cfg(feature = "metrics")]
{
let open_block_number =
self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default();
crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64);
if let Some(channel) = self.channel.as_ref() {
if self.is_timed_out()? {
#[cfg(feature = "metrics")]
{
let open_block_number =
self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default();
crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64);
}
warn!(
target: "channel-assembler",
"Channel (ID: {}) timed out at L1 origin #{}, open block #{}. Discarding channel.",
hex::encode(channel.id()),
origin.number,
channel.open_block_number()
);
self.channel = None;
}
self.channel = None;
}

// Grab the next frame from the previous stage.
let next_frame = self.prev.next_frame().await?;

// Start a new channel if the frame number is 0.
if next_frame.number == 0 {
info!(
target: "channel-assembler",
"Starting new channel (ID: {}) at L1 origin #{}",
hex::encode(next_frame.id),
origin.number
);
self.channel = Some(Channel::new(next_frame.id, origin));
}

if let Some(channel) = self.channel.as_mut() {
// Add the frame to the channel. If this fails, return NotEnoughData and discard the
// frame.
debug!(
target: "channel-assembler",
"Adding frame #{} to channel (ID: {}) at L1 origin #{}",
next_frame.number,
hex::encode(channel.id()),
origin.number
);
if channel.add_frame(next_frame, origin).is_err() {
error!(
target: "channel-assembler",
"Failed to add frame to channel (ID: {}) at L1 origin #{}",
hex::encode(channel.id()),
origin.number
);
return Err(PipelineError::NotEnoughData.temp());
}

Expand All @@ -100,6 +129,12 @@ where
let channel_bytes =
channel.frame_data().ok_or(PipelineError::ChannelNotFound.crit())?;

info!(
target: "channel-assembler",
"Channel (ID: {}) ready for decompression.",
hex::encode(channel.id()),
);

// Reset the channel and return the compressed bytes.
self.channel = None;
return Ok(Some(channel_bytes));
Expand Down Expand Up @@ -148,14 +183,20 @@ mod test {
use crate::{
prelude::PipelineError,
stages::{frame_queue::tests::new_test_frames, ChannelReaderProvider},
test_utils::TestNextFrameProvider,
test_utils::{CollectingLayer, TestNextFrameProvider, TraceStorage},
};
use alloc::sync::Arc;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;
use tracing::Level;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::test]
async fn test_assembler_channel_timeout() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

let frames = new_test_frames(2);
let mock = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect());
let cfg = Arc::new(RollupConfig::default());
Expand All @@ -178,6 +219,18 @@ mod test {
assert!(assembler.is_timed_out().unwrap());
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_none());

// Assert that the info log was emitted.
let info_logs = trace_store.get_by_level(Level::INFO);
assert_eq!(info_logs.len(), 1);
let info_str = "Starting new channel";
assert!(info_logs[0].contains(info_str));

// Assert that the warning log was emitted.
let warning_logs = trace_store.get_by_level(Level::WARN);
assert_eq!(warning_logs.len(), 1);
let warn_str = "timed out at L1 origin";
assert!(warning_logs[0].contains(warn_str));
}

#[tokio::test]
Expand All @@ -196,6 +249,10 @@ mod test {

#[tokio::test]
async fn test_assembler_already_built() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

let frames = new_test_frames(2);
let mock = TestNextFrameProvider::new(frames.clone().into_iter().rev().map(Ok).collect());
let cfg = Arc::new(RollupConfig::default());
Expand All @@ -217,5 +274,11 @@ mod test {
// Send in the second frame again. This should return the channel bytes.
assert!(assembler.next_data().await.unwrap().is_some());
assert!(assembler.channel.is_none());

// Assert that the error log was emitted.
let error_logs = trace_store.get_by_level(Level::ERROR);
assert_eq!(error_logs.len(), 1);
let error_str = "Failed to add frame to channel";
assert!(error_logs[0].contains(error_str));
}
}

0 comments on commit e884f6e

Please sign in to comment.