Skip to content

Commit

Permalink
feat: add deep health check for scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Chethan-rao committed Jan 10, 2024
1 parent a6e01b9 commit 14de558
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 1 deletion.
6 changes: 6 additions & 0 deletions crates/api_models/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ pub struct RouterHealthCheckResponse {
pub redis: String,
pub locker: String,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SchedulerHealthCheckResponse {
pub database: String,
pub redis: String,
}
93 changes: 92 additions & 1 deletion crates/router/src/bin/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![recursion_limit = "256"]
use std::{str::FromStr, sync::Arc};

use actix_web::{dev::Server, web, Scope};
use api_models::health_check::SchedulerHealthCheckResponse;
use common_utils::ext_traits::{OptionExt, StringExt};
use diesel_models::process_tracker as storage;
use error_stack::ResultExt;
Expand All @@ -11,6 +13,7 @@ use router::{
types::storage::ProcessTrackerExt,
workflows,
};
use router_env::{instrument, tracing};
use scheduler::{
consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError,
workflows::ProcessTrackerWorkflows, SchedulerAppState,
Expand All @@ -37,7 +40,7 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
// channel for listening to redis disconnect events
let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel();
let state = Box::pin(routes::AppState::new(
conf,
conf.clone(),
redis_shutdown_signal_tx,
api_client,
))
Expand Down Expand Up @@ -68,6 +71,16 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
[router_env::service_name!()],
);

#[allow(clippy::expect_used)]
let web_server = Box::pin(start_web_server(
state.clone(),
scheduler_flow_str.to_string(),
))
.await
.expect("Failed to create the server");

tokio::spawn(web_server);

logger::debug!(startup_config=?state.conf);

start_scheduler(&state, scheduler_flow, (tx, rx)).await?;
Expand All @@ -76,6 +89,84 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
Ok(())
}

pub async fn start_web_server(
state: routes::AppState,
service: String,
) -> errors::ApplicationResult<Server> {
let server = state.conf.server.clone();
let web_server = actix_web::HttpServer::new(move || {
actix_web::App::new().service(Health::server(state.clone(), service.clone()))
})
.bind((server.host.as_str(), server.port))?
.run();
let _ = web_server.handle();

Ok(web_server)
}

pub struct Health;

impl Health {
pub fn server(state: routes::AppState, service: String) -> Scope {
web::scope("health")
.app_data(web::Data::new(state))
.app_data(web::Data::new(service))
.service(web::resource("").route(web::get().to(health)))
.service(web::resource("/deep_check").route(web::post().to(deep_health_check)))
}
}

#[instrument(skip_all)]
pub async fn health() -> impl actix_web::Responder {
logger::info!("Scheduler health was called");
actix_web::HttpResponse::Ok().body("Scheduler health is good")
}

#[instrument(skip_all)]
pub async fn deep_health_check(
state: web::Data<routes::AppState>,
service: web::Data<String>,
) -> impl actix_web::Responder {
let db = &*state.store;
let mut status_code = 200;
logger::info!("{} deep health check was called", service.into_inner());

logger::debug!("Database health check begin");

let db_status = match db.health_check_db().await {
Ok(_) => "Health is good".to_string(),
Err(err) => {
status_code = 500;
err.to_string()
}
};
logger::debug!("Database health check end");

logger::debug!("Redis health check begin");

let redis_status = match db.health_check_redis(db).await {
Ok(_) => "Health is good".to_string(),
Err(err) => {
status_code = 500;
err.to_string()
}
};

logger::debug!("Redis health check end");

let response = serde_json::to_string(&SchedulerHealthCheckResponse {
database: db_status,
redis: redis_status,
})
.unwrap_or_default();

if status_code == 200 {
services::http_response_json(response)
} else {
services::http_server_error_json_response(response)
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
Expand Down

0 comments on commit 14de558

Please sign in to comment.