Skip to content

Commit

Permalink
misc: error cleanups
Browse files Browse the repository at this point in the history
AppError is essentially an ApiError. The idea is we can use "anyhow"
everywhere with the exception of API handlers, as we want to turn
those into specific errors codes like bad request, internal server
error, not implemented, etc.
  • Loading branch information
jasonish committed Dec 21, 2024
1 parent 1d17ad0 commit 78b9f3f
Show file tree
Hide file tree
Showing 19 changed files with 190 additions and 210 deletions.
7 changes: 3 additions & 4 deletions src/elastic/eventrepo/alerts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: (C) 2020 Jason Ish <[email protected]>
// SPDX-License-Identifier: MIT

use tracing::{debug, error, warn};
use crate::prelude::*;

use crate::{
elastic::{AlertQueryOptions, ElasticResponse},
Expand All @@ -10,7 +10,6 @@ use crate::{
};

use super::{ElasticEventRepo, MINIMUM_SHOULD_MATCH};
use crate::error::AppError;

impl ElasticEventRepo {
pub fn build_inbox_query(&self, options: AlertQueryOptions) -> serde_json::Value {
Expand Down Expand Up @@ -161,14 +160,14 @@ impl ElasticEventRepo {
query
}

pub async fn alerts(&self, options: AlertQueryOptions) -> Result<AlertsResult, AppError> {
pub async fn alerts(&self, options: AlertQueryOptions) -> Result<AlertsResult> {
let mut query = self.build_inbox_query(options);
query["timeout"] = "3s".into();
let start = std::time::Instant::now();
let body = self.search(&query).await?.text().await?;
let response: ElasticResponse = serde_json::from_str(&body)?;
if let Some(error) = &response.error {
return Err(AppError::ElasticSearchError(error.first_reason()));
bail!("elasticsearch: {}", error.first_reason());
}

debug!(
Expand Down
9 changes: 5 additions & 4 deletions src/elastic/eventrepo/dhcp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// SPDX-FileCopyrightText: (C) 2023 Jason Ish <[email protected]>
// SPDX-License-Identifier: MIT

use crate::prelude::*;

use super::ElasticEventRepo;
use crate::error::AppError;
use crate::{
datetime::DateTime,
elastic::request::{term_filter, timestamp_gte_filter},
Expand All @@ -14,7 +15,7 @@ impl ElasticEventRepo {
earliest: Option<DateTime>,
dhcp_type: &str,
sensor: Option<String>,
) -> Result<Vec<serde_json::Value>, AppError> {
) -> Result<Vec<serde_json::Value>> {
let mut filters = vec![];

if let Some(earliest) = &earliest {
Expand Down Expand Up @@ -67,15 +68,15 @@ impl ElasticEventRepo {
&self,
earliest: Option<DateTime>,
sensor: Option<String>,
) -> Result<Vec<serde_json::Value>, AppError> {
) -> Result<Vec<serde_json::Value>> {
self.dhcp(earliest, "request", sensor).await
}

pub async fn dhcp_ack(
&self,
earliest: Option<DateTime>,
sensor: Option<String>,
) -> Result<Vec<serde_json::Value>, AppError> {
) -> Result<Vec<serde_json::Value>> {
self.dhcp(earliest, "ack", sensor).await
}
}
4 changes: 2 additions & 2 deletions src/elastic/eventrepo/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
ElasticEventRepo,
};
use crate::elastic::DateTime;
use crate::error::AppError;
use crate::prelude::*;

impl ElasticEventRepo {
pub(crate) async fn dns_reverse_lookup(
Expand All @@ -15,7 +15,7 @@ impl ElasticEventRepo {
sensor: Option<String>,
src_ip: String,
dest_ip: String,
) -> Result<serde_json::Value, AppError> {
) -> Result<serde_json::Value> {
let mut filters = vec![];

filters.push(term_filter(&self.map_field("event_type"), "dns"));
Expand Down
10 changes: 3 additions & 7 deletions src/elastic/eventrepo/events.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
// SPDX-FileCopyrightText: (C) 2020 Jason Ish <[email protected]>
// SPDX-License-Identifier: MIT

use crate::prelude::*;

use super::ElasticEventRepo;
use crate::elastic::request;
use crate::error::AppError;
use crate::eventrepo::{self};
use crate::LOG_QUERIES;
use serde_json::json;
use tracing::info;
use tracing::warn;

const MINIMUM_SHOULD_MATCH: &str = "minimum_should_match";

impl ElasticEventRepo {
pub async fn events(
&self,
params: eventrepo::EventQueryParams,
) -> Result<serde_json::Value, AppError> {
pub async fn events(&self, params: eventrepo::EventQueryParams) -> Result<serde_json::Value> {
let mut filters = vec![request::exists_filter(&self.map_field("event_type"))];
let mut should = vec![];
let mut must_not = vec![];
Expand Down
52 changes: 20 additions & 32 deletions src/elastic/eventrepo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::datetime;
use crate::elastic::importer::ElasticEventSink;
use crate::elastic::request::exists_filter;
use crate::elastic::{request, ElasticResponse, TAGS_ARCHIVED, TAGS_ESCALATED, TAG_ARCHIVED};
use crate::error::AppError;
use crate::prelude::*;
use crate::queryparser;
use crate::queryparser::QueryElement;
use crate::queryparser::QueryParser;
Expand All @@ -20,9 +20,6 @@ use crate::util;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use tracing::debug;
use tracing::error;
use tracing::warn;

mod alerts;
mod dhcp;
Expand Down Expand Up @@ -142,7 +139,7 @@ impl ElasticEventRepo {
query: serde_json::Value,
tag: &str,
action: &HistoryEntry,
) -> Result<(), AppError> {
) -> Result<()> {
self.add_tags_by_query(query, &[tag], action).await
}

Expand All @@ -151,7 +148,7 @@ impl ElasticEventRepo {
query: serde_json::Value,
tags: &[&str],
action: &HistoryEntry,
) -> Result<(), AppError> {
) -> Result<()> {
let script = json!({
"lang": "painless",
"inline": "
Expand Down Expand Up @@ -202,7 +199,7 @@ impl ElasticEventRepo {
query: serde_json::Value,
tag: &str,
action: &HistoryEntry,
) -> Result<(), AppError> {
) -> Result<()> {
self.remove_tags_by_query(query, &[tag], action).await
}

Expand All @@ -211,7 +208,7 @@ impl ElasticEventRepo {
query: serde_json::Value,
tags: &[&str],
action: &HistoryEntry,
) -> Result<(), AppError> {
) -> Result<()> {
let script = json!({
"lang": "painless",
"inline": "
Expand Down Expand Up @@ -247,7 +244,7 @@ impl ElasticEventRepo {
alert_group: api::AlertGroupSpec,
tags: &[&str],
action: &HistoryEntry,
) -> Result<(), AppError> {
) -> Result<()> {
let mut must_not = Vec::new();
for tag in tags {
must_not.push(json!({"term": {"tags": tag}}));
Expand All @@ -268,7 +265,7 @@ impl ElasticEventRepo {
alert_group: api::AlertGroupSpec,
tags: &[&str],
action: &HistoryEntry,
) -> Result<(), AppError> {
) -> Result<()> {
let mut filters = self.build_alert_group_filter(&alert_group);
for tag in tags {
filters.push(json!({"term": {"tags": tag}}));
Expand All @@ -281,7 +278,7 @@ impl ElasticEventRepo {
self.remove_tags_by_query(query, tags, action).await
}

pub async fn archive_event_by_id(&self, event_id: &str) -> Result<(), AppError> {
pub async fn archive_event_by_id(&self, event_id: &str) -> Result<()> {
let query = json!({
"bool": {
"filter": {
Expand All @@ -293,7 +290,7 @@ impl ElasticEventRepo {
self.add_tag_by_query(query, TAG_ARCHIVED, &action).await
}

pub async fn escalate_event_by_id(&self, event_id: &str) -> Result<(), AppError> {
pub async fn escalate_event_by_id(&self, event_id: &str) -> Result<()> {
let query = json!({
"bool": {
"filter": {
Expand All @@ -305,7 +302,7 @@ impl ElasticEventRepo {
self.add_tag_by_query(query, TAG_ESCALATED, &action).await
}

pub async fn deescalate_event_by_id(&self, event_id: &str) -> Result<(), AppError> {
pub async fn deescalate_event_by_id(&self, event_id: &str) -> Result<()> {
let query = json!({
"bool": {
"filter": {
Expand All @@ -323,7 +320,7 @@ impl ElasticEventRepo {
event_id: &str,
comment: String,
session: Arc<Session>,
) -> Result<(), AppError> {
) -> Result<()> {
let query = json!({
"bool": {
"filter": {
Expand All @@ -338,10 +335,7 @@ impl ElasticEventRepo {
self.add_tags_by_query(query, &[], &action).await
}

pub async fn get_event_by_id(
&self,
event_id: String,
) -> Result<Option<serde_json::Value>, AppError> {
pub async fn get_event_by_id(&self, event_id: String) -> Result<Option<serde_json::Value>> {
let query = json!({
"query": {
"bool": {
Expand All @@ -353,7 +347,7 @@ impl ElasticEventRepo {
});
let response: ElasticResponse = self.search(&query).await?.json().await?;
if let Some(error) = response.error {
return Err(AppError::ElasticSearchError(error.first_reason()));
bail!("elasticsearch: {}", error.first_reason());
} else if let Some(hits) = &response.hits {
if let serde_json::Value::Array(hits) = &hits["hits"] {
if !hits.is_empty() {
Expand Down Expand Up @@ -584,10 +578,7 @@ impl ElasticEventRepo {
}
}

pub async fn archive_by_alert_group(
&self,
alert_group: api::AlertGroupSpec,
) -> Result<(), AppError> {
pub async fn archive_by_alert_group(&self, alert_group: api::AlertGroupSpec) -> Result<()> {
let action = HistoryEntryBuilder::new_archive().build();
self.add_tags_by_alert_group(alert_group, &TAGS_ARCHIVED, &action)
.await
Expand All @@ -597,24 +588,21 @@ impl ElasticEventRepo {
&self,
alert_group: api::AlertGroupSpec,
session: Arc<Session>,
) -> Result<(), AppError> {
) -> Result<()> {
let action = HistoryEntryBuilder::new_escalate()
.username(session.username.clone())
.build();
self.add_tags_by_alert_group(alert_group, &TAGS_ESCALATED, &action)
.await
}

pub async fn deescalate_by_alert_group(
&self,
alert_group: api::AlertGroupSpec,
) -> Result<(), AppError> {
pub async fn deescalate_by_alert_group(&self, alert_group: api::AlertGroupSpec) -> Result<()> {
let action = HistoryEntryBuilder::new_deescalate().build();
self.remove_tags_by_alert_group(alert_group, &TAGS_ESCALATED, &action)
.await
}

async fn get_earliest_timestamp(&self) -> Result<Option<crate::datetime::DateTime>, AppError> {
async fn get_earliest_timestamp(&self) -> Result<Option<crate::datetime::DateTime>> {
#[rustfmt::skip]
let request = json!({
"query": {
Expand Down Expand Up @@ -647,7 +635,7 @@ impl ElasticEventRepo {
&self,
interval: Option<u64>,
query: &[QueryElement],
) -> Result<Vec<serde_json::Value>, AppError> {
) -> Result<Vec<serde_json::Value>> {
let qs = QueryParser::new(query.to_vec());
let mut filters = vec![exists_filter(&self.map_field("event_type"))];
let mut should = vec![];
Expand Down Expand Up @@ -723,7 +711,7 @@ impl ElasticEventRepo {
size: usize,
order: &str,
query: Vec<queryparser::QueryElement>,
) -> Result<Vec<serde_json::Value>, AppError> {
) -> Result<Vec<serde_json::Value>> {
let mut filter = vec![];
let mut should = vec![];
let mut must_not = vec![];
Expand Down Expand Up @@ -795,7 +783,7 @@ impl ElasticEventRepo {

if let Some(error) = response["error"].as_object() {
error!("Elasticsearch \"group_by\" query failed: {error:?}");
Err(AppError::ElasticSearchError(format!("{error:?}")))
bail!("elasticsearch: {:?}", error);
} else {
let mut data = vec![];
if let serde_json::Value::Array(buckets) = &response["aggregations"]["agg"]["buckets"] {
Expand Down
9 changes: 0 additions & 9 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,12 @@ pub(crate) enum AppError {
#[error("bad request: {0}")]
BadRequest(String),

#[error("unimplemented")]
Unimplemented,

#[error("elasticsearch error: {0}")]
ElasticSearchError(String),

#[error("{0}")]
ReqwestError(#[from] reqwest::Error),

#[error("serde: {0}")]
SerdeJsonError(#[from] serde_json::Error),

#[error("event not found")]
EventNotFound,

#[error("failed to parse integer")]
ParseIntError(#[from] ParseIntError),

Expand Down
Loading

0 comments on commit 78b9f3f

Please sign in to comment.