Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Add 'otel' feature for OpenTelemetry support #97

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 292 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tracing-subscriber = { version = "0.3.11", default-features = false, features =
"env-filter",
"ansi",
] }

clap = { workspace = true, features = [
"std",
"env",
Expand All @@ -68,6 +69,12 @@ axum = { version = "0.6.20", optional = true, features = ["ws"] }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

# OTEL
opentelemetry = { version = "0.19.0", optional = true, features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version="0.12.0", optional = true, features = ["tonic", "metrics"] }
opentelemetry-semantic-conventions = { version="0.11.0", optional = true }
tracing-opentelemetry = { version="0.19.0", optional = true }

# systemd related dependency, only relevant on linux systems
[target.'cfg(target_os = "linux")'.dependencies]
sd-notify = "0.4.1"
Expand All @@ -78,6 +85,7 @@ tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:uuid"]
libtest = []
otel = ["dep:chrono", "dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tracing-opentelemetry"]

[build-dependencies]
anyhow = "1.0"
Expand Down
84 changes: 79 additions & 5 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ use crate::query::{CompiledQuery, ExecutionInput};
use crate::types::ExecutionInputImplData;
use tracing::{debug, info, warn};

#[cfg(feature="otel")]
use {
tonic::{metadata::MetadataMap, metadata::MetadataValue, metadata::MetadataKey, metadata::KeyAndValueRef},
opentelemetry,
tracing_opentelemetry::OpenTelemetrySpanExt,
};

use crate::glob;

const MAX_SUBSCRIBE_BUFFER_SIZE: usize = 1000;
Expand Down Expand Up @@ -234,6 +241,7 @@ pub struct EntryUpdate {
}

impl Entry {
#[cfg_attr(feature="otel",tracing::instrument(name="broker_diff", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn diff(&self, mut update: EntryUpdate) -> EntryUpdate {
if let Some(datapoint) = &update.datapoint {
if self.metadata.change_type != ChangeType::Continuous {
Expand All @@ -258,7 +266,7 @@ impl Entry {
self.validate_allowed(data_value)?;
Ok(())
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn validate(&self, update: &EntryUpdate) -> Result<(), UpdateError> {
if let Some(datapoint) = &update.datapoint {
self.validate_value(&datapoint.value)?;
Expand All @@ -276,6 +284,7 @@ impl Entry {
Ok(())
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed_type", skip(self, allowed), fields(timestamp=chrono::Utc::now().to_string())))]
/**
* DataType is VSS type, where we have also smaller type based on 8/16 bits
* That we do not have for DataValue
Expand Down Expand Up @@ -319,6 +328,7 @@ impl Entry {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))]
fn validate_allowed(&self, value: &DataValue) -> Result<(), UpdateError> {
// check if allowed value
if let Some(allowed_values) = &self.metadata.allowed {
Expand Down Expand Up @@ -470,7 +480,8 @@ impl Entry {
}
Ok(())
}


#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_value", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))]
fn validate_value(&self, value: &DataValue) -> Result<(), UpdateError> {
// Not available is always valid
if value == &DataValue::NotAvailable {
Expand Down Expand Up @@ -706,10 +717,12 @@ impl Entry {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="apply_lag_after_execute", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn apply_lag_after_execute(&mut self) {
self.lag_datapoint = self.datapoint.clone();
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_apply", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn apply(&mut self, update: EntryUpdate) -> HashSet<Field> {
let mut changed = HashSet::new();
if let Some(datapoint) = update.datapoint {
Expand All @@ -721,7 +734,6 @@ impl Entry {
self.actuator_target = actuator_target;
changed.insert(Field::ActuatorTarget);
}

if let Some(updated_allowed) = update.allowed {
if updated_allowed != self.metadata.allowed {
self.metadata.allowed = updated_allowed;
Expand Down Expand Up @@ -749,10 +761,12 @@ impl Subscriptions {
self.query_subscriptions.push(subscription)
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_add_change_subscription",skip(self, subscription), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn add_change_subscription(&mut self, subscription: ChangeSubscription) {
self.change_subscriptions.push(subscription)
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_Subscriptions_notify", skip(self, changed, db)))]
pub async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -799,6 +813,7 @@ impl Subscriptions {
self.change_subscriptions.clear();
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_cleanup", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn cleanup(&mut self) {
self.query_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
Expand Down Expand Up @@ -834,13 +849,52 @@ impl Subscriptions {
}
}


#[cfg(feature="otel")]
struct MetadataMapInjector<'a>(&'a mut MetadataMap);

#[cfg(feature="otel")]
impl<'a> opentelemetry::propagation::Injector for MetadataMapInjector<'a> {
fn set(&mut self, key: &str, value: String) {
if let Ok(metadata_key) = MetadataKey::from_bytes(key.as_bytes()) {
let metadata_value = MetadataValue::try_from(value.as_str()).unwrap();
self.0.insert(metadata_key, metadata_value); // Insert key and value into metadata
}
}
}

#[cfg(feature="otel")]
fn metadatamap_to_string(metadata: &MetadataMap) -> String {
let mut result = String::new();

for entry in metadata.iter() {
match entry {
// Handle ASCII metadata
KeyAndValueRef::Ascii(key, value) => {
// `.to_str()` returns a `Result<&str, ToStrError>`, so we need to handle it
let value_str = value.to_str().unwrap_or("<invalid UTF-8>");
result.push_str(&format!("{}: {}\n", key, value_str));
}

// Handle binary metadata separately
_ => (),
}
}
result
}

impl ChangeSubscription {
#[cfg_attr(feature="otel", tracing::instrument(name="broker_ChangeSubscription_notify", skip(self, changed, db)))]
async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
db: &Database,
) -> Result<(), NotificationError> {
let db_read = db.authorized_read_access(&self.permissions);

#[cfg(feature="otel")]
let current_span = tracing::Span::current();

match changed {
Some(changed) => {
let mut matches = false;
Expand Down Expand Up @@ -959,6 +1013,7 @@ impl ChangeSubscription {
}

impl QuerySubscription {
#[cfg_attr(feature="otel", tracing::instrument(name="broker_find_in_db_and_add", skip(self, name, db, input), fields(timestamp=chrono::Utc::now().to_string())))]
fn find_in_db_and_add(
&self,
name: &String,
Expand Down Expand Up @@ -987,6 +1042,7 @@ impl QuerySubscription {
}
}
}
#[cfg_attr(feature="otel", tracing::instrument(name="broker_check_if_changes_match", skip(query, changed_origin, db), fields(timestamp=chrono::Utc::now().to_string())))]
fn check_if_changes_match(
query: &CompiledQuery,
changed_origin: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -1022,6 +1078,7 @@ impl QuerySubscription {
}
false
}
#[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input_list", skip(self, query, db, input), fields(timestamp=chrono::Utc::now().to_string())))]
fn generate_input_list(
&self,
query: &CompiledQuery,
Expand All @@ -1037,6 +1094,7 @@ impl QuerySubscription {
}
}
}
#[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))]
fn generate_input(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand All @@ -1053,6 +1111,7 @@ impl QuerySubscription {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_query_subscription_notify", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))]
async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -1110,8 +1169,9 @@ pub enum EntryReadAccess<'a> {
Entry(&'a Entry),
Err(&'a Metadata, ReadError),
}

impl EntryReadAccess<'_> {
#[cfg_attr(feature="otel", tracing::instrument(name="broker_datapoint", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn datapoint(&self) -> Result<&Datapoint, ReadError> {
match self {
Self::Entry(entry) => Ok(&entry.datapoint),
Expand All @@ -1126,6 +1186,7 @@ impl EntryReadAccess<'_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_metadata", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn metadata(&self) -> &Metadata {
match self {
Self::Entry(entry) => &entry.metadata,
Expand Down Expand Up @@ -1168,6 +1229,7 @@ impl<'a> Iterator for EntryReadIterator<'a, '_> {
}

impl DatabaseReadAccess<'_, '_> {
#[cfg_attr(feature="otel", tracing::instrument(name="get_entry_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn get_entry_by_id(&self, id: i32) -> Result<&Entry, ReadError> {
match self.db.entries.get(&id) {
Some(entry) => match self.permissions.can_read(&entry.metadata.path) {
Expand All @@ -1186,15 +1248,18 @@ impl DatabaseReadAccess<'_, '_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn get_metadata_by_id(&self, id: i32) -> Option<&Metadata> {
self.db.entries.get(&id).map(|entry| &entry.metadata)
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_path", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn get_metadata_by_path(&self, path: &str) -> Option<&Metadata> {
let id = self.db.path_to_id.get(path)?;
self.get_metadata_by_id(*id)
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_iter_entries", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn iter_entries(&self) -> EntryReadIterator {
EntryReadIterator {
inner: self.db.entries.values(),
Expand All @@ -1215,6 +1280,7 @@ impl DatabaseWriteAccess<'_, '_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_update_entry_lag_to_be_equal", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn update_entry_lag_to_be_equal(&mut self, path: &str) -> Result<(), UpdateError> {
match self.db.path_to_id.get(path) {
Some(id) => match self.db.entries.get_mut(id) {
Expand All @@ -1228,13 +1294,13 @@ impl DatabaseWriteAccess<'_, '_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_update", skip(self, id, update), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<HashSet<Field>, UpdateError> {
match self.db.entries.get_mut(&id) {
Some(entry) => {
if update.path.is_some()
|| update.entry_type.is_some()
|| update.data_type.is_some()
|| update.description.is_some()
{
return Err(UpdateError::PermissionDenied);
}
Expand Down Expand Up @@ -1376,6 +1442,7 @@ impl Database {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_read_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn authorized_read_access<'a, 'b>(
&'a self,
permissions: &'b Permissions,
Expand All @@ -1386,6 +1453,7 @@ impl Database {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_write_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn authorized_write_access<'a, 'b>(
&'a mut self,
permissions: &'b Permissions,
Expand Down Expand Up @@ -1453,6 +1521,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.authorized_read_access(self.permissions))
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_get_id_by_path", skip(self, name) fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn get_id_by_path(&self, name: &str) -> Option<i32> {
self.broker
.database
Expand Down Expand Up @@ -1483,6 +1552,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.map(|entry| entry.datapoint.clone())
}

#[cfg_attr(feature="otel", tracing::instrument(name="get_metadata", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn get_metadata(&self, id: i32) -> Option<Metadata> {
self.broker
.database
Expand Down Expand Up @@ -1523,6 +1593,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.cloned()
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_for_each_entry", skip(self, f), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn for_each_entry(&self, f: impl FnMut(EntryReadAccess)) {
self.broker
.database
Expand Down Expand Up @@ -1558,6 +1629,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.collect()
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_update_entries",skip(self, updates), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn update_entries(
&self,
updates: impl IntoIterator<Item = (i32, EntryUpdate)>,
Expand Down Expand Up @@ -1629,6 +1701,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_subscribe", skip(self, valid_entries), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
Expand Down Expand Up @@ -2023,6 +2096,7 @@ impl DataBroker {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_authorized_access",skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn authorized_access<'a, 'b>(
&'a self,
permissions: &'b Permissions,
Expand Down
3 changes: 3 additions & 0 deletions databroker/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Matcher {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex_string", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex_string(glob: &str) -> String {
// Construct regular expression

Expand Down Expand Up @@ -121,6 +122,7 @@ pub fn to_regex_string(glob: &str) -> String {
re
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex(glob: &str) -> Result<Regex, Error> {
let re = to_regex_string(glob);
Regex::new(&re).map_err(|_err| Error::RegexError)
Expand Down Expand Up @@ -160,6 +162,7 @@ lazy_static! {
.expect("regex compilation (of static pattern) should always succeed");
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_is_valid_pattern", skip(input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn is_valid_pattern(input: &str) -> bool {
REGEX_VALID_PATTERN.is_match(input)
}
Expand Down
2 changes: 2 additions & 0 deletions databroker/src/grpc/kuksa_val_v1/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl From<broker::DataValue> for Option<proto::Datapoint> {
}

impl From<Option<proto::datapoint::Value>> for broker::DataValue {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<Option<proto::datapoint::Value>>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: Option<proto::datapoint::Value>) -> Self {
match from {
Some(value) => match value {
Expand Down Expand Up @@ -316,6 +317,7 @@ impl From<proto::Datapoint> for broker::Datapoint {
}

impl From<broker::EntryUpdate> for proto::DataEntry {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<broker::EntryUpdate>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: broker::EntryUpdate) -> Self {
Self {
path: from.path.unwrap_or_default(),
Expand Down
Loading
Loading