From 7f1a77cc514e9c456aa59b634c547d9d763b403d Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 25 Jun 2024 08:29:07 -0400 Subject: [PATCH] chore(derive): add targets to stage logs (#310) --- crates/derive/src/stages/attributes_queue.rs | 3 ++- crates/derive/src/stages/batch_queue.rs | 19 +++++++++++-------- crates/derive/src/stages/channel_bank.rs | 10 +++++----- crates/derive/src/stages/channel_reader.rs | 10 +++++----- crates/derive/src/stages/frame_queue.rs | 8 ++++---- crates/derive/src/stages/l1_traversal.rs | 2 +- 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index a1ae39474..3f5775169 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -131,6 +131,7 @@ where attributes.transactions.extend(batch.transactions); info!( + target: "attributes-queue", "generated attributes in payload queue: txs={}, timestamp={}", tx_count, batch.timestamp ); @@ -196,7 +197,7 @@ where system_config: &SystemConfig, ) -> StageResult<()> { self.prev.reset(block_info, system_config).await?; - info!("resetting attributes queue"); + info!(target: "attributes-queue", "resetting attributes queue"); self.batch = None; self.is_last_in_span = false; Err(StageError::Eof) diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 8d7f89ca6..e54d3cbd1 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -119,7 +119,7 @@ where // Get the epoch let epoch = self.l1_blocks[0]; - info!("Deriving next batch for epoch: {}", epoch.number); + info!(target: "batch-queue", "Deriving next batch for epoch: {}", epoch.number); // Note: epoch origin can now be one block ahead of the L2 Safe Head // This is in the case where we auto generate all batches in an epoch & advance the epoch @@ -151,7 +151,7 @@ where remaining.push(batch.clone()); } BatchValidity::Drop => { - warn!("Dropping batch: {:?}, parent: {}", batch.batch, parent.block_info); + warn!(target: "batch-queue", "Dropping batch: {:?}, parent: {}", batch.batch, parent.block_info); continue; } BatchValidity::Accept => { @@ -171,7 +171,7 @@ where self.batches = remaining; if let Some(nb) = next_batch { - info!("Next batch found: {:?}", nb.batch); + info!(target: "batch-queue", "Next batch found: {:?}", nb.batch); return Ok(nb.batch); } @@ -190,6 +190,7 @@ where } info!( + target: "batch-queue", "Generating empty batches for epoch: {} | parent: {}", epoch.number, parent.l1_origin.number ); @@ -205,7 +206,7 @@ where // to preserve that L2 time >= L1 time. If this is the first block of the epoch, always // generate a batch to ensure that we at least have one batch per epoch. if next_timestamp < next_epoch.timestamp || first_of_epoch { - info!("Generating empty batch for epoch: {}", epoch.number); + info!(target: "batch-queue", "Generating empty batch for epoch: {}", epoch.number); return Ok(Batch::Single(SingleBatch { parent_hash: parent.block_info.hash, epoch_num: epoch.number, @@ -218,6 +219,7 @@ where // At this point we have auto generated every batch for the current epoch // that we can, so we can advance to the next epoch. info!( + target: "batch-queue", "Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}", next_epoch.number, next_timestamp, next_epoch.timestamp ); @@ -228,7 +230,7 @@ where /// Adds a batch to the queue. pub async fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> { if self.l1_blocks.is_empty() { - error!("Cannot add batch without an origin"); + error!(target: "batch-queue", "Cannot add batch without an origin"); panic!("Cannot add batch without an origin"); } let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?; @@ -274,6 +276,7 @@ where // Means the previously returned batch is invalid. // Drop cached batches and find another batch. warn!( + target: "batch-queue", "Parent block does not match the next batch. Dropping {} cached batches.", self.next_spans.len() ); @@ -289,7 +292,7 @@ where for (i, block) in self.l1_blocks.iter().enumerate() { if parent.l1_origin.number == block.number { self.l1_blocks.drain(0..i); - info!("Advancing epoch"); + info!(target: "batch-queue", "Advancing epoch"); break; } } @@ -318,7 +321,7 @@ where // reset is called, the origin behind is false. self.l1_blocks.clear(); } - info!("Advancing batch queue origin: {:?}", self.origin); + info!(target: "batch-queue", "Advancing batch queue origin: {:?}", self.origin); } // Load more data into the batch queue. @@ -328,7 +331,7 @@ where if !origin_behind { self.add_batch(b, parent).await.ok(); } else { - warn!("Dropping batch: Origin is behind"); + warn!(target: "batch-queue", "Dropping batch: Origin is behind"); } } Err(StageError::Eof) => out_of_data = true, diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index a2c778c2d..e9f3b072f 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -12,7 +12,7 @@ use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; use hashbrown::HashMap; -use tracing::{debug, warn}; +use tracing::{trace, warn}; /// Provides frames for the [ChannelBank] stage. #[async_trait] @@ -88,14 +88,14 @@ where // Check if the channel is not timed out. If it has, ignore the frame. if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number { - warn!("Channel {:?} timed out", frame.id); + warn!(target: "channel-bank", "Channel {:?} timed out", frame.id); return Ok(()); } // Ingest the frame. If it fails, ignore the frame. let frame_id = frame.id; if current_channel.add_frame(frame, origin).is_err() { - warn!("Failed to add frame to channel: {:?}", frame_id); + warn!(target: "channel-bank", "Failed to add frame to channel: {:?}", frame_id); return Ok(()); } @@ -108,7 +108,7 @@ where pub fn read(&mut self) -> StageResult> { // Bail if there are no channels to read from. if self.channel_queue.is_empty() { - debug!("No channels to read from"); + trace!(target: "channel-bank", "No channels to read from"); return Err(StageError::Eof); } @@ -118,7 +118,7 @@ where let channel = self.channels.get(&first).ok_or(StageError::ChannelNotFound)?; let origin = self.origin().ok_or(StageError::MissingOrigin)?; if channel.open_block_number() + self.cfg.channel_timeout < origin.number { - warn!("Channel {:?} timed out", first); + warn!(target: "channel-bank", "Channel {:?} timed out", first); self.channels.remove(&first); self.channel_queue.pop_front(); return Ok(None); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 4dc1f0f30..cd41c0985 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -12,7 +12,7 @@ use alloy_rlp::Decodable; use async_trait::async_trait; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec_zlib; -use tracing::warn; +use tracing::{error, warn}; /// ZLIB Deflate Compression Method. pub(crate) const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8; @@ -89,7 +89,7 @@ where { async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { - warn!("Failed to set batch reader: {:?}", e); + warn!(target: "channel-reader", "Failed to set batch reader: {:?}", e); self.next_channel(); return Err(e); } @@ -161,7 +161,7 @@ impl BatchReader { if let Some(data) = self.data.take() { // Peek at the data to determine the compression type. if data.is_empty() { - tracing::warn!(target: "batch-reader", "Data is too short to determine compression type, skipping batch"); + warn!(target: "batch-reader", "Data is too short to determine compression type, skipping batch"); return None; } let compression_type = data[0]; @@ -173,7 +173,7 @@ impl BatchReader { brotli_used = true; self.decompressed = decompress_brotli(&data[1..]).ok()?; } else { - tracing::error!(target: "batch-reader", "Unsupported compression type: {:x}, skipping batch", compression_type); + error!(target: "batch-reader", "Unsupported compression type: {:x}, skipping batch", compression_type); return None; } } @@ -185,7 +185,7 @@ impl BatchReader { // Confirm that brotli decompression was performed *after* the Fjord hardfork. if brotli_used && !cfg.is_fjord_active(batch.timestamp()) { - tracing::warn!(target: "batch-reader", "Brotli compression used before Fjord hardfork, skipping batch"); + warn!(target: "batch-reader", "Brotli compression used before Fjord hardfork, skipping batch"); return None; } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index edb7e4418..708f721fa 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -10,7 +10,7 @@ use alloy_primitives::Bytes; use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; -use tracing::{debug, error, warn}; +use tracing::{error, trace, warn}; /// Provides data frames for the [FrameQueue] stage. #[async_trait] @@ -75,11 +75,11 @@ where } else { // There may be more frames in the queue for the // pipeline to advance, so don't return an error here. - error!("Failed to parse frames from data."); + error!(target: "frame-queue", "Failed to parse frames from data."); } } Err(e) => { - warn!("Failed to retrieve data: {:?}", e); + warn!(target: "frame-queue", "Failed to retrieve data: {:?}", e); return Err(e); // Bubble up potential EOF error without wrapping. } } @@ -87,7 +87,7 @@ where // If we did not add more frames but still have more data, retry this function. if self.queue.is_empty() { - debug!("Queue is empty after fetching data. Retrying next_frame."); + trace!(target: "frame-queue", "Queue is empty after fetching data. Retrying next_frame."); return Err(StageError::NotEnoughData); } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 3a6102561..18677b3dc 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -76,7 +76,7 @@ impl OriginAdvancer for L1Traversal { let block = match self.block { Some(block) => block, None => { - warn!("L1Traversal: No block to advance to"); + warn!(target: "l1-traversal", "Missing current block, can't advance origin with no reference."); return Err(StageError::Eof); } };