Skip to content

Commit

Permalink
elastic: don't use runtime mappings
Browse files Browse the repository at this point in the history
As Opensearch doesn't support runtime mappings, use scripted fields and
nested boolean queries to handle the 2 types of DNS records found over
Suricata 7 and 8.

Fixes #322.
  • Loading branch information
jasonish committed Dec 18, 2024
1 parent d00502b commit 11cb8b7
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 94 deletions.
4 changes: 0 additions & 4 deletions src/elastic/eventrepo/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ impl ElasticEventRepo {
"size": size,
});

if self.runtime_mappings_supported {
body["runtime_mappings"] = self.runtime_mappings();
}

if !should.is_empty() {
body["query"]["bool"]["should"] = should.into();
body["query"]["bool"][MINIMUM_SHOULD_MATCH] = 1.into();
Expand Down
174 changes: 88 additions & 86 deletions src/elastic/eventrepo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub(crate) struct ElasticEventRepo {
pub index_pattern: String,
pub client: Client,
pub ecs: bool,
pub runtime_mappings_supported: bool,
}

impl ElasticEventRepo {
Expand Down Expand Up @@ -111,25 +110,10 @@ impl ElasticEventRepo {
"dest_ip" => "dest_ip.keyword",
"dhcp.assigned_ip" => "dhcp.assigned_ip.keyword",
"dhcp.client_mac" => "dhcp.client_mac.keyword",
"dns.type" => {
if self.runtime_mappings_supported {
"dns_type.keyword"
} else {
"dns.type.keyword"
}
}
"dns.type" => "dns.type.keyword",
"dns.rcode" => "dns.rcode.keyword",
"dns.rdata" => "dns.rdata.keyword",
"dns.rrname" => {
if self.runtime_mappings_supported {
"dns_query_rrname.keyword"
} else {
// Assume Suricata 7 for these older
// Opensearch versions.
"dns.rrname.keyword"
}
}
"dns.queries.rrname" => "dns_query_rrname.keyword",
"dns.rrname" | "dns.queries.rrname" => "dns.queries.rrname.keyword",
"dns.rrtype" => "dns.rrtype.keyword",
"event_type" => "event_type.keyword",
"host" => "host.keyword",
Expand Down Expand Up @@ -440,16 +424,72 @@ impl ElasticEventRepo {
should.push(json!({"term": {self.map_field("dhcp.subnet_mask"): v}}));
}
}
"dns.type" => match v.as_ref() {
"query" | "request" => {
let bool_should = json!({
"bool": {
"should": [
{"term": {self.map_field(k): "query"}},
{"term": {self.map_field(k): "request"}},
]
}
});
if el.negated {
must_not.push(bool_should);
} else {
filter.push(bool_should);
}
}
"answer" | "response" => {
let bool_should = json!({
"bool": {
"should": [
{"term": {self.map_field(k): "answer"}},
{"term": {self.map_field(k): "response"}},
]
}
});
if el.negated {
must_not.push(bool_should);
} else {
filter.push(bool_should);
}
}
_ => {
if el.negated {
must_not.push(json!({"term": {self.map_field(k): v}}));
} else {
filter.push(json!({"term": {self.map_field(k): v}}));
}
}
},
_ => {
if k.starts_with('@') {
warn!("Unhandled @ parameter in query string: {k}");
} else if k.starts_with(' ') {
warn!("Query parameter starting with a space: {k}");
}

let mapped_field = self.map_field(k);

let expression = match mapped_field.as_ref() {
"dns.queries.rrname.keyword" => {
json!({
"bool": {
"should": [
{"term": {"dns.queries.rrname.keyword": v}},
{"term": {"dns.rrname.keyword": v}}
]
}
})
}
_ => request::term_filter(&mapped_field, v),
};

if el.negated {
must_not.push(request::term_filter(&self.map_field(k), v))
must_not.push(expression);
} else {
filter.push(request::term_filter(&self.map_field(k), v))
filter.push(expression);
}
}
},
Expand Down Expand Up @@ -679,53 +719,6 @@ impl ElasticEventRepo {
Ok(data)
}

fn runtime_mappings(&self) -> serde_json::Value {
json!({
"dns_type.keyword": {
"type": "keyword",
"script": {
"source": r#"
try {
if (doc.containsKey('dns.type')) {
if (doc['dns.type.keyword'].value == "request") {
emit("query");
} else if (doc['dns.type.keyword'].value == "response") {
emit("answer");
} else {
emit(doc['dns.type.keyword'].value);
}
}
}
catch (Exception e) {
}
"#
}
},
"dns_query_rrname.keyword": {
"type": "keyword",
"script": {
"source": r#"
try {
if (doc['dns.version'].size() != 0 && doc['dns.version'].value > 2) {
if (doc.containsKey('dns.queries.rrname')) {
emit(doc['dns.queries.rrname.keyword'].value);
}
} else {
if (doc.containsKey('dns.rrname')) {
if (doc['dns.rrname.keyword'].size() != 0) {
emit(doc['dns.rrname.keyword'].value);
}
}
}
}
catch (Exception e) {
}
"#,
}
}
})
}

pub async fn agg(
&self,
field: &str,
Expand All @@ -741,26 +734,39 @@ impl ElasticEventRepo {

let field = self.map_field(field);

#[rustfmt::skip]
let mut agg = json!({});

match field.as_ref() {
"dns.queries.rrname.keyword" => {
agg["script"] = json!({
"source": r#"
if (doc.containsKey('dns.queries.rrname.keyword') && doc['dns.queries.rrname.keyword'].size() != 0) {
return doc['dns.queries.rrname.keyword'];
}
else if (doc.containsKey('dns.rrname.keyword') && doc['dns.rrname.keyword'].size() != 0) {
return doc['dns.rrname.keyword'];
}
"#,
});
}
_ => {
agg["field"] = field.clone().into();
}
}

let agg = if order == "asc" {
// We're after a rare terms...
// Increase the max_doc_count, otherwise only
// terms that appear once will be returned, but
// we're after the least occurring, but those
// numbers could still be high.
agg["max_doc_count"] = 100.into();
json!({
"rare_terms": {
"field": field,
// Increase the max_doc_count, otherwise only
// terms that appear once will be returned, but
// we're after the least occurring, but those
// numbers could still be high.
"max_doc_count": 100,
}
"rare_terms": agg
})
} else {
// This is a normal "Top 10"...
agg["size"] = size.into();
json!({
"terms": {
"field": &field,
"size": size,
},
"terms": agg
})
};

Expand All @@ -782,10 +788,6 @@ impl ElasticEventRepo {
},
});

if self.runtime_mappings_supported {
query["runtime_mappings"] = self.runtime_mappings();
}

if !should.is_empty() {
query["query"]["bool"]["should"] = should.into();
query["query"]["bool"][MINIMUM_SHOULD_MATCH] = 1.into();
Expand Down
4 changes: 0 additions & 4 deletions src/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ async fn configure_datastore(config: Config, server_config: &ServerConfig) -> Re

let client = client.build();

let mut runtime_mappings_supported = true;

let server_info = client.wait_for_info().await;
if matches!(
server_info.version.distribution.as_deref(),
Expand All @@ -464,7 +462,6 @@ async fn configure_datastore(config: Config, server_config: &ServerConfig) -> Re
error!("Failed to parse Opensearch version, EveBox likely won't work properly");
}
warn!("Opensearch support is still a work in progress");
runtime_mappings_supported = false;
} else {
info!(
"Found Elasticsearch version {}; Index={}; ECS={}",
Expand Down Expand Up @@ -494,7 +491,6 @@ async fn configure_datastore(config: Config, server_config: &ServerConfig) -> Re
index_pattern,
client: client.clone(),
ecs: server_config.elastic_ecs,
runtime_mappings_supported,
};
debug!("Elasticsearch base index: {}", &eventstore.base_index);
debug!(
Expand Down

0 comments on commit 11cb8b7

Please sign in to comment.