From d575c1f1e5932499ef51a101acfecaf6b255b4aa Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 31 Dec 2024 14:03:27 +0530 Subject: [PATCH] Update - modified error message - added filters in alerts - added multiple alert conditions for one alert --- Cargo.toml | 2 +- src/alerts/alerts_utils.rs | 332 ++++++++++++++++++++++++++++++++-- src/alerts/mod.rs | 313 +++++++++++++++++++++++++++----- src/handlers/http/alerts.rs | 30 +-- src/storage/localfs.rs | 11 +- src/storage/mod.rs | 2 +- src/storage/object_storage.rs | 9 +- src/sync.rs | 2 +- 8 files changed, 616 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 21938aadd..01d2ef4cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ build = "build.rs" [dependencies] ### apache arrow/datafusion dependencies +arrow = "53.0.0" arrow-schema = { version = "53.0.0", features = ["serde"] } arrow-array = { version = "53.0.0" } arrow-json = "53.0.0" @@ -120,7 +121,6 @@ prost-build = "0.13.3" [dev-dependencies] maplit = "1.0" rstest = "0.23.0" -arrow = "53.0.0" [package.metadata.parseable_ui] assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.12/build.zip" diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index c5dde83e0..506930e83 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -16,6 +16,7 @@ * */ +use arrow_array::{Float64Array, Int64Array}; use datafusion::{ common::tree_node::TreeNode, functions_aggregate::{ @@ -39,7 +40,7 @@ use crate::{ }; use super::{ - Aggregate, AlertConfig, AlertError, AlertOperator, AlertState, ThresholdConfig, ALERTS, + Aggregate, AggregateConfig, AlertConfig, AlertError, AlertOperator, AlertState, ALERTS, }; async fn get_tables_from_query(query: &str) -> Result { @@ -83,8 +84,18 @@ pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Resul Ok(()) } -/// This function contains the logic to run the alert evaluation task -pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { +/// accept the alert +/// +/// alert contains aggregate_config +/// +/// aggregate_config contains the filters which need to be applied +/// +/// iterate over each agg config, apply filters, the evaluate for that config +/// +/// collect the results in the end +/// +/// check whether notification needs to be triggered or not +pub async fn evaluate_alert_the_second(alert: &AlertConfig) -> Result<(), AlertError> { trace!("RUNNING EVAL TASK FOR- {alert:?}"); let (start_time, end_time) = match &alert.eval_type { @@ -99,6 +110,7 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { // TODO: Filter tags should be taken care of!!! let time_range = TimeRange::parse_human_time(start_time, end_time) .map_err(|err| AlertError::CustomError(err.to_string()))?; + let query = crate::query::Query { raw_logical_plan, time_range, @@ -116,41 +128,331 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { ))); }; - let df = query + let base_df = query .get_dataframe(stream_name) .await .map_err(|err| AlertError::CustomError(err.to_string()))?; - let (group_expr, aggr_expr, filter_expr) = get_exprs(&alert.thresholds); - let df = df.aggregate(group_expr, aggr_expr)?; + trace!("got base_df"); + + let mut agg_results = vec![]; + for agg_config in &alert.aggregate_config { + // agg expression + let mut aggr_expr: Vec = vec![]; + + let filtered_df = if let Some(where_clause) = &agg_config.condition_config { + let filter_expr = match where_clause { + crate::alerts::Conditions::AND((expr1, expr2)) => { + let mut expr = + Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); + for e in [expr1, expr2] { + let ex = match e.operator { + AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)), + AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)), + AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)), + AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)), + AlertOperator::GreaterThanEqualTo => { + col(&e.column).gt_eq(lit(&e.value)) + } + AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)), + AlertOperator::Like => col(&e.column).like(lit(&e.value)), + AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)), + }; + expr = expr.and(ex); + } + expr + } + crate::alerts::Conditions::OR((expr1, expr2)) => { + let mut expr = + Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); + for e in [expr1, expr2] { + let ex = match e.operator { + AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)), + AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)), + AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)), + AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)), + AlertOperator::GreaterThanEqualTo => { + col(&e.column).gt_eq(lit(&e.value)) + } + AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)), + AlertOperator::Like => col(&e.column).like(lit(&e.value)), + AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)), + }; + expr = expr.or(ex); + } + expr + } + crate::alerts::Conditions::Condition(expr) => match expr.operator { + AlertOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)), + AlertOperator::LessThan => col(&expr.column).lt(lit(&expr.value)), + AlertOperator::EqualTo => col(&expr.column).eq(lit(&expr.value)), + AlertOperator::NotEqualTo => col(&expr.column).not_eq(lit(&expr.value)), + AlertOperator::GreaterThanEqualTo => col(&expr.column).gt_eq(lit(&expr.value)), + AlertOperator::LessThanEqualTo => col(&expr.column).lt_eq(lit(&expr.value)), + AlertOperator::Like => col(&expr.column).like(lit(&expr.value)), + AlertOperator::NotLike => col(&expr.column).not_like(lit(&expr.value)), + }, + }; + + trace!("filter_expr-\n{filter_expr:?}"); + + base_df.clone().filter(filter_expr)? + } else { + base_df.clone() + }; + + trace!("got filter_df"); + + aggr_expr.push(match agg_config.agg { + Aggregate::Avg => avg(col(&agg_config.column)), //.alias(&agg_config.column), + Aggregate::Count => count(col(&agg_config.column)), //.alias(&agg_config.column), + Aggregate::Min => min(col(&agg_config.column)), //.alias(&agg_config.column), + Aggregate::Max => max(col(&agg_config.column)), //.alias(&agg_config.column), + Aggregate::Sum => sum(col(&agg_config.column)), //.alias(&agg_config.column), + }); + + trace!("Aggregating"); + // now that base_df has been filtered, apply aggregate + let row = filtered_df.aggregate(vec![], aggr_expr)?.collect().await?; + + trace!("row-\n{row:?}"); + + let final_value = if let Some(f) = row + .first() + .and_then(|batch| { + trace!("batch.column(0)-\n{:?}", batch.column(0)); + batch.column(0).as_any().downcast_ref::() + }) + .map(|array| { + trace!("array-\n{array:?}"); + array.value(0) + }) { + f + } else { + let final_value = row + .first() + .and_then(|batch| { + trace!("batch.column(0)-\n{:?}", batch.column(0)); + batch.column(0).as_any().downcast_ref::() + }) + .map(|array| { + trace!("array-\n{array:?}"); + array.value(0) + }) + .unwrap_or_default(); + final_value as f64 + }; + + // let final_value = String::from_utf8(final_value.to_vec()).unwrap().parse::().unwrap(); + + // now compare + let res = match &agg_config.operator { + AlertOperator::GreaterThan => final_value > agg_config.value, + AlertOperator::LessThan => final_value < agg_config.value, + AlertOperator::EqualTo => final_value == agg_config.value, + AlertOperator::NotEqualTo => final_value != agg_config.value, + AlertOperator::GreaterThanEqualTo => final_value >= agg_config.value, + AlertOperator::LessThanEqualTo => final_value <= agg_config.value, + _ => unreachable!(), + }; + + let message = if res { + if agg_config.condition_config.is_some() { + Some( + agg_config + .condition_config + .as_ref() + .unwrap() + .generate_filter_message(), + ) + } else { + Some(String::new()) + } + } else { + None + }; + + agg_results.push((res, message, agg_config, final_value)); + } + + trace!("agg_results-\n{agg_results:?}"); - let nrows = df.clone().filter(filter_expr)?.count().await?; - trace!("dataframe-\n{:?}", df.collect().await); + // this is the final result of this evaluation + let res = if let Some(agg_condition) = &alert.agg_condition { + match agg_condition { + crate::alerts::AggregateCondition::AND => agg_results.iter().all(|(res, _, _, _)| *res), + crate::alerts::AggregateCondition::OR => agg_results.iter().any(|(res, _, _, _)| *res), + } + } else { + assert!(agg_results.len() == 1); + agg_results[0].0 + }; - if nrows > 0 { + if res { trace!("ALERT!!!!!!"); + let mut message = String::new(); + for (_, filter_msg, agg_config, final_value) in agg_results { + if let Some(msg) = filter_msg { + message.extend([format!( + "|{}({}) WHERE ({}) {} {} (ActualValue: {})|", + agg_config.agg, + agg_config.column, + msg, + agg_config.operator, + agg_config.value, + final_value + )]); + } else { + message.extend([format!( + "|{}({}) {} {} (ActualValue: {})", + agg_config.agg, + agg_config.column, + agg_config.operator, + agg_config.value, + final_value + )]); + } + } + + // let outbound_message = format!("AlertName: {}, Triggered TimeStamp: {}, Severity: {}, Message: {}",alert.title, Utc::now().to_rfc3339(), alert.severity, message); + // update state ALERTS - .update_state(&alert.id.to_string(), AlertState::Triggered, true) + .update_state(&alert.id.to_string(), AlertState::Triggered, Some(message)) .await?; } else { ALERTS - .update_state(&alert.id.to_string(), AlertState::Resolved, false) + .update_state(&alert.id.to_string(), AlertState::Resolved, None) .await?; } Ok(()) } -fn get_exprs(thresholds: &Vec) -> (Vec, Vec, Expr) { +// /// This function contains the logic to run the alert evaluation task +// pub async fn evaluate_alert2(alert: &AlertConfig) -> Result<(), AlertError> { +// trace!("RUNNING EVAL TASK FOR- {alert:?}"); + +// let (start_time, end_time) = match &alert.eval_type { +// super::EvalConfig::RollingWindow(rolling_window) => { +// (&rolling_window.eval_start, &rolling_window.eval_end) +// } +// }; + +// let session_state = QUERY_SESSION.state(); +// let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?; + +// // TODO: Filter tags should be taken care of!!! +// let time_range = TimeRange::parse_human_time(start_time, end_time) +// .map_err(|err| AlertError::CustomError(err.to_string()))?; + +// let query = crate::query::Query { +// raw_logical_plan, +// time_range, +// filter_tag: None, +// }; + +// // for now proceed in a similar fashion as we do in query +// // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) +// let stream_name = if let Some(stream_name) = query.first_table_name() { +// stream_name +// } else { +// return Err(AlertError::CustomError(format!( +// "Table name not found in query- {}", +// alert.query +// ))); +// }; + +// let df = query +// .get_dataframe(stream_name) +// .await +// .map_err(|err| AlertError::CustomError(err.to_string()))?; + +// for agg_config in &alert.thresholds { +// if let Some(condition) = &agg_config.condition_config { +// match condition { +// crate::alerts::Conditions::AND((expr1,expr2)) => { +// let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); +// for e in [expr1, expr2] { +// let ex = match e.operator { +// AlertOperator::GreaterThan => expr.and(col(e.column).gt(lit(e.value))), +// AlertOperator::LessThan => expr.and(col(e.column).lt(lit(e.value))), +// AlertOperator::EqualTo => expr.and(col(e.column).eq(lit(e.value))), +// AlertOperator::NotEqualTo => expr.and(col(e.column).not_eq(lit(e.value))), +// AlertOperator::GreaterThanEqualTo => expr.and(col(e.column).gt_eq(lit(e.value))), +// AlertOperator::LessThanEqualTo => expr.and(col(e.column).lt_eq(lit(e.value))), +// AlertOperator::Like => expr.and(col(e.column).like(lit(e.value))), +// AlertOperator::NotLike => expr.and(col(e.column).not_like(lit(e.value))), +// }; +// expr = expr.and(ex); +// } +// expr +// } +// crate::alerts::Conditions::OR((expr1,expr2)) => { +// let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); +// for e in [expr1, expr2] { +// let ex = match e.operator { +// AlertOperator::GreaterThan => expr.and(col(e.column).gt(lit(e.value))), +// AlertOperator::LessThan => expr.and(col(e.column).lt(lit(e.value))), +// AlertOperator::EqualTo => expr.and(col(e.column).eq(lit(e.value))), +// AlertOperator::NotEqualTo => expr.and(col(e.column).not_eq(lit(e.value))), +// AlertOperator::GreaterThanEqualTo => expr.and(col(e.column).gt_eq(lit(e.value))), +// AlertOperator::LessThanEqualTo => expr.and(col(e.column).lt_eq(lit(e.value))), +// AlertOperator::Like => expr.and(col(e.column).like(lit(e.value))), +// AlertOperator::NotLike => expr.and(col(e.column).not_like(lit(e.value))), +// }; +// expr = expr.or(ex); +// } +// expr +// }, +// crate::alerts::Conditions::Condition(expr) => { +// let expr = match expr.operator { +// AlertOperator::GreaterThan => col(expr.column).gt(lit(expr.value)), +// AlertOperator::LessThan => col(expr.column).lt(lit(expr.value)), +// AlertOperator::EqualTo => col(expr.column).eq(lit(expr.value)), +// AlertOperator::NotEqualTo => col(expr.column).not_eq(lit(expr.value)), +// AlertOperator::GreaterThanEqualTo => col(expr.column).gt_eq(lit(expr.value)), +// AlertOperator::LessThanEqualTo => col(expr.column).lt_eq(lit(expr.value)), +// AlertOperator::Like => col(expr.column).like(lit(expr.value)), +// AlertOperator::NotLike => col(expr.column).not_like(lit(expr.value)), +// }; +// expr +// }, +// } +// } +// } + +// let (group_expr, aggr_expr, filter_expr) = get_exprs(&alert.thresholds); +// let df = df.aggregate(group_expr, aggr_expr)?; + +// let nrows = df.clone().filter(filter_expr)?.count().await?; +// trace!("dataframe-\n{:?}", df.collect().await); + +// if nrows > 0 { +// trace!("ALERT!!!!!!"); + +// // update state +// ALERTS +// .update_state(&alert.id.to_string(), AlertState::Triggered, true) +// .await?; +// } else { +// ALERTS +// .update_state(&alert.id.to_string(), AlertState::Resolved, false) +// .await?; +// } + +// Ok(()) +// } + +fn get_exprs(thresholds: &Vec) -> (Vec, Vec, Expr) { // for now group by is empty, we can include this later let group_expr: Vec = vec![]; // agg expression let mut aggr_expr: Vec = vec![]; - let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); + let mut filter_expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); for threshold in thresholds { let res = match threshold.operator { AlertOperator::GreaterThan => col(&threshold.column).gt(lit(threshold.value)), @@ -170,8 +472,8 @@ fn get_exprs(thresholds: &Vec) -> (Vec, Vec, Expr) Aggregate::Max => max(col(&threshold.column)).alias(&threshold.column), Aggregate::Sum => sum(col(&threshold.column)).alias(&threshold.column), }); - expr = expr.and(res); + filter_expr = filter_expr.and(res); } - (group_expr, aggr_expr, expr) + (group_expr, aggr_expr, filter_expr) } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 0de84b4e2..3b6ee4109 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -19,12 +19,14 @@ use actix_web::http::header::ContentType; use actix_web::web::Json; use actix_web::{FromRequest, HttpRequest}; -use alerts_utils::{evaluate_alert, user_auth_for_query}; +use alerts_utils::{evaluate_alert_the_second, user_auth_for_query}; use async_trait::async_trait; +use chrono::Utc; use http::StatusCode; +use itertools::Itertools; use once_cell::sync::Lazy; use serde_json::Error as SerdeError; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display}; use std::future::Future; use std::pin::Pin; @@ -32,24 +34,24 @@ use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::{trace, warn}; -use ulid::Ulid; pub mod alerts_utils; pub mod target; use crate::option::CONFIG; +use crate::query::QUERY_SESSION; use crate::rbac::map::SessionKey; use crate::storage; use crate::storage::ObjectStorageError; use crate::sync::schedule_alert_task; -use crate::utils::uid; -use crate::utils::uid::Uid; +use crate::utils::time::TimeRange; +use crate::utils::{get_hash, uid}; use self::target::Target; // these types describe the scheduled task for an alert pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); -pub type ScheduledTasks = RwLock>; +pub type ScheduledTasks = RwLock>; pub static ALERTS: Lazy = Lazy::new(Alerts::default); @@ -75,22 +77,25 @@ pub trait CallableTarget { pub struct Context { alert_info: AlertInfo, deployment_info: DeploymentInfo, + message: String, } impl Context { - pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo) -> Self { + pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo, message: String) -> Self { Self { alert_info, deployment_info, + message, } } fn default_alert_string(&self) -> String { format!( - "triggered on {}", + "AlertName: {}, Triggered TimeStamp: {}, Severity: {}, Message: {}", self.alert_info.alert_name, - // self.alert_info.message, - // self.alert_info.reason + Utc::now().to_rfc3339(), + self.alert_info.severity, + self.message ) } @@ -113,14 +118,21 @@ pub struct AlertInfo { // message: String, // reason: String, alert_state: AlertState, + severity: String, } impl AlertInfo { - pub fn new(alert_id: String, alert_name: String, alert_state: AlertState) -> Self { + pub fn new( + alert_id: String, + alert_name: String, + alert_state: AlertState, + severity: String, + ) -> Self { Self { alert_id, alert_name, alert_state, + severity, } } } @@ -155,16 +167,39 @@ pub enum AlertType { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub enum AlertOperator { + #[serde(rename = ">")] GreaterThan, + #[serde(rename = "<")] LessThan, + #[serde(rename = "=")] EqualTo, + #[serde(rename = "<>")] NotEqualTo, + #[serde(rename = ">=")] GreaterThanEqualTo, + #[serde(rename = "<=")] LessThanEqualTo, + #[serde(rename = "like")] Like, + #[serde(rename = "not like")] NotLike, } +impl Display for AlertOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertOperator::GreaterThan => write!(f, ">"), + AlertOperator::LessThan => write!(f, "<"), + AlertOperator::EqualTo => write!(f, "="), + AlertOperator::NotEqualTo => write!(f, "<>"), + AlertOperator::GreaterThanEqualTo => write!(f, ">="), + AlertOperator::LessThanEqualTo => write!(f, "<="), + AlertOperator::Like => write!(f, "like"), + AlertOperator::NotLike => write!(f, "not like"), + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub enum Aggregate { @@ -175,12 +210,91 @@ pub enum Aggregate { Sum, } +impl Display for Aggregate { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Aggregate::Avg => write!(f, "Avg"), + Aggregate::Count => write!(f, "Count"), + Aggregate::Min => write!(f, "Min"), + Aggregate::Max => write!(f, "Max"), + Aggregate::Sum => write!(f, "Sum"), + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -pub struct ThresholdConfig { +pub struct OperationConfig { + pub column: String, + pub operator: Option, + pub value: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FilterConfig { + pub conditions: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct ConditionConfig { + pub column: String, + pub operator: AlertOperator, + pub value: String, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub enum Conditions { + AND((ConditionConfig, ConditionConfig)), + OR((ConditionConfig, ConditionConfig)), + Condition(ConditionConfig), +} + +impl Conditions { + pub fn generate_filter_message(&self) -> String { + match self { + Conditions::AND((expr1, expr2)) => { + format!( + "[{} {} {} AND {} {} {}]", + expr1.column, + expr1.operator, + expr1.value, + expr2.column, + expr2.operator, + expr2.value + ) + } + Conditions::OR((expr1, expr2)) => { + format!( + "[{} {} {} OR {} {} {}]", + expr1.column, + expr1.operator, + expr1.value, + expr2.column, + expr2.operator, + expr2.value + ) + } + Conditions::Condition(expr) => { + format!("[{} {} {}]", expr.column, expr.operator, expr.value) + } + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AggregateConfig { pub agg: Aggregate, + pub condition_config: Option, pub column: String, pub operator: AlertOperator, - pub value: f32, + pub value: f64, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub enum AggregateCondition { + AND, + OR, } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] @@ -223,14 +337,38 @@ impl Display for AlertState { } } +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub enum Severity { + Critical, + High, + #[default] + Medium, + Low, +} + +impl Display for Severity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Severity::Critical => write!(f, "Critical (P0)"), + Severity::High => write!(f, "High (P1)"), + Severity::Medium => write!(f, "Medium (P2)"), + Severity::Low => write!(f, "Low (P3)"), + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct AlertRequest { pub version: AlertVerison, + #[serde(default = "Severity::default")] + pub severity: Severity, pub title: String, pub query: String, pub alert_type: AlertType, - pub thresholds: Vec, + pub aggregate_config: Vec, + pub agg_condition: Option, pub eval_type: EvalConfig, pub targets: Vec, } @@ -255,13 +393,15 @@ impl AlertRequest { AlertConfig { version: self.version, id: alert.id, + severity: alert.severity, title: self.title, query: self.query, alert_type: self.alert_type, - thresholds: self.thresholds, + aggregate_config: self.aggregate_config, + agg_condition: self.agg_condition, eval_type: self.eval_type, targets: self.targets, - state: alert.state, + state: AlertState::default(), } } } @@ -270,11 +410,13 @@ impl From for AlertConfig { fn from(val: AlertRequest) -> AlertConfig { AlertConfig { version: val.version, - id: crate::utils::uid::gen(), + id: get_hash(Utc::now().timestamp_micros().to_string().as_str()), + severity: val.severity, title: val.title, query: val.query, alert_type: val.alert_type, - thresholds: val.thresholds, + aggregate_config: val.aggregate_config, + agg_condition: val.agg_condition, eval_type: val.eval_type, targets: val.targets, state: AlertState::default(), @@ -286,12 +428,13 @@ impl From for AlertConfig { #[serde(rename_all = "camelCase")] pub struct AlertConfig { pub version: AlertVerison, - #[serde(default = "crate::utils::uid::gen")] - pub id: uid::Uid, + pub id: String, + pub severity: Severity, pub title: String, pub query: String, pub alert_type: AlertType, - pub thresholds: Vec, + pub aggregate_config: Vec, + pub agg_condition: Option, pub eval_type: EvalConfig, pub targets: Vec, // for new alerts, state should be resolved @@ -300,6 +443,84 @@ pub struct AlertConfig { } impl AlertConfig { + /// Validate whether the query and the + /// aggregate config are correct or not + pub async fn validate(&self) -> Result<(), AlertError> { + // if agg_conditions limited to two + // if multiple conditions, agg_condition should be present + if self.aggregate_config.len() > 2 { + return Err(AlertError::Metadata( + "aggregateConfig can't be more than two", + )); + } + + if self.aggregate_config.len() > 1 { + if self.agg_condition.is_none() { + return Err(AlertError::Metadata( + "aggCondition can't be null with multiple aggregateConfigs", + )); + } + } + + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(&self.query).await?; + + // TODO: Filter tags should be taken care of!!! + let time_range = TimeRange::parse_human_time("1m", "now") + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + let query = crate::query::Query { + raw_logical_plan, + time_range, + filter_tag: None, + }; + + // for now proceed in a similar fashion as we do in query + // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) + let stream_name = if let Some(stream_name) = query.first_table_name() { + stream_name + } else { + return Err(AlertError::CustomError(format!( + "Table name not found in query- {}", + self.query + ))); + }; + + let base_df = query + .get_dataframe(stream_name) + .await + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + warn!("got base_df"); + // now that we have base_df, verify that it has + // columns from aggregate config + let mut columns = HashSet::new(); + for agg_config in &self.aggregate_config { + columns.insert(&agg_config.column); + + if let Some(filters) = &agg_config.condition_config { + match filters { + Conditions::AND((e1, e2)) => { + columns.insert(&e1.column); + columns.insert(&e2.column); + } + Conditions::OR((e1, e2)) => { + columns.insert(&e1.column); + columns.insert(&e2.column); + } + Conditions::Condition(e1) => { + columns.insert(&e1.column); + } + } + } + } + + warn!("select_columns- {columns:?}"); + + base_df.select_columns(columns.iter().map(|c| c.as_str()).collect_vec().as_slice())?; + Ok(()) + } + pub fn get_eval_frequency(&self) -> u32 { match &self.eval_type { EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency, @@ -322,13 +543,20 @@ impl AlertConfig { // .expect("can be flattened"); Context::new( - AlertInfo::new(self.id.to_string(), self.title.clone(), alert_state), + AlertInfo::new( + self.id.to_string(), + self.title.clone(), + alert_state, + self.severity.clone().to_string(), + ), DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), + String::new(), ) } - pub async fn trigger_notifications(&self) -> Result<(), AlertError> { - let context = self.get_context(self.state); + pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> { + let mut context = self.get_context(self.state); + context.message = message; for target in &self.targets { target.call(context.clone()); } @@ -396,7 +624,7 @@ impl Alerts { let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; - self.update_task(alert.id, handle, rx, tx).await; + self.update_task(&alert.id, handle, rx, tx).await; this.push(alert); } @@ -407,7 +635,7 @@ impl Alerts { // run eval task once for each alert for alert in this.iter() { - evaluate_alert(alert).await?; + evaluate_alert_the_second(alert).await?; } Ok(()) @@ -433,7 +661,7 @@ impl Alerts { /// Returns a sigle alert that the user has access to (based on query auth) pub async fn get_alert_by_id(&self, id: &str) -> Result { let read_access = self.alerts.read().await; - let alert = read_access.iter().find(|a| a.id.to_string() == id); + let alert = read_access.iter().find(|a| a.id == id); if let Some(alert) = alert { Ok(alert.clone()) @@ -456,7 +684,7 @@ impl Alerts { &self, alert_id: &str, new_state: AlertState, - trigger_notif: bool, + trigger_notif: Option, ) -> Result<(), AlertError> { let store = CONFIG.storage().get_object_store(); @@ -470,16 +698,14 @@ impl Alerts { // modify in memory let mut writer = self.alerts.write().await; - let alert_to_update = writer - .iter_mut() - .find(|alert| alert.id.to_string() == alert_id); + let alert_to_update = writer.iter_mut().find(|alert| alert.id == alert_id); if let Some(alert) = alert_to_update { alert.state = new_state; }; drop(writer); - if trigger_notif { - alert.trigger_notifications().await?; + if trigger_notif.is_some() { + alert.trigger_notifications(trigger_notif.unwrap()).await?; } Ok(()) @@ -493,7 +719,7 @@ impl Alerts { let index = read_access .iter() .enumerate() - .find(|(_, alert)| alert.id.to_string() == alert_id) + .find(|(_, alert)| alert.id == alert_id) .to_owned(); if let Some((index, _)) = index { @@ -510,7 +736,7 @@ impl Alerts { /// Get state of alert using alert_id pub async fn get_state(&self, alert_id: &str) -> Result { let read_access = self.alerts.read().await; - let alert = read_access.iter().find(|a| a.id.to_string() == alert_id); + let alert = read_access.iter().find(|a| a.id == alert_id); if let Some(alert) = alert { Ok(alert.state) @@ -523,35 +749,28 @@ impl Alerts { /// Update the scheduled alert tasks in-memory map pub async fn update_task( &self, - id: Uid, + id: &str, handle: JoinHandle<()>, rx: Receiver<()>, tx: Sender<()>, ) { let mut s = self.scheduled_tasks.write().await; - s.insert(id, (handle, rx, tx)); + s.remove(id); + s.insert(id.to_owned(), (handle, rx, tx)); } /// Remove a scheduled alert task pub async fn delete_task(&self, alert_id: &str) -> Result<(), AlertError> { let read_access = self.scheduled_tasks.read().await; - let hashed_object = read_access - .iter() - .find(|(id, _)| id.to_string() == alert_id); + let hashed_object = read_access.iter().find(|(id, _)| *id == alert_id); if hashed_object.is_some() { // drop the read access in order to get exclusive write access drop(read_access); // now delete from hashmap - let removed = - self.scheduled_tasks - .write() - .await - .remove(&Ulid::from_string(alert_id).map_err(|_| { - AlertError::CustomError("Unable to decode Ulid".to_owned()) - })?); + let removed = self.scheduled_tasks.write().await.remove(alert_id); if removed.is_none() { trace!("Unable to remove alert task {alert_id} from hashmap"); diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 684a8034a..d94baa67d 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -18,7 +18,7 @@ use crate::{ option::CONFIG, - storage::{object_storage::alert_json_path, ALERTS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY}, + storage::{object_storage::alert_json_path, ALERTS_ROOT_DIRECTORY}, sync::schedule_alert_task, utils::actix::extract_session_key_from_req, }; @@ -43,11 +43,16 @@ pub async fn list(req: HttpRequest) -> Result { // POST /alerts pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result { let alert: AlertConfig = alert.into(); + alert.validate().await?; + // validate the incoming alert query // does the user have access to these tables or not? let session_key = extract_session_key_from_req(&req)?; user_auth_for_query(&session_key, &alert.query).await?; + // create scheduled tasks + let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + // now that we've validated that the user can run this query // move on to saving the alert in ObjectStore ALERTS.update(&alert).await; @@ -58,10 +63,7 @@ pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result Result Result Result Result { // from here, the user can only go to Resolved if new_state == AlertState::Resolved { // update state on disk and in memory - ALERTS.update_state(alert_id, new_state, true).await?; + ALERTS + .update_state(alert_id, new_state, Some("".into())) + .await?; } else { let msg = format!("Not allowed to manually go from Silenced to {new_state}"); return Err(AlertError::InvalidStateChange(msg)); diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 1229425b1..622c263ac 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -39,7 +39,7 @@ use crate::{ }; use super::{ - LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, ALERTS_ROOT_DIRECTORY, CORRELATIONS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; @@ -297,10 +297,15 @@ impl ObjectStorage for LocalFS { async fn list_streams(&self) -> Result, ObjectStorageError> { let ignore_dir = &[ + "lost+found", + PARSEABLE_ROOT_DIRECTORY, + USERS_ROOT_DIR, CORRELATIONS_ROOT_DIRECTORY, + , + ALERTS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; @@ -322,9 +327,13 @@ impl ObjectStorage for LocalFS { async fn list_old_streams(&self) -> Result, ObjectStorageError> { let ignore_dir = &[ + "lost+found", + PARSEABLE_ROOT_DIRECTORY, CORRELATIONS_ROOT_DIRECTORY, + , + ALERTS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 297dfa1dc..c03b3b046 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -44,7 +44,7 @@ pub use store_metadata::{ }; // metadata file names in a Stream prefix -pub const ALERTS_ROOT_DIRECTORY: &str = ".alert"; +pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; pub const STREAM_ROOT_DIRECTORY: &str = ".stream"; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 76ee6f0fb..dc483491d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -301,8 +301,7 @@ pub trait ObjectStorage: Send + Sync + 'static { } async fn get_alerts(&self) -> Result, ObjectStorageError> { - let alerts_path = - RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, ALERTS_ROOT_DIRECTORY]); + let alerts_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]); let alerts_bytes = self .get_objects( Some(&alerts_path), @@ -728,11 +727,7 @@ pub fn parseable_json_path() -> RelativePathBuf { /// TODO: Needs to be updated for distributed mode #[inline(always)] pub fn alert_json_path(alert_id: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([ - PARSEABLE_ROOT_DIRECTORY, - ALERTS_ROOT_DIRECTORY, - &format!("{alert_id}.json"), - ]) + RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("{alert_id}.json")]) // RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, ALERT_FILE_NAME]) } diff --git a/src/sync.rs b/src/sync.rs index d843e3a3a..d064ad9ca 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -158,7 +158,7 @@ pub async fn schedule_alert_task( scheduler.every((eval_frequency).minutes()).run(move || { let alert_val = alert.clone(); async move { - match alerts_utils::evaluate_alert(&alert_val).await { + match alerts_utils::evaluate_alert_the_second(&alert_val).await { Ok(_) => {} Err(err) => error!("Error while evaluation- {err}"), }