Skip to content

Commit

Permalink
chore(derive): add targets to stage logs (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell authored Jun 25, 2024
1 parent c024853 commit 7f1a77c
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 24 deletions.
3 changes: 2 additions & 1 deletion crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ where
attributes.transactions.extend(batch.transactions);

info!(
target: "attributes-queue",
"generated attributes in payload queue: txs={}, timestamp={}",
tx_count, batch.timestamp
);
Expand Down Expand Up @@ -196,7 +197,7 @@ where
system_config: &SystemConfig,
) -> StageResult<()> {
self.prev.reset(block_info, system_config).await?;
info!("resetting attributes queue");
info!(target: "attributes-queue", "resetting attributes queue");
self.batch = None;
self.is_last_in_span = false;
Err(StageError::Eof)
Expand Down
19 changes: 11 additions & 8 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ where

// Get the epoch
let epoch = self.l1_blocks[0];
info!("Deriving next batch for epoch: {}", epoch.number);
info!(target: "batch-queue", "Deriving next batch for epoch: {}", epoch.number);

// Note: epoch origin can now be one block ahead of the L2 Safe Head
// This is in the case where we auto generate all batches in an epoch & advance the epoch
Expand Down Expand Up @@ -151,7 +151,7 @@ where
remaining.push(batch.clone());
}
BatchValidity::Drop => {
warn!("Dropping batch: {:?}, parent: {}", batch.batch, parent.block_info);
warn!(target: "batch-queue", "Dropping batch: {:?}, parent: {}", batch.batch, parent.block_info);
continue;
}
BatchValidity::Accept => {
Expand All @@ -171,7 +171,7 @@ where
self.batches = remaining;

if let Some(nb) = next_batch {
info!("Next batch found: {:?}", nb.batch);
info!(target: "batch-queue", "Next batch found: {:?}", nb.batch);
return Ok(nb.batch);
}

Expand All @@ -190,6 +190,7 @@ where
}

info!(
target: "batch-queue",
"Generating empty batches for epoch: {} | parent: {}",
epoch.number, parent.l1_origin.number
);
Expand All @@ -205,7 +206,7 @@ where
// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always
// generate a batch to ensure that we at least have one batch per epoch.
if next_timestamp < next_epoch.timestamp || first_of_epoch {
info!("Generating empty batch for epoch: {}", epoch.number);
info!(target: "batch-queue", "Generating empty batch for epoch: {}", epoch.number);
return Ok(Batch::Single(SingleBatch {
parent_hash: parent.block_info.hash,
epoch_num: epoch.number,
Expand All @@ -218,6 +219,7 @@ where
// At this point we have auto generated every batch for the current epoch
// that we can, so we can advance to the next epoch.
info!(
target: "batch-queue",
"Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}",
next_epoch.number, next_timestamp, next_epoch.timestamp
);
Expand All @@ -228,7 +230,7 @@ where
/// Adds a batch to the queue.
pub async fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> {
if self.l1_blocks.is_empty() {
error!("Cannot add batch without an origin");
error!(target: "batch-queue", "Cannot add batch without an origin");
panic!("Cannot add batch without an origin");
}
let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?;
Expand Down Expand Up @@ -274,6 +276,7 @@ where
// Means the previously returned batch is invalid.
// Drop cached batches and find another batch.
warn!(
target: "batch-queue",
"Parent block does not match the next batch. Dropping {} cached batches.",
self.next_spans.len()
);
Expand All @@ -289,7 +292,7 @@ where
for (i, block) in self.l1_blocks.iter().enumerate() {
if parent.l1_origin.number == block.number {
self.l1_blocks.drain(0..i);
info!("Advancing epoch");
info!(target: "batch-queue", "Advancing epoch");
break;
}
}
Expand Down Expand Up @@ -318,7 +321,7 @@ where
// reset is called, the origin behind is false.
self.l1_blocks.clear();
}
info!("Advancing batch queue origin: {:?}", self.origin);
info!(target: "batch-queue", "Advancing batch queue origin: {:?}", self.origin);
}

// Load more data into the batch queue.
Expand All @@ -328,7 +331,7 @@ where
if !origin_behind {
self.add_batch(b, parent).await.ok();
} else {
warn!("Dropping batch: Origin is behind");
warn!(target: "batch-queue", "Dropping batch: Origin is behind");
}
}
Err(StageError::Eof) => out_of_data = true,
Expand Down
10 changes: 5 additions & 5 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
use hashbrown::HashMap;
use tracing::{debug, warn};
use tracing::{trace, warn};

/// Provides frames for the [ChannelBank] stage.
#[async_trait]
Expand Down Expand Up @@ -88,14 +88,14 @@ where

// Check if the channel is not timed out. If it has, ignore the frame.
if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number {
warn!("Channel {:?} timed out", frame.id);
warn!(target: "channel-bank", "Channel {:?} timed out", frame.id);
return Ok(());
}

// Ingest the frame. If it fails, ignore the frame.
let frame_id = frame.id;
if current_channel.add_frame(frame, origin).is_err() {
warn!("Failed to add frame to channel: {:?}", frame_id);
warn!(target: "channel-bank", "Failed to add frame to channel: {:?}", frame_id);
return Ok(());
}

Expand All @@ -108,7 +108,7 @@ where
pub fn read(&mut self) -> StageResult<Option<Bytes>> {
// Bail if there are no channels to read from.
if self.channel_queue.is_empty() {
debug!("No channels to read from");
trace!(target: "channel-bank", "No channels to read from");
return Err(StageError::Eof);
}

Expand All @@ -118,7 +118,7 @@ where
let channel = self.channels.get(&first).ok_or(StageError::ChannelNotFound)?;
let origin = self.origin().ok_or(StageError::MissingOrigin)?;
if channel.open_block_number() + self.cfg.channel_timeout < origin.number {
warn!("Channel {:?} timed out", first);
warn!(target: "channel-bank", "Channel {:?} timed out", first);
self.channels.remove(&first);
self.channel_queue.pop_front();
return Ok(None);
Expand Down
10 changes: 5 additions & 5 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use alloy_rlp::Decodable;
use async_trait::async_trait;
use core::fmt::Debug;
use miniz_oxide::inflate::decompress_to_vec_zlib;
use tracing::warn;
use tracing::{error, warn};

/// ZLIB Deflate Compression Method.
pub(crate) const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
Expand Down Expand Up @@ -89,7 +89,7 @@ where
{
async fn next_batch(&mut self) -> StageResult<Batch> {
if let Err(e) = self.set_batch_reader().await {
warn!("Failed to set batch reader: {:?}", e);
warn!(target: "channel-reader", "Failed to set batch reader: {:?}", e);
self.next_channel();
return Err(e);
}
Expand Down Expand Up @@ -161,7 +161,7 @@ impl BatchReader {
if let Some(data) = self.data.take() {
// Peek at the data to determine the compression type.
if data.is_empty() {
tracing::warn!(target: "batch-reader", "Data is too short to determine compression type, skipping batch");
warn!(target: "batch-reader", "Data is too short to determine compression type, skipping batch");
return None;
}
let compression_type = data[0];
Expand All @@ -173,7 +173,7 @@ impl BatchReader {
brotli_used = true;
self.decompressed = decompress_brotli(&data[1..]).ok()?;
} else {
tracing::error!(target: "batch-reader", "Unsupported compression type: {:x}, skipping batch", compression_type);
error!(target: "batch-reader", "Unsupported compression type: {:x}, skipping batch", compression_type);
return None;
}
}
Expand All @@ -185,7 +185,7 @@ impl BatchReader {

// Confirm that brotli decompression was performed *after* the Fjord hardfork.
if brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
tracing::warn!(target: "batch-reader", "Brotli compression used before Fjord hardfork, skipping batch");
warn!(target: "batch-reader", "Brotli compression used before Fjord hardfork, skipping batch");
return None;
}

Expand Down
8 changes: 4 additions & 4 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy_primitives::Bytes;
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
use tracing::{debug, error, warn};
use tracing::{error, trace, warn};

/// Provides data frames for the [FrameQueue] stage.
#[async_trait]
Expand Down Expand Up @@ -75,19 +75,19 @@ where
} else {
// There may be more frames in the queue for the
// pipeline to advance, so don't return an error here.
error!("Failed to parse frames from data.");
error!(target: "frame-queue", "Failed to parse frames from data.");
}
}
Err(e) => {
warn!("Failed to retrieve data: {:?}", e);
warn!(target: "frame-queue", "Failed to retrieve data: {:?}", e);
return Err(e); // Bubble up potential EOF error without wrapping.
}
}
}

// If we did not add more frames but still have more data, retry this function.
if self.queue.is_empty() {
debug!("Queue is empty after fetching data. Retrying next_frame.");
trace!(target: "frame-queue", "Queue is empty after fetching data. Retrying next_frame.");
return Err(StageError::NotEnoughData);
}

Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<F: ChainProvider + Send> OriginAdvancer for L1Traversal<F> {
let block = match self.block {
Some(block) => block,
None => {
warn!("L1Traversal: No block to advance to");
warn!(target: "l1-traversal", "Missing current block, can't advance origin with no reference.");
return Err(StageError::Eof);
}
};
Expand Down

0 comments on commit 7f1a77c

Please sign in to comment.