Skip to content

Commit

Permalink
Upgrade to opentelemetry-rust 0.27
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave committed Jan 24, 2025
1 parent f96fe5e commit c23b10c
Show file tree
Hide file tree
Showing 29 changed files with 597 additions and 1,172 deletions.
184 changes: 49 additions & 135 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ kube = { version = "0.94.2", default-features = false, features = ["client", "ru
mockito = "1.6.1"
num_enum = "0.7.3"
ohttp = { version = "0.5.1", default-features = false }
opentelemetry = { version = "0.24", default-features = false, features = ["trace", "metrics"] }
opentelemetry-otlp = { version = "0.17", default-features = false, features = ["trace", "metrics", "grpc-tonic"] }
opentelemetry-prometheus = "0.17"
opentelemetry_sdk = { version = "0.24", default-features = false, features = ["trace", "metrics"] }
opentelemetry = { version = "0.27", default-features = false, features = ["trace", "metrics"] }
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["trace", "metrics", "grpc-tonic"] }
opentelemetry-prometheus = "0.27"
opentelemetry_sdk = { version = "0.27", default-features = false, features = ["trace", "metrics"] }
pem = "3"
postgres-protocol = "0.6.7"
postgres-types = "0.2.8"
Expand Down Expand Up @@ -108,7 +108,7 @@ thiserror = "2.0"
tracing = "0.1.41"
tracing-chrome = "0.7.2"
tracing-log = "0.2.0"
tracing-opentelemetry = "0.25"
tracing-opentelemetry = "0.28"
tracing-stackdriver = "0.10.0"
tracing-subscriber = "0.3"
tokio = { version = "1.43", features = ["full", "tracing"] }
Expand All @@ -120,7 +120,7 @@ trillium-api = { version = "0.2.0-rc.12", default-features = false }
trillium-caching-headers = "0.2.3"
trillium-head = "0.2.3"
trillium-macros = "0.0.6"
trillium-opentelemetry = "0.9.0"
trillium-opentelemetry = "0.10.0"
trillium-prometheus = "0.2.0"
trillium-proxy = { version = "0.5.5", default-features = false }
trillium-router = "0.4.1"
Expand Down
4 changes: 1 addition & 3 deletions aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ fpvec_bounded_l2 = ["dep:fixed", "janus_core/fpvec_bounded_l2"]
tokio-console = ["dep:console-subscriber"]
otlp = [
"dep:opentelemetry-otlp",
"dep:opentelemetry_sdk",
"dep:tracing-opentelemetry",
]
prometheus = [
"dep:opentelemetry-prometheus",
"dep:opentelemetry_sdk",
"dep:prometheus",
"dep:trillium-prometheus",
]
Expand Down Expand Up @@ -66,7 +64,7 @@ moka = { version = "0.12.10", features = ["future"] }
opentelemetry.workspace = true
opentelemetry-otlp = { workspace = true, features = ["metrics"], optional = true }
opentelemetry-prometheus = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"], optional = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
pem.workspace = true
postgres-protocol = { workspace = true }
postgres-types = { workspace = true, features = ["derive", "array-impls"] }
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl<C: Clock> Aggregator<C> {
"Number of decryption failures in the tasks/{task-id}/reports endpoint.",
)
.with_unit("{error}")
.init();
.build();
upload_decrypt_failure_counter.add(0, &[]);

let upload_decode_failure_counter = meter
Expand All @@ -302,7 +302,7 @@ impl<C: Clock> Aggregator<C> {
"Number of message decode failures in the tasks/{task-id}/reports endpoint.",
)
.with_unit("{error}")
.init();
.build();
upload_decode_failure_counter.add(0, &[]);

let report_aggregation_success_counter = report_aggregation_success_counter(meter);
Expand Down
7 changes: 5 additions & 2 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use janus_aggregator_core::{
Datastore,
},
task::{self, AggregatorTask},
TIME_HISTOGRAM_BOUNDARIES,
};
#[cfg(feature = "fpvec_bounded_l2")]
use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
Expand Down Expand Up @@ -128,13 +129,15 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
.f64_histogram("janus_task_update_time")
.with_description("Time spent updating tasks.")
.with_unit("s")
.init();
.with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec())
.build();
let job_creation_time_histogram = self
.meter
.f64_histogram("janus_job_creation_time")
.with_description("Time spent creating aggregation jobs.")
.with_unit("s")
.init();
.with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec())
.build();

// Set up an interval to occasionally update our view of tasks in the DB.
// (This will fire immediately, so we'll immediately load tasks from the DB when we enter
Expand Down
8 changes: 5 additions & 3 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use janus_aggregator_core::{
Datastore,
},
task::{self, AggregatorTask, VerifyKey},
TIME_HISTOGRAM_BOUNDARIES,
};
use janus_core::{
retries::{is_retryable_http_client_error, is_retryable_http_status},
Expand Down Expand Up @@ -111,14 +112,14 @@ where
.u64_counter("janus_job_cancellations")
.with_description("Count of cancelled jobs.")
.with_unit("{job}")
.init();
.build();
job_cancel_counter.add(0, &[]);

let job_retry_counter = meter
.u64_counter("janus_job_retries")
.with_description("Count of retried job steps.")
.with_unit("{step}")
.init();
.build();
job_retry_counter.add(0, &[]);

let http_request_duration_histogram = meter
Expand All @@ -127,7 +128,8 @@ where
"The amount of time elapsed while making an HTTP request to a helper.",
)
.with_unit("s")
.init();
.with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec())
.build();

Self {
batch_aggregation_shard_count,
Expand Down
15 changes: 8 additions & 7 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use janus_aggregator_core::{
models::{AcquiredCollectionJob, BatchAggregation, CollectionJobState, Lease},
Datastore,
},
task,
task, TIME_HISTOGRAM_BOUNDARIES,
};
use janus_core::{
retries::{is_retryable_http_client_error, is_retryable_http_status},
Expand Down Expand Up @@ -666,7 +666,7 @@ impl CollectionJobDriverMetrics {
.u64_counter("janus_collection_jobs_finished")
.with_description("Count of finished collection jobs.")
.with_unit("{job}")
.init();
.build();
jobs_finished_counter.add(0, &[]);

let http_request_duration_histogram = meter
Expand All @@ -675,13 +675,14 @@ impl CollectionJobDriverMetrics {
"The amount of time elapsed while making an HTTP request to a helper.",
)
.with_unit("s")
.init();
.with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec())
.build();

let jobs_abandoned_counter = meter
.u64_counter("janus_collection_jobs_abandoned")
.with_description("Count of abandoned collection jobs.")
.with_unit("{job}")
.init();
.build();
jobs_abandoned_counter.add(0, &[]);

let deleted_jobs_encountered_counter = meter
Expand All @@ -691,7 +692,7 @@ impl CollectionJobDriverMetrics {
deleted.",
)
.with_unit("{job}")
.init();
.build();
deleted_jobs_encountered_counter.add(0, &[]);

let unexpected_job_state_counter = meter
Expand All @@ -701,14 +702,14 @@ impl CollectionJobDriverMetrics {
state.",
)
.with_unit("{job}")
.init();
.build();
unexpected_job_state_counter.add(0, &[]);

let job_steps_retried_counter = meter
.u64_counter("janus_job_retries")
.with_description("Count of retried job steps.")
.with_unit("{step}")
.init();
.build();
job_steps_retried_counter.add(0, &[]);

Self {
Expand Down
6 changes: 3 additions & 3 deletions aggregator/src/aggregator/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ impl<C: Clock> GarbageCollector<C> {
.u64_counter("janus_gc_deleted_reports")
.with_description("Count of client reports deleted by the garbage collector.")
.with_unit("{report}")
.init();
.build();
let deleted_aggregation_job_counter = meter
.u64_counter("janus_gc_deleted_aggregation_jobs")
.with_description("Count of aggregation jobs deleted by the garbage collector.")
.with_unit("{job}")
.init();
.build();
let deleted_batch_counter = meter
.u64_counter("janus_gc_deleted_batches")
.with_description("Count of batches deleted by the garbage collector.")
.with_unit("{batch}")
.init();
.build();

deleted_report_counter.add(0, &[]);
deleted_aggregation_job_counter.add(0, &[]);
Expand Down
32 changes: 20 additions & 12 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use super::{
use crate::aggregator::problem_details::{ProblemDetailsConnExt, ProblemDocument};
use async_trait::async_trait;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use janus_aggregator_core::{datastore::Datastore, instrumented, taskprov::taskprov_task_id};
use janus_aggregator_api::BYTES_HISTOGRAM_BOUNDARIES;
use janus_aggregator_core::{
datastore::Datastore, instrumented, taskprov::taskprov_task_id, TIME_HISTOGRAM_BOUNDARIES,
};
use janus_core::{
auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER},
http::extract_bearer_token,
Expand All @@ -32,7 +35,7 @@ use tracing::warn;
use trillium::{Conn, Handler, KnownHeaderName, Status};
use trillium_api::{api, State, TryFromConn};
use trillium_caching_headers::{CacheControlDirective, CachingHeadersExt as _};
use trillium_opentelemetry::metrics;
use trillium_opentelemetry::Metrics;
use trillium_router::{Router, RouterConnExt};

#[cfg(test)]
Expand Down Expand Up @@ -253,7 +256,7 @@ impl StatusCounter {
"Count of requests handled by the aggregator, by method, route, and response status.",
)
.with_unit("{request}")
.init(),
.build(),
)
}
}
Expand Down Expand Up @@ -420,17 +423,22 @@ where
instrumented(api(aggregate_shares::<C>)),
);

let metrics = Metrics::new(self.meter.clone())
.with_route(|conn| {
conn.route()
.map(|route_spec| Cow::Owned(route_spec.to_string()))
})
.with_error_type(|conn| {
conn.state::<ErrorCode>()
.map(|error_code| Cow::Borrowed(error_code.0))
})
.with_duration_histogram_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec())
.with_request_size_histogram_boundaries(BYTES_HISTOGRAM_BOUNDARIES.to_vec())
.with_response_size_histogram_boundaries(BYTES_HISTOGRAM_BOUNDARIES.to_vec());

Ok((
State(self.aggregator),
metrics(self.meter)
.with_route(|conn| {
conn.route()
.map(|route_spec| Cow::Owned(route_spec.to_string()))
})
.with_error_type(|conn| {
conn.state::<ErrorCode>()
.map(|error_code| Cow::Borrowed(error_code.0))
}),
metrics,
router,
StatusCounter::new(self.meter),
))
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/http_handlers/tests/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
},
test_util::{create_report, create_report_custom, default_aggregator_config},
},
metrics::test_util::InMemoryMetricsInfrastructure,
metrics::test_util::InMemoryMetricInfrastructure,
};
use janus_aggregator_core::{
datastore::test_util::{ephemeral_datastore, EphemeralDatastoreBuilder},
Expand Down Expand Up @@ -527,7 +527,7 @@ async fn upload_handler_error_fanout() {
async fn upload_client_early_disconnect() {
install_test_trace_subscriber();

let in_memory_metrics = InMemoryMetricsInfrastructure::new();
let in_memory_metrics = InMemoryMetricInfrastructure::new();
let clock = MockClock::default();
let ephemeral_datastore = ephemeral_datastore().await;
let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);
Expand Down
5 changes: 3 additions & 2 deletions aggregator/src/aggregator/problem_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ mod tests {
use bytes::Bytes;
use futures::future::join_all;
use http::Method;
use janus_aggregator_core::test_util::noop_meter;
use janus_aggregator_core::{test_util::noop_meter, TIME_HISTOGRAM_BOUNDARIES};
use janus_core::{
retries::test_util::LimitedRetryer,
time::{Clock, RealClock},
Expand Down Expand Up @@ -179,7 +179,8 @@ mod tests {
let request_histogram = noop_meter()
.f64_histogram("janus_http_request_duration")
.with_unit("s")
.init();
.with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec())
.build();

struct TestCase {
error_factory: Box<dyn Fn() -> Error + Send + Sync>,
Expand Down
Loading

0 comments on commit c23b10c

Please sign in to comment.