From 15b321b41ae9d2a84853f7a6600d193dec9f30bf Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 27 Dec 2024 18:58:39 +0530 Subject: [PATCH] Fix/instrumentation (#200) * update: fixed block_state collection --- CHANGELOG.md | 2 +- crates/orchestrator/src/jobs/mod.rs | 21 ++++++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e8b0acf..141bd53d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,7 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Fixed -- refactor: instrumentation +- refactor: instrumentations - `is_worker_enabled` status check moved from `VerificationFailed` to `Failed` - refactor: static attributes for telemetry - refactor: aws setup for Event Bridge diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 181d64f4..e3e7dfde 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -297,7 +297,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> JobError::Other(OtherError(e)) })?; - let attributes = [ + let attributes = vec![ KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), KeyValue::new("operation_type", "process_job"), ]; @@ -305,8 +305,8 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block"); let duration = start.elapsed(); ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); + register_block_gauge(&job, &attributes)?; Ok(()) } @@ -476,7 +476,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { let duration = start.elapsed(); ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); + register_block_gauge(&job, &attributes)?; Ok(()) } @@ -497,6 +497,21 @@ pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), Job .await } +fn register_block_gauge(job: &JobItem, attributes: &[KeyValue]) -> Result<(), JobError> { + let block_number = if let JobType::StateTransition = job.job_type { + parse_string( + job.external_id + .unwrap_string() + .map_err(|e| JobError::Other(OtherError::from(format!("Could not parse string: {e}"))))?, + ) + } else { + parse_string(&job.internal_id) + }?; + + ORCHESTRATOR_METRICS.block_gauge.record(block_number, attributes); + Ok(()) +} + async fn move_job_to_failed(job: &JobItem, config: Arc, reason: String) -> Result<(), JobError> { if job.status == JobStatus::Completed { tracing::error!(job_id = ?job.id, job_status = ?job.status, "Invalid state exists on DL queue");