Skip to content

Commit

Permalink
refactor: test updated for routes
Browse files Browse the repository at this point in the history
  • Loading branch information
Mohiiit committed Jan 9, 2025
1 parent 6b65a12 commit b317831
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 69 deletions.
62 changes: 27 additions & 35 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use types::{ExternalId, JobItemUpdates};
use uuid::Uuid;

use crate::config::Config;
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY};
use crate::jobs::constants::{
JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY,
JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY,
};
#[double]
use crate::jobs::job_handler_factory::factory;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
Expand Down Expand Up @@ -639,22 +642,19 @@ pub async fn retry_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {

// Increment the process retry counter
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?;

tracing::debug!(
job_id = ?id,
retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?,
job_id = ?id,
retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?,
"Incrementing process retry attempt counter"
);

// Update job status and metadata to PendingRetry before processing
config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::PendingRetry)
.update_metadata(metadata)
.build()
&job,
JobItemUpdates::new().update_status(JobStatus::PendingRetry).update_metadata(metadata).build(),
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -842,7 +842,7 @@ pub fn increment_key_in_metadata(
/// # Notes
/// * Returns 0 if the key doesn't exist in the metadata
/// * Wraps parsing errors with additional context
fn get_u64_from_metadata(metadata: &HashMap<String, String>, key: &str) -> color_eyre::Result<u64> {
pub fn get_u64_from_metadata(metadata: &HashMap<String, String>, key: &str) -> color_eyre::Result<u64> {
metadata
.get(key)
.unwrap_or(&"0".to_string())
Expand All @@ -864,34 +864,29 @@ fn get_u64_from_metadata(metadata: &HashMap<String, String>, key: &str) -> color
#[tracing::instrument(skip(config), fields(category = "general"), ret, err)]
pub async fn queue_job_for_processing(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let job = get_job(id, config.clone()).await?;

// Increment the process retry counter
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?;

tracing::debug!(
job_id = ?id,
retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?,
job_id = ?id,
retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?,
"Incrementing process retry attempt counter"
);

// Update job status and metadata to indicate it's queued for processing
config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::PendingRetry)
.update_metadata(metadata)
.build()
&job,
JobItemUpdates::new().update_status(JobStatus::PendingRetry).update_metadata(metadata).build(),
)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

// Add to process queue
add_job_to_process_queue(id, &job.job_type, config)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

add_job_to_process_queue(id, &job.job_type, config).await.map_err(|e| JobError::Other(OtherError(e)))?;

Ok(())
}

Expand All @@ -911,29 +906,26 @@ pub async fn queue_job_for_processing(id: Uuid, config: Arc<Config>) -> Result<(
pub async fn queue_job_for_verification(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let job = get_job(id, config.clone()).await?;
let job_handler = factory::get_job_handler(&job.job_type).await;

// Reset verification attempts and increment retry counter
let mut metadata = job.metadata.clone();
metadata.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string());

// Increment the retry counter using the existing helper function
metadata = increment_key_in_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?;

tracing::debug!(
job_id = ?id,
retry_count = get_u64_from_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?,
job_id = ?id,
retry_count = get_u64_from_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?,
"Incrementing verification retry attempt counter"
);

// Update job status and metadata
config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::PendingVerification)
.update_metadata(metadata)
.build(),
JobItemUpdates::new().update_status(JobStatus::PendingVerification).update_metadata(metadata).build(),
)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
Expand All @@ -947,7 +939,7 @@ pub async fn queue_job_for_verification(id: Uuid, config: Arc<Config>) -> Result
)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

Ok(())
}

Expand Down
6 changes: 4 additions & 2 deletions crates/orchestrator/src/routes/job_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use uuid::Uuid;
use super::error::JobRouteError;
use super::types::{ApiResponse, JobId, JobRouteResult};
use crate::config::Config;
use crate::jobs::{retry_job, queue_job_for_processing, queue_job_for_verification};
use crate::jobs::{queue_job_for_processing, queue_job_for_verification, retry_job};
use crate::metrics::ORCHESTRATOR_METRICS;

/// Handles HTTP requests to process a job.
Expand Down Expand Up @@ -43,7 +43,9 @@ async fn handle_process_job_request(
match queue_job_for_processing(job_id, config).await {
Ok(_) => {
info!("Job queued for processing successfully");
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_process")]);
ORCHESTRATOR_METRICS
.successful_job_operations
.add(1.0, &[KeyValue::new("operation_type", "queue_process")]);
Ok(Json(ApiResponse::success()).into_response())
}
Err(e) => {
Expand Down
92 changes: 60 additions & 32 deletions crates/orchestrator/src/tests/server/job_routes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use core::panic;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use chrono::{SubsecRound as _, Utc};
use hyper::{Body, Request};
Expand All @@ -13,9 +15,13 @@ use utils::env_utils::get_env_var_or_panic;
use uuid::Uuid;

use crate::config::Config;
use crate::jobs::constants::{
JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY,
JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY,
};
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::{Job, MockJob};
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::jobs::{get_u64_from_metadata, Job, MockJob};
use crate::queue::init_consumers;
use crate::queue::job_queue::{JobQueueMessage, QueueNameForJobType};
use crate::tests::config::{ConfigType, TestConfigBuilder};
Expand Down Expand Up @@ -46,23 +52,12 @@ async fn setup_trigger() -> (SocketAddr, Arc<Config>) {
#[rstest]
async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc<Config>)) {
let (addr, config) = setup_trigger.await;

let job_type = JobType::DataSubmission;

let job_item = build_job_item(job_type.clone(), JobStatus::Created, 1);
let mut job_handler = MockJob::new();

job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string()));

config.database().create_job(job_item.clone()).await.unwrap();
let job_id = job_item.clone().id;

job_handler.expect_verification_polling_delay_seconds().return_const(1u64);

let job_handler: Arc<Box<dyn Job>> = Arc::new(Box::new(job_handler));
let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler));

let client = hyper::Client::new();
let response = client
.request(
Expand All @@ -71,12 +66,23 @@ async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc<Conf
.await
.unwrap();

// assertions
// Verify response status
assert_eq!(response.status(), 200);

// Verify job was added to process queue
let queue_message = config.queue().consume_message_from_queue(job_type.process_queue_name()).await.unwrap();
let message_payload: JobQueueMessage = queue_message.payload_serde_json().unwrap().unwrap();
assert_eq!(message_payload.id, job_id);

// Verify job status and metadata
if let Some(job_fetched) = config.database().get_job_by_id(job_id).await.unwrap() {
assert_eq!(response.status(), 200);
assert_eq!(job_fetched.id, job_item.id);
assert_eq!(job_fetched.version, 2);
assert_eq!(job_fetched.status, JobStatus::PendingVerification);
assert_eq!(job_fetched.status, JobStatus::PendingRetry);

// Verify process attempt counter was incremented
let process_attempts =
get_u64_from_metadata(&job_fetched.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY).unwrap();
assert_eq!(process_attempts, 1);
} else {
panic!("Could not get job from database")
}
Expand All @@ -86,38 +92,57 @@ async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc<Conf
#[rstest]
async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc<Config>)) {
let (addr, config) = setup_trigger.await;

let job_type = JobType::DataSubmission;

let job_item = build_job_item(job_type.clone(), JobStatus::PendingVerification, 1);
let mut job_handler = MockJob::new();
// Create a simple job without initial metadata
let mut job_item = build_job_item(job_type.clone(), JobStatus::PendingVerification, 1);

job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Verified));
// Initialize metadata with verification counters
let mut metadata = HashMap::new();
metadata.insert(JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY.to_string(), "0".to_string());
metadata.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "10".to_string());
job_item.metadata = metadata;

config.database().create_job(job_item.clone()).await.unwrap();
let job_id = job_item.clone().id;

// Set up mock job handler
let mut job_handler = MockJob::new();
job_handler.expect_verification_polling_delay_seconds().return_const(1u64);

let job_handler: Arc<Box<dyn Job>> = Arc::new(Box::new(job_handler));

let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler));
ctx.expect().with(eq(job_type.clone())).times(1).returning(move |_| Arc::clone(&job_handler));

let client = hyper::Client::new();
let response = client
.request(Request::builder().uri(format!("http://{}/jobs/{}/verify", addr, job_id)).body(Body::empty()).unwrap())
.await
.unwrap();

// assertions
if let Some(job_fetched) = config.database().get_job_by_id(job_id).await.unwrap() {
assert_eq!(response.status(), 200);
assert_eq!(job_fetched.id, job_item.id);
assert_eq!(job_fetched.version, 1);
assert_eq!(job_fetched.status, JobStatus::Completed);
} else {
panic!("Could not get job from database")
}
assert_eq!(response.status(), 200);

// Use longer sleep duration as seen in other tests
tokio::time::sleep(Duration::from_secs(2)).await;

// Verify job was added to verification queue
let queue_message = config.queue().consume_message_from_queue(job_type.verify_queue_name()).await.unwrap();
let message_payload: JobQueueMessage = queue_message.payload_serde_json().unwrap().unwrap();
assert_eq!(message_payload.id, job_id);

// Verify job status and metadata
let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database");
assert_eq!(job_fetched.id, job_item.id);
assert_eq!(job_fetched.status, JobStatus::PendingVerification);

// Verify verification attempt was reset
let verify_attempts = get_u64_from_metadata(&job_fetched.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY).unwrap();
assert_eq!(verify_attempts, 0);

// Verify retry attempt was incremented
let retry_attempts =
get_u64_from_metadata(&job_fetched.metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY).unwrap();
assert_eq!(retry_attempts, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -146,6 +171,9 @@ async fn test_trigger_retry_job_when_failed(#[future] setup_trigger: (SocketAddr
// Verify job status changed to PendingRetry
let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database");
assert_eq!(job_fetched.id, job_item.id);
let process_attempts =
get_u64_from_metadata(&job_fetched.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY).unwrap();
assert_eq!(process_attempts, 1);
assert_eq!(job_fetched.status, JobStatus::PendingRetry);
}

Expand Down

0 comments on commit b317831

Please sign in to comment.