From c23b10ce4027cfc4ac4418a0c26406fe6de4665e Mon Sep 17 00:00:00 2001 From: David Cook Date: Fri, 24 Jan 2025 14:12:53 -0600 Subject: [PATCH] Upgrade to opentelemetry-rust 0.27 --- Cargo.lock | 184 +--- Cargo.toml | 12 +- aggregator/Cargo.toml | 4 +- aggregator/src/aggregator.rs | 4 +- .../src/aggregator/aggregation_job_creator.rs | 7 +- .../src/aggregator/aggregation_job_driver.rs | 8 +- .../src/aggregator/collection_job_driver.rs | 15 +- .../src/aggregator/garbage_collector.rs | 6 +- aggregator/src/aggregator/http_handlers.rs | 32 +- .../aggregator/http_handlers/tests/report.rs | 4 +- aggregator/src/aggregator/problem_details.rs | 5 +- aggregator/src/aggregator/queue.rs | 54 +- aggregator/src/binary_utils.rs | 102 +- aggregator/src/binary_utils/job_driver.rs | 11 +- aggregator/src/config.rs | 16 +- aggregator/src/metrics.rs | 205 +--- aggregator/src/metrics/test_util.rs | 18 +- aggregator/src/metrics/tests/prometheus.rs | 16 +- aggregator/src/metrics/tokio_runtime.rs | 890 ++++++------------ aggregator/src/trace.rs | 18 +- aggregator_api/src/lib.rs | 22 +- aggregator_core/src/datastore.rs | 21 +- aggregator_core/src/lib.rs | 49 +- .../aggregation_job_creator.yaml | 11 - .../aggregation_job_driver.yaml | 11 - docs/samples/advanced_config/aggregator.yaml | 11 - .../collection_job_driver.yaml | 11 - .../advanced_config/garbage_collector.yaml | 11 - docs/samples/advanced_config/janus_cli.yaml | 11 - 29 files changed, 597 insertions(+), 1172 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00e3d5f0f..17b989f8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,33 +286,13 @@ checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.2.0", "async-executor", - "async-io 2.3.2", + "async-io", "async-lock 3.3.0", "blocking", "futures-lite 2.3.0", "once_cell", ] -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock 2.8.0", - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-lite 1.13.0", - "log", - "parking", - "polling 2.8.0", - "rustix 0.37.27", - "slab", - "socket2 0.4.10", - "waker-fn", -] - [[package]] name = "async-io" version = "2.3.2" @@ -325,8 +305,8 @@ dependencies = [ "futures-io", "futures-lite 2.3.0", "parking", - "polling 3.6.0", - "rustix 0.38.40", + "polling", + "rustix", "slab", "tracing", "windows-sys 0.52.0", @@ -354,19 +334,21 @@ dependencies = [ [[package]] name = "async-process" -version = "1.8.1" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" dependencies = [ - "async-io 1.13.0", - "async-lock 2.8.0", + "async-channel 2.2.0", + "async-io", + "async-lock 3.3.0", "async-signal", + "async-task", "blocking", "cfg-if", - "event-listener 3.1.0", - "futures-lite 1.13.0", - "rustix 0.38.40", - "windows-sys 0.48.0", + "event-listener 5.3.1", + "futures-lite 2.3.0", + "rustix", + "tracing", ] [[package]] @@ -375,13 +357,13 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e47d90f65a225c4527103a8d747001fc56e375203592b25ad103e1ca13124c5" dependencies = [ - "async-io 2.3.2", + "async-io", "async-lock 2.8.0", "atomic-waker", "cfg-if", "futures-core", "futures-io", - "rustix 0.38.40", + "rustix", "signal-hook-registry", "slab", "windows-sys 0.48.0", @@ -389,20 +371,20 @@ dependencies = [ [[package]] name = "async-std" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" dependencies = [ "async-channel 1.9.0", "async-global-executor", - "async-io 1.13.0", - "async-lock 2.8.0", + "async-io", + "async-lock 3.3.0", "async-process", "crossbeam-utils", "futures-channel", "futures-core", "futures-io", - "futures-lite 1.13.0", + "futures-lite 2.3.0", "gloo-timers", "kv-log-macro", "log", @@ -1608,17 +1590,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "event-listener" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - [[package]] name = "event-listener" version = "4.0.3" @@ -2029,9 +2000,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "gloo-timers" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" dependencies = [ "futures-channel", "futures-core", @@ -2461,7 +2432,7 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "socket2 0.5.6", + "socket2", "tokio", "tower", "tower-service", @@ -2691,17 +2662,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys 0.48.0", -] - [[package]] name = "ipnet" version = "2.9.0" @@ -3372,12 +3332,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -3718,23 +3672,23 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "opentelemetry" -version = "0.24.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" dependencies = [ "futures-core", "futures-sink", "js-sys", - "once_cell", "pin-project-lite", "thiserror 1.0.69", + "tracing", ] [[package]] name = "opentelemetry-otlp" -version = "0.17.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" dependencies = [ "async-trait", "futures-core", @@ -3750,22 +3704,23 @@ dependencies = [ [[package]] name = "opentelemetry-prometheus" -version = "0.17.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc4191ce34aa274621861a7a9d68dbcf618d5b6c66b10081631b61fd81fbc015" +checksum = "1b834e966ea5e2d03dfe5f2253f03d22cce21403ee940265070eeee96cee0bcc" dependencies = [ "once_cell", "opentelemetry", "opentelemetry_sdk", "prometheus", "protobuf", + "tracing", ] [[package]] name = "opentelemetry-proto" -version = "0.7.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -3775,15 +3730,15 @@ dependencies = [ [[package]] name = "opentelemetry-semantic-conventions" -version = "0.16.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cefe0543875379e47eb5f1e68ff83f45cc41366a92dfd0d073d513bf68e9a05" +checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52" [[package]] name = "opentelemetry_sdk" -version = "0.24.1" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692eac490ec80f24a17828d49b40b60f5aeaccdfe6a503f939713afd22bc28df" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" dependencies = [ "async-std", "async-trait", @@ -3791,7 +3746,6 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "once_cell", "opentelemetry", "percent-encoding", "rand", @@ -4087,22 +4041,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "polling" version = "3.6.0" @@ -4113,7 +4051,7 @@ dependencies = [ "concurrent-queue", "hermit-abi", "pin-project-lite", - "rustix 0.38.40", + "rustix", "tracing", "windows-sys 0.52.0", ] @@ -4465,7 +4403,7 @@ checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" dependencies = [ "libc", "once_cell", - "socket2 0.5.6", + "socket2", "tracing", "windows-sys 0.52.0", ] @@ -4781,20 +4719,6 @@ dependencies = [ "semver", ] -[[package]] -name = "rustix" -version = "0.37.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys 0.48.0", -] - [[package]] name = "rustix" version = "0.38.40" @@ -4804,7 +4728,7 @@ dependencies = [ "bitflags 2.6.0", "errno", "libc", - "linux-raw-sys 0.4.14", + "linux-raw-sys", "windows-sys 0.52.0", ] @@ -5334,16 +5258,6 @@ dependencies = [ "anstream", ] -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.6" @@ -5720,7 +5634,7 @@ dependencies = [ "fastrand 2.1.1", "getrandom", "once_cell", - "rustix 0.38.40", + "rustix", "windows-sys 0.59.0", ] @@ -5871,7 +5785,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.6", + "socket2", "tokio-macros", "tracing", "windows-sys 0.52.0", @@ -5908,7 +5822,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand", - "socket2 0.5.6", + "socket2", "tokio", "tokio-util", "whoami", @@ -6039,7 +5953,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "socket2 0.5.6", + "socket2", "tokio", "tokio-stream", "tower", @@ -6156,9 +6070,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.25.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" dependencies = [ "js-sys", "once_cell", @@ -6344,9 +6258,9 @@ dependencies = [ [[package]] name = "trillium-opentelemetry" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "369989011133b91f356bc790cfda4eae9243ffd33929dfb067fb135a728f88af" +checksum = "3b5e36d8e79ad6b8858715b9342b4d761d936dcd884f5680ecfb15c0076fa46a" dependencies = [ "opentelemetry", "opentelemetry-semantic-conventions", @@ -6802,7 +6716,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.40", + "rustix", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8e2afeb17..e9a51d414 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,10 +65,10 @@ kube = { version = "0.94.2", default-features = false, features = ["client", "ru mockito = "1.6.1" num_enum = "0.7.3" ohttp = { version = "0.5.1", default-features = false } -opentelemetry = { version = "0.24", default-features = false, features = ["trace", "metrics"] } -opentelemetry-otlp = { version = "0.17", default-features = false, features = ["trace", "metrics", "grpc-tonic"] } -opentelemetry-prometheus = "0.17" -opentelemetry_sdk = { version = "0.24", default-features = false, features = ["trace", "metrics"] } +opentelemetry = { version = "0.27", default-features = false, features = ["trace", "metrics"] } +opentelemetry-otlp = { version = "0.27", default-features = false, features = ["trace", "metrics", "grpc-tonic"] } +opentelemetry-prometheus = "0.27" +opentelemetry_sdk = { version = "0.27", default-features = false, features = ["trace", "metrics"] } pem = "3" postgres-protocol = "0.6.7" postgres-types = "0.2.8" @@ -108,7 +108,7 @@ thiserror = "2.0" tracing = "0.1.41" tracing-chrome = "0.7.2" tracing-log = "0.2.0" -tracing-opentelemetry = "0.25" +tracing-opentelemetry = "0.28" tracing-stackdriver = "0.10.0" tracing-subscriber = "0.3" tokio = { version = "1.43", features = ["full", "tracing"] } @@ -120,7 +120,7 @@ trillium-api = { version = "0.2.0-rc.12", default-features = false } trillium-caching-headers = "0.2.3" trillium-head = "0.2.3" trillium-macros = "0.0.6" -trillium-opentelemetry = "0.9.0" +trillium-opentelemetry = "0.10.0" trillium-prometheus = "0.2.0" trillium-proxy = { version = "0.5.5", default-features = false } trillium-router = "0.4.1" diff --git a/aggregator/Cargo.toml b/aggregator/Cargo.toml index b4a0ea6b9..c0ed1a11d 100644 --- a/aggregator/Cargo.toml +++ b/aggregator/Cargo.toml @@ -18,12 +18,10 @@ fpvec_bounded_l2 = ["dep:fixed", "janus_core/fpvec_bounded_l2"] tokio-console = ["dep:console-subscriber"] otlp = [ "dep:opentelemetry-otlp", - "dep:opentelemetry_sdk", "dep:tracing-opentelemetry", ] prometheus = [ "dep:opentelemetry-prometheus", - "dep:opentelemetry_sdk", "dep:prometheus", "dep:trillium-prometheus", ] @@ -66,7 +64,7 @@ moka = { version = "0.12.10", features = ["future"] } opentelemetry.workspace = true opentelemetry-otlp = { workspace = true, features = ["metrics"], optional = true } opentelemetry-prometheus = { workspace = true, optional = true } -opentelemetry_sdk = { workspace = true, features = ["rt-tokio"], optional = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } pem.workspace = true postgres-protocol = { workspace = true } postgres-types = { workspace = true, features = ["derive", "array-impls"] } diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 581087ed7..fdb5e69c1 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -293,7 +293,7 @@ impl Aggregator { "Number of decryption failures in the tasks/{task-id}/reports endpoint.", ) .with_unit("{error}") - .init(); + .build(); upload_decrypt_failure_counter.add(0, &[]); let upload_decode_failure_counter = meter @@ -302,7 +302,7 @@ impl Aggregator { "Number of message decode failures in the tasks/{task-id}/reports endpoint.", ) .with_unit("{error}") - .init(); + .build(); upload_decode_failure_counter.add(0, &[]); let report_aggregation_success_counter = report_aggregation_success_counter(meter); diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 33361c157..77ea24605 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -21,6 +21,7 @@ use janus_aggregator_core::{ Datastore, }, task::{self, AggregatorTask}, + TIME_HISTOGRAM_BOUNDARIES, }; #[cfg(feature = "fpvec_bounded_l2")] use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize; @@ -128,13 +129,15 @@ impl AggregationJobCreator { .f64_histogram("janus_task_update_time") .with_description("Time spent updating tasks.") .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); let job_creation_time_histogram = self .meter .f64_histogram("janus_job_creation_time") .with_description("Time spent creating aggregation jobs.") .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); // Set up an interval to occasionally update our view of tasks in the DB. // (This will fire immediately, so we'll immediately load tasks from the DB when we enter diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index f42db03ed..67357692a 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -29,6 +29,7 @@ use janus_aggregator_core::{ Datastore, }, task::{self, AggregatorTask, VerifyKey}, + TIME_HISTOGRAM_BOUNDARIES, }; use janus_core::{ retries::{is_retryable_http_client_error, is_retryable_http_status}, @@ -111,14 +112,14 @@ where .u64_counter("janus_job_cancellations") .with_description("Count of cancelled jobs.") .with_unit("{job}") - .init(); + .build(); job_cancel_counter.add(0, &[]); let job_retry_counter = meter .u64_counter("janus_job_retries") .with_description("Count of retried job steps.") .with_unit("{step}") - .init(); + .build(); job_retry_counter.add(0, &[]); let http_request_duration_histogram = meter @@ -127,7 +128,8 @@ where "The amount of time elapsed while making an HTTP request to a helper.", ) .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); Self { batch_aggregation_shard_count, diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 03baa5498..a23d5fd1c 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -16,7 +16,7 @@ use janus_aggregator_core::{ models::{AcquiredCollectionJob, BatchAggregation, CollectionJobState, Lease}, Datastore, }, - task, + task, TIME_HISTOGRAM_BOUNDARIES, }; use janus_core::{ retries::{is_retryable_http_client_error, is_retryable_http_status}, @@ -666,7 +666,7 @@ impl CollectionJobDriverMetrics { .u64_counter("janus_collection_jobs_finished") .with_description("Count of finished collection jobs.") .with_unit("{job}") - .init(); + .build(); jobs_finished_counter.add(0, &[]); let http_request_duration_histogram = meter @@ -675,13 +675,14 @@ impl CollectionJobDriverMetrics { "The amount of time elapsed while making an HTTP request to a helper.", ) .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); let jobs_abandoned_counter = meter .u64_counter("janus_collection_jobs_abandoned") .with_description("Count of abandoned collection jobs.") .with_unit("{job}") - .init(); + .build(); jobs_abandoned_counter.add(0, &[]); let deleted_jobs_encountered_counter = meter @@ -691,7 +692,7 @@ impl CollectionJobDriverMetrics { deleted.", ) .with_unit("{job}") - .init(); + .build(); deleted_jobs_encountered_counter.add(0, &[]); let unexpected_job_state_counter = meter @@ -701,14 +702,14 @@ impl CollectionJobDriverMetrics { state.", ) .with_unit("{job}") - .init(); + .build(); unexpected_job_state_counter.add(0, &[]); let job_steps_retried_counter = meter .u64_counter("janus_job_retries") .with_description("Count of retried job steps.") .with_unit("{step}") - .init(); + .build(); job_steps_retried_counter.add(0, &[]); Self { diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index 6f87a05ba..4343ac455 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -42,17 +42,17 @@ impl GarbageCollector { .u64_counter("janus_gc_deleted_reports") .with_description("Count of client reports deleted by the garbage collector.") .with_unit("{report}") - .init(); + .build(); let deleted_aggregation_job_counter = meter .u64_counter("janus_gc_deleted_aggregation_jobs") .with_description("Count of aggregation jobs deleted by the garbage collector.") .with_unit("{job}") - .init(); + .build(); let deleted_batch_counter = meter .u64_counter("janus_gc_deleted_batches") .with_description("Count of batches deleted by the garbage collector.") .with_unit("{batch}") - .init(); + .build(); deleted_report_counter.add(0, &[]); deleted_aggregation_job_counter.add(0, &[]); diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index ac3ebcce7..1a25df3c2 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -6,7 +6,10 @@ use super::{ use crate::aggregator::problem_details::{ProblemDetailsConnExt, ProblemDocument}; use async_trait::async_trait; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; -use janus_aggregator_core::{datastore::Datastore, instrumented, taskprov::taskprov_task_id}; +use janus_aggregator_api::BYTES_HISTOGRAM_BOUNDARIES; +use janus_aggregator_core::{ + datastore::Datastore, instrumented, taskprov::taskprov_task_id, TIME_HISTOGRAM_BOUNDARIES, +}; use janus_core::{ auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER}, http::extract_bearer_token, @@ -32,7 +35,7 @@ use tracing::warn; use trillium::{Conn, Handler, KnownHeaderName, Status}; use trillium_api::{api, State, TryFromConn}; use trillium_caching_headers::{CacheControlDirective, CachingHeadersExt as _}; -use trillium_opentelemetry::metrics; +use trillium_opentelemetry::Metrics; use trillium_router::{Router, RouterConnExt}; #[cfg(test)] @@ -253,7 +256,7 @@ impl StatusCounter { "Count of requests handled by the aggregator, by method, route, and response status.", ) .with_unit("{request}") - .init(), + .build(), ) } } @@ -420,17 +423,22 @@ where instrumented(api(aggregate_shares::)), ); + let metrics = Metrics::new(self.meter.clone()) + .with_route(|conn| { + conn.route() + .map(|route_spec| Cow::Owned(route_spec.to_string())) + }) + .with_error_type(|conn| { + conn.state::() + .map(|error_code| Cow::Borrowed(error_code.0)) + }) + .with_duration_histogram_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .with_request_size_histogram_boundaries(BYTES_HISTOGRAM_BOUNDARIES.to_vec()) + .with_response_size_histogram_boundaries(BYTES_HISTOGRAM_BOUNDARIES.to_vec()); + Ok(( State(self.aggregator), - metrics(self.meter) - .with_route(|conn| { - conn.route() - .map(|route_spec| Cow::Owned(route_spec.to_string())) - }) - .with_error_type(|conn| { - conn.state::() - .map(|error_code| Cow::Borrowed(error_code.0)) - }), + metrics, router, StatusCounter::new(self.meter), )) diff --git a/aggregator/src/aggregator/http_handlers/tests/report.rs b/aggregator/src/aggregator/http_handlers/tests/report.rs index c1fe47d1a..754c59667 100644 --- a/aggregator/src/aggregator/http_handlers/tests/report.rs +++ b/aggregator/src/aggregator/http_handlers/tests/report.rs @@ -7,7 +7,7 @@ use crate::{ }, test_util::{create_report, create_report_custom, default_aggregator_config}, }, - metrics::test_util::InMemoryMetricsInfrastructure, + metrics::test_util::InMemoryMetricInfrastructure, }; use janus_aggregator_core::{ datastore::test_util::{ephemeral_datastore, EphemeralDatastoreBuilder}, @@ -527,7 +527,7 @@ async fn upload_handler_error_fanout() { async fn upload_client_early_disconnect() { install_test_trace_subscriber(); - let in_memory_metrics = InMemoryMetricsInfrastructure::new(); + let in_memory_metrics = InMemoryMetricInfrastructure::new(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); diff --git a/aggregator/src/aggregator/problem_details.rs b/aggregator/src/aggregator/problem_details.rs index 5f380e4d5..4cb66dfda 100644 --- a/aggregator/src/aggregator/problem_details.rs +++ b/aggregator/src/aggregator/problem_details.rs @@ -137,7 +137,7 @@ mod tests { use bytes::Bytes; use futures::future::join_all; use http::Method; - use janus_aggregator_core::test_util::noop_meter; + use janus_aggregator_core::{test_util::noop_meter, TIME_HISTOGRAM_BOUNDARIES}; use janus_core::{ retries::test_util::LimitedRetryer, time::{Clock, RealClock}, @@ -179,7 +179,8 @@ mod tests { let request_histogram = noop_meter() .f64_histogram("janus_http_request_duration") .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); struct TestCase { error_factory: Box Error + Send + Sync>, diff --git a/aggregator/src/aggregator/queue.rs b/aggregator/src/aggregator/queue.rs index 22dd6303e..4ca2cb0ee 100644 --- a/aggregator/src/aggregator/queue.rs +++ b/aggregator/src/aggregator/queue.rs @@ -7,7 +7,8 @@ use std::{ }; use itertools::Itertools; -use opentelemetry::metrics::{Meter, MetricsError}; +use opentelemetry::metrics::Meter; +use opentelemetry_sdk::metrics::MetricError; use tokio::{ select, sync::{ @@ -300,17 +301,21 @@ impl Metrics { meter: &Meter, prefix: &str, max_outstanding_requests: u64, - ) -> Result { + ) -> Result { let outstanding_requests = Arc::new(AtomicU64::new(0)); - let outstanding_requests_gauge = meter + let _outstanding_requests_gauge = meter .u64_observable_gauge(Self::get_outstanding_requests_name(prefix)) .with_description(concat!( "The approximate number of requests currently being serviced by the ", "aggregator." )) .with_unit("{request}") - .init(); - let max_outstanding_requests_gauge = meter + .with_callback({ + let outstanding_requests = Arc::clone(&outstanding_requests); + move |observer| observer.observe(outstanding_requests.load(Ordering::Relaxed), &[]) + }) + .build(); + let _max_outstanding_requests_gauge = meter .u64_observable_gauge( [prefix, Self::MAX_OUTSTANDING_REQUESTS_METRIC_NAME] .into_iter() @@ -320,29 +325,8 @@ impl Metrics { "The maximum number of requests that the aggregator can service at a time." )) .with_unit("{request}") - .init(); - - meter.register_callback( - &[ - outstanding_requests_gauge.as_any(), - max_outstanding_requests_gauge.as_any(), - ], - { - let outstanding_requests = Arc::clone(&outstanding_requests); - move |observer| { - observer.observe_u64( - &outstanding_requests_gauge, - outstanding_requests.load(Ordering::Relaxed), - &[], - ); - observer.observe_u64( - &max_outstanding_requests_gauge, - max_outstanding_requests, - &[], - ); - } - }, - )?; + .with_callback(move |observer| observer.observe(max_outstanding_requests, &[])) + .build(); Ok(Self { outstanding_requests, @@ -385,7 +369,7 @@ mod tests { use crate::{ aggregator::queue::{queued_lifo, LIFORequestQueue, Metrics}, - metrics::test_util::InMemoryMetricsInfrastructure, + metrics::test_util::InMemoryMetricInfrastructure, }; /// Some tests busy loop waiting for a condition to become true. Avoid hanging broken tests @@ -393,7 +377,7 @@ mod tests { const TEST_TIMEOUT: Duration = Duration::from_secs(15); async fn get_outstanding_requests_gauge( - metrics: &InMemoryMetricsInfrastructure, + metrics: &InMemoryMetricInfrastructure, meter_prefix: &str, ) -> Option { Some( @@ -415,7 +399,7 @@ mod tests { } async fn wait_for( - metrics: &InMemoryMetricsInfrastructure, + metrics: &InMemoryMetricInfrastructure, meter_prefix: &str, condition: impl Fn(usize) -> bool, ) { @@ -445,7 +429,7 @@ mod tests { handler: Arc, concurrency: u32, depth: usize, - metrics: &InMemoryMetricsInfrastructure, + metrics: &InMemoryMetricInfrastructure, meter_prefix: &str, ) -> Vec> { debug!("filling queue"); @@ -605,7 +589,7 @@ mod tests { runtime_flavor.run(async move { let meter_prefix = "test"; - let metrics = InMemoryMetricsInfrastructure::new(); + let metrics = InMemoryMetricInfrastructure::new(); let unhang = Arc::new(Notify::new()); let queue = Arc::new( LIFORequestQueue::new(concurrency, depth, &metrics.meter, meter_prefix) @@ -687,7 +671,7 @@ mod tests { runtime_flavor.run(async move { let unhang = Arc::new(Notify::new()); let meter_prefix = "test"; - let metrics = InMemoryMetricsInfrastructure::new(); + let metrics = InMemoryMetricInfrastructure::new(); let queue = Arc::new( LIFORequestQueue::new(concurrency, depth, &metrics.meter, meter_prefix) .unwrap(), @@ -752,7 +736,7 @@ mod tests { runtime_flavor.run(async move { let unhang = Arc::new(Notify::new()); let meter_prefix = "test"; - let metrics = InMemoryMetricsInfrastructure::new(); + let metrics = InMemoryMetricInfrastructure::new(); let queue = Arc::new( LIFORequestQueue::new(concurrency, depth, &metrics.meter, meter_prefix) .unwrap(), diff --git a/aggregator/src/binary_utils.rs b/aggregator/src/binary_utils.rs index 083c1d882..12413eea0 100644 --- a/aggregator/src/binary_utils.rs +++ b/aggregator/src/binary_utils.rs @@ -18,7 +18,8 @@ use futures::StreamExt; use janus_aggregator_api::git_revision; use janus_aggregator_core::datastore::{Crypter, Datastore}; use janus_core::time::Clock; -use opentelemetry::metrics::{Meter, MetricsError}; +use opentelemetry::metrics::Meter; +use opentelemetry_sdk::metrics::MetricError; use rayon::{ThreadPoolBuildError, ThreadPoolBuilder}; use rustls::RootCertStore; use std::{ @@ -268,17 +269,6 @@ where let mut runtime_builder = runtime::Builder::new_multi_thread(); runtime_builder.enable_all(); - if let Some(tokio_metrics_config) = config.common_config().metrics_config.tokio.as_ref() { - if tokio_metrics_config.enabled { - #[cfg(feature = "prometheus")] - { - crate::metrics::tokio_runtime::configure_runtime( - &mut runtime_builder, - tokio_metrics_config, - ); - } - } - } let runtime = runtime_builder.build()?; runtime.block_on(async { @@ -353,58 +343,60 @@ where } /// Set up metrics to monitor the database connection pool's status. -fn register_database_pool_status_metrics(pool: Pool, meter: &Meter) -> Result<(), MetricsError> { - let available_connections_gauge = meter +fn register_database_pool_status_metrics(pool: Pool, meter: &Meter) -> Result<(), MetricError> { + let _available_connections_gauge = meter .u64_observable_gauge("janus_database_pool_available_connections") .with_description( "Number of available database connections in the database connection pool.", ) - .init(); - let total_connections_gauge = meter + .with_callback({ + let pool = pool.clone(); + move |observer| { + observer.observe( + u64::try_from(pool.status().available).unwrap_or(u64::MAX), + &[], + ) + } + }) + .build(); + let _total_connections_gauge = meter .u64_observable_gauge("janus_database_pool_total_connections") .with_description("Total number of connections in the database connection pool.") - .init(); - let maximum_size_gauge = meter + .with_callback({ + let pool = pool.clone(); + move |observer| { + observer.observe(u64::try_from(pool.status().size).unwrap_or(u64::MAX), &[]) + } + }) + .build(); + let _maximum_size_gauge = meter .u64_observable_gauge("janus_database_pool_maximum_size_connections") .with_description("Maximum size of the database connection pool.") - .init(); - let waiting_tasks_gauge = meter + .with_callback({ + let pool = pool.clone(); + move |observer| { + observer.observe( + u64::try_from(pool.status().max_size).unwrap_or(u64::MAX), + &[], + ) + } + }) + .build(); + let _waiting_tasks_gauge = meter .u64_observable_gauge("janus_database_pool_waiting_tasks") .with_description( "Number of tasks waiting for a connection from the database connection pool.", ) - .init(); - meter.register_callback( - &[ - available_connections_gauge.as_any(), - total_connections_gauge.as_any(), - maximum_size_gauge.as_any(), - waiting_tasks_gauge.as_any(), - ], - move |observer| { - let status = pool.status(); - observer.observe_u64( - &available_connections_gauge, - u64::try_from(status.available).unwrap_or(u64::MAX), - &[], - ); - observer.observe_u64( - &total_connections_gauge, - u64::try_from(status.size).unwrap_or(u64::MAX), - &[], - ); - observer.observe_u64( - &maximum_size_gauge, - u64::try_from(status.max_size).unwrap_or(u64::MAX), - &[], - ); - observer.observe_u64( - &waiting_tasks_gauge, - u64::try_from(status.waiting).unwrap_or(u64::MAX), - &[], - ); - }, - )?; + .with_callback({ + let pool = pool.clone(); + move |observer| { + observer.observe( + u64::try_from(pool.status().waiting).unwrap_or(u64::MAX), + &[], + ) + } + }) + .build(); Ok(()) } @@ -559,7 +551,7 @@ mod tests { zpages_handler, CommonBinaryOptions, }, config::DbConfig, - metrics::test_util::InMemoryMetricsInfrastructure, + metrics::test_util::InMemoryMetricInfrastructure, }; use clap::CommandFactory; use janus_aggregator_core::datastore::test_util::ephemeral_datastore; @@ -722,7 +714,7 @@ mod tests { let ephemeral_datastore = ephemeral_datastore().await; let pool = ephemeral_datastore.pool(); - let in_memory_metrics = InMemoryMetricsInfrastructure::new(); + let in_memory_metrics = InMemoryMetricInfrastructure::new(); register_database_pool_status_metrics(pool.clone(), &in_memory_metrics.meter).unwrap(); @@ -736,7 +728,7 @@ mod tests { } async fn check_database_pool_gauges( - in_memory_metrics: &InMemoryMetricsInfrastructure, + in_memory_metrics: &InMemoryMetricInfrastructure, expected_available: u64, expected_total: u64, expected_waiting: u64, diff --git a/aggregator/src/binary_utils/job_driver.rs b/aggregator/src/binary_utils/job_driver.rs index ecba9686b..2531efff6 100644 --- a/aggregator/src/binary_utils/job_driver.rs +++ b/aggregator/src/binary_utils/job_driver.rs @@ -2,7 +2,10 @@ use anyhow::Context as _; use chrono::NaiveDateTime; -use janus_aggregator_core::datastore::{self, models::Lease}; +use janus_aggregator_core::{ + datastore::{self, models::Lease}, + TIME_HISTOGRAM_BOUNDARIES, +}; use janus_core::{time::Clock, Runtime}; use opentelemetry::{metrics::Meter, KeyValue}; use rand::{thread_rng, Rng}; @@ -101,13 +104,15 @@ where .f64_histogram("janus_job_acquire_time") .with_description("Time spent acquiring jobs.") .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); let job_step_time_histogram = self .meter .f64_histogram("janus_job_step_time") .with_description("Time spent stepping jobs.") .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); // Set up state for the job driver run. let sem = Arc::new(Semaphore::new(self.max_concurrent_job_workers)); diff --git a/aggregator/src/config.rs b/aggregator/src/config.rs index 1df28946a..5c0f1bb4e 100644 --- a/aggregator/src/config.rs +++ b/aggregator/src/config.rs @@ -323,7 +323,7 @@ mod tests { test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, CommonConfig, DbConfig, JobDriverConfig, }, - metrics::{MetricsExporterConfiguration, PollTimeHistogramConfiguration}, + metrics::MetricsExporterConfiguration, trace::OpenTelemetryTraceConfiguration, }; use assert_matches::assert_matches; @@ -427,23 +427,9 @@ metrics_config: port: 9464 tokio: enabled: true - enable_poll_time_histogram: true - poll_time_histogram: !log - min_value_us: 100 - max_value_us: 3000000 - max_relative_error: 0.25 "; let config: CommonConfig = serde_yaml::from_str(input).unwrap(); let tokio_config = config.metrics_config.tokio.unwrap(); assert!(tokio_config.enabled); - assert!(tokio_config.enable_poll_time_histogram); - assert_eq!( - tokio_config.poll_time_histogram, - PollTimeHistogramConfiguration::Log { - min_value_us: Some(100), - max_value_us: Some(3_000_000), - max_relative_error: Some(0.25), - } - ); } } diff --git a/aggregator/src/metrics.rs b/aggregator/src/metrics.rs index 22c6e4c73..944661d33 100644 --- a/aggregator/src/metrics.rs +++ b/aggregator/src/metrics.rs @@ -24,24 +24,15 @@ use { #[cfg(feature = "otlp")] use { opentelemetry_otlp::WithExportConfig, - opentelemetry_sdk::{ - metrics::{ - reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, - PeriodicReader, - }, - runtime::Tokio, - }, + opentelemetry_sdk::{metrics::PeriodicReader, runtime::Tokio}, }; #[cfg(any(feature = "otlp", feature = "prometheus"))] use { janus_aggregator_api::git_revision, - janus_aggregator_core::datastore::TRANSACTION_RETRIES_METER_NAME, - opentelemetry::{global::set_meter_provider, metrics::MetricsError}, + opentelemetry::global::set_meter_provider, opentelemetry_sdk::{ - metrics::{ - new_view, Aggregation, Instrument, InstrumentKind, SdkMeterProvider, Stream, View, - }, + metrics::{MetricError, SdkMeterProvider}, Resource, }, }; @@ -60,7 +51,7 @@ pub enum Error { #[error("bad IP address: {0}")] IpAddress(#[from] AddrParseError), #[error(transparent)] - OpenTelemetry(#[from] opentelemetry::metrics::MetricsError), + OpenTelemetry(#[from] opentelemetry_sdk::metrics::MetricError), #[error("{0}")] Other(#[from] anyhow::Error), } @@ -105,59 +96,6 @@ pub struct TokioMetricsConfiguration { /// to the compiler if this is enabled. #[serde(default)] pub enabled: bool, - - /// Enable Tokio's poll time histogram. This introduces some additional overhead by calling - /// [`Instant::now`](std::time::Instant::now) twice per task poll. - #[serde(default)] - pub enable_poll_time_histogram: bool, - - /// Choose whether poll times should be tracked on a linear scale or a logarithmic scale, and - /// set the parameters for the histogram. - #[serde(default)] - pub poll_time_histogram: PollTimeHistogramConfiguration, -} - -/// Configuration for the poll time histogram. -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] -#[serde(deny_unknown_fields, rename_all = "lowercase")] -pub enum PollTimeHistogramConfiguration { - /// Linear histogram scale. - Linear { - /// Width of each histogram bucket, in microseconds. - #[serde(default = "default_linear_histogram_resolution_us")] - resolution_us: u64, - /// Number of histogram buckets. - #[serde(default = "default_linear_histogram_num_buckets")] - num_buckets: usize, - }, - /// Logarithmic histogram scale. - Log { - /// Sets the minimum duration that can be accurately recorded, in microseconds. - min_value_us: Option, - /// Sets the maximum value that can be accurately recorded, in microseconds. - max_value_us: Option, - /// Sets the maximum relative error. This should be between 0.0 and 1.0. - max_relative_error: Option, - }, -} - -impl Default for PollTimeHistogramConfiguration { - /// This uses the default configuration values of - /// [`tokio::runtime::Builder`]. - fn default() -> Self { - Self::Linear { - resolution_us: default_linear_histogram_resolution_us(), - num_buckets: default_linear_histogram_num_buckets(), - } - } -} - -fn default_linear_histogram_resolution_us() -> u64 { - 100 -} - -fn default_linear_histogram_num_buckets() -> usize { - 10 } /// Choice of OpenTelemetry metrics exporter implementation. @@ -172,94 +110,6 @@ pub enum MetricsExporterHandle { Noop, } -#[cfg(any(feature = "prometheus", feature = "otlp"))] -struct CustomView { - retries_histogram_view: Box, - vdaf_dimension_histogram_view: Box, - bytes_histogram_view: Box, - default_histogram_view: Box, -} - -#[cfg(any(feature = "prometheus", feature = "otlp"))] -impl CustomView { - /// These boundaries are for the number of times a database transaction was retried. - const RETRIES_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ - 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0, 2048.0, 4096.0, 8192.0, - 16384.0, - ]; - - /// These boundaries are for the dimensions of VDAF measurements. - const VDAF_DIMENSION_HISTOGRAM_VALUES: &'static [f64] = &[ - 1.0, 4.0, 16.0, 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, - ]; - - /// These boundaries are intended to be used with measurements having the unit of "bytes". - const BYTES_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ - 1024.0, 2048.0, 4096.0, 8192.0, 16384.0, 32768.0, 65536.0, 131072.0, 262144.0, 524288.0, - 1048576.0, 2097152.0, 4194304.0, 8388608.0, 16777216.0, 33554432.0, - ]; - - /// These boundaries are intended to be able to capture the length of short-lived operations - /// (e.g HTTP requests) as well as longer-running operations. - const DEFAULT_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ - 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 90.0, 300.0, - ]; - - pub fn new() -> Result { - let wildcard_instrument = Instrument::new().name("*"); - Ok(Self { - retries_histogram_view: new_view( - wildcard_instrument.clone(), - Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: Vec::from(Self::RETRIES_HISTOGRAM_BOUNDARIES), - record_min_max: true, - }), - )?, - vdaf_dimension_histogram_view: new_view( - wildcard_instrument.clone(), - Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: Vec::from(Self::VDAF_DIMENSION_HISTOGRAM_VALUES), - record_min_max: true, - }), - )?, - bytes_histogram_view: new_view( - wildcard_instrument.clone(), - Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: Vec::from(Self::BYTES_HISTOGRAM_BOUNDARIES), - record_min_max: true, - }), - )?, - default_histogram_view: new_view( - wildcard_instrument, - Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: Vec::from(Self::DEFAULT_HISTOGRAM_BOUNDARIES), - record_min_max: true, - }), - )?, - }) - } -} - -#[cfg(any(feature = "prometheus", feature = "otlp"))] -impl View for CustomView { - fn match_inst(&self, inst: &Instrument) -> Option { - match (inst.kind, inst.name.as_ref()) { - (Some(InstrumentKind::Histogram), TRANSACTION_RETRIES_METER_NAME) => { - self.retries_histogram_view.match_inst(inst) - } - (Some(InstrumentKind::Histogram), AGGREGATED_REPORT_SHARE_DIMENSION_METER_NAME) => { - self.vdaf_dimension_histogram_view.match_inst(inst) - } - ( - Some(InstrumentKind::Histogram), - "http.server.request.body.size" | "http.server.response.body.size", - ) => self.bytes_histogram_view.match_inst(inst), - (Some(InstrumentKind::Histogram), _) => self.default_histogram_view.match_inst(inst), - _ => None, - } - } -} - /// Construct and return an opentelemetry-prometheus MeterProvider. /// /// # Arguments @@ -270,18 +120,12 @@ impl View for CustomView { #[cfg(feature = "prometheus")] fn build_opentelemetry_prometheus_meter_provider( registry: Registry, - runtime_opt: Option<&Runtime>, -) -> Result { +) -> Result { let mut reader_builder = opentelemetry_prometheus::exporter(); reader_builder = reader_builder.with_registry(registry); - if let Some(runtime) = runtime_opt { - reader_builder = reader_builder - .with_producer(tokio_runtime::TokioRuntimeMetrics::new(runtime.metrics())); - } let reader = reader_builder.build()?; let meter_provider = SdkMeterProvider::builder() .with_reader(reader) - .with_view(CustomView::new()?) .with_resource(resource()) .build(); Ok(meter_provider) @@ -332,21 +176,18 @@ pub async fn install_metrics_exporter( host: config_exporter_host, port: config_exporter_port, }) => { - let runtime_opt = if config + let registry = Registry::new(); + let meter_provider = build_opentelemetry_prometheus_meter_provider(registry.clone())?; + set_meter_provider(meter_provider.clone()); + + if config .tokio .as_ref() .map(|tokio_metrics_config| tokio_metrics_config.enabled) .unwrap_or_default() { - Some(_runtime) - } else { - None - }; - - let registry = Registry::new(); - let meter_provider = - build_opentelemetry_prometheus_meter_provider(registry.clone(), runtime_opt)?; - set_meter_provider(meter_provider); + tokio_runtime::initialize(_runtime.metrics(), &meter_provider); + } let host = config_exporter_host .as_ref() @@ -381,17 +222,13 @@ pub async fn install_metrics_exporter( ))); } - let exporter = opentelemetry_otlp::new_exporter() - .tonic() + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() .with_endpoint(otlp_config.endpoint.clone()) - .build_metrics_exporter( - Box::new(DefaultAggregationSelector::new()), - Box::new(DefaultTemporalitySelector::new()), - )?; + .build()?; let reader = PeriodicReader::builder(exporter, Tokio).build(); let meter_provider = SdkMeterProvider::builder() .with_reader(reader) - .with_view(CustomView::new()?) .with_resource(resource()) .build(); set_meter_provider(meter_provider.clone()); @@ -435,7 +272,7 @@ pub(crate) fn report_aggregation_success_counter(meter: &Meter) -> Counter .u64_counter("janus_report_aggregation_success_counter") .with_description("Number of successfully-aggregated report shares") .with_unit("{report}") - .init(); + .build(); report_aggregation_success_counter.add(0, &[]); report_aggregation_success_counter } @@ -443,11 +280,17 @@ pub(crate) fn report_aggregation_success_counter(meter: &Meter) -> Counter pub const AGGREGATED_REPORT_SHARE_DIMENSION_METER_NAME: &str = "janus_aggregated_report_share_vdaf_dimension"; +/// These boundaries are for the dimensions of VDAF measurements. +const VDAF_DIMENSION_HISTOGRAM_VALUES: &[f64] = &[ + 1.0, 4.0, 16.0, 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, +]; + pub(crate) fn aggregated_report_share_dimension_histogram(meter: &Meter) -> Histogram { meter .u64_histogram(AGGREGATED_REPORT_SHARE_DIMENSION_METER_NAME) .with_description("Successfully aggregated report shares") - .init() + .with_boundaries(VDAF_DIMENSION_HISTOGRAM_VALUES.to_vec()) + .build() } pub(crate) fn aggregate_step_failure_counter(meter: &Meter) -> Counter { @@ -458,7 +301,7 @@ pub(crate) fn aggregate_step_failure_counter(meter: &Meter) -> Counter { "related to individual client reports rather than entire aggregation jobs." )) .with_unit("{error}") - .init(); + .build(); // Initialize counters with desired status labels. This causes Prometheus to see the first // non-zero value we record. diff --git a/aggregator/src/metrics/test_util.rs b/aggregator/src/metrics/test_util.rs index aeeb8bfbb..301bd482a 100644 --- a/aggregator/src/metrics/test_util.rs +++ b/aggregator/src/metrics/test_util.rs @@ -4,29 +4,29 @@ use opentelemetry::metrics::{Meter, MeterProvider}; use opentelemetry_sdk::{ metrics::{data::Metric, PeriodicReader, SdkMeterProvider}, runtime, - testing::metrics::InMemoryMetricsExporter, + testing::metrics::InMemoryMetricExporter, }; use tokio::task::spawn_blocking; #[derive(Clone)] -pub(crate) struct InMemoryMetricsInfrastructure { - /// The in-memory metrics exporter - pub exporter: InMemoryMetricsExporter, +pub(crate) struct InMemoryMetricInfrastructure { + /// The in-memory metric exporter + pub exporter: InMemoryMetricExporter, /// The meter provider. pub meter_provider: SdkMeterProvider, /// A meter, with the name "test". pub meter: Meter, } -impl InMemoryMetricsInfrastructure { - /// Create an [`InMemoryMetricsExporter`], then use it to create an [`SdkMeterProvider`] and +impl InMemoryMetricInfrastructure { + /// Create an [`InMemoryMetricExporter`], then use it to create an [`SdkMeterProvider`] and /// [`Meter`]. - pub(crate) fn new() -> InMemoryMetricsInfrastructure { - let exporter = InMemoryMetricsExporter::default(); + pub(crate) fn new() -> InMemoryMetricInfrastructure { + let exporter = InMemoryMetricExporter::default(); let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let meter = meter_provider.meter("test"); - InMemoryMetricsInfrastructure { + InMemoryMetricInfrastructure { exporter, meter_provider, meter, diff --git a/aggregator/src/metrics/tests/prometheus.rs b/aggregator/src/metrics/tests/prometheus.rs index e07b12f02..861120834 100644 --- a/aggregator/src/metrics/tests/prometheus.rs +++ b/aggregator/src/metrics/tests/prometheus.rs @@ -15,13 +15,13 @@ use prometheus::{ Registry, }; use std::{collections::HashMap, net::Ipv4Addr, sync::Arc}; +use trillium::Handler; use trillium_testing::prelude::get; #[tokio::test] async fn prometheus_metrics_pull() { let registry = Registry::new(); - let meter_provider = - build_opentelemetry_prometheus_meter_provider(registry.clone(), None).unwrap(); + let meter_provider = build_opentelemetry_prometheus_meter_provider(registry.clone()).unwrap(); let (join_handle, port) = prometheus_metrics_server(registry, Ipv4Addr::LOCALHOST.into(), 0) .await .unwrap(); @@ -30,8 +30,8 @@ async fn prometheus_metrics_pull() { meter .u64_observable_gauge("test_metric") .with_description("Gauge for test purposes") - .init() - .observe(1, &[]); + .with_callback(|observer| observer.observe(1, &[])) + .build(); let url = format!("http://127.0.0.1:{port}/metrics"); let response = retry_http_request(test_http_request_exponential_backoff(), || { @@ -76,15 +76,14 @@ async fn http_metrics() { install_test_trace_subscriber(); let registry = Registry::new(); - let meter_provider = - build_opentelemetry_prometheus_meter_provider(registry.clone(), None).unwrap(); + let meter_provider = build_opentelemetry_prometheus_meter_provider(registry.clone()).unwrap(); let meter = meter_provider.meter("tests"); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); datastore.put_hpke_key().await.unwrap(); - let handler = AggregatorHandlerBuilder::new( + let mut handler = AggregatorHandlerBuilder::new( datastore.clone(), clock.clone(), TestRuntime::default(), @@ -96,6 +95,9 @@ async fn http_metrics() { .build() .unwrap(); + // Call init to finish setting up metrics. + handler.init(&mut "testing".into()).await; + get("/hpke_config").run_async(&handler).await; let metric_families = registry diff --git a/aggregator/src/metrics/tokio_runtime.rs b/aggregator/src/metrics/tokio_runtime.rs index 4c6300ea0..f8dd4d165 100644 --- a/aggregator/src/metrics/tokio_runtime.rs +++ b/aggregator/src/metrics/tokio_runtime.rs @@ -1,619 +1,305 @@ #[cfg(tokio_unstable)] use std::time::Duration; -use std::time::SystemTime; -use educe::Educe; -use opentelemetry::{metrics::MetricsError, InstrumentationLibrary, KeyValue}; #[cfg(tokio_unstable)] -use opentelemetry_sdk::metrics::data::{Histogram, HistogramDataPoint, Sum, Temporality}; -use opentelemetry_sdk::metrics::{ - data::{DataPoint, Gauge, Metric, ScopeMetrics}, - reader::MetricProducer, -}; -use tokio::runtime::{self, RuntimeMetrics}; -#[cfg(tokio_unstable)] -use tokio::runtime::{HistogramConfiguration, LogHistogram}; +use opentelemetry::metrics::Meter; +use opentelemetry::{metrics::MeterProvider, KeyValue}; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use tokio::runtime::RuntimeMetrics; + +pub(super) fn initialize(runtime_metrics: RuntimeMetrics, meter_provider: &SdkMeterProvider) { + let meter = meter_provider.meter("tokio-runtime-metrics"); + + let num_workers = runtime_metrics.num_workers(); + + meter + .u64_observable_gauge("tokio.thread.worker.count") + .with_description("Number of runtime worker threads") + .with_unit("{thread}") + .with_callback({ + let num_workers_u64 = u64::try_from(num_workers).unwrap_or(u64::MAX); + move |observer| observer.observe(num_workers_u64, &[]) + }) + .build(); + + #[cfg(not(tokio_unstable))] + meter + .u64_observable_gauge("tokio.queue.depth") + .with_description("Number of tasks currently in the runtime's global queue") + .with_unit("{task}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + observer.observe( + u64::try_from(runtime_metrics.global_queue_depth()).unwrap_or(u64::MAX), + &[KeyValue::new("queue", "global")], + ) + } + }) + .build(); + + #[cfg(tokio_unstable)] + initialize_unstable_metrics(runtime_metrics, meter); +} #[cfg(tokio_unstable)] -use crate::metrics::PollTimeHistogramConfiguration; -use crate::metrics::TokioMetricsConfiguration; +fn initialize_unstable_metrics(runtime_metrics: RuntimeMetrics, meter: Meter) { + let num_workers = runtime_metrics.num_workers(); + + meter + .u64_observable_gauge("tokio.thread.blocking.count") + .with_description( + "Number of additional threads spawned by the runtime for blocking operations", + ) + .with_unit("{thread}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + observer.observe( + u64::try_from(runtime_metrics.num_blocking_threads()).unwrap_or(u64::MAX), + &[], + ) + } + }) + .build(); + + meter + .u64_observable_gauge("tokio.task.alive.count") + .with_description("Number of alive tasks in the runtime") + .with_unit("{task}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + observer.observe( + u64::try_from(runtime_metrics.num_alive_tasks()).unwrap_or(u64::MAX), + &[], + ) + } + }) + .build(); + + meter + .u64_observable_gauge("tokio.thread.blocking.idle.count") + .with_description("Number of additional threads for blocking operations which are idle") + .with_unit("{thread}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + observer.observe( + u64::try_from(runtime_metrics.num_idle_blocking_threads()).unwrap_or(u64::MAX), + &[], + ) + } + }) + .build(); + + meter + .u64_observable_counter("tokio.task.spawned") + .with_description("Total number of tasks spawned in the runtime") + .with_unit("{task}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| observer.observe(runtime_metrics.spawned_tasks_count(), &[]) + }) + .build(); + + meter + .u64_observable_counter("tokio.task.scheduled") + .with_description( + "Number of tasks scheduled, either ot the thread's own local queue, \ + from a worker thread to the global queue due to overflow, or \ + from a remote thread to the global queue", + ) + .with_unit("{task}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + let remote_schedule_count = runtime_metrics.remote_schedule_count(); + + let mut local_schedule_count = 0; + let mut overflow_count = 0; + for worker in 0..num_workers { + local_schedule_count += runtime_metrics.worker_local_schedule_count(worker); + overflow_count += runtime_metrics.worker_overflow_count(worker); + } -pub(crate) fn configure_runtime( - _runtime_builder: &mut runtime::Builder, - _config: &TokioMetricsConfiguration, -) { - #[cfg(tokio_unstable)] - if _config.enable_poll_time_histogram { - _runtime_builder.enable_metrics_poll_time_histogram(); - match _config.poll_time_histogram { - PollTimeHistogramConfiguration::Linear { - resolution_us, - num_buckets, - } => { - _runtime_builder.metrics_poll_time_histogram_configuration( - HistogramConfiguration::linear( - Duration::from_micros(resolution_us), - num_buckets, - ), - ); + observer.observe(local_schedule_count, &[KeyValue::new("queue", "local")]); + observer.observe(overflow_count, &[KeyValue::new("queue", "local_overflow")]); + observer.observe(remote_schedule_count, &[KeyValue::new("queue", "remote")]); } - PollTimeHistogramConfiguration::Log { - min_value_us, - max_value_us, - max_relative_error, - } => { - let mut histogram_builder = LogHistogram::builder(); - if let Some(min_value_us) = min_value_us { - let min_value = Duration::from_micros(min_value_us); - histogram_builder = histogram_builder.min_value(min_value); + }) + .build(); + + meter + .u64_observable_counter("tokio.task.budget_forced_yield") + .with_description( + "Number of times tasks have been forced to yield because their task budget was \ + exhausted", + ) + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + observer.observe(runtime_metrics.budget_forced_yield_count(), &[]); + } + }) + .build(); + + meter + .u64_observable_counter("tokio.park") + .with_description("Total number of times worker threads have parked") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + let mut park_count = 0; + for worker in 0..num_workers { + park_count += runtime_metrics.worker_park_count(worker); } - if let Some(max_value_us) = max_value_us { - let max_value = Duration::from_micros(max_value_us); - histogram_builder = histogram_builder.max_value(max_value); + observer.observe(park_count, &[]); + } + }) + .build(); + + meter + .u64_observable_counter("tokio.noop") + .with_description( + "Total number of times worker threads unparked and parked again without doing any work", + ) + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + let mut noop_count = 0; + for worker in 0..num_workers { + noop_count += runtime_metrics.worker_noop_count(worker); } - if let Some(max_relative_error) = max_relative_error { - histogram_builder = histogram_builder.max_error(max_relative_error); + observer.observe(noop_count, &[]); + } + }) + .build(); + + meter + .u64_observable_counter("tokio.task.stolen") + .with_description("Total number of tasks stolen between worker threads") + .with_unit("{task}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + let mut steal_count = 0; + for worker in 0..num_workers { + steal_count += runtime_metrics.worker_steal_count(worker); } - _runtime_builder.metrics_poll_time_histogram_configuration( - HistogramConfiguration::log(histogram_builder.build()), - ); + observer.observe(steal_count, &[]); } - } - } -} - -#[derive(Educe)] -#[educe(Debug)] -pub(super) struct TokioRuntimeMetrics { - runtime_metrics: RuntimeMetrics, - - #[educe(Debug(ignore))] - scope: InstrumentationLibrary, - - #[educe(Debug(ignore))] - start_time: SystemTime, - - num_workers: usize, - - #[educe(Debug(ignore))] - attributes_global_queue: Vec, - - #[cfg(tokio_unstable)] - unstable: UnstableTokioRuntimeMetrics, -} - -#[cfg(tokio_unstable)] -#[derive(Educe)] -#[educe(Debug)] -struct UnstableTokioRuntimeMetrics { - poll_time_histogram_num_buckets: usize, - - poll_time_histogram_bucket_bounds: Vec, - - #[educe(Debug(ignore))] - attributes_local: Vec, - - #[educe(Debug(ignore))] - attributes_local_overflow: Vec, - - #[educe(Debug(ignore))] - attributes_remote: Vec, - - #[educe(Debug(ignore))] - attributes_local_queue_worker: Vec>, - - #[educe(Debug(ignore))] - attributes_blocking_queue: Vec, -} - -impl TokioRuntimeMetrics { - pub(super) fn new(runtime_metrics: RuntimeMetrics) -> Self { - let scope = InstrumentationLibrary::builder("tokio-runtime-metrics").build(); - - let start_time = SystemTime::now(); - - let num_workers = runtime_metrics.num_workers(); - let attributes_global_queue = Vec::from([KeyValue::new("queue", "global")].as_slice()); - - #[cfg(tokio_unstable)] - let unstable = { - let poll_time_histogram_enabled = runtime_metrics.poll_time_histogram_enabled(); - let poll_time_histogram_num_buckets = runtime_metrics.poll_time_histogram_num_buckets(); - let all_but_last_bucket = if poll_time_histogram_enabled { - 0..poll_time_histogram_num_buckets - 1 - } else { - 0..0 - }; - let poll_time_histogram_bucket_bounds = all_but_last_bucket - .map(|bucket| { - runtime_metrics - .poll_time_histogram_bucket_range(bucket) - .end - .as_secs_f64() - }) - .collect(); - - let attributes_local = Vec::from([KeyValue::new("queue", "local")].as_slice()); - let attributes_local_overflow = - Vec::from([KeyValue::new("queue", "local_overflow")].as_slice()); - let attributes_remote = Vec::from([KeyValue::new("queue", "remote")].as_slice()); - let attributes_local_queue_worker = (0..num_workers) - .map(|i| { - Vec::from( - [ + }) + .build(); + + meter + .u64_observable_counter("tokio.steals") + .with_description("Nuber of times worker threads successfully stole one or more tasks") + .with_unit("{operation}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + let mut steal_operations = 0; + for worker in 0..num_workers { + steal_operations += runtime_metrics.worker_steal_operations(worker); + } + observer.observe(steal_operations, &[]); + } + }) + .build(); + + meter + .f64_observable_counter("tokio.thread.worker.busy.time") + .with_description("Total amount of time that all worker threads have been busy") + .with_unit("s") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + let mut total_busy_duration = Duration::from_secs(0); + for worker in 0..num_workers { + total_busy_duration += runtime_metrics.worker_total_busy_duration(worker); + } + observer.observe(total_busy_duration.as_secs_f64(), &[]); + } + }) + .build(); + + meter + .u64_observable_gauge("tokio.queue.depth") + .with_description( + "Number of tasks currently in the runtime's global queue, blocking thread pool queue, \ + or a worker's local queue", + ) + .with_unit("{task}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + for worker in 0..num_workers { + observer.observe( + u64::try_from(runtime_metrics.worker_local_queue_depth(worker)) + .unwrap_or(u64::MAX), + &[ KeyValue::new("queue", "local"), - KeyValue::new("worker", i64::try_from(i).unwrap()), - ] - .as_slice(), - ) - }) - .collect(); - let attributes_blocking_queue = - Vec::from([KeyValue::new("queue", "blocking")].as_slice()); - - UnstableTokioRuntimeMetrics { - poll_time_histogram_num_buckets, - poll_time_histogram_bucket_bounds, - attributes_local, - attributes_local_overflow, - attributes_remote, - attributes_local_queue_worker, - attributes_blocking_queue, + KeyValue::new("worker", i64::try_from(worker).unwrap()), + ], + ); + } + observer.observe( + u64::try_from(runtime_metrics.global_queue_depth()).unwrap_or(u64::MAX), + &[KeyValue::new("queue", "global")], + ); + observer.observe( + u64::try_from(runtime_metrics.blocking_queue_depth()).unwrap_or(u64::MAX), + &[KeyValue::new("queue", "blocking")], + ); } - }; - - Self { - runtime_metrics, - scope, - start_time, - num_workers, - attributes_global_queue, - #[cfg(tokio_unstable)] - unstable, - } - } -} - -impl MetricProducer for TokioRuntimeMetrics { - fn produce(&self) -> Result { - let now = SystemTime::now(); - - let mut metrics = Vec::with_capacity(19); - metrics.push(Metric { - name: "tokio.thread.worker.count".into(), - description: "Number of runtime worker threads".into(), - unit: "{thread}".into(), - data: Box::new(Gauge:: { - data_points: Vec::from([DataPoint { - attributes: Vec::default(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(self.num_workers).unwrap_or(u64::MAX), - exemplars: Vec::new(), - }]), - }), - }); - - #[cfg(not(tokio_unstable))] - { - let global_queue_depth = self.runtime_metrics.global_queue_depth(); - metrics.push(Metric { - name: "tokio.queue.depth".into(), - description: "Number of tasks currently in the runtime's global queue".into(), - unit: "{task}".into(), - data: Box::new(Gauge:: { - data_points: { - let mut data_points = Vec::with_capacity(self.num_workers + 2); - data_points.push(DataPoint { - attributes: self.attributes_global_queue.clone(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(global_queue_depth).unwrap_or(u64::MAX), - exemplars: Vec::new(), - }); - data_points - }, - }), - }); - } - - #[cfg(tokio_unstable)] - self.produce_unstable_metrics(&mut metrics, now); - - Ok(ScopeMetrics { - scope: self.scope.clone(), - metrics, }) - } -} - -#[cfg(tokio_unstable)] -impl TokioRuntimeMetrics { - fn produce_unstable_metrics(&self, metrics: &mut Vec, now: SystemTime) { - let num_blocking_threads = self.runtime_metrics.num_blocking_threads(); - let num_alive_tasks = self.runtime_metrics.num_alive_tasks(); - let num_idle_blocking_threads = self.runtime_metrics.num_idle_blocking_threads(); - let spawned_task_count = self.runtime_metrics.spawned_tasks_count(); - let remote_schedule_count = self.runtime_metrics.remote_schedule_count(); - let budget_forced_yield_count = self.runtime_metrics.budget_forced_yield_count(); - let global_queue_depth = self.runtime_metrics.global_queue_depth(); - let blocking_queue_depth = self.runtime_metrics.blocking_queue_depth(); - let io_driver_fd_registered_count = self.runtime_metrics.io_driver_fd_registered_count(); - let io_driver_fd_deregistered_count = - self.runtime_metrics.io_driver_fd_deregistered_count(); - let io_driver_ready_count = self.runtime_metrics.io_driver_ready_count(); - - let mut park_count = 0; - let mut noop_count = 0; - let mut steal_count = 0; - let mut steal_operations = 0; - let mut poll_count = 0; - let mut total_busy_duration = Duration::from_secs(0); - let mut local_schedule_count = 0; - let mut overflow_count = 0; - let mut local_queue_depth = vec![0; self.num_workers]; - let mut poll_time_histogram_bucket_count = - vec![0; self.unstable.poll_time_histogram_num_buckets]; - let mut worker_mean_poll_time_sum = Duration::from_secs(0); - for (worker, worker_local_queue_depth) in local_queue_depth.iter_mut().enumerate() { - park_count += self.runtime_metrics.worker_park_count(worker); - noop_count += self.runtime_metrics.worker_noop_count(worker); - steal_count += self.runtime_metrics.worker_steal_count(worker); - steal_operations += self.runtime_metrics.worker_steal_operations(worker); - poll_count += self.runtime_metrics.worker_poll_count(worker); - total_busy_duration += self.runtime_metrics.worker_total_busy_duration(worker); - local_schedule_count += self.runtime_metrics.worker_local_schedule_count(worker); - overflow_count += self.runtime_metrics.worker_overflow_count(worker); - - *worker_local_queue_depth = self.runtime_metrics.worker_local_queue_depth(worker); - - for (bucket, out) in poll_time_histogram_bucket_count.iter_mut().enumerate() { - *out += self - .runtime_metrics - .poll_time_histogram_bucket_count(worker, bucket); + .build(); + + meter + .u64_observable_gauge("tokio.io.fd.count") + .with_description("Number of file descriptors currently registered with the I/O driver") + .with_unit("{fd}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| { + observer.observe( + runtime_metrics + .io_driver_fd_registered_count() + .saturating_sub(runtime_metrics.io_driver_fd_deregistered_count()), + &[], + ) } - - worker_mean_poll_time_sum += self.runtime_metrics.worker_mean_poll_time(worker); - } - let mean_poll_time = worker_mean_poll_time_sum / u32::try_from(self.num_workers).unwrap(); - - metrics.extend([ - Metric { - name: "tokio.thread.blocking.count".into(), - description: "Number of additional threads spawned by the runtime for blocking \ - operations" - .into(), - unit: "{thread}".into(), - data: Box::new(Gauge:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(num_blocking_threads).unwrap_or(u64::MAX), - exemplars: Vec::new(), - }]), - }), - }, - Metric { - name: "tokio.task.alive.count".into(), - description: "Number of alive tasks in the runtime".into(), - unit: "{task}".into(), - data: Box::new(Gauge:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(num_alive_tasks).unwrap_or(u64::MAX), - exemplars: Vec::new(), - }]), - }), - }, - Metric { - name: "tokio.thread.blocking.idle.count".into(), - description: "Number of additional threads for blocking operations which are idle" - .into(), - unit: "{thread}".into(), - data: Box::new(Gauge:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(num_idle_blocking_threads).unwrap_or(u64::MAX), - exemplars: Vec::new(), - }]), - }), - }, - Metric { - name: "tokio.task.spawned".into(), - description: "Total number of tasks spawned in the runtime".into(), - unit: "{task}".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: spawned_task_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.task.scheduled".into(), - description: "Number of tasks scheduled, either to the thread's own local queue, \ - from a worker thread to the global queue due to overflow, or \ - from a remote thread to the global queue" - .into(), - unit: "{task}".into(), - data: Box::new(Sum:: { - data_points: Vec::from([ - DataPoint { - attributes: self.unstable.attributes_local.clone(), - start_time: Some(self.start_time), - time: Some(now), - value: local_schedule_count, - exemplars: Vec::new(), - }, - DataPoint { - attributes: self.unstable.attributes_local_overflow.clone(), - start_time: Some(self.start_time), - time: Some(now), - value: overflow_count, - exemplars: Vec::new(), - }, - DataPoint { - attributes: self.unstable.attributes_remote.clone(), - start_time: Some(self.start_time), - time: Some(now), - value: remote_schedule_count, - exemplars: Vec::new(), - }, - ]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.task.budget_forced_yield".into(), - description: "Number of times tasks have been forced to yield because their task \ - budget was exhausted" - .into(), - unit: "".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: budget_forced_yield_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.park".into(), - description: "Total number of times worker threads have parked".into(), - unit: "".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: park_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.noop".into(), - description: "Total number of times worker threads unparked and parked again \ - without doing any work" - .into(), - unit: "".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: noop_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.task.stolen".into(), - description: "Total number of tasks stolen between worker threads".into(), - unit: "{task}".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: steal_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.steals".into(), - description: "Number of times worker threads successfully stole one or more tasks" - .into(), - unit: "{operation}".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: steal_operations, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.thread.worker.busy.time".into(), - description: "Total amount of time that all worker threads have been busy".into(), - unit: "s".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: total_busy_duration.as_secs_f64(), - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.queue.depth".into(), - description: "Number of tasks currently in the runtime's global queue, \ - blocking thread pool queue, or a worker's local queue" - .into(), - unit: "{task}".into(), - data: Box::new(Gauge:: { - data_points: { - let mut data_points = Vec::with_capacity(self.num_workers + 2); - data_points.extend( - local_queue_depth - .into_iter() - .zip(self.unstable.attributes_local_queue_worker.iter()) - .map(|(worker_local_queue_depth, attributes)| DataPoint { - attributes: attributes.clone(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(worker_local_queue_depth) - .unwrap_or(u64::MAX), - exemplars: Vec::new(), - }), - ); - data_points.push(DataPoint { - attributes: self.attributes_global_queue.clone(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(global_queue_depth).unwrap_or(u64::MAX), - exemplars: Vec::new(), - }); - data_points.push(DataPoint { - attributes: self.unstable.attributes_blocking_queue.clone(), - start_time: Some(self.start_time), - time: Some(now), - value: u64::try_from(blocking_queue_depth).unwrap_or(u64::MAX), - exemplars: Vec::new(), - }); - data_points - }, - }), - }, - Metric { - name: "tokio.task.poll.time".into(), - description: "Histogram of task poll times".into(), - unit: "s".into(), - data: Box::new(Histogram:: { - data_points: Vec::from([HistogramDataPoint { - attributes: Vec::new(), - start_time: self.start_time, - time: now, - count: poll_count, - bounds: self.unstable.poll_time_histogram_bucket_bounds.clone(), - bucket_counts: poll_time_histogram_bucket_count, - min: Some(f64::NAN), - max: Some(f64::NAN), - sum: f64::NAN, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - }), - }, - Metric { - name: "tokio.task.poll.time.average".into(), - description: "Exponentially weighted moving average of task poll times".into(), - unit: "s".into(), - data: Box::new(Gauge:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: mean_poll_time.as_secs_f64(), - exemplars: Vec::new(), - }]), - }), - }, - Metric { - name: "tokio.io.fd.count".into(), - description: "Number of file descriptors currently registered with the I/O driver" - .into(), - unit: "{fd}".into(), - data: Box::new(Gauge:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: io_driver_fd_registered_count - .saturating_sub(io_driver_fd_deregistered_count), - exemplars: Vec::new(), - }]), - }), - }, - Metric { - name: "tokio.io.fd.registered".into(), - description: "Total number of file descriptors registered by the I/O driver".into(), - unit: "{fd}".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: io_driver_fd_registered_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.io.fd.deregistered".into(), - description: "Total number of file descriptors deregistered by the I/O driver" - .into(), - unit: "{fd}".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: io_driver_fd_deregistered_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - Metric { - name: "tokio.io.ready_events".into(), - description: "Number of ready events processed by the I/O driver".into(), - unit: "{event}".into(), - data: Box::new(Sum:: { - data_points: Vec::from([DataPoint { - attributes: Vec::new(), - start_time: Some(self.start_time), - time: Some(now), - value: io_driver_ready_count, - exemplars: Vec::new(), - }]), - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }, - ]); - } + }) + .build(); + + meter + .u64_observable_counter("tokio.io.fd.registered") + .with_description("Total number of file descriptors registered by the I/O driver") + .with_unit("{fd}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| observer.observe(runtime_metrics.io_driver_fd_registered_count(), &[]) + }) + .build(); + + meter + .u64_observable_counter("tokio.io.fd.deregistered") + .with_description("Total number of file descriptors deregistered by the I/O driver") + .with_unit("{fd}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| observer.observe(runtime_metrics.io_driver_fd_deregistered_count(), &[]) + }) + .build(); + + meter + .u64_observable_counter("tokio.io.ready_events") + .with_description("Number of ready events processed by the I/O driver") + .with_unit("{event}") + .with_callback({ + let runtime_metrics = runtime_metrics.clone(); + move |observer| observer.observe(runtime_metrics.io_driver_ready_count(), &[]) + }) + .build(); } diff --git a/aggregator/src/trace.rs b/aggregator/src/trace.rs index 5c3147724..44009e392 100644 --- a/aggregator/src/trace.rs +++ b/aggregator/src/trace.rs @@ -14,8 +14,9 @@ use tracing_subscriber::{ #[cfg(feature = "otlp")] use { - opentelemetry::{global::set_tracer_provider, trace::TracerProvider}, + opentelemetry::{global::set_tracer_provider, trace::TracerProvider as _}, opentelemetry_otlp::WithExportConfig, + opentelemetry_sdk::trace::TracerProvider, }; /// Errors from initializing trace subscriber. @@ -192,14 +193,13 @@ pub fn install_trace_subscriber( #[cfg(feature = "otlp")] if let Some(OpenTelemetryTraceConfiguration::Otlp(otlp_config)) = &config.open_telemetry_config { - let tracer_provider = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(otlp_config.endpoint.clone()), - ) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(otlp_config.endpoint.clone()) + .build()?; + let tracer_provider = TracerProvider::builder() + .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio) + .build(); set_tracer_provider(tracer_provider.clone()); let tracer = tracer_provider.tracer("janus_aggregator"); diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index 326e13c7c..02262047b 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use git_version::git_version; use janus_aggregator_core::{ datastore::{self, Datastore}, - instrumented, + instrumented, TIME_HISTOGRAM_BOUNDARIES, }; use janus_core::{auth_tokens::AuthenticationToken, hpke, http::extract_bearer_token, time::Clock}; use janus_messages::{HpkeConfigId, RoleParseError, TaskId}; @@ -23,7 +23,7 @@ use trillium::{ Status::{NotAcceptable, UnsupportedMediaType}, }; use trillium_api::{api, Halt, State}; -use trillium_opentelemetry::metrics; +use trillium_opentelemetry::Metrics; use trillium_router::{Router, RouterConnExt}; use url::Url; @@ -67,6 +67,12 @@ impl Handler for ReplaceMimeTypes { } } +/// These boundaries are intended to be used with measurements having the unit of "bytes". +pub const BYTES_HISTOGRAM_BOUNDARIES: &[f64] = &[ + 1024.0, 2048.0, 4096.0, 8192.0, 16384.0, 32768.0, 65536.0, 131072.0, 262144.0, 524288.0, + 1048576.0, 2097152.0, 4194304.0, 8388608.0, 16777216.0, 33554432.0, +]; + /// Returns a new handler for an instance of the aggregator API, backed by the given datastore, /// according to the given configuration. pub fn aggregator_api_handler( @@ -79,10 +85,14 @@ pub fn aggregator_api_handler( State(ds), State(Arc::new(cfg)), // Metrics. - metrics(meter).with_route(|conn| { - conn.route() - .map(|route_spec| Cow::Owned(route_spec.to_string())) - }), + Metrics::new(meter.clone()) + .with_route(|conn| { + conn.route() + .map(|route_spec| Cow::Owned(route_spec.to_string())) + }) + .with_duration_histogram_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .with_request_size_histogram_boundaries(BYTES_HISTOGRAM_BOUNDARIES.to_vec()) + .with_response_size_histogram_boundaries(BYTES_HISTOGRAM_BOUNDARIES.to_vec()), // Authorization check. api(auth_check), // Check content type and accept headers diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 5e2235aca..521de03c0 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -15,7 +15,7 @@ use crate::{ batch_mode::{AccumulableBatchMode, CollectableBatchMode}, task::{self, AggregatorTask, AggregatorTaskParameters}, taskprov::PeerAggregator, - SecretBytes, + SecretBytes, TIME_HISTOGRAM_BOUNDARIES, }; use aws_lc_rs::aead::{self, LessSafeKey, AES_128_GCM}; use chrono::NaiveDateTime; @@ -192,7 +192,7 @@ impl Datastore { .u64_counter(TRANSACTION_METER_NAME) .with_description("Count of database transactions run, with their status.") .with_unit("{transaction}") - .init(); + .build(); let rollback_error_counter = meter .u64_counter(TRANSACTION_ROLLBACK_METER_NAME) .with_description(concat!( @@ -200,12 +200,13 @@ impl Datastore { "with their PostgreSQL error code.", )) .with_unit("{error}") - .init(); + .build(); let transaction_retry_histogram = meter .u64_histogram(TRANSACTION_RETRIES_METER_NAME) .with_description("The number of retries before a transaction is committed or aborted.") .with_unit("{retry}") - .init(); + .with_boundaries(RETRIES_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); let transaction_duration_histogram = meter .f64_histogram(TRANSACTION_DURATION_METER_NAME) .with_description(concat!( @@ -213,7 +214,8 @@ impl Datastore { "BEGIN and COMMIT/ROLLBACK statements." )) .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); let transaction_pool_wait_histogram = meter .f64_histogram(TRANSACTION_POOL_WAIT_METER_NAME) .with_description(concat!( @@ -221,7 +223,8 @@ impl Datastore { "slot to become available in the connection pooler." )) .with_unit("s") - .init(); + .with_boundaries(TIME_HISTOGRAM_BOUNDARIES.to_vec()) + .build(); Self { pool, @@ -461,6 +464,12 @@ pub const TRANSACTION_RETRIES_METER_NAME: &str = "janus_database_transaction_ret pub const TRANSACTION_DURATION_METER_NAME: &str = "janus_database_transaction_duration"; pub const TRANSACTION_POOL_WAIT_METER_NAME: &str = "janus_database_pool_wait_duration"; +/// These boundaries are for the number of times a database transaction was retried. +const RETRIES_HISTOGRAM_BOUNDARIES: &[f64] = &[ + 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0, 2048.0, 4096.0, 8192.0, + 16384.0, +]; + /// Transaction represents an ongoing datastore transaction. pub struct Transaction<'a, C: Clock> { raw_tx: deadpool_postgres::Transaction<'a>, diff --git a/aggregator_core/src/lib.rs b/aggregator_core/src/lib.rs index 5727ba125..0ecc68b90 100644 --- a/aggregator_core/src/lib.rs +++ b/aggregator_core/src/lib.rs @@ -75,11 +75,58 @@ impl InstrumentedHandler { } } +/// These boundaries are intended to be able to capture the length of short-lived operations +/// (e.g. HTTP requests) as well as longer-running operations. +pub const TIME_HISTOGRAM_BOUNDARIES: &[f64] = &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 90.0, 300.0, +]; + #[cfg(feature = "test-util")] pub mod test_util { - use opentelemetry::metrics::{noop::NoopMeterProvider, Meter, MeterProvider}; + use std::sync::Arc; + + use opentelemetry::{ + metrics::{InstrumentProvider, Meter, MeterProvider}, + InstrumentationScope, + }; pub fn noop_meter() -> Meter { NoopMeterProvider::new().meter("janus_aggregator") } + + // TODO(https://github.com/open-telemetry/opentelemetry-rust/issues/2444): Version 0.27 of + // `opentelemetry` removed `NoopMeterProvider` from the public API. The implementation is copied + // below until it is added back to a future version. + + #[derive(Debug, Default)] + pub struct NoopMeterProvider { + _private: (), + } + + impl NoopMeterProvider { + /// Create a new no-op meter provider. + pub fn new() -> Self { + Self { _private: () } + } + } + + impl MeterProvider for NoopMeterProvider { + fn meter_with_scope(&self, _scope: InstrumentationScope) -> Meter { + Meter::new(Arc::new(NoopMeter::new())) + } + } + + #[derive(Debug, Default)] + pub struct NoopMeter { + _private: (), + } + + impl NoopMeter { + /// Create a new no-op meter core. + pub fn new() -> Self { + Self { _private: () } + } + } + + impl InstrumentProvider for NoopMeter {} } diff --git a/docs/samples/advanced_config/aggregation_job_creator.yaml b/docs/samples/advanced_config/aggregation_job_creator.yaml index 5f8f37c46..d0b8abcd6 100644 --- a/docs/samples/advanced_config/aggregation_job_creator.yaml +++ b/docs/samples/advanced_config/aggregation_job_creator.yaml @@ -76,17 +76,6 @@ metrics_config: # binary must have been compiled with the flag `--cfg tokio_unstable`. # (optional) enabled: false - # Enable a histogram of task poll times. This introduces some additional - # overhead. (optional) - enable_poll_time_histogram: false - # Configure the scale and parameters of the poll time histogram. Defaults - # to a linear scale. (optional) - poll_time_histogram: - linear: - # Sets the poll time histogram's resolution. (optional) - resolution_us: 100 - # Sets the number of buckets in the poll time histogram. (optional) - num_buckets: 10 # Aggregation job creator-specific parameters: diff --git a/docs/samples/advanced_config/aggregation_job_driver.yaml b/docs/samples/advanced_config/aggregation_job_driver.yaml index 47a715358..ae44b58b3 100644 --- a/docs/samples/advanced_config/aggregation_job_driver.yaml +++ b/docs/samples/advanced_config/aggregation_job_driver.yaml @@ -76,17 +76,6 @@ metrics_config: # binary must have been compiled with the flag `--cfg tokio_unstable`. # (optional) enabled: false - # Enable a histogram of task poll times. This introduces some additional - # overhead. (optional) - enable_poll_time_histogram: false - # Configure the scale and parameters of the poll time histogram. Defaults - # to a linear scale. (optional) - poll_time_histogram: - linear: - # Sets the poll time histogram's resolution. (optional) - resolution_us: 100 - # Sets the number of buckets in the poll time histogram. (optional) - num_buckets: 10 # Stack size, in bytes, for threads used for VDAF preparation. (optional) thread_pool_stack_size: 2097152 diff --git a/docs/samples/advanced_config/aggregator.yaml b/docs/samples/advanced_config/aggregator.yaml index 71590a5cc..70ae07d68 100644 --- a/docs/samples/advanced_config/aggregator.yaml +++ b/docs/samples/advanced_config/aggregator.yaml @@ -78,17 +78,6 @@ metrics_config: # binary must have been compiled with the flag `--cfg tokio_unstable`. # (optional) enabled: false - # Enable a histogram of task poll times. This introduces some additional - # overhead. (optional) - enable_poll_time_histogram: false - # Configure the scale and parameters of the poll time histogram. Defaults to - # a linear scale. (optional) - poll_time_histogram: - linear: - # Sets the poll time histogram's resolution. (optional) - resolution_us: 100 - # Sets the number of buckets in the poll time histogram. (optional) - num_buckets: 10 # Stack size, in bytes, for threads used for VDAF preparation. (optional) thread_pool_stack_size: 2097152 diff --git a/docs/samples/advanced_config/collection_job_driver.yaml b/docs/samples/advanced_config/collection_job_driver.yaml index e3bb287dc..715abba5e 100644 --- a/docs/samples/advanced_config/collection_job_driver.yaml +++ b/docs/samples/advanced_config/collection_job_driver.yaml @@ -76,17 +76,6 @@ metrics_config: # binary must have been compiled with the flag `--cfg tokio_unstable`. # (optional) enabled: false - # Enable a histogram of task poll times. This introduces some additional - # overhead. (optional) - enable_poll_time_histogram: false - # Configure the scale and parameters of the poll time histogram. Defaults - # to a linear scale. (optional) - poll_time_histogram: - linear: - # Sets the poll time histogram's resolution. (optional) - resolution_us: 100 - # Sets the number of buckets in the poll time histogram. (optional) - num_buckets: 10 # Collection job driver-related parameters: diff --git a/docs/samples/advanced_config/garbage_collector.yaml b/docs/samples/advanced_config/garbage_collector.yaml index 2ae61f231..6604d15a5 100644 --- a/docs/samples/advanced_config/garbage_collector.yaml +++ b/docs/samples/advanced_config/garbage_collector.yaml @@ -76,17 +76,6 @@ metrics_config: # binary must have been compiled with the flag `--cfg tokio_unstable`. # (optional) enabled: false - # Enable a histogram of task poll times. This introduces some additional - # overhead. (optional) - enable_poll_time_histogram: false - # Configure the scale and parameters of the poll time histogram. Defaults - # to a linear scale. (optional) - poll_time_histogram: - linear: - # Sets the poll time histogram's resolution. (optional) - resolution_us: 100 - # Sets the number of buckets in the poll time histogram. (optional) - num_buckets: 10 # Garbage collector-specific parameters: garbage_collection: diff --git a/docs/samples/advanced_config/janus_cli.yaml b/docs/samples/advanced_config/janus_cli.yaml index 4addbb9ee..cae495f8d 100644 --- a/docs/samples/advanced_config/janus_cli.yaml +++ b/docs/samples/advanced_config/janus_cli.yaml @@ -76,14 +76,3 @@ metrics_config: # binary must have been compiled with the flag `--cfg tokio_unstable`. # (optional) enabled: false - # Enable a histogram of task poll times. This introduces some additional - # overhead. (optional) - enable_poll_time_histogram: false - # Configure the scale and parameters of the poll time histogram. Defaults - # to a linear scale. (optional) - poll_time_histogram: - linear: - # Sets the poll time histogram's resolution. (optional) - resolution_us: 100 - # Sets the number of buckets in the poll time histogram. (optional) - num_buckets: 10