Skip to content

Commit

Permalink
fix(derive): Pipeline Reset (#383)
Browse files Browse the repository at this point in the history
* disable walkback by default

* parameterize drift threshold

* fix(derive): pipeline resets

* fix(examples): remove unused variable

* reset pipeline with l1 block info

* feat: set stage resets to 0 at construction

* fix: cursor advancing

* fix: sys config fetch
  • Loading branch information
refcell authored Jul 19, 2024
1 parent 64177b3 commit 00e6bc8
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 36 deletions.
4 changes: 4 additions & 0 deletions crates/derive/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ macro_rules! set {
#[cfg(feature = "metrics")]
$crate::metrics::$metric.set($value);
};
($metric:ident, $value:expr, $labels:expr) => {
#[cfg(feature = "metrics")]
$crate::metrics::$metric.with_label_values($labels).set($value as f64);
};
}
6 changes: 6 additions & 0 deletions crates/derive/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ const RESPONSE_TIME_CUSTOM_BUCKETS: &[f64; 18] = &[
const FRAME_COUNT_BUCKETS: &[f64; 10] = &[1.0, 2.0, 3.0, 5.0, 8.0, 10.0, 12.0, 15.0, 18.0, 20.0];

lazy_static! {
/// Tracks stage resets.
pub static ref STAGE_RESETS: GaugeVec = {
let opts = opts!("kona_derive_stage_resets", "Number of times various stages are reset");
register_gauge_vec!(opts, &["stage"]).expect("Stage reset metric failed to register")
};

/// Tracks the L1 origin for the L1 Traversal Stage.
pub static ref ORIGIN_GAUGE: IntGauge = register_int_gauge!(
"kona_derive_origin_gauge",
Expand Down
10 changes: 7 additions & 3 deletions crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ where

/// Resets the pipelien by calling the [`ResettableStage::reset`] method.
/// This will bubble down the stages all the way to the `L1Traversal` stage.
async fn reset(&mut self, block_info: BlockInfo) -> anyhow::Result<()> {
async fn reset(
&mut self,
l2_block_info: BlockInfo,
l1_block_info: BlockInfo,
) -> anyhow::Result<()> {
let system_config = self
.l2_chain_provider
.system_config_by_number(block_info.number, Arc::clone(&self.rollup_config))
.system_config_by_number(l2_block_info.number, Arc::clone(&self.rollup_config))
.await?;
match self.attributes.reset(block_info, &system_config).await {
match self.attributes.reset(l1_block_info, &system_config).await {
Ok(()) => trace!(target: "pipeline", "Stages reset"),
Err(StageError::Eof) => trace!(target: "pipeline", "Stages reset with EOF"),
Err(err) => {
Expand Down
5 changes: 3 additions & 2 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ where
{
/// Create a new [AttributesQueue] stage.
pub fn new(cfg: Arc<RollupConfig>, prev: P, builder: AB) -> Self {
crate::set!(STAGE_RESETS, 0, &["attributes-queue"]);
Self { cfg, prev, is_last_in_span: false, batch: None, builder }
}

Expand Down Expand Up @@ -209,10 +210,10 @@ where
system_config: &SystemConfig,
) -> StageResult<()> {
self.prev.reset(block_info, system_config).await?;
info!(target: "attributes-queue", "resetting attributes queue");
self.batch = None;
self.is_last_in_span = false;
Err(StageError::Eof)
crate::inc!(STAGE_RESETS, &["attributes-queue"]);
Ok(())
}
}

Expand Down
7 changes: 3 additions & 4 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ where
{
/// Creates a new [BatchQueue] stage.
pub fn new(cfg: Arc<RollupConfig>, prev: P, fetcher: BF) -> Self {
crate::set!(STAGE_RESETS, 0, &["batch-queue"]);
Self {
cfg,
prev,
Expand Down Expand Up @@ -442,9 +443,6 @@ where
{
async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> StageResult<()> {
self.prev.reset(base, system_config).await?;
// Copy over the Origin from the next stage.
// It is set in the engine queue (two stages away)
// such that the L2 Safe Head origin is the progress.
self.origin = Some(base);
self.batches.clear();
// Include the new origin as an origin to build on.
Expand All @@ -453,7 +451,8 @@ where
self.l1_blocks.clear();
self.l1_blocks.push(base);
self.next_spans.clear();
Err(StageError::Eof)
crate::inc!(STAGE_RESETS, &["batch-queue"]);
Ok(())
}
}

Expand Down
4 changes: 3 additions & 1 deletion crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ where
{
/// Create a new [ChannelBank] stage.
pub fn new(cfg: Arc<RollupConfig>, prev: P) -> Self {
crate::set!(STAGE_RESETS, 0, &["channel-bank"]);
Self { cfg, channels: HashMap::new(), channel_queue: VecDeque::new(), prev }
}

Expand Down Expand Up @@ -258,7 +259,8 @@ where
self.prev.reset(block_info, system_config).await?;
self.channels.clear();
self.channel_queue = VecDeque::with_capacity(10);
Err(StageError::Eof)
crate::inc!(STAGE_RESETS, &["channel-bank"]);
Ok(())
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ where
{
/// Create a new [ChannelReader] stage.
pub fn new(prev: P, cfg: Arc<RollupConfig>) -> Self {
crate::set!(STAGE_RESETS, 0, &["channel-reader"]);
Self { prev, next_batch: None, cfg: cfg.clone() }
}

Expand Down Expand Up @@ -129,6 +130,7 @@ where
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> {
self.prev.reset(base, cfg).await?;
self.next_channel();
crate::inc!(STAGE_RESETS, &["channel-reader"]);
Ok(())
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ where
///
/// [L1Retrieval]: crate::stages::L1Retrieval
pub fn new(prev: P) -> Self {
crate::set!(STAGE_RESETS, 0, &["frame-queue"]);
Self { prev, queue: VecDeque::new() }
}
}
Expand Down Expand Up @@ -127,7 +128,8 @@ where
) -> StageResult<()> {
self.prev.reset(block_info, system_config).await?;
self.queue = VecDeque::default();
Err(StageError::Eof)
crate::inc!(STAGE_RESETS, &["frame-queue"]);
Ok(())
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ where
///
/// [L1Traversal]: crate::stages::L1Traversal
pub fn new(prev: P, provider: DAP) -> Self {
crate::set!(STAGE_RESETS, 0, &["l1-retrieval"]);
Self { prev, provider, data: None }
}
}
Expand Down Expand Up @@ -134,6 +135,7 @@ where
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> {
self.prev.reset(base, cfg).await?;
self.data = Some(self.provider.open_data(&base).await?);
crate::inc!(STAGE_RESETS, &["l1-retrieval"]);
Ok(())
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl<F: ChainProvider + Send> L1RetrievalProvider for L1Traversal<F> {
impl<F: ChainProvider> L1Traversal<F> {
/// Creates a new [L1Traversal] instance.
pub fn new(data_source: F, cfg: Arc<RollupConfig>) -> Self {
crate::set!(STAGE_RESETS, 0, &["l1-traversal"]);
Self {
block: Some(BlockInfo::default()),
data_source,
Expand Down Expand Up @@ -129,7 +130,8 @@ impl<F: ChainProvider + Send> ResettableStage for L1Traversal<F> {
self.block = Some(base);
self.done = false;
self.system_config = cfg.clone();
Err(StageError::Eof)
crate::inc!(STAGE_RESETS, &["l1-traversal"]);
Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/traits/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait Pipeline: OriginProvider + Iterator<Item = L2AttributesWithParent> {
fn peek(&self) -> Option<&L2AttributesWithParent>;

/// Resets the pipeline on the next [Pipeline::step] call.
async fn reset(&mut self, origin: BlockInfo) -> anyhow::Result<()>;
async fn reset(&mut self, l2_block_info: BlockInfo, origin: BlockInfo) -> anyhow::Result<()>;

/// Attempts to progress the pipeline.
async fn step(&mut self, cursor: L2BlockInfo) -> StepResult;
Expand Down
57 changes: 34 additions & 23 deletions examples/trusted-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,22 @@ async fn sync(cli: cli::Cli) -> Result<()> {
let validator = validation::OnlineValidator::new_http(l2_rpc_url.clone(), &cfg);
let genesis_l2_block_number = cfg.genesis.l2.number;
let mut pipeline =
new_online_pipeline(cfg, l1_provider, dap, l2_provider.clone(), attributes, tip);
new_online_pipeline(cfg, l1_provider.clone(), dap, l2_provider.clone(), attributes, tip);

// Reset metrics so they can be queried.
metrics::FAILED_PAYLOAD_DERIVATION.reset();
metrics::DRIFT_WALKBACK.set(0);
metrics::DRIFT_WALKBACK_TIMESTAMP.set(0);
metrics::DERIVED_ATTRIBUTES_COUNT.reset();
metrics::FAST_FORWARD_BLOCK.set(0);
metrics::FAST_FORWARD_TIMESTAMP.set(0);

// Continuously step on the pipeline and validate payloads.
let mut advance_cursor_flag = false;
loop {
// Update the reference l2 head.
match l2_provider.latest_block_number().await {
Ok(latest) => {
let prev = metrics::REFERENCE_L2_HEAD.get();
metrics::REFERENCE_L2_HEAD.set(latest as i64);
let timestamp = match std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
Expand All @@ -115,9 +116,7 @@ async fn sync(cli: cli::Cli) -> Result<()> {
};

// Update the timestamp
if latest as i64 > prev {
metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(timestamp as i64);
}
metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(timestamp as i64);

// Don't check drift if we're within 10 blocks of origin.
if cursor.block_info.number - genesis_l2_block_number <= 10 {
Expand All @@ -130,37 +129,49 @@ async fn sync(cli: cli::Cli) -> Result<()> {
// If walkback isn't enabled, jump to 10 blocks less than the reference l2
// head.
if drift > cli.drift_threshold as i64 && !cli.enable_reorg_walkback {
cursor = if let Ok(c) =
l2_provider.l2_block_info_by_number(latest - 10).await
{
c
metrics::FAST_FORWARD_BLOCK.set(cursor.block_info.number as i64);
metrics::FAST_FORWARD_TIMESTAMP.set(timestamp as i64);
if let Ok(c) = l2_provider.l2_block_info_by_number(latest - 100).await {
let l1_block_info = l1_provider
.block_info_by_number(c.l1_origin.number)
.await
.expect("Failed to fetch L1 block info for fast forward");
info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info);
if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await {
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
continue;
}
cursor = c;
advance_cursor_flag = false;
} else {
error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", latest - 10);
error!(target: LOG_TARGET, "Failed to get block info by number: {}", latest - 100);
continue;
};
advance_cursor_flag = false;
if let Err(e) = pipeline.reset(cursor.block_info).await {
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
}
} else if drift > cli.drift_threshold as i64 &&
timestamp as i64 > metrics::DRIFT_WALKBACK_TIMESTAMP.get() + 300
{
metrics::DRIFT_WALKBACK.set(cursor.block_info.number as i64);
metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64);
warn!(target: LOG_TARGET, "Detected drift of over {} blocks, walking back", drift);
cursor = if let Ok(c) =
l2_provider.l2_block_info_by_number(cursor.block_info.number - 10).await
if let Ok(c) = l2_provider
.l2_block_info_by_number(cursor.block_info.number - 100)
.await
{
c
let l1_block_info = l1_provider
.block_info_by_number(c.l1_origin.number)
.await
.expect("Failed to fetch L1 block info for fast forward");
info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info);
if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await {
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
continue;
}
cursor = c;
advance_cursor_flag = false;
} else {
error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", cursor.block_info.number - 10);
continue;
};
advance_cursor_flag = false;
if let Err(e) = pipeline.reset(cursor.block_info).await {
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
}

metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64);
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions examples/trusted-sync/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ lazy_static! {
pub static ref DRIFT_WALKBACK_TIMESTAMP: IntGauge =
register_int_gauge!("trusted_sync_drift_walkback_timestamp", "Timestamp of the last drift walkback").expect("Failed to register drift walkback timestamp metric");

/// Tracks the block number when a fast forward last happened.
pub static ref FAST_FORWARD_BLOCK: IntGauge =
register_int_gauge!("trusted_sync_fast_forward_block", "Latest fast forward block").expect("Failed to register fast forward metric");

/// Tracks the timestamp of the last fast forward.
pub static ref FAST_FORWARD_TIMESTAMP: IntGauge =
register_int_gauge!("trusted_sync_fast_forward_timestamp", "Timestamp of the latest fast forward block").expect("Failed to register fast forward timestamp metric");

/// Tracks the latest reference l2 safe head update.
pub static ref LATEST_REF_SAFE_HEAD_UPDATE: IntGauge = register_int_gauge!(
"trusted_sync_latest_ref_safe_head_update",
Expand Down

0 comments on commit 00e6bc8

Please sign in to comment.