From 1794f93a3dc8b13e59328ff8d8e5ca767cd0ca13 Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 27 Dec 2024 08:19:37 +0530 Subject: [PATCH] restructured code --- .../http => }/alerts/alerts_utils.rs | 57 ++++++------------- src/{handlers/http => }/alerts/mod.rs | 26 +-------- src/{handlers/http => }/alerts/parser.rs | 0 src/{handlers/http => }/alerts/rule.rs | 0 src/{handlers/http => }/alerts/target.rs | 2 +- .../{alerts/http_handlers.rs => alerts.rs} | 15 ++++- src/handlers/http/modal/query_server.rs | 2 +- src/handlers/http/modal/server.rs | 30 +++------- src/lib.rs | 1 + src/storage/object_storage.rs | 2 +- src/sync.rs | 2 +- 11 files changed, 44 insertions(+), 93 deletions(-) rename src/{handlers/http => }/alerts/alerts_utils.rs (71%) rename src/{handlers/http => }/alerts/mod.rs (95%) rename src/{handlers/http => }/alerts/parser.rs (100%) rename src/{handlers/http => }/alerts/rule.rs (100%) rename src/{handlers/http => }/alerts/target.rs (99%) rename src/handlers/http/{alerts/http_handlers.rs => alerts.rs} (94%) diff --git a/src/handlers/http/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs similarity index 71% rename from src/handlers/http/alerts/alerts_utils.rs rename to src/alerts/alerts_utils.rs index c173d23a0..c5dde83e0 100644 --- a/src/handlers/http/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -29,7 +29,6 @@ use datafusion::{ use tracing::trace; use crate::{ - handlers::http::alerts::{AlertState, ALERTS}, query::{TableScanVisitor, QUERY_SESSION}, rbac::{ map::SessionKey, @@ -39,7 +38,9 @@ use crate::{ utils::time::TimeRange, }; -use super::{AlertConfig, AlertError, ThresholdConfig}; +use super::{ + Aggregate, AlertConfig, AlertError, AlertOperator, AlertState, ThresholdConfig, ALERTS, +}; async fn get_tables_from_query(query: &str) -> Result { let session_state = QUERY_SESSION.state(); @@ -152,48 +153,22 @@ fn get_exprs(thresholds: &Vec) -> (Vec, Vec, Expr) let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); for threshold in thresholds { let res = match threshold.operator { - crate::handlers::http::alerts::AlertOperator::GreaterThan => { - col(&threshold.column).gt(lit(threshold.value)) - } - crate::handlers::http::alerts::AlertOperator::LessThan => { - col(&threshold.column).lt(lit(threshold.value)) - } - crate::handlers::http::alerts::AlertOperator::EqualTo => { - col(&threshold.column).eq(lit(threshold.value)) - } - crate::handlers::http::alerts::AlertOperator::NotEqualTo => { - col(&threshold.column).not_eq(lit(threshold.value)) - } - crate::handlers::http::alerts::AlertOperator::GreaterThanEqualTo => { - col(&threshold.column).gt_eq(lit(threshold.value)) - } - crate::handlers::http::alerts::AlertOperator::LessThanEqualTo => { - col(&threshold.column).lt_eq(lit(threshold.value)) - } - crate::handlers::http::alerts::AlertOperator::Like => { - col(&threshold.column).like(lit(threshold.value)) - } - crate::handlers::http::alerts::AlertOperator::NotLike => { - col(&threshold.column).not_like(lit(threshold.value)) - } + AlertOperator::GreaterThan => col(&threshold.column).gt(lit(threshold.value)), + AlertOperator::LessThan => col(&threshold.column).lt(lit(threshold.value)), + AlertOperator::EqualTo => col(&threshold.column).eq(lit(threshold.value)), + AlertOperator::NotEqualTo => col(&threshold.column).not_eq(lit(threshold.value)), + AlertOperator::GreaterThanEqualTo => col(&threshold.column).gt_eq(lit(threshold.value)), + AlertOperator::LessThanEqualTo => col(&threshold.column).lt_eq(lit(threshold.value)), + AlertOperator::Like => col(&threshold.column).like(lit(threshold.value)), + AlertOperator::NotLike => col(&threshold.column).not_like(lit(threshold.value)), }; aggr_expr.push(match threshold.agg { - crate::handlers::http::alerts::Aggregate::Avg => { - avg(col(&threshold.column)).alias(&threshold.column) - } - crate::handlers::http::alerts::Aggregate::Count => { - count(col(&threshold.column)).alias(&threshold.column) - } - crate::handlers::http::alerts::Aggregate::Min => { - min(col(&threshold.column)).alias(&threshold.column) - } - crate::handlers::http::alerts::Aggregate::Max => { - max(col(&threshold.column)).alias(&threshold.column) - } - crate::handlers::http::alerts::Aggregate::Sum => { - sum(col(&threshold.column)).alias(&threshold.column) - } + Aggregate::Avg => avg(col(&threshold.column)).alias(&threshold.column), + Aggregate::Count => count(col(&threshold.column)).alias(&threshold.column), + Aggregate::Min => min(col(&threshold.column)).alias(&threshold.column), + Aggregate::Max => max(col(&threshold.column)).alias(&threshold.column), + Aggregate::Sum => sum(col(&threshold.column)).alias(&threshold.column), }); expr = expr.and(res); } diff --git a/src/handlers/http/alerts/mod.rs b/src/alerts/mod.rs similarity index 95% rename from src/handlers/http/alerts/mod.rs rename to src/alerts/mod.rs index 3df06aa76..0de84b4e2 100644 --- a/src/handlers/http/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -35,13 +35,11 @@ use tracing::{trace, warn}; use ulid::Ulid; pub mod alerts_utils; -pub mod http_handlers; pub mod target; use crate::option::CONFIG; use crate::rbac::map::SessionKey; use crate::storage; -use crate::storage::object_storage::alert_json_path; use crate::storage::ObjectStorageError; use crate::sync::schedule_alert_task; use crate::utils::uid; @@ -461,14 +459,12 @@ impl Alerts { trigger_notif: bool, ) -> Result<(), AlertError> { let store = CONFIG.storage().get_object_store(); - // let alert_path = alert_json_path(alert_id); - // // read and modify alert - // let mut alert: AlertConfig = serde_json::from_slice(&store.get_object(&alert_path).await?)?; - // alert.state = new_state; + // read and modify alert let mut alert = self.get_alert_by_id(alert_id).await?; alert.state = new_state; + // save to disk store.put_alert(alert_id, &alert).await?; @@ -482,12 +478,6 @@ impl Alerts { }; drop(writer); - // self.alerts.write().await.iter_mut().for_each(|alert| { - // if alert.id.to_string() == alert_id { - // alert.state = new_state; - // } - // }); - if trigger_notif { alert.trigger_notifications().await?; } @@ -497,17 +487,7 @@ impl Alerts { /// Remove alert and scheduled task from disk and memory pub async fn delete(&self, alert_id: &str) -> Result<(), AlertError> { - let store = CONFIG.storage().get_object_store(); - let alert_path = alert_json_path(alert_id); - - // delete from disk - store - .delete_object(&alert_path) - .await - .map_err(AlertError::ObjectStorage)?; - trace!("Deleted from disk"); - - // now delete from memory + // delete from memory let read_access = self.alerts.read().await; let index = read_access diff --git a/src/handlers/http/alerts/parser.rs b/src/alerts/parser.rs similarity index 100% rename from src/handlers/http/alerts/parser.rs rename to src/alerts/parser.rs diff --git a/src/handlers/http/alerts/rule.rs b/src/alerts/rule.rs similarity index 100% rename from src/handlers/http/alerts/rule.rs rename to src/alerts/rule.rs diff --git a/src/handlers/http/alerts/target.rs b/src/alerts/target.rs similarity index 99% rename from src/handlers/http/alerts/target.rs rename to src/alerts/target.rs index 1f3e36d86..5d536932d 100644 --- a/src/handlers/http/alerts/target.rs +++ b/src/alerts/target.rs @@ -30,7 +30,7 @@ use humantime_serde::re::humantime; use reqwest::ClientBuilder; use tracing::{error, trace, warn}; -use crate::handlers::http::alerts::ALERTS; +use super::ALERTS; use super::{AlertState, CallableTarget, Context}; diff --git a/src/handlers/http/alerts/http_handlers.rs b/src/handlers/http/alerts.rs similarity index 94% rename from src/handlers/http/alerts/http_handlers.rs rename to src/handlers/http/alerts.rs index b9ab9a7b2..684a8034a 100644 --- a/src/handlers/http/alerts/http_handlers.rs +++ b/src/handlers/http/alerts.rs @@ -26,7 +26,7 @@ use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use relative_path::RelativePathBuf; -use super::{ +use crate::alerts::{ alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS, }; @@ -95,6 +95,15 @@ pub async fn delete(req: HttpRequest) -> Result { // validate that the user has access to the tables mentioned user_auth_for_query(&session_key, &alert.query).await?; + let store = CONFIG.storage().get_object_store(); + let alert_path = alert_json_path(alert_id); + + // delete from disk + store + .delete_object(&alert_path) + .await + .map_err(AlertError::ObjectStorage)?; + // delete from disk and memory ALERTS.delete(alert_id).await?; @@ -115,9 +124,11 @@ pub async fn modify(req: HttpRequest, alert: AlertRequest) -> Result