From 1017e32538b28571da8a893c80b7039d8e458707 Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 26 Jun 2024 10:36:58 -0400 Subject: [PATCH] clean up trusted sync loop (#318) --- crates/derive/src/stages/channel_reader.rs | 4 +- crates/derive/src/stages/frame_queue.rs | 4 +- examples/trusted-sync/src/main.rs | 52 +++++++++++++--------- examples/trusted-sync/src/metrics.rs | 3 ++ 4 files changed, 39 insertions(+), 24 deletions(-) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 07b8ec96e..225c8e023 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -12,7 +12,7 @@ use alloy_rlp::Decodable; use async_trait::async_trait; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec_zlib; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; /// ZLIB Deflate Compression Method. pub(crate) const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8; @@ -90,7 +90,7 @@ where async fn next_batch(&mut self) -> StageResult { crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_reader"], timer); if let Err(e) = self.set_batch_reader().await { - warn!(target: "channel-reader", "Failed to set batch reader: {:?}", e); + debug!(target: "channel-reader", "Failed to set batch reader: {:?}", e); self.next_channel(); crate::timer!(DISCARD, timer); return Err(e); diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index be4263ed9..efcf987e1 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -10,7 +10,7 @@ use alloy_primitives::Bytes; use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; -use tracing::{error, trace, warn}; +use tracing::{debug, error, trace}; /// Provides data frames for the [FrameQueue] stage. #[async_trait] @@ -81,7 +81,7 @@ where } } Err(e) => { - warn!(target: "frame-queue", "Failed to retrieve data: {:?}", e); + debug!(target: "frame-queue", "Failed to retrieve data: {:?}", e); return Err(e); // Bubble up potential EOF error without wrapping. } } diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index 53ff181d4..e897fe36f 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -2,7 +2,7 @@ use anyhow::Result; use clap::Parser; use kona_derive::online::*; use std::sync::Arc; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; mod cli; mod metrics; @@ -68,39 +68,51 @@ async fn sync(cli: cli::Cli) -> Result<()> { new_online_pipeline(cfg, l1_provider, dap, l2_provider.clone(), attributes, tip); // Continuously step on the pipeline and validate payloads. + let mut advance_cursor_flag = false; loop { info!(target: LOG_TARGET, "Validated payload attributes number {}", metrics::DERIVED_ATTRIBUTES_COUNT.get()); info!(target: LOG_TARGET, "Pending l2 safe head num: {}", cursor.block_info.number); - match pipeline.step(cursor).await { - Ok(_) => info!(target: "loop", "Stepped derivation pipeline"), - Err(e) => warn!(target: "loop", "Error stepping derivation pipeline: {:?}", e), - } - - if let Some(attributes) = pipeline.next_attributes() { - if !validator.validate(&attributes).await { - error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash); - return Ok(()); - } - metrics::DERIVED_ATTRIBUTES_COUNT.inc(); + if advance_cursor_flag { match l2_provider.l2_block_info_by_number(cursor.block_info.number + 1).await { Ok(bi) => { cursor = bi; metrics::SAFE_L2_HEAD.inc(); + advance_cursor_flag = false; } Err(e) => { error!(target: LOG_TARGET, "Failed to fetch next pending l2 safe head: {}, err: {:?}", cursor.block_info.number + 1, e); + // We don't need to step on the pipeline if we failed to fetch the next pending + // l2 safe head. + continue; } } - println!( - "Validated Payload Attributes {} [L2 Block Num: {}] [L2 Timestamp: {}] [L1 Origin Block Num: {}]", - metrics::DERIVED_ATTRIBUTES_COUNT.get(), - attributes.parent.block_info.number + 1, - attributes.attributes.timestamp, - pipeline.origin().unwrap().number, - ); - info!(target: LOG_TARGET, "attributes: {:#?}", attributes); + } + match pipeline.step(cursor).await { + Ok(_) => info!(target: "loop", "Stepped derivation pipeline"), + Err(e) => debug!(target: "loop", "Error stepping derivation pipeline: {:?}", e), + } + + let attributes = if let Some(attributes) = pipeline.next_attributes() { + attributes } else { debug!(target: LOG_TARGET, "No attributes to validate"); + continue; + }; + + if !validator.validate(&attributes).await { + error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash); + metrics::FAILED_PAYLOAD_DERIVATION.inc(); } + // If we validated payload attributes, we should advance the cursor. + advance_cursor_flag = true; + metrics::DERIVED_ATTRIBUTES_COUNT.inc(); + println!( + "Validated Payload Attributes {} [L2 Block Num: {}] [L2 Timestamp: {}] [L1 Origin Block Num: {}]", + metrics::DERIVED_ATTRIBUTES_COUNT.get(), + attributes.parent.block_info.number + 1, + attributes.attributes.timestamp, + pipeline.origin().unwrap().number, + ); + debug!(target: LOG_TARGET, "attributes: {:#?}", attributes); } } diff --git a/examples/trusted-sync/src/metrics.rs b/examples/trusted-sync/src/metrics.rs index 4f4a64fe1..b25370a5a 100644 --- a/examples/trusted-sync/src/metrics.rs +++ b/examples/trusted-sync/src/metrics.rs @@ -8,6 +8,9 @@ use lazy_static::lazy_static; use prometheus::register_int_counter; lazy_static! { + pub static ref FAILED_PAYLOAD_DERIVATION: IntCounter = + register_int_counter!("failed_payload_derivation", "Number of failed payload derivations") + .unwrap(); pub static ref DERIVED_ATTRIBUTES_COUNT: IntCounter = register_int_counter!( "derived_attributes_count", "Number of total payload attributes derived"