Skip to content

Commit

Permalink
clean up trusted sync loop (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell authored Jun 26, 2024
1 parent 105d032 commit 1017e32
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 24 deletions.
4 changes: 2 additions & 2 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use alloy_rlp::Decodable;
use async_trait::async_trait;
use core::fmt::Debug;
use miniz_oxide::inflate::decompress_to_vec_zlib;
use tracing::{error, warn};
use tracing::{debug, error, warn};

/// ZLIB Deflate Compression Method.
pub(crate) const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
Expand Down Expand Up @@ -90,7 +90,7 @@ where
async fn next_batch(&mut self) -> StageResult<Batch> {
crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_reader"], timer);
if let Err(e) = self.set_batch_reader().await {
warn!(target: "channel-reader", "Failed to set batch reader: {:?}", e);
debug!(target: "channel-reader", "Failed to set batch reader: {:?}", e);
self.next_channel();
crate::timer!(DISCARD, timer);
return Err(e);
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy_primitives::Bytes;
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
use tracing::{error, trace, warn};
use tracing::{debug, error, trace};

/// Provides data frames for the [FrameQueue] stage.
#[async_trait]
Expand Down Expand Up @@ -81,7 +81,7 @@ where
}
}
Err(e) => {
warn!(target: "frame-queue", "Failed to retrieve data: {:?}", e);
debug!(target: "frame-queue", "Failed to retrieve data: {:?}", e);
return Err(e); // Bubble up potential EOF error without wrapping.
}
}
Expand Down
52 changes: 32 additions & 20 deletions examples/trusted-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use clap::Parser;
use kona_derive::online::*;
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};

mod cli;
mod metrics;
Expand Down Expand Up @@ -68,39 +68,51 @@ async fn sync(cli: cli::Cli) -> Result<()> {
new_online_pipeline(cfg, l1_provider, dap, l2_provider.clone(), attributes, tip);

// Continuously step on the pipeline and validate payloads.
let mut advance_cursor_flag = false;
loop {
info!(target: LOG_TARGET, "Validated payload attributes number {}", metrics::DERIVED_ATTRIBUTES_COUNT.get());
info!(target: LOG_TARGET, "Pending l2 safe head num: {}", cursor.block_info.number);
match pipeline.step(cursor).await {
Ok(_) => info!(target: "loop", "Stepped derivation pipeline"),
Err(e) => warn!(target: "loop", "Error stepping derivation pipeline: {:?}", e),
}

if let Some(attributes) = pipeline.next_attributes() {
if !validator.validate(&attributes).await {
error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash);
return Ok(());
}
metrics::DERIVED_ATTRIBUTES_COUNT.inc();
if advance_cursor_flag {
match l2_provider.l2_block_info_by_number(cursor.block_info.number + 1).await {
Ok(bi) => {
cursor = bi;
metrics::SAFE_L2_HEAD.inc();
advance_cursor_flag = false;
}
Err(e) => {
error!(target: LOG_TARGET, "Failed to fetch next pending l2 safe head: {}, err: {:?}", cursor.block_info.number + 1, e);
// We don't need to step on the pipeline if we failed to fetch the next pending
// l2 safe head.
continue;
}
}
println!(
"Validated Payload Attributes {} [L2 Block Num: {}] [L2 Timestamp: {}] [L1 Origin Block Num: {}]",
metrics::DERIVED_ATTRIBUTES_COUNT.get(),
attributes.parent.block_info.number + 1,
attributes.attributes.timestamp,
pipeline.origin().unwrap().number,
);
info!(target: LOG_TARGET, "attributes: {:#?}", attributes);
}
match pipeline.step(cursor).await {
Ok(_) => info!(target: "loop", "Stepped derivation pipeline"),
Err(e) => debug!(target: "loop", "Error stepping derivation pipeline: {:?}", e),
}

let attributes = if let Some(attributes) = pipeline.next_attributes() {
attributes
} else {
debug!(target: LOG_TARGET, "No attributes to validate");
continue;
};

if !validator.validate(&attributes).await {
error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash);
metrics::FAILED_PAYLOAD_DERIVATION.inc();
}
// If we validated payload attributes, we should advance the cursor.
advance_cursor_flag = true;
metrics::DERIVED_ATTRIBUTES_COUNT.inc();
println!(
"Validated Payload Attributes {} [L2 Block Num: {}] [L2 Timestamp: {}] [L1 Origin Block Num: {}]",
metrics::DERIVED_ATTRIBUTES_COUNT.get(),
attributes.parent.block_info.number + 1,
attributes.attributes.timestamp,
pipeline.origin().unwrap().number,
);
debug!(target: LOG_TARGET, "attributes: {:#?}", attributes);
}
}
3 changes: 3 additions & 0 deletions examples/trusted-sync/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use lazy_static::lazy_static;
use prometheus::register_int_counter;

lazy_static! {
pub static ref FAILED_PAYLOAD_DERIVATION: IntCounter =
register_int_counter!("failed_payload_derivation", "Number of failed payload derivations")
.unwrap();
pub static ref DERIVED_ATTRIBUTES_COUNT: IntCounter = register_int_counter!(
"derived_attributes_count",
"Number of total payload attributes derived"
Expand Down

0 comments on commit 1017e32

Please sign in to comment.