From cec252eaec57f8bc8f9c3b9ff3b5821732cced5e Mon Sep 17 00:00:00 2001 From: Chethan Date: Wed, 27 Dec 2023 20:47:07 +0530 Subject: [PATCH 01/23] feat: add deep health check --- crates/router/src/db.rs | 2 + crates/router/src/db/health_check.rs | 181 +++++++++++++++++++++++++++ crates/router/src/routes/app.rs | 5 +- crates/router/src/routes/health.rs | 46 +++++++ crates/storage_impl/src/errors.rs | 50 ++++++++ 5 files changed, 282 insertions(+), 2 deletions(-) create mode 100644 crates/router/src/db/health_check.rs diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 0cd4cb21881..5beace9cbb8 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -14,6 +14,7 @@ pub mod events; pub mod file; pub mod fraud_check; pub mod gsm; +pub mod health_check; mod kafka_store; pub mod locker_mock_up; pub mod mandate; @@ -103,6 +104,7 @@ pub trait StorageInterface: + user_role::UserRoleInterface + authorization::AuthorizationInterface + user::sample_data::BatchSampleDataInterface + + health_check::HealthCheckInterface + 'static { fn get_scheduler_db(&self) -> Box; diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs new file mode 100644 index 00000000000..3cacc40d364 --- /dev/null +++ b/crates/router/src/db/health_check.rs @@ -0,0 +1,181 @@ +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; +use diesel_models::ConfigNew; +use error_stack::ResultExt; +use router_env::logger; + +use super::{KafkaStore, MockDb, StorageInterface, Store}; +use crate::{ + connection, + core::errors::{self, CustomResult}, + routes, + services::api as services, +}; + +#[async_trait::async_trait] +pub trait HealthCheckInterface { + async fn health_check_db( + &self, + db: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckDBError>; + async fn health_check_redis( + &self, + db: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckRedisError>; + async fn health_check_locker( + &self, + state: &routes::AppState, + ) -> CustomResult; +} + +#[async_trait::async_trait] +impl HealthCheckInterface for Store { + async fn health_check_db( + &self, + db: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckDBError> { + let conn = connection::pg_connection_write(self) + .await + .change_context(errors::HealthCheckDBError::DBError)?; + + let _data = conn + .transaction_async(|conn| { + Box::pin(async move { + let query = + diesel::select(diesel::dsl::sql::("1 + 1")); + let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { + logger::error!(read_err=?err,"Error while reading element in the database"); + errors::HealthCheckDBError::DBReadError + })?; + + logger::debug!("Database read was successful"); + + db.insert_config(ConfigNew { + key: "test_key".to_string(), + config: "test_value".to_string(), + }) + .await + .map_err(|err| { + logger::error!(write_err=?err,"Error while writing to database"); + errors::HealthCheckDBError::DBWriteError + })?; + + logger::debug!("Database write was successful"); + + db.delete_config_by_key("test_key").await.map_err(|err| { + logger::error!(delete_err=?err,"Error while deleting element in the database"); + errors::HealthCheckDBError::DBDeleteError + })?; + + logger::debug!("Database delete was successful"); + + Ok::<_, errors::HealthCheckDBError>(()) + }) + }) + .await?; + + Ok(()) + } + + async fn health_check_redis( + &self, + db: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckRedisError> { + let redis_conn = db + .get_redis_conn() + .change_context(errors::HealthCheckRedisError::RedisConnectionError)?; + + redis_conn + .serialize_and_set_key_with_expiry("test_key", "test_value", 30) + .await + .change_context(errors::HealthCheckRedisError::SetFailed)?; + + logger::debug!("Redis set_key was successful"); + + redis_conn + .get_key("test_key") + .await + .change_context(errors::HealthCheckRedisError::GetFailed)?; + + logger::debug!("Redis get_key was successful"); + + redis_conn + .delete_key("test_key") + .await + .change_context(errors::HealthCheckRedisError::DeleteFailed)?; + + logger::debug!("Redis delete_key was successful"); + + Ok(()) + } + + async fn health_check_locker( + &self, + state: &routes::AppState, + ) -> CustomResult { + let locker = &state.conf.locker; + let mut status_code = 0; + if !locker.mock_locker { + let mut url = locker.host_rs.to_owned(); + url.push_str("/health"); + let request = services::Request::new(services::Method::Get, &url); + status_code = services::call_connector_api(state, request) + .await + .change_context(errors::HealthCheckLockerError::FailedToCallLocker)? + .map(|resp| resp.status_code) + .map_err(|err| err.status_code) + .unwrap_or_else(|code| code); + } + + logger::debug!("Locker call was successful"); + + Ok(status_code) + } +} + +#[async_trait::async_trait] +impl HealthCheckInterface for MockDb { + async fn health_check_db( + &self, + _: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckDBError> { + Ok(()) + } + + async fn health_check_redis( + &self, + _: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckRedisError> { + Ok(()) + } + + async fn health_check_locker( + &self, + _: &routes::AppState, + ) -> CustomResult { + Ok(0) + } +} + +#[async_trait::async_trait] +impl HealthCheckInterface for KafkaStore { + async fn health_check_db( + &self, + _: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckDBError> { + Ok(()) + } + + async fn health_check_redis( + &self, + _: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckRedisError> { + Ok(()) + } + + async fn health_check_locker( + &self, + _: &routes::AppState, + ) -> CustomResult { + Ok(0) + } +} diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 0357cedd443..6625a206be2 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -253,9 +253,10 @@ pub struct Health; impl Health { pub fn server(state: AppState) -> Scope { - web::scope("") + web::scope("health") .app_data(web::Data::new(state)) - .service(web::resource("/health").route(web::get().to(health))) + .service(web::resource("").route(web::get().to(health))) + .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) } } diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 7c7f29bd181..b2b460c25ef 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,3 +1,5 @@ +use super::app; +use actix_web::web; use router_env::{instrument, logger, tracing}; use crate::routes::metrics; @@ -11,3 +13,47 @@ pub async fn health() -> impl actix_web::Responder { logger::info!("Health was called"); actix_web::HttpResponse::Ok().body("health is good") } + +#[instrument(skip_all)] +pub async fn deep_health_check(state: web::Data) -> impl actix_web::Responder { + metrics::HEALTH_METRIC.add(&metrics::CONTEXT, 1, &[]); + let db = &*state.store; + logger::info!("Deep health check was called"); + + logger::debug!("Database health check begin"); + + let db_status = match db.health_check_db(db).await { + Ok(_) => "Health is good".to_string(), + Err(err) => err.to_string(), + }; + logger::debug!("Database health check end"); + + logger::debug!("Redis health check begin"); + + let redis_status = match db.health_check_redis(db).await { + Ok(_) => "Health is good".to_string(), + Err(err) => err.to_string(), + }; + + logger::debug!("Redis health check end"); + + logger::debug!("Locker health check begin"); + + let locker_status = match db.health_check_locker(&state).await { + Ok(status_code) => { + let mut status_message = "Health is good".to_string(); + if status_code == 403 { + status_message = format!("{}; Key custodian locked", status_message); + } + status_message + } + Err(err) => err.to_string(), + }; + + logger::debug!("Locker health check end"); + + actix_web::HttpResponse::Ok().body(format!( + "Database - {}\nRedis - {}\nLocker - {}", + db_status, redis_status, locker_status + )) +} diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs index f0cbebf78c5..50173bb1c73 100644 --- a/crates/storage_impl/src/errors.rs +++ b/crates/storage_impl/src/errors.rs @@ -376,3 +376,53 @@ pub enum ConnectorError { #[error("Missing 3DS redirection payload: {field_name}")] MissingConnectorRedirectionPayload { field_name: &'static str }, } + +#[derive(Debug, thiserror::Error)] +pub enum HealthCheckDBError { + #[error("Error while connecting to database")] + DBError, + #[error("Error while writing to database")] + DBWriteError, + #[error("Error while reading element in the database")] + DBReadError, + #[error("Error while deleting element in the database")] + DBDeleteError, + #[error("Unpredictable error occurred")] + UnknownError, + #[error("Error in database transaction")] + TransactionError, +} + +impl From for HealthCheckDBError { + fn from(error: diesel::result::Error) -> Self { + match error { + diesel::result::Error::DatabaseError(_, _) => Self::DBError, + + diesel::result::Error::RollbackErrorOnCommit { .. } + | diesel::result::Error::RollbackTransaction + | diesel::result::Error::AlreadyInTransaction + | diesel::result::Error::NotInTransaction + | diesel::result::Error::BrokenTransactionManager => Self::TransactionError, + + _ => Self::UnknownError, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum HealthCheckRedisError { + #[error("Failed to establish Redis connection")] + RedisConnectionError, + #[error("Failed to set key value in Redis")] + SetFailed, + #[error("Failed to get key value in Redis")] + GetFailed, + #[error("Failed to delete key value in Redis")] + DeleteFailed, +} + +#[derive(Debug, Clone, thiserror::Error)] +pub enum HealthCheckLockerError { + #[error("Failed to establish Locker connection")] + FailedToCallLocker, +} From ef5d8b360161ce27714439904f1bef475b105852 Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Wed, 27 Dec 2023 15:29:38 +0000 Subject: [PATCH 02/23] chore: run formatter --- crates/router/src/routes/health.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index b2b460c25ef..e8c2d10672d 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,7 +1,7 @@ -use super::app; use actix_web::web; use router_env::{instrument, logger, tracing}; +use super::app; use crate::routes::metrics; /// . From 2e1d559cedb57bd6239ed31d7388542e1777efce Mon Sep 17 00:00:00 2001 From: Chethan Date: Thu, 28 Dec 2023 14:05:28 +0530 Subject: [PATCH 03/23] update response type to json and impl health check interface for kafka store --- crates/api_models/src/health_check.rs | 19 +++++++++++++++ crates/api_models/src/lib.rs | 1 + crates/router/src/db/health_check.rs | 24 ------------------- crates/router/src/db/kafka_store.rs | 26 +++++++++++++++++++++ crates/router/src/routes/health.rs | 33 +++++++++++++++++---------- 5 files changed, 67 insertions(+), 36 deletions(-) create mode 100644 crates/api_models/src/health_check.rs diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs new file mode 100644 index 00000000000..b62d59916ca --- /dev/null +++ b/crates/api_models/src/health_check.rs @@ -0,0 +1,19 @@ +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct HealthCheckResponse { + pub database: String, + pub redis: String, + pub locker: LockerHealthResponse, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct LockerHealthResponse { + pub status: String, + pub key_custodian_status: KeyCustodianStatus, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum KeyCustodianStatus { + Unavailable, + Locked, + Unlocked, +} diff --git a/crates/api_models/src/lib.rs b/crates/api_models/src/lib.rs index 935944cf74c..a2d36cd687e 100644 --- a/crates/api_models/src/lib.rs +++ b/crates/api_models/src/lib.rs @@ -15,6 +15,7 @@ pub mod ephemeral_key; pub mod errors; pub mod events; pub mod files; +pub mod health_check; pub mod gsm; pub mod locker_migration; pub mod mandates; diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 3cacc40d364..e42f911d937 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -155,27 +155,3 @@ impl HealthCheckInterface for MockDb { Ok(0) } } - -#[async_trait::async_trait] -impl HealthCheckInterface for KafkaStore { - async fn health_check_db( - &self, - _: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckDBError> { - Ok(()) - } - - async fn health_check_redis( - &self, - _: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError> { - Ok(()) - } - - async fn health_check_locker( - &self, - _: &routes::AppState, - ) -> CustomResult { - Ok(0) - } -} diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index db94c1bcbca..15fe2a01ae0 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -43,6 +43,7 @@ use crate::{ events::EventInterface, file::FileMetadataInterface, gsm::GsmInterface, + health_check::HealthCheckInterface, locker_mock_up::LockerMockUpInterface, mandate::MandateInterface, merchant_account::MerchantAccountInterface, @@ -57,6 +58,7 @@ use crate::{ routing_algorithm::RoutingAlgorithmInterface, MasterKeyInterface, StorageInterface, }, + routes, services::{authentication, kafka::KafkaProducer, Store}, types::{ domain, @@ -2131,3 +2133,27 @@ impl AuthorizationInterface for KafkaStore { .await } } + +#[async_trait::async_trait] +impl HealthCheckInterface for KafkaStore { + async fn health_check_db( + &self, + db: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckDBError> { + self.diesel_store.health_check_db(db).await + } + + async fn health_check_redis( + &self, + db: &dyn StorageInterface, + ) -> CustomResult<(), errors::HealthCheckRedisError> { + self.diesel_store.health_check_redis(db).await + } + + async fn health_check_locker( + &self, + state: &routes::AppState, + ) -> CustomResult { + self.diesel_store.health_check_locker(state).await + } +} diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index b2b460c25ef..7ce3d249f49 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,8 +1,9 @@ use super::app; use actix_web::web; +use api_models::health_check::{HealthCheckResponse, KeyCustodianStatus, LockerHealthResponse}; use router_env::{instrument, logger, tracing}; -use crate::routes::metrics; +use crate::{routes::metrics, services}; /// . // #[logger::instrument(skip_all, name = "name1", level = "warn", fields( key1 = "val1" ))] @@ -39,21 +40,29 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we logger::debug!("Locker health check begin"); - let locker_status = match db.health_check_locker(&state).await { + let (locker_status, key_custodian_status) = match db.health_check_locker(&state).await { Ok(status_code) => { - let mut status_message = "Health is good".to_string(); - if status_code == 403 { - status_message = format!("{}; Key custodian locked", status_message); - } - status_message + let status_message = "Health is good".to_string(); + let key_custodian_status = if status_code == 403 { + KeyCustodianStatus::Locked + } else { + KeyCustodianStatus::Unlocked + }; + (status_message, key_custodian_status) } - Err(err) => err.to_string(), + Err(err) => (err.to_string(), KeyCustodianStatus::Unavailable), }; logger::debug!("Locker health check end"); - actix_web::HttpResponse::Ok().body(format!( - "Database - {}\nRedis - {}\nLocker - {}", - db_status, redis_status, locker_status - )) + let response = HealthCheckResponse { + database: db_status, + redis: redis_status, + locker: LockerHealthResponse { + status: locker_status, + key_custodian_status, + }, + }; + + services::http_response_json(serde_json::to_string(&response).unwrap_or_default()) } From 7e75ed42a43dc9dfaa99fb6c673e02808210e553 Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Thu, 28 Dec 2023 08:39:48 +0000 Subject: [PATCH 04/23] chore: run formatter --- crates/api_models/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/api_models/src/lib.rs b/crates/api_models/src/lib.rs index a2d36cd687e..459443747e3 100644 --- a/crates/api_models/src/lib.rs +++ b/crates/api_models/src/lib.rs @@ -15,8 +15,8 @@ pub mod ephemeral_key; pub mod errors; pub mod events; pub mod files; -pub mod health_check; pub mod gsm; +pub mod health_check; pub mod locker_migration; pub mod mandates; pub mod organization; From 5040182a593a4a2687ec2558707cfb0adb117671 Mon Sep 17 00:00:00 2001 From: Chethan Date: Fri, 29 Dec 2023 13:23:11 +0530 Subject: [PATCH 05/23] return 5xx in case at least one component health is down --- crates/router/src/routes/health.rs | 27 +++++++++++++++++++++------ crates/router/src/services/api.rs | 8 ++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 24bbcf95353..f86a7c0d9c9 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -18,13 +18,17 @@ pub async fn health() -> impl actix_web::Responder { pub async fn deep_health_check(state: web::Data) -> impl actix_web::Responder { metrics::HEALTH_METRIC.add(&metrics::CONTEXT, 1, &[]); let db = &*state.store; + let mut status_code = 200; logger::info!("Deep health check was called"); logger::debug!("Database health check begin"); let db_status = match db.health_check_db(db).await { Ok(_) => "Health is good".to_string(), - Err(err) => err.to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } }; logger::debug!("Database health check end"); @@ -32,7 +36,10 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we let redis_status = match db.health_check_redis(db).await { Ok(_) => "Health is good".to_string(), - Err(err) => err.to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } }; logger::debug!("Redis health check end"); @@ -49,19 +56,27 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we }; (status_message, key_custodian_status) } - Err(err) => (err.to_string(), KeyCustodianStatus::Unavailable), + Err(err) => { + status_code = 500; + (err.to_string(), KeyCustodianStatus::Unavailable) + } }; logger::debug!("Locker health check end"); - let response = HealthCheckResponse { + let response = serde_json::to_string(&HealthCheckResponse { database: db_status, redis: redis_status, locker: LockerHealthResponse { status: locker_status, key_custodian_status, }, - }; + }) + .unwrap_or_default(); - services::http_response_json(serde_json::to_string(&response).unwrap_or_default()) + if status_code == 200 { + services::http_response_json(response) + } else { + services::http_server_error_json_response(response) + } } diff --git a/crates/router/src/services/api.rs b/crates/router/src/services/api.rs index 92fda578727..71687e02c12 100644 --- a/crates/router/src/services/api.rs +++ b/crates/router/src/services/api.rs @@ -1138,6 +1138,14 @@ pub fn http_response_json(response: T) -> HttpRe .body(response) } +pub fn http_server_error_json_response( + response: T, +) -> HttpResponse { + HttpResponse::InternalServerError() + .content_type(mime::APPLICATION_JSON) + .body(response) +} + pub fn http_response_json_with_headers( response: T, mut headers: Vec<(String, String)>, From 605c65b0b5dd16408158e73dc2c76cf94222d83d Mon Sep 17 00:00:00 2001 From: Chethan Date: Fri, 29 Dec 2023 13:29:51 +0530 Subject: [PATCH 06/23] change type names --- crates/api_models/src/health_check.rs | 2 +- crates/router/src/routes/health.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index b62d59916ca..52a46d45619 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -1,5 +1,5 @@ #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct HealthCheckResponse { +pub struct RouterHealthCheckResponse { pub database: String, pub redis: String, pub locker: LockerHealthResponse, diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index f86a7c0d9c9..15dd3acdb0d 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,5 +1,7 @@ use actix_web::web; -use api_models::health_check::{HealthCheckResponse, KeyCustodianStatus, LockerHealthResponse}; +use api_models::health_check::{ + KeyCustodianStatus, LockerHealthResponse, RouterHealthCheckResponse, +}; use router_env::{instrument, logger, tracing}; use super::app; @@ -64,7 +66,7 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we logger::debug!("Locker health check end"); - let response = serde_json::to_string(&HealthCheckResponse { + let response = serde_json::to_string(&RouterHealthCheckResponse { database: db_status, redis: redis_status, locker: LockerHealthResponse { From 0cdfb9d9c27710d5f6dbc2d6c19a7f1e2a0fd01d Mon Sep 17 00:00:00 2001 From: Chethan Date: Tue, 2 Jan 2024 15:43:49 +0530 Subject: [PATCH 07/23] remove key custodian status and refactor transaction code --- crates/api_models/src/health_check.rs | 15 +-------- crates/router/src/consts.rs | 2 ++ crates/router/src/db/health_check.rs | 39 +++++++++--------------- crates/router/src/db/kafka_store.rs | 5 ++- crates/router/src/routes/health.rs | 25 ++++----------- crates/router/src/services/api/client.rs | 2 ++ 6 files changed, 28 insertions(+), 60 deletions(-) diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 52a46d45619..d7bb120d017 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -2,18 +2,5 @@ pub struct RouterHealthCheckResponse { pub database: String, pub redis: String, - pub locker: LockerHealthResponse, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct LockerHealthResponse { - pub status: String, - pub key_custodian_status: KeyCustodianStatus, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum KeyCustodianStatus { - Unavailable, - Locked, - Unlocked, + pub locker: String, } diff --git a/crates/router/src/consts.rs b/crates/router/src/consts.rs index 4a2d2831d10..eff42c0cd7c 100644 --- a/crates/router/src/consts.rs +++ b/crates/router/src/consts.rs @@ -70,3 +70,5 @@ pub const EMAIL_TOKEN_TIME_IN_SECS: u64 = 60 * 60 * 24; // 1 day pub const VERIFY_CONNECTOR_ID_PREFIX: &str = "conn_verify"; #[cfg(feature = "olap")] pub const VERIFY_CONNECTOR_MERCHANT_ID: &str = "test_merchant"; + +pub const LOCKER_HEALTH_CALL_PATH: &str = "/health"; diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 97e104fce9d..fe7ab11c7c4 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -9,14 +9,12 @@ use crate::{ core::errors::{self, CustomResult}, routes, services::api as services, + types::storage, }; #[async_trait::async_trait] pub trait HealthCheckInterface { - async fn health_check_db( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckDBError>; + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError>; async fn health_check_redis( &self, db: &dyn StorageInterface, @@ -24,15 +22,12 @@ pub trait HealthCheckInterface { async fn health_check_locker( &self, state: &routes::AppState, - ) -> CustomResult; + ) -> CustomResult<(), errors::HealthCheckLockerError>; } #[async_trait::async_trait] impl HealthCheckInterface for Store { - async fn health_check_db( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckDBError> { + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { let conn = connection::pg_connection_write(self) .await .change_context(errors::HealthCheckDBError::DBError)?; @@ -49,19 +44,19 @@ impl HealthCheckInterface for Store { logger::debug!("Database read was successful"); - db.insert_config(ConfigNew { + let config = ConfigNew { key: "test_key".to_string(), config: "test_value".to_string(), - }) - .await - .map_err(|err| { + }; + + config.insert(&conn).await.map_err(|err| { logger::error!(write_err=?err,"Error while writing to database"); errors::HealthCheckDBError::DBWriteError })?; logger::debug!("Database write was successful"); - db.delete_config_by_key("test_key").await.map_err(|err| { + storage::Config::delete_by_key(&conn, "test_key").await.map_err(|err| { logger::error!(delete_err=?err,"Error while deleting element in the database"); errors::HealthCheckDBError::DBDeleteError })?; @@ -111,14 +106,13 @@ impl HealthCheckInterface for Store { async fn health_check_locker( &self, state: &routes::AppState, - ) -> CustomResult { + ) -> CustomResult<(), errors::HealthCheckLockerError> { let locker = &state.conf.locker; - let mut status_code = 0; if !locker.mock_locker { let mut url = locker.host_rs.to_owned(); url.push_str("/health"); let request = services::Request::new(services::Method::Get, &url); - status_code = services::call_connector_api(state, request) + services::call_connector_api(state, request) .await .change_context(errors::HealthCheckLockerError::FailedToCallLocker)? .map(|resp| resp.status_code) @@ -128,16 +122,13 @@ impl HealthCheckInterface for Store { logger::debug!("Locker call was successful"); - Ok(status_code) + Ok(()) } } #[async_trait::async_trait] impl HealthCheckInterface for MockDb { - async fn health_check_db( - &self, - _: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckDBError> { + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { Ok(()) } @@ -151,7 +142,7 @@ impl HealthCheckInterface for MockDb { async fn health_check_locker( &self, _: &routes::AppState, - ) -> CustomResult { - Ok(0) + ) -> CustomResult<(), errors::HealthCheckLockerError> { + Ok(()) } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 15fe2a01ae0..7818f08706e 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -2138,9 +2138,8 @@ impl AuthorizationInterface for KafkaStore { impl HealthCheckInterface for KafkaStore { async fn health_check_db( &self, - db: &dyn StorageInterface, ) -> CustomResult<(), errors::HealthCheckDBError> { - self.diesel_store.health_check_db(db).await + self.diesel_store.health_check_db().await } async fn health_check_redis( @@ -2153,7 +2152,7 @@ impl HealthCheckInterface for KafkaStore { async fn health_check_locker( &self, state: &routes::AppState, - ) -> CustomResult { + ) -> CustomResult<(), errors::HealthCheckLockerError> { self.diesel_store.health_check_locker(state).await } } diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 15dd3acdb0d..f07b744f7f5 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,7 +1,5 @@ use actix_web::web; -use api_models::health_check::{ - KeyCustodianStatus, LockerHealthResponse, RouterHealthCheckResponse, -}; +use api_models::health_check::RouterHealthCheckResponse; use router_env::{instrument, logger, tracing}; use super::app; @@ -25,7 +23,7 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we logger::debug!("Database health check begin"); - let db_status = match db.health_check_db(db).await { + let db_status = match db.health_check_db().await { Ok(_) => "Health is good".to_string(), Err(err) => { status_code = 500; @@ -48,19 +46,11 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we logger::debug!("Locker health check begin"); - let (locker_status, key_custodian_status) = match db.health_check_locker(&state).await { - Ok(status_code) => { - let status_message = "Health is good".to_string(); - let key_custodian_status = if status_code == 403 { - KeyCustodianStatus::Locked - } else { - KeyCustodianStatus::Unlocked - }; - (status_message, key_custodian_status) - } + let locker_status = match db.health_check_locker(&state).await { + Ok(_) => "Health is good".to_string(), Err(err) => { status_code = 500; - (err.to_string(), KeyCustodianStatus::Unavailable) + err.to_string() } }; @@ -69,10 +59,7 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we let response = serde_json::to_string(&RouterHealthCheckResponse { database: db_status, redis: redis_status, - locker: LockerHealthResponse { - status: locker_status, - key_custodian_status, - }, + locker: locker_status, }) .unwrap_or_default(); diff --git a/crates/router/src/services/api/client.rs b/crates/router/src/services/api/client.rs index cc7353dcda6..fca85c41699 100644 --- a/crates/router/src/services/api/client.rs +++ b/crates/router/src/services/api/client.rs @@ -10,6 +10,7 @@ use router_env::tracing_actix_web::RequestId; use super::{request::Maskable, Request}; use crate::{ configs::settings::{Locker, Proxy}, + consts::LOCKER_HEALTH_CALL_PATH, core::{ errors::{ApiClientError, CustomResult}, payments, @@ -119,6 +120,7 @@ pub fn proxy_bypass_urls(locker: &Locker) -> Vec { format!("{locker_host_rs}/cards/add"), format!("{locker_host_rs}/cards/retrieve"), format!("{locker_host_rs}/cards/delete"), + format!("{locker_host_rs}{}", LOCKER_HEALTH_CALL_PATH), format!("{locker_host}/card/addCard"), format!("{locker_host}/card/getCard"), format!("{locker_host}/card/deleteCard"), From e3cf92c1432aaaf9089254d3535d782f7f9f1e4b Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Tue, 2 Jan 2024 10:14:28 +0000 Subject: [PATCH 08/23] chore: run formatter --- crates/router/src/db/kafka_store.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 7818f08706e..1184992a8f7 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -2136,9 +2136,7 @@ impl AuthorizationInterface for KafkaStore { #[async_trait::async_trait] impl HealthCheckInterface for KafkaStore { - async fn health_check_db( - &self, - ) -> CustomResult<(), errors::HealthCheckDBError> { + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { self.diesel_store.health_check_db().await } From 14de5584b8c67ca95fe6bc384fe6063d6a89ea7f Mon Sep 17 00:00:00 2001 From: Chethan Date: Wed, 10 Jan 2024 15:35:28 +0530 Subject: [PATCH 09/23] feat: add deep health check for scheduler --- crates/api_models/src/health_check.rs | 6 ++ crates/router/src/bin/scheduler.rs | 93 ++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index d7bb120d017..3d97e72a0e0 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -4,3 +4,9 @@ pub struct RouterHealthCheckResponse { pub redis: String, pub locker: String, } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct SchedulerHealthCheckResponse { + pub database: String, + pub redis: String, +} diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index b800ecb897e..0e7fef098a0 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -1,6 +1,8 @@ #![recursion_limit = "256"] use std::{str::FromStr, sync::Arc}; +use actix_web::{dev::Server, web, Scope}; +use api_models::health_check::SchedulerHealthCheckResponse; use common_utils::ext_traits::{OptionExt, StringExt}; use diesel_models::process_tracker as storage; use error_stack::ResultExt; @@ -11,6 +13,7 @@ use router::{ types::storage::ProcessTrackerExt, workflows, }; +use router_env::{instrument, tracing}; use scheduler::{ consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError, workflows::ProcessTrackerWorkflows, SchedulerAppState, @@ -37,7 +40,7 @@ async fn main() -> CustomResult<(), ProcessTrackerError> { // channel for listening to redis disconnect events let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel(); let state = Box::pin(routes::AppState::new( - conf, + conf.clone(), redis_shutdown_signal_tx, api_client, )) @@ -68,6 +71,16 @@ async fn main() -> CustomResult<(), ProcessTrackerError> { [router_env::service_name!()], ); + #[allow(clippy::expect_used)] + let web_server = Box::pin(start_web_server( + state.clone(), + scheduler_flow_str.to_string(), + )) + .await + .expect("Failed to create the server"); + + tokio::spawn(web_server); + logger::debug!(startup_config=?state.conf); start_scheduler(&state, scheduler_flow, (tx, rx)).await?; @@ -76,6 +89,84 @@ async fn main() -> CustomResult<(), ProcessTrackerError> { Ok(()) } +pub async fn start_web_server( + state: routes::AppState, + service: String, +) -> errors::ApplicationResult { + let server = state.conf.server.clone(); + let web_server = actix_web::HttpServer::new(move || { + actix_web::App::new().service(Health::server(state.clone(), service.clone())) + }) + .bind((server.host.as_str(), server.port))? + .run(); + let _ = web_server.handle(); + + Ok(web_server) +} + +pub struct Health; + +impl Health { + pub fn server(state: routes::AppState, service: String) -> Scope { + web::scope("health") + .app_data(web::Data::new(state)) + .app_data(web::Data::new(service)) + .service(web::resource("").route(web::get().to(health))) + .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) + } +} + +#[instrument(skip_all)] +pub async fn health() -> impl actix_web::Responder { + logger::info!("Scheduler health was called"); + actix_web::HttpResponse::Ok().body("Scheduler health is good") +} + +#[instrument(skip_all)] +pub async fn deep_health_check( + state: web::Data, + service: web::Data, +) -> impl actix_web::Responder { + let db = &*state.store; + let mut status_code = 200; + logger::info!("{} deep health check was called", service.into_inner()); + + logger::debug!("Database health check begin"); + + let db_status = match db.health_check_db().await { + Ok(_) => "Health is good".to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } + }; + logger::debug!("Database health check end"); + + logger::debug!("Redis health check begin"); + + let redis_status = match db.health_check_redis(db).await { + Ok(_) => "Health is good".to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } + }; + + logger::debug!("Redis health check end"); + + let response = serde_json::to_string(&SchedulerHealthCheckResponse { + database: db_status, + redis: redis_status, + }) + .unwrap_or_default(); + + if status_code == 200 { + services::http_response_json(response) + } else { + services::http_server_error_json_response(response) + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[strum(serialize_all = "SCREAMING_SNAKE_CASE")] From e66bfb80b75570c9f7e9de48a124f93076ad3415 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Wed, 24 Jan 2024 13:07:42 +0530 Subject: [PATCH 10/23] feat: add deep health check for analytics --- crates/analytics/src/clickhouse.rs | 13 +++++++++++++ crates/analytics/src/health_check.rs | 7 +++++++ crates/analytics/src/lib.rs | 1 + crates/analytics/src/sqlx.rs | 12 ++++++++++++ crates/api_models/src/health_check.rs | 1 + crates/router/src/db/health_check.rs | 27 +++++++++++++++++++++++++++ crates/router/src/db/kafka_store.rs | 6 ++++++ crates/router/src/routes/health.rs | 9 +++++++++ crates/storage_impl/src/errors.rs | 4 ++++ 9 files changed, 80 insertions(+) create mode 100644 crates/analytics/src/health_check.rs diff --git a/crates/analytics/src/clickhouse.rs b/crates/analytics/src/clickhouse.rs index f81c29c801c..bade1b178ab 100644 --- a/crates/analytics/src/clickhouse.rs +++ b/crates/analytics/src/clickhouse.rs @@ -7,6 +7,7 @@ use router_env::logger; use time::PrimitiveDateTime; use super::{ + health_check::HealthCheck, payments::{ distribution::PaymentDistributionRow, filters::FilterRow, metrics::PaymentMetricRow, }, @@ -93,6 +94,18 @@ impl ClickhouseClient { } } +#[async_trait::async_trait] +impl HealthCheck for ClickhouseClient { + async fn deep_health_check( + &self, + ) -> common_utils::errors::CustomResult<(), QueryExecutionError> { + self.execute_query("SELECT 1") + .await + .map(|_| ()) + .change_context(QueryExecutionError::DatabaseError) + } +} + #[async_trait::async_trait] impl AnalyticsDataSource for ClickhouseClient { type Row = serde_json::Value; diff --git a/crates/analytics/src/health_check.rs b/crates/analytics/src/health_check.rs new file mode 100644 index 00000000000..d9f95848723 --- /dev/null +++ b/crates/analytics/src/health_check.rs @@ -0,0 +1,7 @@ +use crate::types::QueryExecutionError; +use common_utils::errors::CustomResult; + +#[async_trait::async_trait] +pub trait HealthCheck { + async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError>; +} diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 501bd58527c..a4e925519ce 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -8,6 +8,7 @@ pub mod refunds; pub mod api_event; pub mod connector_events; +pub mod health_check; pub mod outgoing_webhook_event; pub mod sdk_events; mod sqlx; diff --git a/crates/analytics/src/sqlx.rs b/crates/analytics/src/sqlx.rs index 7ab8a2aa4bc..562a3a1f64d 100644 --- a/crates/analytics/src/sqlx.rs +++ b/crates/analytics/src/sqlx.rs @@ -17,6 +17,7 @@ use storage_impl::config::Database; use time::PrimitiveDateTime; use super::{ + health_check::HealthCheck, query::{Aggregate, ToSql, Window}, types::{ AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, LoadRow, QueryExecutionError, @@ -164,6 +165,17 @@ impl AnalyticsDataSource for SqlxClient { .change_context(QueryExecutionError::RowExtractionFailure) } } +#[async_trait::async_trait] +impl HealthCheck for SqlxClient { + async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError> { + sqlx::query("SELECT 1") + .fetch_all(&self.pool) + .await + .map(|_| ()) + .into_report() + .change_context(QueryExecutionError::DatabaseError) + } +} impl<'a> FromRow<'a, PgRow> for super::refunds::metrics::RefundMetricRow { fn from_row(row: &'a PgRow) -> sqlx::Result { diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index d7bb120d017..529ada52db7 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -3,4 +3,5 @@ pub struct RouterHealthCheckResponse { pub database: String, pub redis: String, pub locker: String, + pub analytics: String, } diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 73bc2a4321d..44db5dfaed9 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -1,3 +1,4 @@ +use analytics::health_check::HealthCheck; use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use diesel_models::ConfigNew; use error_stack::ResultExt; @@ -24,6 +25,10 @@ pub trait HealthCheckInterface { &self, state: &routes::AppState, ) -> CustomResult<(), errors::HealthCheckLockerError>; + async fn health_check_analytics( + &self, + analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError>; } #[async_trait::async_trait] @@ -123,6 +128,22 @@ impl HealthCheckInterface for Store { Ok(()) } + async fn health_check_analytics( + &self, + analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError> { + match analytics { + analytics::AnalyticsProvider::Sqlx(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError), + analytics::AnalyticsProvider::Clickhouse(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError), + _ => Ok(()), + } + } } #[async_trait::async_trait] @@ -144,4 +165,10 @@ impl HealthCheckInterface for MockDb { ) -> CustomResult<(), errors::HealthCheckLockerError> { Ok(()) } + async fn health_check_analytics( + &self, + _analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError> { + Ok(()) + } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 8398c153156..73ab50ff13d 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -2188,4 +2188,10 @@ impl HealthCheckInterface for KafkaStore { ) -> CustomResult<(), errors::HealthCheckLockerError> { self.diesel_store.health_check_locker(state).await } + async fn health_check_analytics( + &self, + analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError> { + self.diesel_store.health_check_analytics(analytics).await + } } diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index f07b744f7f5..3f963c4ed95 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -54,12 +54,21 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we } }; + let analytics_status = match db.health_check_analytics(&state.pool).await { + Ok(_) => "Health is good".to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } + }; + logger::debug!("Locker health check end"); let response = serde_json::to_string(&RouterHealthCheckResponse { database: db_status, redis: redis_status, locker: locker_status, + analytics: analytics_status, }) .unwrap_or_default(); diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs index ac3a04e85b2..2adcdcf8d2e 100644 --- a/crates/storage_impl/src/errors.rs +++ b/crates/storage_impl/src/errors.rs @@ -394,6 +394,10 @@ pub enum HealthCheckDBError { UnknownError, #[error("Error in database transaction")] TransactionError, + #[error("Error while executing query in Sqlx Analytics")] + SqlxAnalyticsError, + #[error("Error while executing query in Clickhouse Analytics")] + ClickhouseAnalyticsError, } impl From for HealthCheckDBError { From b79ea4906b5432f9b7fe6144c8b658736896b16f Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Wed, 24 Jan 2024 07:41:37 +0000 Subject: [PATCH 11/23] chore: run formatter --- crates/analytics/src/health_check.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/analytics/src/health_check.rs b/crates/analytics/src/health_check.rs index d9f95848723..f566aecf10b 100644 --- a/crates/analytics/src/health_check.rs +++ b/crates/analytics/src/health_check.rs @@ -1,6 +1,7 @@ -use crate::types::QueryExecutionError; use common_utils::errors::CustomResult; +use crate::types::QueryExecutionError; + #[async_trait::async_trait] pub trait HealthCheck { async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError>; From e4f3b7b8add2e456a00305f7e2cdcebdcaacf190 Mon Sep 17 00:00:00 2001 From: Chethan Date: Thu, 25 Jan 2024 12:31:30 +0530 Subject: [PATCH 12/23] update the deep health check http method to get request --- crates/router/src/bin/scheduler.rs | 2 +- crates/router/src/routes/app.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 0e7fef098a0..352360692ad 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -112,7 +112,7 @@ impl Health { .app_data(web::Data::new(state)) .app_data(web::Data::new(service)) .service(web::resource("").route(web::get().to(health))) - .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) + .service(web::resource("/deep_check").route(web::get().to(deep_health_check))) } } diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index d3a43f0f490..f53e30d7c29 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -306,7 +306,7 @@ impl Health { web::scope("health") .app_data(web::Data::new(state)) .service(web::resource("").route(web::get().to(health))) - .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) + .service(web::resource("/deep_check").route(web::get().to(deep_health_check))) } } From 6b06075677b132928ddc5cb54ae5fccfea71b549 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Wed, 24 Jan 2024 15:19:39 +0530 Subject: [PATCH 13/23] refactor(health): refactor deep health check --- crates/api_models/src/health_check.rs | 10 +- crates/router/src/core.rs | 1 + crates/router/src/core/health_check.rs | 108 +++++++++++++++ crates/router/src/db.rs | 2 +- crates/router/src/db/health_check.rs | 174 +++++-------------------- crates/router/src/db/kafka_store.rs | 26 +--- crates/router/src/routes/app.rs | 2 +- crates/router/src/routes/health.rs | 100 +++++++------- crates/router/src/routes/lock_utils.rs | 2 + crates/router_env/src/logger/types.rs | 2 + 10 files changed, 215 insertions(+), 212 deletions(-) create mode 100644 crates/router/src/core/health_check.rs diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 529ada52db7..70c4fe96304 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -1,7 +1,9 @@ #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct RouterHealthCheckResponse { - pub database: String, - pub redis: String, - pub locker: String, - pub analytics: String, + pub database: bool, + pub redis: bool, + pub locker: bool, + pub analytics: bool, } + +impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {} diff --git a/crates/router/src/core.rs b/crates/router/src/core.rs index 5ae4b0be33d..9bdc493e078 100644 --- a/crates/router/src/core.rs +++ b/crates/router/src/core.rs @@ -17,6 +17,7 @@ pub mod files; #[cfg(feature = "frm")] pub mod fraud_check; pub mod gsm; +pub mod health_check; pub mod locker_migration; pub mod mandate; pub mod metrics; diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs new file mode 100644 index 00000000000..d0ffe7ec3f3 --- /dev/null +++ b/crates/router/src/core/health_check.rs @@ -0,0 +1,108 @@ +use crate::routes::app; + +use analytics::health_check::HealthCheck; +use error_stack::ResultExt; +use router_env::logger; + +use crate::{ + consts::LOCKER_HEALTH_CALL_PATH, + core::errors::{self, CustomResult}, + services::api as services, +}; + +#[async_trait::async_trait] +pub trait HealthCheckInterface { + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError>; + async fn health_check_redis(&self) -> CustomResult<(), errors::HealthCheckRedisError>; + async fn health_check_locker(&self) -> CustomResult<(), errors::HealthCheckLockerError>; + async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError>; +} + +#[async_trait::async_trait] +impl HealthCheckInterface for app::AppState { + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { + let db = &*self.store; + db.health_check_db().await?; + Ok(()) + } + + async fn health_check_redis(&self) -> CustomResult<(), errors::HealthCheckRedisError> { + let db = &*self.store; + let redis_conn = db + .get_redis_conn() + .change_context(errors::HealthCheckRedisError::RedisConnectionError)?; + + redis_conn + .serialize_and_set_key_with_expiry("test_key", "test_value", 30) + .await + .change_context(errors::HealthCheckRedisError::SetFailed)?; + + logger::debug!("Redis set_key was successful"); + + redis_conn + .get_key("test_key") + .await + .change_context(errors::HealthCheckRedisError::GetFailed)?; + + logger::debug!("Redis get_key was successful"); + + redis_conn + .delete_key("test_key") + .await + .change_context(errors::HealthCheckRedisError::DeleteFailed)?; + + logger::debug!("Redis delete_key was successful"); + + Ok(()) + } + + async fn health_check_locker(&self) -> CustomResult<(), errors::HealthCheckLockerError> { + let locker = &self.conf.locker; + if !locker.mock_locker { + let mut url = locker.host_rs.to_owned(); + url.push_str(LOCKER_HEALTH_CALL_PATH); + let request = services::Request::new(services::Method::Get, &url); + services::call_connector_api(self, request) + .await + .change_context(errors::HealthCheckLockerError::FailedToCallLocker)? + .ok(); + } + + logger::debug!("Locker call was successful"); + + Ok(()) + } + async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError> { + let analytics = &self.pool; + match analytics { + analytics::AnalyticsProvider::Sqlx(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError), + analytics::AnalyticsProvider::Clickhouse(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError), + analytics::AnalyticsProvider::CombinedCkh(sqlx_client, ckh_client) => { + sqlx_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError)?; + ckh_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError) + } + analytics::AnalyticsProvider::CombinedSqlx(sqlx_client, ckh_client) => { + sqlx_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError)?; + ckh_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError) + } + } + } +} diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index b9d346b7a71..54900177246 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -110,7 +110,7 @@ pub trait StorageInterface: + user_role::UserRoleInterface + authorization::AuthorizationInterface + user::sample_data::BatchSampleDataInterface - + health_check::HealthCheckInterface + + health_check::HealthCheckDbInterface + 'static { fn get_scheduler_db(&self) -> Box; diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 44db5dfaed9..74556c93e43 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -1,174 +1,70 @@ -use analytics::health_check::HealthCheck; -use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; -use diesel_models::ConfigNew; +use async_bb8_diesel::AsyncConnection; use error_stack::ResultExt; -use router_env::logger; -use super::{MockDb, StorageInterface, Store}; +use super::{MockDb, Store}; use crate::{ connection, - consts::LOCKER_HEALTH_CALL_PATH, core::errors::{self, CustomResult}, - routes, - services::api as services, types::storage, }; +use diesel_models::ConfigNew; +use router_env::logger; + +use async_bb8_diesel::AsyncRunQueryDsl; #[async_trait::async_trait] -pub trait HealthCheckInterface { +pub trait HealthCheckDbInterface { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError>; - async fn health_check_redis( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError>; - async fn health_check_locker( - &self, - state: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError>; - async fn health_check_analytics( - &self, - analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError>; } #[async_trait::async_trait] -impl HealthCheckInterface for Store { +impl HealthCheckDbInterface for Store { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { let conn = connection::pg_connection_write(self) .await .change_context(errors::HealthCheckDBError::DBError)?; - let _data = conn - .transaction_async(|conn| { - Box::pin(async move { - let query = - diesel::select(diesel::dsl::sql::("1 + 1")); - let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { - logger::error!(read_err=?err,"Error while reading element in the database"); - errors::HealthCheckDBError::DBReadError - })?; - - logger::debug!("Database read was successful"); - - let config = ConfigNew { - key: "test_key".to_string(), - config: "test_value".to_string(), - }; - - config.insert(&conn).await.map_err(|err| { - logger::error!(write_err=?err,"Error while writing to database"); - errors::HealthCheckDBError::DBWriteError - })?; - - logger::debug!("Database write was successful"); - - storage::Config::delete_by_key(&conn, "test_key").await.map_err(|err| { - logger::error!(delete_err=?err,"Error while deleting element in the database"); - errors::HealthCheckDBError::DBDeleteError - })?; - - logger::debug!("Database delete was successful"); - - Ok::<_, errors::HealthCheckDBError>(()) - }) - }) - .await?; - - Ok(()) - } - - async fn health_check_redis( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError> { - let redis_conn = db - .get_redis_conn() - .change_context(errors::HealthCheckRedisError::RedisConnectionError)?; - - redis_conn - .serialize_and_set_key_with_expiry("test_key", "test_value", 30) - .await - .change_context(errors::HealthCheckRedisError::SetFailed)?; - - logger::debug!("Redis set_key was successful"); + conn.transaction_async(|conn| async move { + let query = diesel::select(diesel::dsl::sql::("1 + 1")); + let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { + logger::error!(read_err=?err,"Error while reading element in the database"); + errors::HealthCheckDBError::DBReadError + })?; - redis_conn - .get_key("test_key") - .await - .change_context(errors::HealthCheckRedisError::GetFailed)?; + logger::debug!("Database read was successful"); - logger::debug!("Redis get_key was successful"); + let config = ConfigNew { + key: "test_key".to_string(), + config: "test_value".to_string(), + }; - redis_conn - .delete_key("test_key") - .await - .change_context(errors::HealthCheckRedisError::DeleteFailed)?; + config.insert(&conn).await.map_err(|err| { + logger::error!(write_err=?err,"Error while writing to database"); + errors::HealthCheckDBError::DBWriteError + })?; - logger::debug!("Redis delete_key was successful"); + logger::debug!("Database write was successful"); - Ok(()) - } - - async fn health_check_locker( - &self, - state: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError> { - let locker = &state.conf.locker; - if !locker.mock_locker { - let mut url = locker.host_rs.to_owned(); - url.push_str(LOCKER_HEALTH_CALL_PATH); - let request = services::Request::new(services::Method::Get, &url); - services::call_connector_api(state, request) + storage::Config::delete_by_key(&conn, "test_key") .await - .change_context(errors::HealthCheckLockerError::FailedToCallLocker)? - .ok(); - } + .map_err(|err| { + logger::error!(delete_err=?err,"Error while deleting element in the database"); + errors::HealthCheckDBError::DBDeleteError + })?; + + logger::debug!("Database delete was successful"); - logger::debug!("Locker call was successful"); + Ok::<_, errors::HealthCheckDBError>(()) + }) + .await?; Ok(()) } - async fn health_check_analytics( - &self, - analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError> { - match analytics { - analytics::AnalyticsProvider::Sqlx(client) => client - .deep_health_check() - .await - .change_context(errors::HealthCheckDBError::SqlxAnalyticsError), - analytics::AnalyticsProvider::Clickhouse(client) => client - .deep_health_check() - .await - .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError), - _ => Ok(()), - } - } } #[async_trait::async_trait] -impl HealthCheckInterface for MockDb { +impl HealthCheckDbInterface for MockDb { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { Ok(()) } - - async fn health_check_redis( - &self, - _: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError> { - Ok(()) - } - - async fn health_check_locker( - &self, - _: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError> { - Ok(()) - } - async fn health_check_analytics( - &self, - _analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError> { - Ok(()) - } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 73ab50ff13d..d8dc115aba4 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -27,6 +27,7 @@ use super::{ user::{sample_data::BatchSampleDataInterface, UserInterface}, user_role::UserRoleInterface, }; + use crate::{ core::errors::{self, ProcessTrackerError}, db::{ @@ -43,7 +44,7 @@ use crate::{ events::EventInterface, file::FileMetadataInterface, gsm::GsmInterface, - health_check::HealthCheckInterface, + health_check::HealthCheckDbInterface, locker_mock_up::LockerMockUpInterface, mandate::MandateInterface, merchant_account::MerchantAccountInterface, @@ -58,7 +59,6 @@ use crate::{ routing_algorithm::RoutingAlgorithmInterface, MasterKeyInterface, StorageInterface, }, - routes, services::{authentication, kafka::KafkaProducer, Store}, types::{ domain, @@ -2170,28 +2170,8 @@ impl AuthorizationInterface for KafkaStore { } #[async_trait::async_trait] -impl HealthCheckInterface for KafkaStore { +impl HealthCheckDbInterface for KafkaStore { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { self.diesel_store.health_check_db().await } - - async fn health_check_redis( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError> { - self.diesel_store.health_check_redis(db).await - } - - async fn health_check_locker( - &self, - state: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError> { - self.diesel_store.health_check_locker(state).await - } - async fn health_check_analytics( - &self, - analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError> { - self.diesel_store.health_check_analytics(analytics).await - } } diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 4345109a672..db664900619 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -260,7 +260,7 @@ impl Health { web::scope("health") .app_data(web::Data::new(state)) .service(web::resource("").route(web::get().to(health))) - .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) + .service(web::resource("/ready").route(web::get().to(deep_health_check))) } } diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 3f963c4ed95..2ea36919cb6 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,9 +1,16 @@ -use actix_web::web; +use actix_web::{web, HttpRequest}; use api_models::health_check::RouterHealthCheckResponse; -use router_env::{instrument, logger, tracing}; +use router_env::{instrument, logger, tracing, Flow}; + +use error_stack::ResultExt; use super::app; -use crate::{routes::metrics, services}; +use crate::{ + core::{api_locking, health_check::HealthCheckInterface}, + errors::{self, RouterResponse}, + routes::metrics, + services::{api, authentication as auth}, +}; /// . // #[logger::instrument(skip_all, name = "name1", level = "warn", fields( key1 = "val1" ))] #[instrument(skip_all)] @@ -14,67 +21,72 @@ pub async fn health() -> impl actix_web::Responder { actix_web::HttpResponse::Ok().body("health is good") } -#[instrument(skip_all)] -pub async fn deep_health_check(state: web::Data) -> impl actix_web::Responder { +#[instrument(skip_all, fields(flow = ?Flow::DeepHealthCheck))] +pub async fn deep_health_check( + state: web::Data, + request: HttpRequest, +) -> impl actix_web::Responder { metrics::HEALTH_METRIC.add(&metrics::CONTEXT, 1, &[]); - let db = &*state.store; - let mut status_code = 200; + + let flow = Flow::DeepHealthCheck; + + Box::pin(api::server_wrap( + flow, + state, + &request, + (), + |state, _, _| deep_health_check_func(state), + &auth::NoAuth, + api_locking::LockAction::NotApplicable, + )) + .await +} + +async fn deep_health_check_func(state: app::AppState) -> RouterResponse { logger::info!("Deep health check was called"); logger::debug!("Database health check begin"); - let db_status = match db.health_check_db().await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let db_status = state + .health_check_db() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; + logger::debug!("Database health check end"); logger::debug!("Redis health check begin"); - let redis_status = match db.health_check_redis(db).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let redis_status = state + .health_check_redis() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; logger::debug!("Redis health check end"); logger::debug!("Locker health check begin"); - let locker_status = match db.health_check_locker(&state).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let locker_status = state + .health_check_locker() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; - let analytics_status = match db.health_check_analytics(&state.pool).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let analytics_status = state + .health_check_analytics() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; logger::debug!("Locker health check end"); - let response = serde_json::to_string(&RouterHealthCheckResponse { + let response = RouterHealthCheckResponse { database: db_status, redis: redis_status, locker: locker_status, analytics: analytics_status, - }) - .unwrap_or_default(); - - if status_code == 200 { - services::http_response_json(response) - } else { - services::http_server_error_json_response(response) - } + }; + + Ok(api::ApplicationResponse::Json(response)) } diff --git a/crates/router/src/routes/lock_utils.rs b/crates/router/src/routes/lock_utils.rs index 1c967222dc7..a24c5dc6687 100644 --- a/crates/router/src/routes/lock_utils.rs +++ b/crates/router/src/routes/lock_utils.rs @@ -11,6 +11,7 @@ pub enum ApiIdentifier { Configs, Customers, Ephemeral, + Health, Mandates, PaymentMethods, PaymentMethodAuth, @@ -83,6 +84,7 @@ impl From for ApiIdentifier { Flow::EphemeralKeyCreate | Flow::EphemeralKeyDelete => Self::Ephemeral, + Flow::DeepHealthCheck => Self::Health, Flow::MandatesRetrieve | Flow::MandatesRevoke | Flow::MandatesList => Self::Mandates, Flow::PaymentMethodsCreate diff --git a/crates/router_env/src/logger/types.rs b/crates/router_env/src/logger/types.rs index ba323ebc5e3..dbbc3cdc277 100644 --- a/crates/router_env/src/logger/types.rs +++ b/crates/router_env/src/logger/types.rs @@ -54,6 +54,8 @@ pub enum Tag { /// API Flow #[derive(Debug, Display, Clone, PartialEq, Eq)] pub enum Flow { + /// Deep health Check + DeepHealthCheck, /// Merchants account create flow. MerchantsAccountCreate, /// Merchants account retrieve flow. From 1bcd77420e67594ffe44ce749af1e9e0c9e6fb88 Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Mon, 29 Jan 2024 17:19:47 +0000 Subject: [PATCH 14/23] chore: run formatter --- crates/router/src/core/health_check.rs | 3 +-- crates/router/src/db/health_check.rs | 8 +++----- crates/router/src/db/kafka_store.rs | 1 - crates/router/src/routes/health.rs | 3 +-- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index d0ffe7ec3f3..e8215dd4bee 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -1,5 +1,3 @@ -use crate::routes::app; - use analytics::health_check::HealthCheck; use error_stack::ResultExt; use router_env::logger; @@ -7,6 +5,7 @@ use router_env::logger; use crate::{ consts::LOCKER_HEALTH_CALL_PATH, core::errors::{self, CustomResult}, + routes::app, services::api as services, }; diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 74556c93e43..6ebc9dfff5a 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -1,5 +1,7 @@ -use async_bb8_diesel::AsyncConnection; +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; +use diesel_models::ConfigNew; use error_stack::ResultExt; +use router_env::logger; use super::{MockDb, Store}; use crate::{ @@ -7,10 +9,6 @@ use crate::{ core::errors::{self, CustomResult}, types::storage, }; -use diesel_models::ConfigNew; -use router_env::logger; - -use async_bb8_diesel::AsyncRunQueryDsl; #[async_trait::async_trait] pub trait HealthCheckDbInterface { diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index d3b90f1026d..665a920bcad 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -27,7 +27,6 @@ use super::{ user::{sample_data::BatchSampleDataInterface, UserInterface}, user_role::UserRoleInterface, }; - use crate::{ core::errors::{self, ProcessTrackerError}, db::{ diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 2ea36919cb6..20b39a44401 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,8 +1,7 @@ use actix_web::{web, HttpRequest}; use api_models::health_check::RouterHealthCheckResponse; -use router_env::{instrument, logger, tracing, Flow}; - use error_stack::ResultExt; +use router_env::{instrument, logger, tracing, Flow}; use super::app; use crate::{ From 3e67fbf6429fb61a222199ea0d178a516730b9c6 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Tue, 30 Jan 2024 12:07:16 +0530 Subject: [PATCH 15/23] fix: fix health check error --- .../router/src/compatibility/stripe/errors.rs | 3 +- .../src/core/errors/api_error_response.rs | 5 +++ crates/router/src/core/errors/transformers.rs | 5 ++- crates/router/src/routes/health.rs | 34 +++++++++++++------ 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/crates/router/src/compatibility/stripe/errors.rs b/crates/router/src/compatibility/stripe/errors.rs index 63205ea68ca..759e968125f 100644 --- a/crates/router/src/compatibility/stripe/errors.rs +++ b/crates/router/src/compatibility/stripe/errors.rs @@ -468,7 +468,8 @@ impl From for StripeErrorCode { errors::ApiErrorResponse::MandateUpdateFailed | errors::ApiErrorResponse::MandateSerializationFailed | errors::ApiErrorResponse::MandateDeserializationFailed - | errors::ApiErrorResponse::InternalServerError => Self::InternalServerError, // not a stripe code + | errors::ApiErrorResponse::InternalServerError + | errors::ApiErrorResponse::HealthCheckError { .. } => Self::InternalServerError, // not a stripe code errors::ApiErrorResponse::ExternalConnectorError { code, message, diff --git a/crates/router/src/core/errors/api_error_response.rs b/crates/router/src/core/errors/api_error_response.rs index 54ec4ec1e29..023e1f4b7fb 100644 --- a/crates/router/src/core/errors/api_error_response.rs +++ b/crates/router/src/core/errors/api_error_response.rs @@ -238,6 +238,11 @@ pub enum ApiErrorResponse { WebhookInvalidMerchantSecret, #[error(error_type = ErrorType::InvalidRequestError, code = "IR_19", message = "{message}")] CurrencyNotSupported { message: String }, + #[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failiing with error: {message}")] + HealthCheckError { + component: &'static str, + message: String, + }, #[error(error_type = ErrorType::InvalidRequestError, code = "IR_24", message = "Merchant connector account is configured with invalid {config}")] InvalidConnectorConfiguration { config: String }, #[error(error_type = ErrorType::ValidationError, code = "HE_01", message = "Failed to convert currency to minor unit")] diff --git a/crates/router/src/core/errors/transformers.rs b/crates/router/src/core/errors/transformers.rs index ff764cafed6..0119335b7c4 100644 --- a/crates/router/src/core/errors/transformers.rs +++ b/crates/router/src/core/errors/transformers.rs @@ -123,7 +123,10 @@ impl ErrorSwitch for ApiErrorRespon }, Self::MandateUpdateFailed | Self::MandateSerializationFailed | Self::MandateDeserializationFailed | Self::InternalServerError => { AER::InternalServerError(ApiError::new("HE", 0, "Something went wrong", None)) - } + }, + Self::HealthCheckError { message,component } => { + AER::InternalServerError(ApiError::new("HE",0,format!("{} health check failed with error: {}",component,message),None)) + }, Self::PayoutFailed { data } => { AER::BadRequest(ApiError::new("CE", 4, "Payout failed while processing with connector.", Some(Extra { data: data.clone(), ..Default::default()}))) }, diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 2ea36919cb6..83454ccb590 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -2,8 +2,6 @@ use actix_web::{web, HttpRequest}; use api_models::health_check::RouterHealthCheckResponse; use router_env::{instrument, logger, tracing, Flow}; -use error_stack::ResultExt; - use super::app; use crate::{ core::{api_locking, health_check::HealthCheckInterface}, @@ -47,11 +45,12 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse RouterResponse RouterResponse Date: Tue, 30 Jan 2024 12:32:45 +0530 Subject: [PATCH 16/23] address requested changes --- crates/router/src/bin/scheduler.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 352360692ad..e8837b36b67 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -40,7 +40,7 @@ async fn main() -> CustomResult<(), ProcessTrackerError> { // channel for listening to redis disconnect events let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel(); let state = Box::pin(routes::AppState::new( - conf.clone(), + conf, redis_shutdown_signal_tx, api_client, )) @@ -79,7 +79,10 @@ async fn main() -> CustomResult<(), ProcessTrackerError> { .await .expect("Failed to create the server"); - tokio::spawn(web_server); + tokio::spawn(async move { + let _ = web_server.await; + logger::error!("The health check probe stopped working!"); + }); logger::debug!(startup_config=?state.conf); From 39c0db8fbe2bc43190bb2d628279eeaf46e0da31 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Tue, 30 Jan 2024 17:30:07 +0530 Subject: [PATCH 17/23] fix: build --- crates/api_models/Cargo.toml | 1 + crates/api_models/src/health_check.rs | 1 + crates/router/Cargo.toml | 2 +- crates/router/src/core/health_check.rs | 5 +++++ crates/router/src/core/pm_auth.rs | 3 --- crates/router/src/routes/health.rs | 2 ++ 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/api_models/Cargo.toml b/crates/api_models/Cargo.toml index 8cd3ee53f21..1e8e0f47eb9 100644 --- a/crates/api_models/Cargo.toml +++ b/crates/api_models/Cargo.toml @@ -18,6 +18,7 @@ dummy_connector = ["euclid/dummy_connector", "common_enums/dummy_connector"] detailed_errors = [] payouts = [] frm = [] +olap = [] openapi = ["common_enums/openapi"] recon = [] diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 70c4fe96304..8323f135134 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -3,6 +3,7 @@ pub struct RouterHealthCheckResponse { pub database: bool, pub redis: bool, pub locker: bool, + #[cfg(feature = "olap")] pub analytics: bool, } diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index e575daf7e7a..4567b1656d7 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -17,7 +17,7 @@ email = ["external_services/email", "dep:aws-config", "olap"] frm = [] stripe = ["dep:serde_qs"] release = ["kms", "stripe", "aws_s3", "email", "backwards_compatibility", "business_profile_routing", "accounts_cache", "kv_store", "connector_choice_mca_id", "profile_specific_fallback_routing", "vergen", "recon"] -olap = ["data_models/olap", "storage_impl/olap", "scheduler/olap", "dep:analytics"] +olap = ["data_models/olap", "storage_impl/olap", "scheduler/olap","api_models/olap","dep:analytics"] oltp = ["storage_impl/oltp"] kv_store = ["scheduler/kv_store"] accounts_cache = [] diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index e8215dd4bee..5f2351ef726 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -1,4 +1,6 @@ +#[cfg(feature = "olap")] use analytics::health_check::HealthCheck; + use error_stack::ResultExt; use router_env::logger; @@ -14,6 +16,7 @@ pub trait HealthCheckInterface { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError>; async fn health_check_redis(&self) -> CustomResult<(), errors::HealthCheckRedisError>; async fn health_check_locker(&self) -> CustomResult<(), errors::HealthCheckLockerError>; + #[cfg(feature = "olap")] async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError>; } @@ -71,6 +74,8 @@ impl HealthCheckInterface for app::AppState { Ok(()) } + + #[cfg(feature = "olap")] async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError> { let analytics = &self.pool; match analytics { diff --git a/crates/router/src/core/pm_auth.rs b/crates/router/src/core/pm_auth.rs index d805925f318..9f70cc6baee 100644 --- a/crates/router/src/core/pm_auth.rs +++ b/crates/router/src/core/pm_auth.rs @@ -375,9 +375,6 @@ async fn store_bank_details_in_payment_methods( .await .change_context(ApiErrorResponse::InternalServerError)?; - #[cfg(not(feature = "kms"))] - let pm_auth_key = pm_auth_key; - let mut update_entries: Vec<(storage::PaymentMethod, storage::PaymentMethodUpdate)> = Vec::new(); let mut new_entries: Vec = Vec::new(); diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 83454ccb590..89132c3319b 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -82,6 +82,7 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse RouterResponse Date: Tue, 30 Jan 2024 12:00:44 +0000 Subject: [PATCH 18/23] chore: run formatter --- crates/router/src/core/health_check.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index 5f2351ef726..6fc038b82e9 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -1,6 +1,5 @@ #[cfg(feature = "olap")] use analytics::health_check::HealthCheck; - use error_stack::ResultExt; use router_env::logger; From 52bcec082ed4f41d0270cae346ecc2cf7a8f29e0 Mon Sep 17 00:00:00 2001 From: Chethan Date: Tue, 30 Jan 2024 19:22:21 +0530 Subject: [PATCH 19/23] refactor health check --- crates/api_models/src/health_check.rs | 2 + crates/router/src/bin/scheduler.rs | 67 +++++++++++-------- .../src/core/errors/api_error_response.rs | 2 +- 3 files changed, 43 insertions(+), 28 deletions(-) diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 116fdd90040..eab971b5fe1 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -13,3 +13,5 @@ pub struct SchedulerHealthCheckResponse { pub database: bool, pub redis: bool, } + +impl common_utils::events::ApiEventMetric for SchedulerHealthCheckResponse {} diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index e8837b36b67..62d53e279c1 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -8,7 +8,10 @@ use diesel_models::process_tracker as storage; use error_stack::ResultExt; use router::{ configs::settings::{CmdLineConf, Settings}, - core::errors::{self, CustomResult}, + core::{ + errors::{self, CustomResult}, + health_check::HealthCheckInterface, + }, logger, routes, services, types::storage::ProcessTrackerExt, workflows, @@ -115,7 +118,7 @@ impl Health { .app_data(web::Data::new(state)) .app_data(web::Data::new(service)) .service(web::resource("").route(web::get().to(health))) - .service(web::resource("/deep_check").route(web::get().to(deep_health_check))) + .service(web::resource("/ready").route(web::get().to(deep_health_check))) } } @@ -124,50 +127,60 @@ pub async fn health() -> impl actix_web::Responder { logger::info!("Scheduler health was called"); actix_web::HttpResponse::Ok().body("Scheduler health is good") } - #[instrument(skip_all)] pub async fn deep_health_check( state: web::Data, service: web::Data, ) -> impl actix_web::Responder { - let db = &*state.store; - let mut status_code = 200; + let report = deep_health_check_func(state, service).await; + match report { + Ok(response) => { + services::http_response_json(serde_json::to_string(&response).unwrap_or_default()) + } + Err(err) => services::http_server_error_json_response( + serde_json::to_string(&err.current_context()).unwrap_or_default(), + ), + } +} +#[instrument(skip_all)] +pub async fn deep_health_check_func( + state: web::Data, + service: web::Data, +) -> errors::RouterResult { logger::info!("{} deep health check was called", service.into_inner()); logger::debug!("Database health check begin"); - let db_status = match db.health_check_db().await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let db_status = state.health_check_db().await.map(|_| true).map_err(|err| { + error_stack::report!(errors::ApiErrorResponse::HealthCheckError { + component: "Database", + message: err.to_string() + }) + })?; + logger::debug!("Database health check end"); logger::debug!("Redis health check begin"); - let redis_status = match db.health_check_redis(db).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let redis_status = state + .health_check_redis() + .await + .map(|_| true) + .map_err(|err| { + error_stack::report!(errors::ApiErrorResponse::HealthCheckError { + component: "Redis", + message: err.to_string() + }) + })?; logger::debug!("Redis health check end"); - let response = serde_json::to_string(&SchedulerHealthCheckResponse { + let response = SchedulerHealthCheckResponse { database: db_status, redis: redis_status, - }) - .unwrap_or_default(); + }; - if status_code == 200 { - services::http_response_json(response) - } else { - services::http_server_error_json_response(response) - } + Ok(response) } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)] diff --git a/crates/router/src/core/errors/api_error_response.rs b/crates/router/src/core/errors/api_error_response.rs index 023e1f4b7fb..e83483b0816 100644 --- a/crates/router/src/core/errors/api_error_response.rs +++ b/crates/router/src/core/errors/api_error_response.rs @@ -238,7 +238,7 @@ pub enum ApiErrorResponse { WebhookInvalidMerchantSecret, #[error(error_type = ErrorType::InvalidRequestError, code = "IR_19", message = "{message}")] CurrencyNotSupported { message: String }, - #[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failiing with error: {message}")] + #[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failing with error: {message}")] HealthCheckError { component: &'static str, message: String, From 515dbea0013b9e3d66d1210c9cc28d6c16b5bf61 Mon Sep 17 00:00:00 2001 From: Chethan Date: Wed, 31 Jan 2024 13:57:31 +0530 Subject: [PATCH 20/23] log and return the health check error --- crates/router/src/bin/scheduler.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 62d53e279c1..370d477b08d 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -12,7 +12,8 @@ use router::{ errors::{self, CustomResult}, health_check::HealthCheckInterface, }, - logger, routes, services, + logger, routes, + services::{self, api}, types::storage::ProcessTrackerExt, workflows, }; @@ -137,9 +138,7 @@ pub async fn deep_health_check( Ok(response) => { services::http_response_json(serde_json::to_string(&response).unwrap_or_default()) } - Err(err) => services::http_server_error_json_response( - serde_json::to_string(&err.current_context()).unwrap_or_default(), - ), + Err(err) => api::log_and_return_error_response(err), } } #[instrument(skip_all)] From a80f156a6d2d06b60ad7f200428ee255405a2dc4 Mon Sep 17 00:00:00 2001 From: Chethan Date: Wed, 31 Jan 2024 19:26:05 +0530 Subject: [PATCH 21/23] address requested changes --- Cargo.lock | 1 + config/config.example.toml | 5 +++++ config/development.toml | 5 +++++ config/docker_compose.toml | 5 +++++ crates/router/src/bin/scheduler.rs | 23 +++++++++++++++++---- crates/scheduler/Cargo.toml | 1 + crates/scheduler/src/configs/defaults.rs | 11 ++++++++++ crates/scheduler/src/configs/settings.rs | 9 ++++++++ crates/scheduler/src/configs/validations.rs | 12 +++++++++++ 9 files changed, 68 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1aabc89a04..f0334ce9cfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5526,6 +5526,7 @@ dependencies = [ "external_services", "futures 0.3.28", "masking", + "num_cpus", "once_cell", "rand 0.8.5", "redis_interface", diff --git a/config/config.example.toml b/config/config.example.toml index f7e9fa70f6e..12830b54297 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -298,6 +298,11 @@ lower_fetch_limit = 1800 # Lower limit for fetching entries from redis lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with lock_ttl = 160 # the ttl being the expiry (in seconds) +[scheduler.server] +port = 3000 +host = "127.0.0.1" +workers = 1 + batch_size = 200 # Specifies the batch size the producer will push under a single entry in the redis queue # Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database diff --git a/config/development.toml b/config/development.toml index 584bdf751a2..20abb7bd6f3 100644 --- a/config/development.toml +++ b/config/development.toml @@ -228,6 +228,11 @@ stream = "SCHEDULER_STREAM" disabled = false consumer_group = "SCHEDULER_GROUP" +[scheduler.server] +port = 3000 +host = "127.0.0.1" +workers = 1 + [email] sender_email = "example@example.com" aws_region = "" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 67fca85929d..d14b7c1b740 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -227,6 +227,11 @@ stream = "SCHEDULER_STREAM" disabled = false consumer_group = "SCHEDULER_GROUP" +[scheduler.server] +port = 3000 +host = "127.0.0.1" +workers = 1 + #tokenization configuration which describe token lifetime and payment method for specific connector [tokenization] stripe = { long_lived_token = false, payment_method = "wallet", payment_method_type = { type = "disable_only", list = "google_pay" } } diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 370d477b08d..5f98cd88014 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -23,6 +23,7 @@ use scheduler::{ workflows::ProcessTrackerWorkflows, SchedulerAppState, }; use serde::{Deserialize, Serialize}; +use storage_impl::errors::ApplicationError; use strum::EnumString; use tokio::sync::{mpsc, oneshot}; @@ -100,11 +101,21 @@ pub async fn start_web_server( state: routes::AppState, service: String, ) -> errors::ApplicationResult { - let server = state.conf.server.clone(); + let server = state + .conf + .scheduler + .as_ref() + .ok_or(ApplicationError::InvalidConfigurationValueError( + "Scheduler server is invalidly configured".into(), + ))? + .server + .clone(); + let web_server = actix_web::HttpServer::new(move || { actix_web::App::new().service(Health::server(state.clone(), service.clone())) }) .bind((server.host.as_str(), server.port))? + .workers(server.workers) .run(); let _ = web_server.handle(); @@ -135,9 +146,13 @@ pub async fn deep_health_check( ) -> impl actix_web::Responder { let report = deep_health_check_func(state, service).await; match report { - Ok(response) => { - services::http_response_json(serde_json::to_string(&response).unwrap_or_default()) - } + Ok(response) => services::http_response_json( + serde_json::to_string(&response) + .map_err(|err| { + logger::error!(serialization_error=?err); + }) + .unwrap_or_default(), + ), Err(err) => api::log_and_return_error_response(err), } } diff --git a/crates/scheduler/Cargo.toml b/crates/scheduler/Cargo.toml index fe090552edb..7d4a7821fc9 100644 --- a/crates/scheduler/Cargo.toml +++ b/crates/scheduler/Cargo.toml @@ -13,6 +13,7 @@ kv_store = [] async-trait = "0.1.68" error-stack = "0.3.1" futures = "0.3.28" +num_cpus = "1.15.0" once_cell = "1.18.0" rand = "0.8.5" serde = "1.0.193" diff --git a/crates/scheduler/src/configs/defaults.rs b/crates/scheduler/src/configs/defaults.rs index 25eb19e24f2..d17c20829ea 100644 --- a/crates/scheduler/src/configs/defaults.rs +++ b/crates/scheduler/src/configs/defaults.rs @@ -6,6 +6,7 @@ impl Default for super::settings::SchedulerSettings { consumer: super::settings::ConsumerSettings::default(), graceful_shutdown_interval: 60000, loop_interval: 5000, + server: super::settings::Server::default(), } } } @@ -30,3 +31,13 @@ impl Default for super::settings::ConsumerSettings { } } } + +impl Default for super::settings::Server { + fn default() -> Self { + Self { + port: 8080, + workers: num_cpus::get_physical(), + host: "localhost".into(), + } + } +} diff --git a/crates/scheduler/src/configs/settings.rs b/crates/scheduler/src/configs/settings.rs index 56a9f4079ac..723ef81e70c 100644 --- a/crates/scheduler/src/configs/settings.rs +++ b/crates/scheduler/src/configs/settings.rs @@ -15,6 +15,15 @@ pub struct SchedulerSettings { pub consumer: ConsumerSettings, pub loop_interval: u64, pub graceful_shutdown_interval: u64, + pub server: Server, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(default)] +pub struct Server { + pub port: u16, + pub workers: usize, + pub host: String, } #[derive(Debug, Clone, Deserialize)] diff --git a/crates/scheduler/src/configs/validations.rs b/crates/scheduler/src/configs/validations.rs index e9f6621b2a5..06052f9ff6c 100644 --- a/crates/scheduler/src/configs/validations.rs +++ b/crates/scheduler/src/configs/validations.rs @@ -19,6 +19,8 @@ impl super::settings::SchedulerSettings { self.producer.validate()?; + self.server.validate()?; + Ok(()) } } @@ -32,3 +34,13 @@ impl super::settings::ProducerSettings { }) } } + +impl super::settings::Server { + pub fn validate(&self) -> Result<(), ApplicationError> { + common_utils::fp_utils::when(self.host.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "server host must not be empty".into(), + )) + }) + } +} From f4600e50702d26a22a3fb31804a61acc90bd3b9d Mon Sep 17 00:00:00 2001 From: Chethan Date: Wed, 31 Jan 2024 21:34:58 +0530 Subject: [PATCH 22/23] add configs to deployment files --- config/deployments/scheduler/consumer.toml | 5 +++++ config/deployments/scheduler/producer.toml | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/config/deployments/scheduler/consumer.toml b/config/deployments/scheduler/consumer.toml index 907e3b8297e..d0c9e0d89bd 100644 --- a/config/deployments/scheduler/consumer.toml +++ b/config/deployments/scheduler/consumer.toml @@ -9,3 +9,8 @@ stream = "scheduler_stream" [scheduler.consumer] consumer_group = "scheduler_group" disabled = false # This flag decides if the consumer should actively consume task + +[scheduler.server] +port = 8080 +host = "127.0.0.1" +workers = 1 diff --git a/config/deployments/scheduler/producer.toml b/config/deployments/scheduler/producer.toml index 579466a23cc..881ae734086 100644 --- a/config/deployments/scheduler/producer.toml +++ b/config/deployments/scheduler/producer.toml @@ -12,3 +12,8 @@ lock_key = "producer_locking_key" # The following keys defines the producer lock lock_ttl = 160 # the ttl being the expiry (in seconds) lower_fetch_limit = 900 # Lower limit for fetching entries from redis queue (in seconds) upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds)0 + +[scheduler.server] +port = 8080 +host = "127.0.0.1" +workers = 1 From 8215494b2d34592b13e52733c27eb4d5ee02a630 Mon Sep 17 00:00:00 2001 From: Chethan Date: Wed, 31 Jan 2024 23:02:19 +0530 Subject: [PATCH 23/23] add comments --- config/config.example.toml | 7 ++++--- config/deployments/scheduler/consumer.toml | 7 ++++--- config/deployments/scheduler/producer.toml | 7 ++++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index 12830b54297..fc5d4a3a039 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -298,10 +298,11 @@ lower_fetch_limit = 1800 # Lower limit for fetching entries from redis lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with lock_ttl = 160 # the ttl being the expiry (in seconds) +# Scheduler server configuration [scheduler.server] -port = 3000 -host = "127.0.0.1" -workers = 1 +port = 3000 # Port on which the server will listen for incoming requests +host = "127.0.0.1" # Host IP address to bind the server to +workers = 1 # Number of actix workers to handle incoming requests concurrently batch_size = 200 # Specifies the batch size the producer will push under a single entry in the redis queue diff --git a/config/deployments/scheduler/consumer.toml b/config/deployments/scheduler/consumer.toml index d0c9e0d89bd..cdd60552668 100644 --- a/config/deployments/scheduler/consumer.toml +++ b/config/deployments/scheduler/consumer.toml @@ -10,7 +10,8 @@ stream = "scheduler_stream" consumer_group = "scheduler_group" disabled = false # This flag decides if the consumer should actively consume task +# Scheduler server configuration [scheduler.server] -port = 8080 -host = "127.0.0.1" -workers = 1 +port = 3000 # Port on which the server will listen for incoming requests +host = "127.0.0.1" # Host IP address to bind the server to +workers = 1 # Number of actix workers to handle incoming requests concurrently diff --git a/config/deployments/scheduler/producer.toml b/config/deployments/scheduler/producer.toml index 881ae734086..9cbaee96f03 100644 --- a/config/deployments/scheduler/producer.toml +++ b/config/deployments/scheduler/producer.toml @@ -13,7 +13,8 @@ lock_ttl = 160 # the ttl being the expiry (in seconds) lower_fetch_limit = 900 # Lower limit for fetching entries from redis queue (in seconds) upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds)0 +# Scheduler server configuration [scheduler.server] -port = 8080 -host = "127.0.0.1" -workers = 1 +port = 3000 # Port on which the server will listen for incoming requests +host = "127.0.0.1" # Host IP address to bind the server to +workers = 1 # Number of actix workers to handle incoming requests concurrently