Skip to content

Commit

Permalink
feat(derive): Batch Queue Logging (#86)
Browse files Browse the repository at this point in the history
* feat(derive): batch queue testing

* fix(derive): lints

* fix(derive): merge conflicts

* fix(derive): bad upstream sync

* fix(derive): batch queue logging
  • Loading branch information
refcell authored Apr 5, 2024
1 parent f9041df commit 8f5c7ee
Showing 1 changed file with 57 additions and 21 deletions.
78 changes: 57 additions & 21 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides batches for the [BatchQueue] stage.
/// Provides [Batch]es for the [BatchQueue] stage.
#[async_trait]
pub trait BatchQueueProvider {
/// Returns the next batch in the [ChannelReader] stage, if the stage is not complete.
/// Returns the next [Batch] in the [ChannelReader] stage, if the stage is not complete.
/// This function can only be called once while the stage is in progress, and will return
/// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is
/// complete and the batch has been consumed, an [StageError::Eof] error is returned.
Expand Down Expand Up @@ -117,8 +117,10 @@ where

// Get the epoch
let epoch = self.l1_blocks[0];
// TODO: log that the next batch is being derived.
// TODO: metrice the time it takes to derive the next batch.
self.telemetry.write(
Bytes::from(alloc::format!("Deriving next batch for epoch: {}", epoch.number)),
LogLevel::Info,
);

// Note: epoch origin can now be one block ahead of the L2 Safe Head
// This is in the case where we auto generate all batches in an epoch & advance the epoch
Expand Down Expand Up @@ -149,9 +151,14 @@ where
remaining.push(batch.clone());
}
BatchValidity::Drop => {
// TODO: Log the drop reason with WARN level.
// batch.log_context(self.log).warn("Dropping batch", "parent", parent.id(),
// "parent_time", parent.info.time);
self.telemetry.write(
Bytes::from(alloc::format!(
"Dropping batch: {:?}, parent: {}",
batch.batch,
parent.block_info
)),
LogLevel::Warning,
);
continue;
}
BatchValidity::Accept => {
Expand All @@ -171,7 +178,10 @@ where
self.batches = remaining;

if let Some(nb) = next_batch {
// TODO: log that the next batch is found.
self.telemetry.write(
Bytes::from(alloc::format!("Next batch found: {:?}", nb.batch)),
LogLevel::Info,
);
return Ok(nb.batch);
}

Expand All @@ -182,15 +192,22 @@ where
expiry_epoch < parent.l1_origin.number;
let first_of_epoch = epoch.number == parent.l1_origin.number + 1;

// TODO: Log the empty batch generation.

// If the sequencer window did not expire,
// there is still room to receive batches for the current epoch.
// No need to force-create empty batch(es) towards the next epoch yet.
if !force_empty_batches {
return Err(StageError::Eof);
}

self.telemetry.write(
Bytes::from(alloc::format!(
"Generating empty batches. Epoch: {}, Parent: {}",
epoch.number,
parent.l1_origin.number
)),
LogLevel::Info,
);

// The next L1 block is needed to proceed towards the next epoch.
if self.l1_blocks.len() < 2 {
return Err(StageError::Eof);
Expand All @@ -202,7 +219,10 @@ where
// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always
// generate a batch to ensure that we at least have one batch per epoch.
if next_timestamp < next_epoch.timestamp || first_of_epoch {
// TODO: log next batch generation.
self.telemetry.write(
Bytes::from(alloc::format!("Generating empty batch for epoch: {}", epoch.number)),
LogLevel::Info,
);
return Ok(Batch::Single(SingleBatch {
parent_hash: parent.block_info.hash,
epoch_num: epoch.number,
Expand All @@ -214,17 +234,24 @@ where

// At this point we have auto generated every batch for the current epoch
// that we can, so we can advance to the next epoch.
// TODO: log that the epoch is advanced.
// bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp,
// "next_epoch_time", nextEpoch.Time)
self.telemetry.write(
Bytes::from(alloc::format!(
"Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}",
next_epoch.number,
next_timestamp,
next_epoch.timestamp
)),
LogLevel::Info,
);
self.l1_blocks.remove(0);
Err(StageError::Eof)
}

/// Adds a batch to the queue.
pub fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> {
if self.l1_blocks.is_empty() {
// TODO: log that the batch cannot be added without an origin
self.telemetry
.write(Bytes::from("Cannot add batch without an origin"), LogLevel::Error);
panic!("Cannot add batch without an origin");
}
let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?;
Expand Down Expand Up @@ -259,9 +286,14 @@ where
// Parent block does not match the next batch.
// Means the previously returned batch is invalid.
// Drop cached batches and find another batch.
self.telemetry.write(
Bytes::from(alloc::format!(
"Parent block does not match the next batch. Dropping {} cached batches.",
self.next_spans.len()
)),
LogLevel::Warning,
);
self.next_spans.clear();
// TODO: log that the provided parent block does not match the next batch.
// TODO: metrice the internal batch drop.
}

// If the epoch is advanced, update the l1 blocks.
Expand All @@ -273,8 +305,7 @@ where
for (i, block) in self.l1_blocks.iter().enumerate() {
if parent.l1_origin.number == block.number {
self.l1_blocks.drain(0..i);
self.telemetry
.write(Bytes::from("Advancing internal L1 blocks"), LogLevel::Info);
self.telemetry.write(Bytes::from("Adancing epoch"), LogLevel::Info);
break;
}
}
Expand Down Expand Up @@ -303,7 +334,10 @@ where
// reset is called, the origin behind is false.
self.l1_blocks.clear();
}
// TODO: log batch queue origin advancement.
self.telemetry.write(
Bytes::from(alloc::format!("Batch queue advanced origin: {:?}", self.origin)),
LogLevel::Info,
);
}

// Load more data into the batch queue.
Expand All @@ -313,7 +347,8 @@ where
if !origin_behind {
self.add_batch(b, parent).ok();
} else {
// TODO: metrice when the batch is dropped because the origin is behind.
self.telemetry
.write(Bytes::from("[Batch Dropped]: Origin is behind"), LogLevel::Warning);
}
}
Err(StageError::Eof) => out_of_data = true,
Expand Down Expand Up @@ -445,6 +480,7 @@ mod tests {

// TODO(refcell): The batch reader here loops forever.
// Maybe the cursor isn't being used?
// UPDATE: the batch data is not valid
// #[tokio::test]
// async fn test_next_batch_succeeds() {
// let mut reader = new_batch_reader();
Expand Down

0 comments on commit 8f5c7ee

Please sign in to comment.