From 2b159243cd7b46f56f8e68ef619988c5f407f61e Mon Sep 17 00:00:00 2001 From: Likhith Thammegowda Date: Tue, 5 Nov 2024 18:16:38 +0100 Subject: [PATCH] "Add 'otel' feature for OpenTelemetry support Enabling the 'otel' feature conditionally includes OpenTelemetry and related tracing functionalities, allowing method-level tracing and context propagation." Signed-off-by: Likhith Thammegowda --- Cargo.lock | 329 +++++++++++++++--- databroker/Cargo.toml | 8 + databroker/src/broker.rs | 98 +++++- databroker/src/glob.rs | 3 + .../src/grpc/kuksa_val_v1/conversions.rs | 3 + databroker/src/grpc/kuksa_val_v1/val.rs | 92 ++++- databroker/src/lib.rs | 36 ++ databroker/src/open_telemetry.rs | 26 ++ databroker/src/permissions.rs | 4 + databroker/src/query/executor.rs | 5 + 10 files changed, 552 insertions(+), 52 deletions(-) create mode 100644 databroker/src/open_telemetry.rs diff --git a/Cargo.lock b/Cargo.lock index 4dc45ed1..6a18de88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,7 +160,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -171,7 +171,7 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -434,7 +434,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -498,6 +498,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -553,7 +562,7 @@ dependencies = [ "globwalk", "humantime", "inventory", - "itertools", + "itertools 0.12.1", "lazy-regex", "linked-hash-map", "once_cell", @@ -573,11 +582,11 @@ checksum = "01091e28d1f566c8b31b67948399d2efd6c0a8f6228a9785519ed7b73f7f0aef" dependencies = [ "cucumber-expressions", "inflections", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", "regex", - "syn", + "syn 2.0.77", "synthez", ] @@ -595,6 +604,19 @@ dependencies = [ "regex-syntax 0.7.5", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -618,7 +640,10 @@ dependencies = [ "kuksa", "kuksa-common", "lazy_static", - "prost", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "prost 0.12.6", "prost-types", "regex", "sd-notify", @@ -628,9 +653,10 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", "tonic-mock", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "uuid", "vergen", @@ -648,22 +674,22 @@ dependencies = [ "kuksa-common", "kuksa-sdv", "linefeed", - "prost", + "prost 0.12.6", "prost-types", "regex", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] name = "databroker-proto" version = "0.4.7-dev.0" dependencies = [ - "prost", + "prost 0.12.6", "prost-types", "protobuf-src", - "tonic", + "tonic 0.11.0", "tonic-build", ] @@ -684,7 +710,7 @@ checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -879,7 +905,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -957,7 +983,7 @@ dependencies = [ "quote", "serde", "serde_json", - "syn", + "syn 2.0.77", "textwrap", "thiserror", "typed-builder", @@ -1241,7 +1267,7 @@ checksum = "999ce923619f88194171a67fb3e6d613653b8d4d6078b529b15a765da0edcc17" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -1731,6 +1757,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -1799,7 +1834,7 @@ dependencies = [ "kuksa-common", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] @@ -1810,7 +1845,7 @@ dependencies = [ "http", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] @@ -1822,7 +1857,7 @@ dependencies = [ "kuksa-common", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] @@ -1845,7 +1880,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn", + "syn 2.0.77", ] [[package]] @@ -2095,6 +2130,94 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca" +dependencies = [ + "async-trait", + "futures", + "futures-util", + "http", + "opentelemetry", + "opentelemetry-proto", + "prost 0.11.9", + "thiserror", + "tokio", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c" +dependencies = [ + "futures", + "futures-util", + "opentelemetry", + "prost 0.11.9", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24e33428e6bf08c6f7fcea4ddb8e358fab0fe48ab877a87c70c6ebe20f673ce5" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "opentelemetry_api" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "overload" version = "0.1.1" @@ -2232,7 +2355,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2269,7 +2392,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.77", ] [[package]] @@ -2287,6 +2410,16 @@ version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "744a264d26b88a6a7e37cbad97953fa233b94d585236310bcbc88474b4092d79" +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.12.6" @@ -2294,7 +2427,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.6", ] [[package]] @@ -2305,19 +2438,32 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools", + "itertools 0.12.1", "log", "multimap", "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.12.6", "prost-types", "regex", - "syn", + "syn 2.0.77", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.12.6" @@ -2325,10 +2471,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2337,7 +2483,7 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" dependencies = [ - "prost", + "prost 0.12.6", ] [[package]] @@ -2604,7 +2750,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2633,7 +2779,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2771,7 +2917,7 @@ checksum = "0eb01866308440fc64d6c44d9e86c5cc17adfe33c4d6eed55da9145044d0ffc1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2817,6 +2963,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.77" @@ -2840,7 +2997,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3d2c2202510a1e186e63e596d9318c91a8cbe85cd1a56a7be0c333e5f59ec8d" dependencies = [ - "syn", + "syn 2.0.77", "synthez-codegen", "synthez-core", ] @@ -2851,7 +3008,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f724aa6d44b7162f3158a57bccd871a77b39a4aef737e01bcdff41f4772c7746" dependencies = [ - "syn", + "syn 2.0.77", "synthez-core", ] @@ -2864,7 +3021,7 @@ dependencies = [ "proc-macro2", "quote", "sealed", - "syn", + "syn 2.0.77", ] [[package]] @@ -2931,7 +3088,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -3027,7 +3184,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -3078,6 +3235,38 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.9", + "prost-derive 0.11.9", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tonic" version = "0.11.0" @@ -3096,7 +3285,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.6", "rustls-pemfile", "rustls-pki-types", "tokio", @@ -3118,7 +3307,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -3131,8 +3320,8 @@ dependencies = [ "futures", "http", "http-body", - "prost", - "tonic", + "prost 0.12.6", + "tonic 0.11.0", ] [[package]] @@ -3187,7 +3376,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -3197,6 +3386,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "tracing-log" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00a39dcf9bfc1742fa4d6215253b33a6e474be78275884c216fc2a06267b3600" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", ] [[package]] @@ -3257,7 +3482,7 @@ checksum = "29a3151c41d0b13e3d011f98adc24434560ef06673a155a6c7f66b9879eecce2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -3322,6 +3547,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -3343,6 +3574,12 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vergen" version = "8.3.2" @@ -3417,7 +3654,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.77", "wasm-bindgen-shared", ] @@ -3439,7 +3676,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3665,7 +3902,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] diff --git a/databroker/Cargo.toml b/databroker/Cargo.toml index 492b0db7..688afb74 100644 --- a/databroker/Cargo.toml +++ b/databroker/Cargo.toml @@ -42,6 +42,7 @@ tracing-subscriber = { version = "0.3.11", default-features = false, features = "env-filter", "ansi", ] } + clap = { workspace = true, features = [ "std", "env", @@ -66,6 +67,12 @@ futures = { version = "0.3.28", optional = true } chrono = { version = "0.4.31", optional = true, features = ["std"] } uuid = { version = "1.4.1", optional = true, features = ["v4"] } +# OTEL +opentelemetry = { version = "0.19.0", optional = true, features = ["rt-tokio", "trace"] } +opentelemetry-otlp = { version="0.12.0", optional = true, features = ["tonic", "metrics"] } +opentelemetry-semantic-conventions = { version="0.11.0", optional = true } +tracing-opentelemetry = { version="0.19.0", optional = true } + # systemd related dependency, only relevant on linux systems [target.'cfg(target_os = "linux")'.dependencies] sd-notify = "0.4.1" @@ -76,6 +83,7 @@ tls = ["tonic/tls"] jemalloc = ["dep:jemallocator"] viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"] libtest = [] +otel = ["dep:chrono", "dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tracing-opentelemetry"] [build-dependencies] anyhow = "1.0" diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 4e098067..ea631358 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -31,6 +31,13 @@ use crate::query::{CompiledQuery, ExecutionInput}; use crate::types::ExecutionInputImplData; use tracing::{debug, info, warn}; +#[cfg(feature="otel")] +use { +tonic::{metadata::MetadataMap, metadata::MetadataValue, metadata::MetadataKey, metadata::KeyAndValueRef}, +opentelemetry, +tracing_opentelemetry::OpenTelemetrySpanExt, +}; + use crate::glob; #[derive(Debug)] @@ -186,6 +193,7 @@ pub struct EntryUpdate { } impl Entry { + #[cfg_attr(feature="otel",tracing::instrument(name="broker_diff", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))] pub fn diff(&self, mut update: EntryUpdate) -> EntryUpdate { if let Some(datapoint) = &update.datapoint { if self.metadata.change_type != ChangeType::Continuous { @@ -205,6 +213,7 @@ impl Entry { update } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))] pub fn validate(&self, update: &EntryUpdate) -> Result<(), UpdateError> { if let Some(datapoint) = &update.datapoint { self.validate_value(&datapoint.value)?; @@ -222,6 +231,7 @@ impl Entry { Ok(()) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed_type", skip(self, allowed), fields(timestamp=chrono::Utc::now().to_string())))] pub fn validate_allowed_type(&self, allowed: &Option) -> Result<(), UpdateError> { if let Some(allowed_values) = allowed { match (allowed_values, &self.metadata.data_type) { @@ -249,6 +259,7 @@ impl Entry { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))] fn validate_allowed(&self, value: &DataValue) -> Result<(), UpdateError> { // check if allowed value if let Some(allowed_values) = &self.metadata.allowed { @@ -380,6 +391,7 @@ impl Entry { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_value", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))] fn validate_value(&self, value: &DataValue) -> Result<(), UpdateError> { // Not available is always valid if value == &DataValue::NotAvailable { @@ -564,10 +576,12 @@ impl Entry { } } + #[cfg_attr(feature="otel", tracing::instrument(name="apply_lag_after_execute", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn apply_lag_after_execute(&mut self) { self.lag_datapoint = self.datapoint.clone(); } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_apply", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))] pub fn apply(&mut self, update: EntryUpdate) -> HashSet { let mut changed = HashSet::new(); if let Some(datapoint) = update.datapoint { @@ -579,7 +593,10 @@ impl Entry { self.actuator_target = actuator_target; changed.insert(Field::ActuatorTarget); } - + if let Some(metadata_description) = update.description { + self.metadata.description = metadata_description; + // changed.insert(Field::ActuatorTarget); + } if let Some(updated_allowed) = update.allowed { if updated_allowed != self.metadata.allowed { self.metadata.allowed = updated_allowed; @@ -603,10 +620,12 @@ impl Subscriptions { self.query_subscriptions.push(subscription) } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_add_change_subscription",skip(self, subscription), fields(timestamp=chrono::Utc::now().to_string())))] pub fn add_change_subscription(&mut self, subscription: ChangeSubscription) { self.change_subscriptions.push(subscription) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_Subscriptions_notify", skip(self, changed, db)))] pub async fn notify( &self, changed: Option<&HashMap>>, @@ -652,6 +671,7 @@ impl Subscriptions { self.change_subscriptions.clear(); } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_cleanup", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn cleanup(&mut self) { self.query_subscriptions.retain(|sub| { if sub.sender.is_closed() { @@ -682,13 +702,52 @@ impl Subscriptions { } } + +#[cfg(feature="otel")] +struct MetadataMapInjector<'a>(&'a mut MetadataMap); + +#[cfg(feature="otel")] +impl<'a> opentelemetry::propagation::Injector for MetadataMapInjector<'a> { + fn set(&mut self, key: &str, value: String) { + if let Ok(metadata_key) = MetadataKey::from_bytes(key.as_bytes()) { + let metadata_value = MetadataValue::try_from(value.as_str()).unwrap(); + self.0.insert(metadata_key, metadata_value); // Insert key and value into metadata + } + } +} + +#[cfg(feature="otel")] +fn metadatamap_to_string(metadata: &MetadataMap) -> String { + let mut result = String::new(); + + for entry in metadata.iter() { + match entry { + // Handle ASCII metadata + KeyAndValueRef::Ascii(key, value) => { + // `.to_str()` returns a `Result<&str, ToStrError>`, so we need to handle it + let value_str = value.to_str().unwrap_or(""); + result.push_str(&format!("{}: {}\n", key, value_str)); + } + + // Handle binary metadata separately + _ => (), + } + } + result +} + impl ChangeSubscription { + #[cfg_attr(feature="otel", tracing::instrument(name="broker_ChangeSubscription_notify", skip(self, changed, db)))] async fn notify( &self, changed: Option<&HashMap>>, db: &Database, ) -> Result<(), NotificationError> { let db_read = db.authorized_read_access(&self.permissions); + + #[cfg(feature="otel")] + let current_span = tracing::Span::current(); + match changed { Some(changed) => { let mut matches = false; @@ -728,6 +787,19 @@ impl ChangeSubscription { } // fill unit field always update.unit.clone_from(&entry.metadata.unit); + update.description = Some(entry.metadata.description.clone()); + #[cfg(feature = "otel")] // This block will only compile if the "otel" feature is enabled + { + let mut metadata = MetadataMap::new(); + // @TODO: Speak to Kuksa team regarding MetadataMap in proto file + let mut injector = MetadataMapInjector(&mut metadata); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(¤t_span.context(), &mut injector); + }); + let description = metadatamap_to_string(&metadata); + + update.description = Some(description); + } notifications.updates.push(ChangeNotification { update, fields: notify_fields, @@ -769,6 +841,7 @@ impl ChangeSubscription { let mut notify_fields = HashSet::new(); // TODO: Perhaps make path optional update.path = Some(entry.metadata.path.clone()); + update.description = Some(entry.metadata.description.clone()); if fields.contains(&Field::Datapoint) { update.datapoint = Some(entry.datapoint.clone()); notify_fields.insert(Field::Datapoint); @@ -799,6 +872,7 @@ impl ChangeSubscription { } impl QuerySubscription { + #[cfg_attr(feature="otel", tracing::instrument(name="broker_find_in_db_and_add", skip(self, name, db, input), fields(timestamp=chrono::Utc::now().to_string())))] fn find_in_db_and_add( &self, name: &String, @@ -827,6 +901,7 @@ impl QuerySubscription { } } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_check_if_changes_match", skip(query, changed_origin, db), fields(timestamp=chrono::Utc::now().to_string())))] fn check_if_changes_match( query: &CompiledQuery, changed_origin: Option<&HashMap>>, @@ -862,6 +937,7 @@ impl QuerySubscription { } false } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input_list", skip(self, query, db, input), fields(timestamp=chrono::Utc::now().to_string())))] fn generate_input_list( &self, query: &CompiledQuery, @@ -877,6 +953,7 @@ impl QuerySubscription { } } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))] fn generate_input( &self, changed: Option<&HashMap>>, @@ -893,6 +970,7 @@ impl QuerySubscription { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_query_subscription_notify", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))] async fn notify( &self, changed: Option<&HashMap>>, @@ -952,6 +1030,7 @@ pub enum EntryReadAccess<'a> { } impl<'a> EntryReadAccess<'a> { + #[cfg_attr(feature="otel", tracing::instrument(name="broker_datapoint", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn datapoint(&self) -> Result<&Datapoint, ReadError> { match self { Self::Entry(entry) => Ok(&entry.datapoint), @@ -966,6 +1045,7 @@ impl<'a> EntryReadAccess<'a> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_metadata", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn metadata(&self) -> &Metadata { match self { Self::Entry(entry) => &entry.metadata, @@ -1008,6 +1088,7 @@ impl<'a, 'b> Iterator for EntryReadIterator<'a, 'b> { } impl<'a, 'b> DatabaseReadAccess<'a, 'b> { + #[cfg_attr(feature="otel", tracing::instrument(name="get_entry_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))] pub fn get_entry_by_id(&self, id: i32) -> Result<&Entry, ReadError> { match self.db.entries.get(&id) { Some(entry) => match self.permissions.can_read(&entry.metadata.path) { @@ -1026,15 +1107,18 @@ impl<'a, 'b> DatabaseReadAccess<'a, 'b> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))] pub fn get_metadata_by_id(&self, id: i32) -> Option<&Metadata> { self.db.entries.get(&id).map(|entry| &entry.metadata) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_path", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn get_metadata_by_path(&self, path: &str) -> Option<&Metadata> { let id = self.db.path_to_id.get(path)?; self.get_metadata_by_id(*id) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_iter_entries", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn iter_entries(&self) -> EntryReadIterator { EntryReadIterator { inner: self.db.entries.values(), @@ -1055,6 +1139,7 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_update_entry_lag_to_be_equal", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn update_entry_lag_to_be_equal(&mut self, path: &str) -> Result<(), UpdateError> { match self.db.path_to_id.get(path) { Some(id) => match self.db.entries.get_mut(id) { @@ -1068,13 +1153,14 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_update", skip(self, id, update), fields(timestamp=chrono::Utc::now().to_string())))] pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result, UpdateError> { match self.db.entries.get_mut(&id) { Some(entry) => { if update.path.is_some() || update.entry_type.is_some() || update.data_type.is_some() - || update.description.is_some() + // || update.description.is_some() { return Err(UpdateError::PermissionDenied); } @@ -1212,6 +1298,7 @@ impl Database { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_read_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))] pub fn authorized_read_access<'a, 'b>( &'a self, permissions: &'b Permissions, @@ -1222,6 +1309,7 @@ impl Database { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_write_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))] pub fn authorized_write_access<'a, 'b>( &'a mut self, permissions: &'b Permissions, @@ -1285,6 +1373,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .authorized_read_access(self.permissions)) } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_get_id_by_path", skip(self, name) fields(timestamp=chrono::Utc::now().to_string())))] pub async fn get_id_by_path(&self, name: &str) -> Option { self.broker .database @@ -1315,6 +1404,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .map(|entry| entry.datapoint.clone()) } + #[cfg_attr(feature="otel", tracing::instrument(name="get_metadata", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn get_metadata(&self, id: i32) -> Option { self.broker .database @@ -1355,6 +1445,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .cloned() } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_for_each_entry", skip(self, f), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn for_each_entry(&self, f: impl FnMut(EntryReadAccess)) { self.broker .database @@ -1390,6 +1481,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .collect() } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_update_entries",skip(self, updates), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn update_entries( &self, updates: impl IntoIterator, @@ -1461,6 +1553,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { } } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_subscribe", skip(self, valid_entries), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn subscribe( &self, valid_entries: HashMap>, @@ -1544,6 +1637,7 @@ impl DataBroker { } } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_authorized_access",skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))] pub fn authorized_access<'a, 'b>( &'a self, permissions: &'b Permissions, diff --git a/databroker/src/glob.rs b/databroker/src/glob.rs index bc3f4327..ca071364 100644 --- a/databroker/src/glob.rs +++ b/databroker/src/glob.rs @@ -74,6 +74,7 @@ impl Matcher { } } +#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex_string", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))] pub fn to_regex_string(glob: &str) -> String { // Construct regular expression @@ -121,6 +122,7 @@ pub fn to_regex_string(glob: &str) -> String { re } +#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))] pub fn to_regex(glob: &str) -> Result { let re = to_regex_string(glob); Regex::new(&re).map_err(|_err| Error::RegexError) @@ -160,6 +162,7 @@ lazy_static! { .expect("regex compilation (of static pattern) should always succeed"); } +#[cfg_attr(feature="otel", tracing::instrument(name="glob_is_valid_pattern", skip(input), fields(timestamp=chrono::Utc::now().to_string())))] pub fn is_valid_pattern(input: &str) -> bool { REGEX_VALID_PATTERN.is_match(input) } diff --git a/databroker/src/grpc/kuksa_val_v1/conversions.rs b/databroker/src/grpc/kuksa_val_v1/conversions.rs index d9b972d1..6dc5d197 100644 --- a/databroker/src/grpc/kuksa_val_v1/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v1/conversions.rs @@ -236,6 +236,7 @@ impl From for Option { } impl From> for broker::DataValue { + #[cfg_attr(feature="otel", tracing::instrument(name="conversion_From>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))] fn from(from: Option) -> Self { match from { Some(value) => match value { @@ -316,6 +317,7 @@ impl From for broker::Datapoint { } impl From for proto::DataEntry { + #[cfg_attr(feature="otel", tracing::instrument(name="conversion_From", skip(from), fields(timestamp=chrono::Utc::now().to_string())))] fn from(from: broker::EntryUpdate) -> Self { Self { path: from.path.unwrap_or_default(), @@ -331,6 +333,7 @@ impl From for proto::DataEntry { metadata: { let metadata = proto::Metadata { unit: from.unit, + description: from.description, ..Default::default() }; Some(metadata) diff --git a/databroker/src/grpc/kuksa_val_v1/val.rs b/databroker/src/grpc/kuksa_val_v1/val.rs index 9cbbfc8d..8d6ddb9c 100644 --- a/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/databroker/src/grpc/kuksa_val_v1/val.rs @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ - + use std::collections::HashMap; use std::collections::HashSet; use std::iter::FromIterator; @@ -24,8 +24,18 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::Stream; use tokio_stream::StreamExt; use tonic::{Response, Status, Streaming}; -use tracing::debug; -use tracing::info; +use tracing::{debug, info}; + +#[cfg(feature="otel")] +use { + tracing_opentelemetry::OpenTelemetrySpanExt, + tonic::metadata::KeyAndValueRef, + opentelemetry::global, +}; + + + + use crate::broker; use crate::broker::ReadError; @@ -255,11 +265,24 @@ impl proto::val_server::Val for broker::DataBroker { } } + #[cfg_attr(feature="otel",tracing::instrument(name="val_set",skip(self, request), fields(trace_id, timestamp= chrono::Utc::now().to_string())))] async fn set( &self, request: tonic::Request, ) -> Result, tonic::Status> { debug!(?request); + + #[cfg(feature="otel")] + let request = (||{ + let (trace_id, request) = read_incoming_trace_id(request); + let metadata = request.metadata(); + let cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&MetadataMapExtractor(&metadata)) + }); + tracing::Span::current().record("trace_id", &trace_id).set_parent(cx); + request + })(); + let permissions = match request.extensions().get::() { Some(permissions) => { debug!(?permissions); @@ -471,6 +494,7 @@ impl proto::val_server::Val for broker::DataBroker { >, >; + #[cfg_attr(feature="otel", tracing::instrument(name="subscribe", skip(self, request), fields(trace_id, timestamp=chrono::Utc::now().to_string())))] async fn subscribe( &self, request: tonic::Request, @@ -661,6 +685,7 @@ async fn validate_entry_update( Ok((id, update)) } +#[cfg_attr(feature="otel", tracing::instrument(name="val_convert_to_data_entry_error", skip(path, error), fields(timestamp=chrono::Utc::now().to_string())))] fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> DataEntryError { match error { broker::UpdateError::NotFound => DataEntryError { @@ -714,6 +739,7 @@ fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> Da } } +#[cfg_attr(feature="otel", tracing::instrument(name = "val_convert_to_proto_stream", skip(input), fields(timestamp=chrono::Utc::now().to_string())))] fn convert_to_proto_stream( input: impl Stream, ) -> impl Stream> { @@ -955,7 +981,54 @@ fn combine_view_and_fields( combined } +// Metadata extractor for gRPC +#[cfg(feature="otel")] +struct MetadataMapExtractor<'a>(&'a tonic::metadata::MetadataMap); + +#[cfg(feature="otel")] +impl<'a> opentelemetry::propagation::Extractor for MetadataMapExtractor<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|val| val.to_str().ok()) + } + + /// Collect all the keys from the HeaderMap. + fn keys(&self) -> Vec<&str> { + self.0.iter() + .filter_map(|kv| { + if let KeyAndValueRef::Ascii(key, _) = kv { + Some(key.as_str()) + } else { + None + } + }) + .collect() + } +} + +#[cfg(feature="otel")] +#[cfg_attr(feature="otel", tracing::instrument(name="val_read_incoming_trace_id", skip(request), fields(timestamp=chrono::Utc::now().to_string())))] +fn read_incoming_trace_id(request: tonic::Request) -> (String, tonic::Request){ + let mut trace_id: String = String::from(""); + let request_copy = tonic::Request::new(request.get_ref().clone()); + for request in request_copy.into_inner().updates { + match &request.entry { + Some(entry) => match &entry.metadata { + Some(metadata) => match &metadata.description{ + Some(description)=> { + trace_id = String::from(description); + } + None => trace_id = String::from("") + } + None => trace_id = String::from("") + } + None => trace_id = String::from("") + } + } + return(trace_id, request); +} + impl broker::EntryUpdate { + #[cfg_attr(feature="otel", tracing::instrument(name = "val_from_proto_entry_and_fields",skip(entry,fields), fields(timestamp=chrono::Utc::now().to_string())))] fn from_proto_entry_and_fields( entry: &proto::DataEntry, fields: HashSet, @@ -976,13 +1049,24 @@ impl broker::EntryUpdate { } else { None }; + let metadata_description = if fields.contains(&proto::Field::MetadataDescription) { + match &entry.metadata { + Some(metadata) => match &metadata.description { + Some(description) => Some(description), + None => None, + } + None => None, + } + } else { + None + }; Self { path: None, datapoint, actuator_target, entry_type: None, data_type: None, - description: None, + description: metadata_description.cloned(), allowed: None, unit: None, } diff --git a/databroker/src/lib.rs b/databroker/src/lib.rs index 49672dba..456cb8eb 100644 --- a/databroker/src/lib.rs +++ b/databroker/src/lib.rs @@ -19,6 +19,8 @@ pub mod permissions; pub mod query; pub mod types; pub mod vss; +pub mod open_telemetry; + #[cfg(feature = "viss")] pub mod viss; @@ -28,6 +30,15 @@ use std::fmt::Write; use tracing::info; use tracing_subscriber::filter::EnvFilter; +#[cfg(feature="otel")] +use { +tracing_subscriber::layer::SubscriberExt, +open_telemetry::init_trace, +opentelemetry::global, +opentelemetry::sdk::propagation::TraceContextPropagator, +}; + +#[cfg(not(feature="otel"))] pub fn init_logging() { let mut output = String::from("Init logging from RUST_LOG"); let filter = EnvFilter::try_from_default_env().unwrap_or_else(|err| { @@ -42,3 +53,28 @@ pub fn init_logging() { info!("{}", output); } + +#[cfg(feature="otel")] +pub fn init_logging() { + let output = String::from("Init logging from RUST_LOG"); + + // Set OpenTelemetry trace propagator + global::set_text_map_propagator(TraceContextPropagator::new()); + + // Initialize OpenTelemetry tracer + let tracer = init_trace().expect("Failed to initialize tracer"); + + // telemetry layer + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let subscriber = tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(tracing::Level::INFO) // adjust this log level as needed + .finish() + .with(telemetry); // Add telemetry layer + + // Set the subscriber as the global default for tracing + tracing::subscriber::set_global_default(subscriber) + .expect("Unable to install global logging subscriber"); + + info!("{}", output); +} diff --git a/databroker/src/open_telemetry.rs b/databroker/src/open_telemetry.rs new file mode 100644 index 00000000..b2e618ec --- /dev/null +++ b/databroker/src/open_telemetry.rs @@ -0,0 +1,26 @@ +#[cfg(feature="otel")] +use { + opentelemetry::{KeyValue, runtime}, + opentelemetry::sdk::{Resource, trace}, + opentelemetry::trace::TraceError, + opentelemetry_otlp::WithExportConfig, + std::env +}; + +#[cfg(feature="otel")] +pub fn init_trace() -> Result { + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(env::var("OTEL_ENDPOINT").unwrap_or_else(|_| "http://localhost:4317".to_string())), + ).with_batch_config(trace::BatchConfig::default()) // to change default of max_queue_size use .with_max_queue_size(8192) or set env OTEL_BSP_MAX_QUEUE_SIZE, by default it is set to 2_048 + .with_trace_config( + trace::config().with_resource(Resource::new(vec![KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + "kuksa-rust-app", + )])), + ) + .install_batch(runtime::Tokio) +} diff --git a/databroker/src/permissions.rs b/databroker/src/permissions.rs index 7da1eae1..f301c1af 100644 --- a/databroker/src/permissions.rs +++ b/databroker/src/permissions.rs @@ -186,6 +186,7 @@ impl Permissions { Err(PermissionError::Denied) } + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_actuator_target", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn can_write_actuator_target(&self, path: &str) -> Result<(), PermissionError> { self.expired()?; @@ -195,6 +196,7 @@ impl Permissions { Err(PermissionError::Denied) } + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_datapoint", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn can_write_datapoint(&self, path: &str) -> Result<(), PermissionError> { self.expired()?; @@ -213,6 +215,7 @@ impl Permissions { Err(PermissionError::Denied) } + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_expired", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] #[inline] pub fn expired(&self) -> Result<(), PermissionError> { if let Some(expires_at) = self.expires_at { @@ -225,6 +228,7 @@ impl Permissions { } impl PathMatcher { + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_is_match", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn is_match(&self, path: &str) -> bool { match self { PathMatcher::Nothing => false, diff --git a/databroker/src/query/executor.rs b/databroker/src/query/executor.rs index afbd9932..c6935547 100644 --- a/databroker/src/query/executor.rs +++ b/databroker/src/query/executor.rs @@ -37,6 +37,7 @@ pub trait ExecutionInput { } impl CompiledQuery { + #[cfg_attr(feature="otel", tracing::instrument(name="executor_execute_internal", skip(query, input), fields(timestamp=chrono::Utc::now().to_string())))] fn execute_internal( query: &CompiledQuery, input: &impl ExecutionInput, @@ -157,6 +158,8 @@ impl CompiledQuery { Ok(None) } } + + #[cfg_attr(feature="otel", tracing::instrument(name="executor_execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))] pub fn execute( &self, input: &impl ExecutionInput, @@ -166,6 +169,7 @@ impl CompiledQuery { } impl Expr { + #[cfg_attr(feature="otel", tracing::instrument(name="execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))] pub fn execute(&self, input: &impl ExecutionInput) -> Result { match &self { Expr::Datapoint { @@ -396,6 +400,7 @@ impl ExecutionInput for ExecutionInputImpl { } } + #[cfg_attr(feature="otel", tracing::instrument(name="executor_get_fields", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] fn get_fields(&self) -> &HashMap { &self.fields }