Skip to content

Commit

Permalink
"Add 'otel' feature for OpenTelemetry support
Browse files Browse the repository at this point in the history
Enabling the 'otel' feature conditionally includes OpenTelemetry and related tracing functionalities, allowing method-level tracing and context propagation."

Signed-off-by: Likhith Thammegowda <[email protected]>
  • Loading branch information
LikhithST committed Nov 5, 2024
1 parent 1247130 commit 2b15924
Show file tree
Hide file tree
Showing 10 changed files with 552 additions and 52 deletions.
329 changes: 283 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tracing-subscriber = { version = "0.3.11", default-features = false, features =
"env-filter",
"ansi",
] }

clap = { workspace = true, features = [
"std",
"env",
Expand All @@ -66,6 +67,12 @@ futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

# OTEL
opentelemetry = { version = "0.19.0", optional = true, features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version="0.12.0", optional = true, features = ["tonic", "metrics"] }
opentelemetry-semantic-conventions = { version="0.11.0", optional = true }
tracing-opentelemetry = { version="0.19.0", optional = true }

# systemd related dependency, only relevant on linux systems
[target.'cfg(target_os = "linux")'.dependencies]
sd-notify = "0.4.1"
Expand All @@ -76,6 +83,7 @@ tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
libtest = []
otel = ["dep:chrono", "dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tracing-opentelemetry"]

[build-dependencies]
anyhow = "1.0"
Expand Down
98 changes: 96 additions & 2 deletions databroker/src/broker.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions databroker/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Matcher {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex_string", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex_string(glob: &str) -> String {
// Construct regular expression

Expand Down Expand Up @@ -121,6 +122,7 @@ pub fn to_regex_string(glob: &str) -> String {
re
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex(glob: &str) -> Result<Regex, Error> {
let re = to_regex_string(glob);
Regex::new(&re).map_err(|_err| Error::RegexError)
Expand Down Expand Up @@ -160,6 +162,7 @@ lazy_static! {
.expect("regex compilation (of static pattern) should always succeed");
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_is_valid_pattern", skip(input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn is_valid_pattern(input: &str) -> bool {
REGEX_VALID_PATTERN.is_match(input)
}
Expand Down
3 changes: 3 additions & 0 deletions databroker/src/grpc/kuksa_val_v1/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl From<broker::DataValue> for Option<proto::Datapoint> {
}

impl From<Option<proto::datapoint::Value>> for broker::DataValue {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<Option<proto::datapoint::Value>>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: Option<proto::datapoint::Value>) -> Self {
match from {
Some(value) => match value {
Expand Down Expand Up @@ -316,6 +317,7 @@ impl From<proto::Datapoint> for broker::Datapoint {
}

impl From<broker::EntryUpdate> for proto::DataEntry {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<broker::EntryUpdate>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: broker::EntryUpdate) -> Self {
Self {
path: from.path.unwrap_or_default(),
Expand All @@ -331,6 +333,7 @@ impl From<broker::EntryUpdate> for proto::DataEntry {
metadata: {
let metadata = proto::Metadata {
unit: from.unit,
description: from.description,
..Default::default()
};
Some(metadata)
Expand Down
92 changes: 88 additions & 4 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::collections::HashMap;
use std::collections::HashSet;
use std::iter::FromIterator;
Expand All @@ -24,8 +24,18 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use tonic::{Response, Status, Streaming};
use tracing::debug;
use tracing::info;
use tracing::{debug, info};

#[cfg(feature="otel")]
use {
tracing_opentelemetry::OpenTelemetrySpanExt,
tonic::metadata::KeyAndValueRef,
opentelemetry::global,
};





use crate::broker;
use crate::broker::ReadError;
Expand Down Expand Up @@ -255,11 +265,24 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

#[cfg_attr(feature="otel",tracing::instrument(name="val_set",skip(self, request), fields(trace_id, timestamp= chrono::Utc::now().to_string())))]
async fn set(
&self,
request: tonic::Request<proto::SetRequest>,
) -> Result<tonic::Response<proto::SetResponse>, tonic::Status> {
debug!(?request);

#[cfg(feature="otel")]
let request = (||{
let (trace_id, request) = read_incoming_trace_id(request);
let metadata = request.metadata();
let cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&MetadataMapExtractor(&metadata))
});
tracing::Span::current().record("trace_id", &trace_id).set_parent(cx);
request
})();

let permissions = match request.extensions().get::<Permissions>() {
Some(permissions) => {
debug!(?permissions);
Expand Down Expand Up @@ -471,6 +494,7 @@ impl proto::val_server::Val for broker::DataBroker {
>,
>;

#[cfg_attr(feature="otel", tracing::instrument(name="subscribe", skip(self, request), fields(trace_id, timestamp=chrono::Utc::now().to_string())))]
async fn subscribe(
&self,
request: tonic::Request<proto::SubscribeRequest>,
Expand Down Expand Up @@ -661,6 +685,7 @@ async fn validate_entry_update(
Ok((id, update))
}

#[cfg_attr(feature="otel", tracing::instrument(name="val_convert_to_data_entry_error", skip(path, error), fields(timestamp=chrono::Utc::now().to_string())))]
fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> DataEntryError {
match error {
broker::UpdateError::NotFound => DataEntryError {
Expand Down Expand Up @@ -714,6 +739,7 @@ fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> Da
}
}

#[cfg_attr(feature="otel", tracing::instrument(name = "val_convert_to_proto_stream", skip(input), fields(timestamp=chrono::Utc::now().to_string())))]
fn convert_to_proto_stream(
input: impl Stream<Item = broker::EntryUpdates>,
) -> impl Stream<Item = Result<proto::SubscribeResponse, tonic::Status>> {
Expand Down Expand Up @@ -955,7 +981,54 @@ fn combine_view_and_fields(
combined
}

// Metadata extractor for gRPC
#[cfg(feature="otel")]
struct MetadataMapExtractor<'a>(&'a tonic::metadata::MetadataMap);

#[cfg(feature="otel")]
impl<'a> opentelemetry::propagation::Extractor for MetadataMapExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|val| val.to_str().ok())
}

/// Collect all the keys from the HeaderMap.
fn keys(&self) -> Vec<&str> {
self.0.iter()
.filter_map(|kv| {
if let KeyAndValueRef::Ascii(key, _) = kv {
Some(key.as_str())
} else {
None
}
})
.collect()
}
}

#[cfg(feature="otel")]
#[cfg_attr(feature="otel", tracing::instrument(name="val_read_incoming_trace_id", skip(request), fields(timestamp=chrono::Utc::now().to_string())))]
fn read_incoming_trace_id(request: tonic::Request<proto::SetRequest>) -> (String, tonic::Request<proto::SetRequest>){
let mut trace_id: String = String::from("");
let request_copy = tonic::Request::new(request.get_ref().clone());
for request in request_copy.into_inner().updates {
match &request.entry {
Some(entry) => match &entry.metadata {
Some(metadata) => match &metadata.description{
Some(description)=> {
trace_id = String::from(description);
}
None => trace_id = String::from("")
}
None => trace_id = String::from("")
}
None => trace_id = String::from("")
}
}
return(trace_id, request);
}

impl broker::EntryUpdate {
#[cfg_attr(feature="otel", tracing::instrument(name = "val_from_proto_entry_and_fields",skip(entry,fields), fields(timestamp=chrono::Utc::now().to_string())))]
fn from_proto_entry_and_fields(
entry: &proto::DataEntry,
fields: HashSet<proto::Field>,
Expand All @@ -976,13 +1049,24 @@ impl broker::EntryUpdate {
} else {
None
};
let metadata_description = if fields.contains(&proto::Field::MetadataDescription) {
match &entry.metadata {
Some(metadata) => match &metadata.description {
Some(description) => Some(description),
None => None,
}
None => None,
}
} else {
None
};
Self {
path: None,
datapoint,
actuator_target,
entry_type: None,
data_type: None,
description: None,
description: metadata_description.cloned(),
allowed: None,
unit: None,
}
Expand Down
36 changes: 36 additions & 0 deletions databroker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub mod permissions;
pub mod query;
pub mod types;
pub mod vss;
pub mod open_telemetry;


#[cfg(feature = "viss")]
pub mod viss;
Expand All @@ -28,6 +30,15 @@ use std::fmt::Write;
use tracing::info;
use tracing_subscriber::filter::EnvFilter;

#[cfg(feature="otel")]
use {
tracing_subscriber::layer::SubscriberExt,
open_telemetry::init_trace,
opentelemetry::global,
opentelemetry::sdk::propagation::TraceContextPropagator,
};

#[cfg(not(feature="otel"))]
pub fn init_logging() {
let mut output = String::from("Init logging from RUST_LOG");
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|err| {
Expand All @@ -42,3 +53,28 @@ pub fn init_logging() {

info!("{}", output);
}

#[cfg(feature="otel")]
pub fn init_logging() {
let output = String::from("Init logging from RUST_LOG");

// Set OpenTelemetry trace propagator
global::set_text_map_propagator(TraceContextPropagator::new());

// Initialize OpenTelemetry tracer
let tracer = init_trace().expect("Failed to initialize tracer");

// telemetry layer
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

let subscriber = tracing_subscriber::fmt::Subscriber::builder()
.with_max_level(tracing::Level::INFO) // adjust this log level as needed
.finish()
.with(telemetry); // Add telemetry layer

// Set the subscriber as the global default for tracing
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to install global logging subscriber");

info!("{}", output);
}
26 changes: 26 additions & 0 deletions databroker/src/open_telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#[cfg(feature="otel")]
use {
opentelemetry::{KeyValue, runtime},
opentelemetry::sdk::{Resource, trace},
opentelemetry::trace::TraceError,
opentelemetry_otlp::WithExportConfig,
std::env
};

#[cfg(feature="otel")]
pub fn init_trace() -> Result<trace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(env::var("OTEL_ENDPOINT").unwrap_or_else(|_| "http://localhost:4317".to_string())),
).with_batch_config(trace::BatchConfig::default()) // to change default of max_queue_size use .with_max_queue_size(8192) or set env OTEL_BSP_MAX_QUEUE_SIZE, by default it is set to 2_048
.with_trace_config(
trace::config().with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"kuksa-rust-app",
)])),
)
.install_batch(runtime::Tokio)
}
4 changes: 4 additions & 0 deletions databroker/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl Permissions {
Err(PermissionError::Denied)
}

#[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_actuator_target", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn can_write_actuator_target(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;

Expand All @@ -195,6 +196,7 @@ impl Permissions {
Err(PermissionError::Denied)
}

#[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_datapoint", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn can_write_datapoint(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;

Expand All @@ -213,6 +215,7 @@ impl Permissions {
Err(PermissionError::Denied)
}

#[cfg_attr(feature="otel", tracing::instrument(name="permissions_expired", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
#[inline]
pub fn expired(&self) -> Result<(), PermissionError> {
if let Some(expires_at) = self.expires_at {
Expand All @@ -225,6 +228,7 @@ impl Permissions {
}

impl PathMatcher {
#[cfg_attr(feature="otel", tracing::instrument(name="permissions_is_match", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn is_match(&self, path: &str) -> bool {
match self {
PathMatcher::Nothing => false,
Expand Down
5 changes: 5 additions & 0 deletions databroker/src/query/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub trait ExecutionInput {
}

impl CompiledQuery {
#[cfg_attr(feature="otel", tracing::instrument(name="executor_execute_internal", skip(query, input), fields(timestamp=chrono::Utc::now().to_string())))]
fn execute_internal(
query: &CompiledQuery,
input: &impl ExecutionInput,
Expand Down Expand Up @@ -157,6 +158,8 @@ impl CompiledQuery {
Ok(None)
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="executor_execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn execute(
&self,
input: &impl ExecutionInput,
Expand All @@ -166,6 +169,7 @@ impl CompiledQuery {
}

impl Expr {
#[cfg_attr(feature="otel", tracing::instrument(name="execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn execute(&self, input: &impl ExecutionInput) -> Result<DataValue, ExecutionError> {
match &self {
Expr::Datapoint {
Expand Down Expand Up @@ -396,6 +400,7 @@ impl ExecutionInput for ExecutionInputImpl {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="executor_get_fields", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
fn get_fields(&self) -> &HashMap<String, ExecutionInputImplData> {
&self.fields
}
Expand Down

0 comments on commit 2b15924

Please sign in to comment.