diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index acb0c7a6..c057e6c4 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -22,7 +22,10 @@ use types::{ExternalId, JobItemUpdates}; use uuid::Uuid; use crate::config::Config; -use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY}; +use crate::jobs::constants::{ + JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, + JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, +}; #[double] use crate::jobs::job_handler_factory::factory; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; @@ -639,10 +642,10 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { // Increment the process retry counter let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?; - + tracing::debug!( - job_id = ?id, - retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, "Incrementing process retry attempt counter" ); @@ -650,11 +653,8 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { config .database() .update_job( - &job, - JobItemUpdates::new() - .update_status(JobStatus::PendingRetry) - .update_metadata(metadata) - .build() + &job, + JobItemUpdates::new().update_status(JobStatus::PendingRetry).update_metadata(metadata).build(), ) .await .map_err(|e| { @@ -842,7 +842,7 @@ pub fn increment_key_in_metadata( /// # Notes /// * Returns 0 if the key doesn't exist in the metadata /// * Wraps parsing errors with additional context -fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color_eyre::Result { +pub fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color_eyre::Result { metadata .get(key) .unwrap_or(&"0".to_string()) @@ -864,34 +864,29 @@ fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color #[tracing::instrument(skip(config), fields(category = "general"), ret, err)] pub async fn queue_job_for_processing(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?; - + // Increment the process retry counter let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?; - + tracing::debug!( - job_id = ?id, - retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, "Incrementing process retry attempt counter" ); - + // Update job status and metadata to indicate it's queued for processing config .database() .update_job( - &job, - JobItemUpdates::new() - .update_status(JobStatus::PendingRetry) - .update_metadata(metadata) - .build() + &job, + JobItemUpdates::new().update_status(JobStatus::PendingRetry).update_metadata(metadata).build(), ) .await .map_err(|e| JobError::Other(OtherError(e)))?; // Add to process queue - add_job_to_process_queue(id, &job.job_type, config) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; - + add_job_to_process_queue(id, &job.job_type, config).await.map_err(|e| JobError::Other(OtherError(e)))?; + Ok(()) } @@ -911,29 +906,26 @@ pub async fn queue_job_for_processing(id: Uuid, config: Arc) -> Result<( pub async fn queue_job_for_verification(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?; let job_handler = factory::get_job_handler(&job.job_type).await; - + // Reset verification attempts and increment retry counter let mut metadata = job.metadata.clone(); metadata.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); - + // Increment the retry counter using the existing helper function metadata = increment_key_in_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?; - + tracing::debug!( - job_id = ?id, - retry_count = get_u64_from_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?, + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?, "Incrementing verification retry attempt counter" ); - + // Update job status and metadata config .database() .update_job( &job, - JobItemUpdates::new() - .update_status(JobStatus::PendingVerification) - .update_metadata(metadata) - .build(), + JobItemUpdates::new().update_status(JobStatus::PendingVerification).update_metadata(metadata).build(), ) .await .map_err(|e| JobError::Other(OtherError(e)))?; @@ -947,7 +939,7 @@ pub async fn queue_job_for_verification(id: Uuid, config: Arc) -> Result ) .await .map_err(|e| JobError::Other(OtherError(e)))?; - + Ok(()) } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index ce51cbcd..6503b498 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -11,7 +11,7 @@ use uuid::Uuid; use super::error::JobRouteError; use super::types::{ApiResponse, JobId, JobRouteResult}; use crate::config::Config; -use crate::jobs::{retry_job, queue_job_for_processing, queue_job_for_verification}; +use crate::jobs::{queue_job_for_processing, queue_job_for_verification, retry_job}; use crate::metrics::ORCHESTRATOR_METRICS; /// Handles HTTP requests to process a job. @@ -43,7 +43,9 @@ async fn handle_process_job_request( match queue_job_for_processing(job_id, config).await { Ok(_) => { info!("Job queued for processing successfully"); - ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_process")]); + ORCHESTRATOR_METRICS + .successful_job_operations + .add(1.0, &[KeyValue::new("operation_type", "queue_process")]); Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index 2b513caf..8a64baf0 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -1,6 +1,8 @@ use core::panic; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use chrono::{SubsecRound as _, Utc}; use hyper::{Body, Request}; @@ -13,9 +15,13 @@ use utils::env_utils::get_env_var_or_panic; use uuid::Uuid; use crate::config::Config; +use crate::jobs::constants::{ + JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, + JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, +}; use crate::jobs::job_handler_factory::mock_factory; -use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; -use crate::jobs::{Job, MockJob}; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::jobs::{get_u64_from_metadata, Job, MockJob}; use crate::queue::init_consumers; use crate::queue::job_queue::{JobQueueMessage, QueueNameForJobType}; use crate::tests::config::{ConfigType, TestConfigBuilder}; @@ -46,23 +52,12 @@ async fn setup_trigger() -> (SocketAddr, Arc) { #[rstest] async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc)) { let (addr, config) = setup_trigger.await; - let job_type = JobType::DataSubmission; let job_item = build_job_item(job_type.clone(), JobStatus::Created, 1); - let mut job_handler = MockJob::new(); - - job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); - config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; - job_handler.expect_verification_polling_delay_seconds().return_const(1u64); - - let job_handler: Arc> = Arc::new(Box::new(job_handler)); - let ctx = mock_factory::get_job_handler_context(); - ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); - let client = hyper::Client::new(); let response = client .request( @@ -71,12 +66,23 @@ async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc)) { let (addr, config) = setup_trigger.await; - let job_type = JobType::DataSubmission; - let job_item = build_job_item(job_type.clone(), JobStatus::PendingVerification, 1); - let mut job_handler = MockJob::new(); + // Create a simple job without initial metadata + let mut job_item = build_job_item(job_type.clone(), JobStatus::PendingVerification, 1); - job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Verified)); + // Initialize metadata with verification counters + let mut metadata = HashMap::new(); + metadata.insert(JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); + metadata.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "10".to_string()); + job_item.metadata = metadata; config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; + // Set up mock job handler + let mut job_handler = MockJob::new(); job_handler.expect_verification_polling_delay_seconds().return_const(1u64); - let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); - ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); + ctx.expect().with(eq(job_type.clone())).times(1).returning(move |_| Arc::clone(&job_handler)); let client = hyper::Client::new(); let response = client @@ -109,15 +120,29 @@ async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc