Skip to content

Commit

Permalink
refactor: deepsource
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Jan 27, 2025
1 parent 94793e6 commit cab2dfa
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 159 deletions.
287 changes: 130 additions & 157 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::{
min_max::{max, min},
sum::sum,
},
prelude::{col, lit, Expr},
prelude::{col, lit, DataFrame, Expr},
};
use tracing::trace;

Expand All @@ -41,8 +41,8 @@ use crate::{
};

use super::{
AggregateConfig, AggregateOperation, Aggregations, AlertConfig, AlertError, AlertOperator,
AlertState, ConditionConfig, Conditions, ALERTS,
AggregateConfig, AggregateOperation, AggregateResult, Aggregations, AlertConfig, AlertError,
AlertOperator, AlertState, ConditionConfig, Conditions, ALERTS,
};

async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
Expand Down Expand Up @@ -100,6 +100,16 @@ pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Resul
pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
trace!("RUNNING EVAL TASK FOR- {alert:?}");

let query = prepare_query(alert).await?;
let base_df = execute_base_query(&query, &alert.query).await?;
let agg_results = evaluate_aggregates(&alert.aggregate_config, &base_df).await?;
let final_res = calculate_final_result(&alert.aggregate_config, &agg_results);

update_alert_state(alert, final_res, &agg_results).await?;
Ok(())
}

async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, AlertError> {
let (start_time, end_time) = match &alert.eval_type {
super::EvalConfig::RollingWindow(rolling_window) => {
(&rolling_window.eval_start, &rolling_window.eval_end)
Expand All @@ -109,195 +119,158 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
let session_state = QUERY_SESSION.state();
let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?;

// TODO: Filter tags should be taken care of!!!
let time_range = TimeRange::parse_human_time(start_time, end_time)
.map_err(|err| AlertError::CustomError(err.to_string()))?;

let query = crate::query::Query {
Ok(crate::query::Query {
raw_logical_plan,
time_range,
filter_tag: None,
};
})
}

// for now proceed in a similar fashion as we do in query
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
let stream_name = if let Some(stream_name) = query.first_table_name() {
stream_name
} else {
return Err(AlertError::CustomError(format!(
"Table name not found in query- {}",
alert.query
)));
};
async fn execute_base_query(
query: &crate::query::Query,
original_query: &str,
) -> Result<DataFrame, AlertError> {
let stream_name = query.first_table_name().ok_or_else(|| {
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
})?;

let base_df = query
query
.get_dataframe(stream_name)
.await
.map_err(|err| AlertError::CustomError(err.to_string()))?;
.map_err(|err| AlertError::CustomError(err.to_string()))
}

let mut agg_results = vec![];

let agg_filter_exprs = get_exprs(&alert.aggregate_config);

let final_res = match &alert.aggregate_config.operator {
Some(op) => {
match op {
AggregateCondition::And | AggregateCondition::Or => {
let agg1 = &alert.aggregate_config.aggregate_conditions[0];
let agg2 = &alert.aggregate_config.aggregate_conditions[1];

for ((agg_expr, filter), agg) in agg_filter_exprs.into_iter().zip([agg1, agg2])
{
let filtered_df = if let Some(filter) = filter {
base_df.clone().filter(filter)?
} else {
base_df.clone()
};

let aggregated_rows = filtered_df
.aggregate(vec![], vec![agg_expr])?
.collect()
.await?;

let final_value = get_final_value(aggregated_rows);

// now compare
let res = match &agg.operator {
AlertOperator::GreaterThan => final_value > agg.value,
AlertOperator::LessThan => final_value < agg.value,
AlertOperator::EqualTo => final_value == agg.value,
AlertOperator::NotEqualTo => final_value != agg.value,
AlertOperator::GreaterThanEqualTo => final_value >= agg.value,
AlertOperator::LessThanEqualTo => final_value <= agg.value,
_ => unreachable!(),
};

let message = if res {
if agg.condition_config.is_some() {
Some(
agg.condition_config
.as_ref()
.unwrap()
.generate_filter_message(),
)
} else {
Some(String::default())
}
} else {
None
};

agg_results.push((res, message, agg, final_value));
}
let res = match &alert.aggregate_config.operator.clone().unwrap() {
AggregateCondition::And => agg_results.iter().all(|(r, _, _, _)| *r),
AggregateCondition::Or => agg_results.iter().any(|(r, _, _, _)| *r),
};
async fn evaluate_aggregates(
agg_config: &Aggregations,
base_df: &DataFrame,
) -> Result<Vec<AggregateResult>, AlertError> {
let agg_filter_exprs = get_exprs(agg_config);
let mut results = Vec::new();

res
}
}
}
None => {
let agg = &alert.aggregate_config.aggregate_conditions[0];
let (agg_expr, filter) = &agg_filter_exprs[0];
let filtered_df = if let Some(filter) = filter {
base_df.filter(filter.clone())?
} else {
base_df
};
let conditions = match &agg_config.operator {
Some(_) => &agg_config.aggregate_conditions[0..2],
None => &agg_config.aggregate_conditions[0..1],
};

let aggregated_rows = filtered_df
.aggregate(vec![], vec![agg_expr.clone()])?
.collect()
.await?;

let final_value = get_final_value(aggregated_rows);

// now compare
let res = match &agg.operator {
AlertOperator::GreaterThan => final_value > agg.value,
AlertOperator::LessThan => final_value < agg.value,
AlertOperator::EqualTo => final_value == agg.value,
AlertOperator::NotEqualTo => final_value != agg.value,
AlertOperator::GreaterThanEqualTo => final_value >= agg.value,
AlertOperator::LessThanEqualTo => final_value <= agg.value,
_ => unreachable!(),
};
for ((agg_expr, filter), agg) in agg_filter_exprs.into_iter().zip(conditions) {
let result = evaluate_single_aggregate(base_df, filter, agg_expr, agg).await?;
results.push(result);
}

let message = if res {
if agg.condition_config.is_some() {
Some(
agg.condition_config
.as_ref()
.unwrap()
.generate_filter_message(),
)
} else {
Some(String::default())
}
} else {
None
};
Ok(results)
}

async fn evaluate_single_aggregate(
base_df: &DataFrame,
filter: Option<Expr>,
agg_expr: Expr,
agg: &AggregateConfig,
) -> Result<AggregateResult, AlertError> {
let filtered_df = if let Some(filter) = filter {
base_df.clone().filter(filter)?
} else {
base_df.clone()
};

agg_results.push((res, message, agg, final_value));
let aggregated_rows = filtered_df
.aggregate(vec![], vec![agg_expr])?
.collect()
.await?;

res
}
let final_value = get_final_value(aggregated_rows);
let result = evaluate_condition(&agg.operator, final_value, agg.value);

let message = if result {
agg.condition_config
.as_ref()
.map(|config| config.generate_filter_message())
.or(Some(String::default()))
} else {
None
};

trace!(
"alert.state.eq(&AlertState::Triggered)-\n{}",
alert.state.eq(&AlertState::Triggered)
);
trace!("final_res- {final_res}");
Ok(AggregateResult {
result,
message,
config: agg.clone(),
value: final_value,
})
}

if final_res {
trace!("ALERT!!!!!!");
fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> bool {
match operator {
AlertOperator::GreaterThan => actual > expected,
AlertOperator::LessThan => actual < expected,
AlertOperator::EqualTo => actual == expected,
AlertOperator::NotEqualTo => actual != expected,
AlertOperator::GreaterThanEqualTo => actual >= expected,
AlertOperator::LessThanEqualTo => actual <= expected,
_ => unreachable!(),
}
}

let mut message = String::default();
for (_, filter_msg, agg_config, final_value) in agg_results {
if let Some(msg) = filter_msg {
message.extend([format!(
"|{}({}) WHERE ({}) {} {} (ActualValue: {})|",
agg_config.agg,
agg_config.column,
msg,
agg_config.operator,
agg_config.value,
final_value
)]);
} else {
message.extend([format!(
"|{}({}) {} {} (ActualValue: {})",
agg_config.agg,
agg_config.column,
agg_config.operator,
agg_config.value,
final_value
)]);
}
}
fn calculate_final_result(agg_config: &Aggregations, results: &[AggregateResult]) -> bool {
match &agg_config.operator {
Some(AggregateCondition::And) => results.iter().all(|r| r.result),
Some(AggregateCondition::Or) => results.iter().any(|r| r.result),
None => results.first().is_some_and(|r| r.result),
}
}

// update state
async fn update_alert_state(
alert: &AlertConfig,
final_res: bool,
agg_results: &[AggregateResult],
) -> Result<(), AlertError> {
if final_res {
trace!("ALERT!!!!!!");
let message = format_alert_message(agg_results);
ALERTS
.update_state(&alert.id.to_string(), AlertState::Triggered, Some(message))
.await?;
.await
} else if ALERTS
.get_state(&alert.id)
.await?
.eq(&AlertState::Triggered)
{
ALERTS
.update_state(&alert.id.to_string(), AlertState::Resolved, Some("".into()))
.await?;
.await
} else {
ALERTS
.update_state(&alert.id.to_string(), AlertState::Resolved, None)
.await?;
.await
}
}

Ok(())
fn format_alert_message(agg_results: &[AggregateResult]) -> String {
let mut message = String::default();
for result in agg_results {
if let Some(msg) = &result.message {
message.extend([format!(
"|{}({}) WHERE ({}) {} {} (ActualValue: {})|",
result.config.agg,
result.config.column,
msg,
result.config.operator,
result.config.value,
result.value
)]);
} else {
message.extend([format!(
"|{}({}) {} {} (ActualValue: {})",
result.config.agg,
result.config.column,
result.config.operator,
result.config.value,
result.value
)]);
}
}
message
}

fn get_final_value(aggregated_rows: Vec<RecordBatch>) -> f64 {
Expand Down
7 changes: 7 additions & 0 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ impl From<&str> for AlertVerison {
}
}

pub struct AggregateResult {
result: bool,
message: Option<String>,
config: AggregateConfig,
value: f64,
}

#[async_trait]
pub trait CallableTarget {
async fn call(&self, payload: &Context);
Expand Down
2 changes: 0 additions & 2 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use super::modal::utils::logstream_utils::{
create_stream_and_schema_from_storage, create_update_stream, update_first_event_at,
};
use super::query::update_schema_when_distributed;
use crate::alerts::Alerts;
use crate::catalog::get_first_event;
use crate::event::format::{override_data_type, LogSource};
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
Expand Down

0 comments on commit cab2dfa

Please sign in to comment.