From b65865055b7d0f87f85c4a6a197acaf2aed2a053 Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 17 Jul 2024 21:23:08 -0400 Subject: [PATCH] feat: set stage resets to 0 at construction --- crates/derive/src/macros.rs | 4 ++++ crates/derive/src/stages/attributes_queue.rs | 1 + crates/derive/src/stages/batch_queue.rs | 1 + crates/derive/src/stages/channel_bank.rs | 1 + crates/derive/src/stages/channel_reader.rs | 1 + crates/derive/src/stages/frame_queue.rs | 1 + crates/derive/src/stages/l1_retrieval.rs | 1 + crates/derive/src/stages/l1_traversal.rs | 1 + 8 files changed, 11 insertions(+) diff --git a/crates/derive/src/macros.rs b/crates/derive/src/macros.rs index ed350d041..a6ce974e1 100644 --- a/crates/derive/src/macros.rs +++ b/crates/derive/src/macros.rs @@ -54,4 +54,8 @@ macro_rules! set { #[cfg(feature = "metrics")] $crate::metrics::$metric.set($value); }; + ($metric:ident, $value:expr, $labels:expr) => { + #[cfg(feature = "metrics")] + $crate::metrics::$metric.with_label_values($labels).set($value as f64); + }; } diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 163927b7e..6c79c31f3 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -67,6 +67,7 @@ where { /// Create a new [AttributesQueue] stage. pub fn new(cfg: Arc, prev: P, builder: AB) -> Self { + crate::set!(STAGE_RESETS, 0, &["attributes-queue"]); Self { cfg, prev, is_last_in_span: false, batch: None, builder } } diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 3a0bfd124..654991984 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -80,6 +80,7 @@ where { /// Creates a new [BatchQueue] stage. pub fn new(cfg: Arc, prev: P, fetcher: BF) -> Self { + crate::set!(STAGE_RESETS, 0, &["batch-queue"]); Self { cfg, prev, diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 3305483af..11177445b 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -55,6 +55,7 @@ where { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { + crate::set!(STAGE_RESETS, 0, &["channel-bank"]); Self { cfg, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } } diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 6534c26fd..3f4e1b181 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -53,6 +53,7 @@ where { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { + crate::set!(STAGE_RESETS, 0, &["channel-reader"]); Self { prev, next_batch: None, cfg: cfg.clone() } } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index f38c2945b..3a37ab83b 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -47,6 +47,7 @@ where /// /// [L1Retrieval]: crate::stages::L1Retrieval pub fn new(prev: P) -> Self { + crate::set!(STAGE_RESETS, 0, &["frame-queue"]); Self { prev, queue: VecDeque::new() } } } diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 1faf43ec5..233db5944 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -60,6 +60,7 @@ where /// /// [L1Traversal]: crate::stages::L1Traversal pub fn new(prev: P, provider: DAP) -> Self { + crate::set!(STAGE_RESETS, 0, &["l1-retrieval"]); Self { prev, provider, data: None } } } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index b98bd52e8..43b387c6a 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -50,6 +50,7 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. pub fn new(data_source: F, cfg: Arc) -> Self { + crate::set!(STAGE_RESETS, 0, &["l1-traversal"]); Self { block: Some(BlockInfo::default()), data_source,