Skip to content

Commit

Permalink
chore(process_tracker): use const instead of String for `business…
Browse files Browse the repository at this point in the history
…_status` (#4849)

Co-authored-by: SanchithHegde <[email protected]>
  • Loading branch information
pixincreate and SanchithHegde authored Jun 14, 2024
1 parent 6582729 commit 40dfad8
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 30 deletions.
43 changes: 40 additions & 3 deletions crates/diesel_models/src/process_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ impl ProcessTrackerNew {
where
T: Serialize + std::fmt::Debug,
{
const BUSINESS_STATUS_PENDING: &str = "Pending";

let current_time = common_utils::date_time::now();
Ok(Self {
id: process_tracker_id.into(),
Expand All @@ -91,7 +89,7 @@ impl ProcessTrackerNew {
.encode_to_value()
.change_context(errors::DatabaseError::Others)
.attach_printable("Failed to serialize process tracker tracking data")?,
business_status: String::from(BUSINESS_STATUS_PENDING),
business_status: String::from(business_status::PENDING),
status: storage_enums::ProcessTrackerStatus::New,
event: vec![],
created_at: current_time,
Expand Down Expand Up @@ -227,3 +225,42 @@ mod tests {
assert_eq!(enum_format, ProcessTrackerRunner::PaymentsSyncWorkflow);
}
}

pub mod business_status {
/// Indicates that an irrecoverable error occurred during the workflow execution.
pub const GLOBAL_FAILURE: &str = "GLOBAL_FAILURE";

/// Task successfully completed by consumer.
/// A task that reaches this status should not be retried (rescheduled for execution) later.
pub const COMPLETED_BY_PT: &str = "COMPLETED_BY_PT";

/// An error occurred during the workflow execution which prevents further execution and
/// retries.
/// A task that reaches this status should not be retried (rescheduled for execution) later.
pub const FAILURE: &str = "FAILURE";

/// The resource associated with the task was removed, due to which further retries can/should
/// not be done.
pub const REVOKED: &str = "Revoked";

/// The task was executed for the maximum possible number of times without a successful outcome.
/// A task that reaches this status should not be retried (rescheduled for execution) later.
pub const RETRIES_EXCEEDED: &str = "RETRIES_EXCEEDED";

/// The outgoing webhook was successfully delivered in the initial attempt.
/// Further retries of the task are not required.
pub const INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL: &str = "INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL";

/// Indicates that an error occurred during the workflow execution.
/// This status is typically set by the workflow error handler.
/// A task that reaches this status should not be retried (rescheduled for execution) later.
pub const GLOBAL_ERROR: &str = "GLOBAL_ERROR";

/// The resource associated with the task has been significantly modified since the task was
/// created, due to which further retries of the current task are not required.
/// A task that reaches this status should not be retried (rescheduled for execution) later.
pub const RESOURCE_STATUS_MISMATCH: &str = "RESOURCE_STATUS_MISMATCH";

/// Business status set for newly created tasks.
pub const PENDING: &str = "Pending";
}
9 changes: 6 additions & 3 deletions crates/router/src/bin/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{collections::HashMap, str::FromStr, sync::Arc};
use actix_web::{dev::Server, web, Scope};
use api_models::health_check::SchedulerHealthCheckResponse;
use common_utils::ext_traits::{OptionExt, StringExt};
use diesel_models::process_tracker as storage;
use diesel_models::process_tracker::{self as storage, business_status};
use error_stack::ResultExt;
use router::{
configs::settings::{CmdLineConf, Settings},
Expand Down Expand Up @@ -329,10 +329,13 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
let status = state
.get_db()
.as_scheduler()
.finish_process_with_business_status(process, "GLOBAL_FAILURE".to_string())
.finish_process_with_business_status(
process,
business_status::GLOBAL_FAILURE,
)
.await;
if let Err(err) = status {
logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE");
logger::error!(%err, "Failed while performing database operation: {}", business_status::GLOBAL_FAILURE);
}
}
},
Expand Down
6 changes: 4 additions & 2 deletions crates/router/src/core/api_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ pub async fn update_api_key_expiry_task(
retry_count: Some(0),
schedule_time,
tracking_data: Some(updated_api_key_expiry_workflow_model),
business_status: Some("Pending".to_string()),
business_status: Some(String::from(
diesel_models::process_tracker::business_status::PENDING,
)),
status: Some(storage_enums::ProcessTrackerStatus::New),
updated_at: Some(current_time),
};
Expand Down Expand Up @@ -450,7 +452,7 @@ pub async fn revoke_api_key_expiry_task(
let task_ids = vec![task_id];
let updated_process_tracker_data = storage::ProcessTrackerUpdate::StatusUpdate {
status: storage_enums::ProcessTrackerStatus::Finish,
business_status: Some("Revoked".to_string()),
business_status: Some(String::from(diesel_models::business_status::REVOKED)),
};

store
Expand Down
7 changes: 5 additions & 2 deletions crates/router/src/core/payment_methods/vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ pub async fn start_tokenize_data_workflow(
db.as_scheduler()
.finish_process_with_business_status(
tokenize_tracker.clone(),
"COMPLETED_BY_PT".to_string(),
diesel_models::process_tracker::business_status::COMPLETED_BY_PT,
)
.await?;
}
Expand Down Expand Up @@ -1241,7 +1241,10 @@ pub async fn retry_delete_tokenize(
}
None => db
.as_scheduler()
.finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string())
.finish_process_with_business_status(
pt,
diesel_models::process_tracker::business_status::RETRIES_EXCEEDED,
)
.await
.map_err(Into::into),
}
Expand Down
5 changes: 3 additions & 2 deletions crates/router/src/core/refunds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use common_utils::{
ext_traits::{AsyncExt, ValueExt},
types::MinorUnit,
};
use diesel_models::process_tracker::business_status;
use error_stack::{report, ResultExt};
use masking::PeekInterface;
use router_env::{instrument, tracing};
Expand Down Expand Up @@ -1028,7 +1029,7 @@ pub async fn sync_refund_with_gateway_workflow(
.as_scheduler()
.finish_process_with_business_status(
refund_tracker.clone(),
"COMPLETED_BY_PT".to_string(),
business_status::COMPLETED_BY_PT,
)
.await?
}
Expand Down Expand Up @@ -1193,7 +1194,7 @@ pub async fn trigger_refund_execute_workflow(
db.as_scheduler()
.finish_process_with_business_status(
refund_tracker.clone(),
"COMPLETED_BY_PT".to_string(),
business_status::COMPLETED_BY_PT,
)
.await?;
}
Expand Down
7 changes: 4 additions & 3 deletions crates/router/src/core/webhooks/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use api_models::{
webhooks,
};
use common_utils::{ext_traits::Encode, request::RequestContent};
use diesel_models::process_tracker::business_status;
use error_stack::{report, ResultExt};
use masking::{ExposeInterface, Mask, PeekInterface, Secret};
use router_env::{
Expand Down Expand Up @@ -231,7 +232,7 @@ async fn trigger_webhook_to_merchant(
state
.store
.as_scheduler()
.finish_process_with_business_status(process_tracker, "FAILURE".into())
.finish_process_with_business_status(process_tracker, business_status::FAILURE)
.await
.change_context(
errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed,
Expand Down Expand Up @@ -304,7 +305,7 @@ async fn trigger_webhook_to_merchant(
state.clone(),
&business_profile.merchant_id,
process_tracker,
"INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL",
business_status::INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL,
)
.await?;
} else {
Expand Down Expand Up @@ -769,7 +770,7 @@ async fn success_response_handler(
Some(process_tracker) => state
.store
.as_scheduler()
.finish_process_with_business_status(process_tracker, business_status.into())
.finish_process_with_business_status(process_tracker, business_status)
.await
.change_context(
errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed,
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,7 @@ impl ProcessTrackerInterface for KafkaStore {
async fn finish_process_with_business_status(
&self,
this: storage::ProcessTracker,
business_status: String,
business_status: &'static str,
) -> CustomResult<(), errors::StorageError> {
self.diesel_store
.finish_process_with_business_status(this, business_status)
Expand Down
3 changes: 2 additions & 1 deletion crates/router/src/types/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub mod user_role;
use std::collections::HashMap;

pub use diesel_models::{
ProcessTracker, ProcessTrackerNew, ProcessTrackerRunner, ProcessTrackerUpdate,
process_tracker::business_status, ProcessTracker, ProcessTrackerNew, ProcessTrackerRunner,
ProcessTrackerUpdate,
};
pub use hyperswitch_domain_models::payments::{
payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate},
Expand Down
6 changes: 4 additions & 2 deletions crates/router/src/workflows/api_key_expiry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use common_utils::{errors::ValidationError, ext_traits::ValueExt};
use diesel_models::{enums as storage_enums, ApiKeyExpiryTrackingData};
use diesel_models::{
enums as storage_enums, process_tracker::business_status, ApiKeyExpiryTrackingData,
};
use router_env::logger;
use scheduler::{workflows::ProcessTrackerWorkflow, SchedulerSessionState};

Expand Down Expand Up @@ -96,7 +98,7 @@ impl ProcessTrackerWorkflow<SessionState> for ApiKeyExpiryWorkflow {
state
.get_db()
.as_scheduler()
.finish_process_with_business_status(process, "COMPLETED_BY_PT".to_string())
.finish_process_with_business_status(process, business_status::COMPLETED_BY_PT)
.await?
}
// If tasks are remaining that has to be scheduled
Expand Down
5 changes: 3 additions & 2 deletions crates/router/src/workflows/outgoing_webhook_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use api_models::{
webhooks::{OutgoingWebhook, OutgoingWebhookContent},
};
use common_utils::ext_traits::{StringExt, ValueExt};
use diesel_models::process_tracker::business_status;
use error_stack::ResultExt;
use masking::PeekInterface;
use router_env::tracing::{self, instrument};
Expand Down Expand Up @@ -197,7 +198,7 @@ impl ProcessTrackerWorkflow<SessionState> for OutgoingWebhookRetryWorkflow {
db.as_scheduler()
.finish_process_with_business_status(
process.clone(),
"RESOURCE_STATUS_MISMATCH".to_string(),
business_status::RESOURCE_STATUS_MISMATCH,
)
.await?;
}
Expand Down Expand Up @@ -309,7 +310,7 @@ pub(crate) async fn retry_webhook_delivery_task(
}
None => {
db.as_scheduler()
.finish_process_with_business_status(process, "RETRIES_EXCEEDED".to_string())
.finish_process_with_business_status(process, business_status::RETRIES_EXCEEDED)
.await
}
}
Expand Down
5 changes: 3 additions & 2 deletions crates/router/src/workflows/payment_sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use common_utils::ext_traits::{OptionExt, StringExt, ValueExt};
use diesel_models::process_tracker::business_status;
use error_stack::ResultExt;
use router_env::logger;
use scheduler::{
Expand Down Expand Up @@ -89,7 +90,7 @@ impl ProcessTrackerWorkflow<SessionState> for PaymentsSyncWorkflow {
state
.store
.as_scheduler()
.finish_process_with_business_status(process, "COMPLETED_BY_PT".to_string())
.finish_process_with_business_status(process, business_status::COMPLETED_BY_PT)
.await?
}
_ => {
Expand Down Expand Up @@ -269,7 +270,7 @@ pub async fn retry_sync_task(
}
None => {
db.as_scheduler()
.finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string())
.finish_process_with_business_status(pt, business_status::RETRIES_EXCEEDED)
.await?;
Ok(true)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/scheduler/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{

// Valid consumer business statuses
pub fn valid_business_statuses() -> Vec<&'static str> {
vec!["Pending"]
vec![storage::business_status::PENDING]
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -262,7 +262,7 @@ pub async fn consumer_error_handler(
vec![process.id],
storage::ProcessTrackerUpdate::StatusUpdate {
status: enums::ProcessTrackerStatus::Finish,
business_status: Some("GLOBAL_ERROR".to_string()),
business_status: Some(String::from(storage::business_status::GLOBAL_ERROR)),
},
)
.await
Expand Down
6 changes: 5 additions & 1 deletion crates/scheduler/src/consumer/workflows.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use common_utils::errors::CustomResult;
pub use diesel_models::process_tracker as storage;
use diesel_models::process_tracker::business_status;
use router_env::logger;

use crate::{errors, SchedulerSessionState};
Expand Down Expand Up @@ -45,7 +46,10 @@ pub trait ProcessTrackerWorkflows<T>: Send + Sync {
let status = app_state
.get_db()
.as_scheduler()
.finish_process_with_business_status(process, "GLOBAL_FAILURE".to_string())
.finish_process_with_business_status(
process,
business_status::GLOBAL_FAILURE,
)
.await;
if let Err(error) = status {
logger::error!(?error, "Failed to update process business status");
Expand Down
8 changes: 4 additions & 4 deletions crates/scheduler/src/db/process_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub trait ProcessTrackerInterface: Send + Sync + 'static {
async fn finish_process_with_business_status(
&self,
this: storage::ProcessTracker,
business_status: String,
business_status: &'static str,
) -> CustomResult<(), errors::StorageError>;

async fn find_processes_by_time_status(
Expand Down Expand Up @@ -166,13 +166,13 @@ impl ProcessTrackerInterface for Store {
async fn finish_process_with_business_status(
&self,
this: storage::ProcessTracker,
business_status: String,
business_status: &'static str,
) -> CustomResult<(), errors::StorageError> {
self.update_process(
this,
storage::ProcessTrackerUpdate::StatusUpdate {
status: storage_enums::ProcessTrackerStatus::Finish,
business_status: Some(business_status),
business_status: Some(String::from(business_status)),
},
)
.await
Expand Down Expand Up @@ -284,7 +284,7 @@ impl ProcessTrackerInterface for MockDb {
async fn finish_process_with_business_status(
&self,
_this: storage::ProcessTracker,
_business_status: String,
_business_status: &'static str,
) -> CustomResult<(), errors::StorageError> {
// [#172]: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)?
Expand Down

0 comments on commit 40dfad8

Please sign in to comment.