Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add deep health check for scheduler #3304

Merged
merged 39 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cec252e
feat: add deep health check
Chethan-rao Dec 27, 2023
ef5d8b3
chore: run formatter
hyperswitch-bot[bot] Dec 27, 2023
2e1d559
update response type to json and impl health check interface for kafk…
Chethan-rao Dec 28, 2023
28c7d92
Merge branch 'health_checks' of github.com:juspay/hyperswitch into he…
Chethan-rao Dec 28, 2023
7e75ed4
chore: run formatter
hyperswitch-bot[bot] Dec 28, 2023
1a6943d
Merge branch 'health_checks' of github.com:juspay/hyperswitch into he…
Chethan-rao Dec 28, 2023
b2edae1
Merge branch 'health_checks' of github.com:juspay/hyperswitch into he…
Chethan-rao Dec 28, 2023
5040182
return 5xx in case at least one component health is down
Chethan-rao Dec 29, 2023
605c65b
change type names
Chethan-rao Dec 29, 2023
ab5110b
Merge branch 'main' of github.com:juspay/hyperswitch into health_checks
Chethan-rao Jan 2, 2024
0cdfb9d
remove key custodian status and refactor transaction code
Chethan-rao Jan 2, 2024
e3cf92c
chore: run formatter
hyperswitch-bot[bot] Jan 2, 2024
f76c667
Merge branch 'main' of github.com:juspay/hyperswitch into scheduler-h…
Chethan-rao Jan 4, 2024
a6e01b9
Merge branch 'main' of github.com:juspay/hyperswitch into scheduler-h…
Chethan-rao Jan 10, 2024
14de558
feat: add deep health check for scheduler
Chethan-rao Jan 10, 2024
e66bfb8
feat: add deep health check for analytics
dracarys18 Jan 24, 2024
b79ea49
chore: run formatter
hyperswitch-bot[bot] Jan 24, 2024
630a259
Merge branch 'main' of github.com:juspay/hyperswitch into scheduler-h…
Chethan-rao Jan 25, 2024
e4f3b7b
update the deep health check http method to get request
Chethan-rao Jan 25, 2024
6b06075
refactor(health): refactor deep health check
dracarys18 Jan 24, 2024
7b0f7e3
Merge branch 'add_analytics_health' of github.com:juspay/hyperswitch …
dracarys18 Jan 29, 2024
bea745a
Merge branch 'main' of github.com:juspay/hyperswitch into add_analyti…
dracarys18 Jan 29, 2024
1bcd774
chore: run formatter
hyperswitch-bot[bot] Jan 29, 2024
3e67fbf
fix: fix health check error
dracarys18 Jan 30, 2024
a82dfea
Merge branch 'add_analytics_health' of github.com:juspay/hyperswitch …
dracarys18 Jan 30, 2024
3159984
Merge branch 'main' of github.com:juspay/hyperswitch into scheduler-h…
Chethan-rao Jan 30, 2024
461081d
address requested changes
Chethan-rao Jan 30, 2024
39c0db8
fix: build
dracarys18 Jan 30, 2024
b0a65e8
chore: run formatter
hyperswitch-bot[bot] Jan 30, 2024
59117d0
Merge branch 'main' of github.com:juspay/hyperswitch into scheduler-h…
Chethan-rao Jan 30, 2024
1acd7dd
Merge branch 'add_analytics_health' of github.com:juspay/hyperswitch …
Chethan-rao Jan 30, 2024
52bcec0
refactor health check
Chethan-rao Jan 30, 2024
2e83aed
Merge branch 'main' of github.com:juspay/hyperswitch into scheduler-h…
Chethan-rao Jan 31, 2024
515dbea
log and return the health check error
Chethan-rao Jan 31, 2024
1dc29bd
Merge branch 'main' of github.com:juspay/hyperswitch into scheduler-h…
Chethan-rao Jan 31, 2024
a80f156
address requested changes
Chethan-rao Jan 31, 2024
f4600e5
add configs to deployment files
Chethan-rao Jan 31, 2024
8215494
add comments
Chethan-rao Jan 31, 2024
12f6e46
Merge branch 'main' into scheduler-health-checks
dracarys18 Feb 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ lower_fetch_limit = 1800 # Lower limit for fetching entries from redis
lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with
lock_ttl = 160 # the ttl being the expiry (in seconds)

# Scheduler server configuration
[scheduler.server]
port = 3000 # Port on which the server will listen for incoming requests
host = "127.0.0.1" # Host IP address to bind the server to
workers = 1 # Number of actix workers to handle incoming requests concurrently

batch_size = 200 # Specifies the batch size the producer will push under a single entry in the redis queue

# Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database
Expand Down
6 changes: 6 additions & 0 deletions config/deployments/scheduler/consumer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ stream = "scheduler_stream"
[scheduler.consumer]
consumer_group = "scheduler_group"
disabled = false # This flag decides if the consumer should actively consume task

# Scheduler server configuration
[scheduler.server]
port = 3000 # Port on which the server will listen for incoming requests
host = "127.0.0.1" # Host IP address to bind the server to
workers = 1 # Number of actix workers to handle incoming requests concurrently
6 changes: 6 additions & 0 deletions config/deployments/scheduler/producer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ lock_key = "producer_locking_key" # The following keys defines the producer lock
lock_ttl = 160 # the ttl being the expiry (in seconds)
lower_fetch_limit = 900 # Lower limit for fetching entries from redis queue (in seconds)
upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds)0

# Scheduler server configuration
[scheduler.server]
port = 3000 # Port on which the server will listen for incoming requests
host = "127.0.0.1" # Host IP address to bind the server to
workers = 1 # Number of actix workers to handle incoming requests concurrently
5 changes: 5 additions & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ stream = "SCHEDULER_STREAM"
disabled = false
consumer_group = "SCHEDULER_GROUP"

[scheduler.server]
port = 3000
host = "127.0.0.1"
workers = 1

[email]
sender_email = "[email protected]"
aws_region = ""
Expand Down
5 changes: 5 additions & 0 deletions config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ stream = "SCHEDULER_STREAM"
disabled = false
consumer_group = "SCHEDULER_GROUP"

[scheduler.server]
port = 3000
host = "127.0.0.1"
workers = 1

#tokenization configuration which describe token lifetime and payment method for specific connector
[tokenization]
stripe = { long_lived_token = false, payment_method = "wallet", payment_method_type = { type = "disable_only", list = "google_pay" } }
Expand Down
7 changes: 7 additions & 0 deletions crates/api_models/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ pub struct RouterHealthCheckResponse {
}

impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SchedulerHealthCheckResponse {
pub database: bool,
pub redis: bool,
}

impl common_utils::events::ApiEventMetric for SchedulerHealthCheckResponse {}
125 changes: 123 additions & 2 deletions crates/router/src/bin/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
#![recursion_limit = "256"]
use std::{str::FromStr, sync::Arc};

use actix_web::{dev::Server, web, Scope};
use api_models::health_check::SchedulerHealthCheckResponse;
use common_utils::ext_traits::{OptionExt, StringExt};
use diesel_models::process_tracker as storage;
use error_stack::ResultExt;
use router::{
configs::settings::{CmdLineConf, Settings},
core::errors::{self, CustomResult},
logger, routes, services,
core::{
errors::{self, CustomResult},
health_check::HealthCheckInterface,
},
logger, routes,
services::{self, api},
types::storage::ProcessTrackerExt,
workflows,
};
use router_env::{instrument, tracing};
use scheduler::{
consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError,
workflows::ProcessTrackerWorkflows, SchedulerAppState,
};
use serde::{Deserialize, Serialize};
use storage_impl::errors::ApplicationError;
use strum::EnumString;
use tokio::sync::{mpsc, oneshot};

Expand Down Expand Up @@ -68,6 +76,19 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
[router_env::service_name!()],
);

#[allow(clippy::expect_used)]
let web_server = Box::pin(start_web_server(
state.clone(),
scheduler_flow_str.to_string(),
))
.await
.expect("Failed to create the server");

tokio::spawn(async move {
let _ = web_server.await;
logger::error!("The health check probe stopped working!");
});

logger::debug!(startup_config=?state.conf);

start_scheduler(&state, scheduler_flow, (tx, rx)).await?;
Expand All @@ -76,6 +97,106 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
Ok(())
}

pub async fn start_web_server(
state: routes::AppState,
service: String,
) -> errors::ApplicationResult<Server> {
let server = state
.conf
.scheduler
.as_ref()
.ok_or(ApplicationError::InvalidConfigurationValueError(
"Scheduler server is invalidly configured".into(),
))?
.server
.clone();

let web_server = actix_web::HttpServer::new(move || {
actix_web::App::new().service(Health::server(state.clone(), service.clone()))
})
.bind((server.host.as_str(), server.port))?
Chethan-rao marked this conversation as resolved.
Show resolved Hide resolved
.workers(server.workers)
.run();
let _ = web_server.handle();

Ok(web_server)
}

pub struct Health;

impl Health {
pub fn server(state: routes::AppState, service: String) -> Scope {
web::scope("health")
.app_data(web::Data::new(state))
Chethan-rao marked this conversation as resolved.
Show resolved Hide resolved
.app_data(web::Data::new(service))
.service(web::resource("").route(web::get().to(health)))
.service(web::resource("/ready").route(web::get().to(deep_health_check)))
}
}

#[instrument(skip_all)]
pub async fn health() -> impl actix_web::Responder {
logger::info!("Scheduler health was called");
actix_web::HttpResponse::Ok().body("Scheduler health is good")
}
#[instrument(skip_all)]
pub async fn deep_health_check(
state: web::Data<routes::AppState>,
service: web::Data<String>,
) -> impl actix_web::Responder {
let report = deep_health_check_func(state, service).await;
match report {
Ok(response) => services::http_response_json(
serde_json::to_string(&response)
.map_err(|err| {
logger::error!(serialization_error=?err);
})
.unwrap_or_default(),
),
Err(err) => api::log_and_return_error_response(err),
}
}
#[instrument(skip_all)]
pub async fn deep_health_check_func(
state: web::Data<routes::AppState>,
service: web::Data<String>,
) -> errors::RouterResult<SchedulerHealthCheckResponse> {
logger::info!("{} deep health check was called", service.into_inner());

logger::debug!("Database health check begin");

let db_status = state.health_check_db().await.map(|_| true).map_err(|err| {
error_stack::report!(errors::ApiErrorResponse::HealthCheckError {
component: "Database",
message: err.to_string()
})
})?;

logger::debug!("Database health check end");

logger::debug!("Redis health check begin");

let redis_status = state
.health_check_redis()
.await
.map(|_| true)
.map_err(|err| {
error_stack::report!(errors::ApiErrorResponse::HealthCheckError {
component: "Redis",
message: err.to_string()
})
})?;

logger::debug!("Redis health check end");

let response = SchedulerHealthCheckResponse {
database: db_status,
redis: redis_status,
};

Ok(response)
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/core/errors/api_error_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub enum ApiErrorResponse {
WebhookInvalidMerchantSecret,
#[error(error_type = ErrorType::InvalidRequestError, code = "IR_19", message = "{message}")]
CurrencyNotSupported { message: String },
#[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failiing with error: {message}")]
#[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failing with error: {message}")]
HealthCheckError {
component: &'static str,
message: String,
Expand Down
1 change: 1 addition & 0 deletions crates/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ kv_store = []
async-trait = "0.1.68"
error-stack = "0.3.1"
futures = "0.3.28"
num_cpus = "1.15.0"
once_cell = "1.18.0"
rand = "0.8.5"
serde = "1.0.193"
Expand Down
11 changes: 11 additions & 0 deletions crates/scheduler/src/configs/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ impl Default for super::settings::SchedulerSettings {
consumer: super::settings::ConsumerSettings::default(),
graceful_shutdown_interval: 60000,
loop_interval: 5000,
server: super::settings::Server::default(),
}
}
}
Expand All @@ -30,3 +31,13 @@ impl Default for super::settings::ConsumerSettings {
}
}
}

impl Default for super::settings::Server {
fn default() -> Self {
Self {
port: 8080,
workers: num_cpus::get_physical(),
host: "localhost".into(),
}
}
}
9 changes: 9 additions & 0 deletions crates/scheduler/src/configs/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ pub struct SchedulerSettings {
pub consumer: ConsumerSettings,
pub loop_interval: u64,
pub graceful_shutdown_interval: u64,
pub server: Server,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(default)]
pub struct Server {
pub port: u16,
pub workers: usize,
pub host: String,
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
12 changes: 12 additions & 0 deletions crates/scheduler/src/configs/validations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ impl super::settings::SchedulerSettings {

self.producer.validate()?;

self.server.validate()?;

Ok(())
}
}
Expand All @@ -32,3 +34,13 @@ impl super::settings::ProducerSettings {
})
}
}

impl super::settings::Server {
pub fn validate(&self) -> Result<(), ApplicationError> {
common_utils::fp_utils::when(self.host.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"server host must not be empty".into(),
))
})
}
}
Loading