Skip to content

Commit

Permalink
console-api,console-subscriber-tokio-console: attempt to genericize o…
Browse files Browse the repository at this point in the history
…ver subscriber

This is currently a build failure. Not sure how to fix it.
  • Loading branch information
jswrenn committed Aug 4, 2022
1 parent bb94af6 commit 7137c44
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 312 deletions.
479 changes: 256 additions & 223 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion console-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ tonic = { version = "0.7", default-features = false, features = [
] }
prost = "0.10"
prost-types = "0.10"
tracing-core = "0.1.17"
tracing-core = "0.1.29"

[dev-dependencies]
tonic-build = { version = "0.7", default-features = false, features = [
Expand Down
2 changes: 0 additions & 2 deletions console-api/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ message Span {
repeated Field fields = 3;
// Timestamp for the span.
google.protobuf.Timestamp at = 4;
// An Id that uniquely identifies this span's parent (if any).
optional SpanId parent_id = 5;
}

// Any new metadata that was registered since the last update.
Expand Down
Empty file.
3 changes: 0 additions & 3 deletions console-api/src/generated/rs.tokio.console.common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ pub struct Span {
/// Timestamp for the span.
#[prost(message, optional, tag="4")]
pub at: ::core::option::Option<::prost_types::Timestamp>,
/// An Id that uniquely identifies this span's parent (if any).
#[prost(message, optional, tag="5")]
pub parent_id: ::core::option::Option<SpanId>,
}
/// Any new metadata that was registered since the last update.
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
17 changes: 8 additions & 9 deletions console-api/src/generated/rs.tokio.console.instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub mod instrument_client {
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Default + Body<Data = Bytes> + Send + 'static,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
Expand All @@ -97,6 +97,7 @@ pub mod instrument_client {
) -> InstrumentClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
Expand Down Expand Up @@ -129,9 +130,9 @@ pub mod instrument_client {
&mut self,
request: impl tonic::IntoRequest<super::InstrumentRequest>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::Update>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::Update>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -152,11 +153,9 @@ pub mod instrument_client {
&mut self,
request: impl tonic::IntoRequest<super::TaskDetailsRequest>,
) -> Result<
tonic::Response<
tonic::codec::Streaming<super::super::tasks::TaskDetails>,
>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::super::tasks::TaskDetails>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand Down
9 changes: 5 additions & 4 deletions console-api/src/generated/rs.tokio.console.trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub mod trace_client {
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Default + Body<Data = Bytes> + Send + 'static,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
Expand All @@ -117,6 +117,7 @@ pub mod trace_client {
) -> TraceClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
Expand Down Expand Up @@ -149,9 +150,9 @@ pub mod trace_client {
&mut self,
request: impl tonic::IntoRequest<super::WatchRequest>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::TraceEvent>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::TraceEvent>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand Down
4 changes: 2 additions & 2 deletions console-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub mod async_ops;
/// Represents unique id's and Rust source locations.
mod common;
/// Represents updates to the causality tree.
pub mod consequences;
/// Represents interactions between the console-subscriber and a console client observing it.
pub mod instrument;
/// Represents updates to the resources in an async runtime.
Expand All @@ -12,6 +14,4 @@ pub mod resources;
pub mod tasks;
/// Represents events on the tracing subsystem: thread registration and span activities.
pub mod trace;
/// Represents updates to the causality tree.
pub mod consequences;
pub use common::*;
4 changes: 2 additions & 2 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ tokio-stream = "0.1"
thread_local = "1.1.3"
console-api = { version = "0.2.0", path = "../console-api", features = ["transport"] }
tonic = { version = "0.7", features = ["transport"] }
tracing-core = "0.1.24"
tracing-core = "0.1.29"
tracing = "0.1.26"
tracing-subscriber = { version = "0.3.11", default-features = false, features = ["fmt", "registry"] }
tracing-causality = { path = "../../tracing-causality" }
tracing-causality = { version = "0.1.0" }
futures = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
# The parking_lot dependency is renamed, because we want our `parking_lot`
Expand Down
13 changes: 8 additions & 5 deletions console-subscriber/examples/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

static HELP: &str = r#"
Example console-instrumented app
Expand All @@ -17,13 +17,16 @@ OPTIONS:
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tracing_subscriber::prelude::*;

// initialize an underlying `Registry`
let registry = Arc::new(tracing_subscriber::registry());

// spawn the console server in the background,
// returning a `Layer`:
let console_layer = console_subscriber::spawn();
let console_layer = console_subscriber::spawn(registry.clone());

// build a `Subscriber` by combining layers with a
// `tracing_subscriber::Registry`:
tracing_subscriber::registry().with(console_layer).init();
// build a `Subscriber` by combining layers with the
// `registry`:
registry.with(console_layer).init();

// spawn optional extras from CLI args
// skip first which is command name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use std::sync::Arc;

use tokio::task;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tracing_subscriber::prelude::*;

let subscriber = tracing_subscriber::registry()
.with(console_subscriber::spawn())
.with(tracing_subscriber::filter::filter_fn(|_| true));
// initialize an underlying `Registry`
let registry = Arc::new(tracing_subscriber::registry());

// spawn the console server in the background,
// returning a `Layer`:
let console_layer = console_subscriber::spawn(registry.clone());

tracing::subscriber::set_global_default(subscriber).unwrap();
registry
.with(console_layer)
.with(tracing_subscriber::filter::filter_fn(|_| true))
.init();

task::Builder::default()
.name("main-task")
Expand Down
35 changes: 20 additions & 15 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::{Command, Event, Shared, Watch};
use crate::{
consequences,
stats::{self, Unsent},
ToProto, WatchRequest,
};
Expand All @@ -16,14 +15,16 @@ use std::{
},
time::{Duration, Instant},
};
use tracing::Subscriber;
use tracing_core::{span::Id, Metadata};
use tracing_subscriber::registry::LookupSpan;

mod id_data;
mod shrink;
use self::id_data::{IdData, Include};
use self::shrink::{ShrinkMap, ShrinkVec};

pub(crate) struct Aggregator {
pub(crate) struct Aggregator<S> {
/// Channel of incoming events emitted by `TaskLayer`s.
events: mpsc::Receiver<Event>,

Expand Down Expand Up @@ -86,6 +87,9 @@ pub(crate) struct Aggregator {
/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
/// timestamp that can be sent over the wire.
base_time: stats::TimeAnchor,

/// The subscriber used by the application.
subscriber: Arc<S>,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -129,13 +133,17 @@ struct AsyncOp {
source: String,
}

impl Aggregator {
impl<S> Aggregator<S>
where
S: Subscriber + for<'s> LookupSpan<'s>,
{
pub(crate) fn new(
events: mpsc::Receiver<Event>,
rpcs: mpsc::Receiver<Command>,
builder: &crate::Builder,
shared: Arc<crate::Shared>,
base_time: stats::TimeAnchor,
subscriber: Arc<S>,
) -> Self {
Self {
shared,
Expand All @@ -156,6 +164,7 @@ impl Aggregator {
poll_ops: Default::default(),
temporality: Temporality::Live,
base_time,
subscriber,
}
}

Expand Down Expand Up @@ -324,19 +333,15 @@ impl Aggregator {
let now = Some(self.base_time.to_timestamp(Instant::now()));

let span = id.clone();
// XXX(jswrenn): A new thread is spawned solely for the sake of
// getting access to the global subscriber.
let (trace, updates) = std::thread::spawn(move || {
// XXX(jswrenn): Need to gracefully handle the case where the
// task span is already closed.
consequences::trace(&span, buffer)
.expect("the subscriber isn't a Registry")
.expect("id is already closed")
})
.join()
.unwrap();

let initial = crate::consequences::trace_into_proto(trace);
let maybe_trace = tracing_causality::trace(&self.subscriber, &span, buffer);
let (initial, updates) = if let Some((trace, updates)) = maybe_trace {
(crate::consequences::trace_into_proto(trace), updates)
} else {
// a trace could not be produced, because the span is already closed
//(vec![], tracing_causality::Updates::default())
panic!();
};

// Send back the stream receiver.
// Then send the initial state --- if this fails, the subscription is already dead.
Expand Down
45 changes: 33 additions & 12 deletions console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use super::{ConsoleLayer, Server};
use std::{
net::{SocketAddr, ToSocketAddrs},
path::PathBuf,
sync::Arc,
thread,
time::Duration,
};
use tokio::runtime;
use tracing::Subscriber;
use tracing_subscriber::{
filter,
filter::{self, FilterFn},
layer::{Layer, SubscriberExt},
prelude::*,
registry::LookupSpan,
Expand Down Expand Up @@ -51,7 +52,7 @@ impl Default for Builder {
client_buffer_capacity: ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY,
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
server_addr: SocketAddr::new(Server::<()>::DEFAULT_IP, Server::<()>::DEFAULT_PORT),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
Expand Down Expand Up @@ -190,8 +191,11 @@ impl Builder {
}

/// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task.
pub fn build(self) -> (ConsoleLayer, Server) {
ConsoleLayer::build(self)
pub fn build<S>(self, subscriber: Arc<S>) -> (ConsoleLayer, Server<S>)
where
S: Subscriber + for<'s> LookupSpan<'s>,
{
ConsoleLayer::build(self, subscriber)
}

/// Configures this builder from a standard set of environment variables:
Expand Down Expand Up @@ -318,9 +322,11 @@ impl Builder {
.expect("`error` filter should always parse successfully")
});

let console_layer = self.spawn();
let registry = Arc::new(tracing_subscriber::registry());

let console_layer = self.spawn(registry.clone());

tracing_subscriber::registry()
registry
.with(console_layer)
.with(tracing_subscriber::fmt::layer().with_filter(fmt_filter))
.init();
Expand Down Expand Up @@ -382,13 +388,28 @@ impl Builder {
/// [`fmt::Layer`]: https://docs.rs/tracing-subscriber/latest/tracing-subscriber/fmt/struct.Layer.html
/// [`console_subscriber::init`]: crate::init()
#[must_use = "a `Layer` must be added to a `tracing::Subscriber` in order to be used"]
pub fn spawn<S>(self) -> impl Layer<S>
pub fn spawn<S>(self, subscriber: Arc<S>) -> impl Layer<Arc<S>>
where
S: Subscriber + for<'a> LookupSpan<'a>,
S: Send + Sync + Subscriber + for<'a> LookupSpan<'a> + 'static,
{
fn console_filter(meta: &tracing::Metadata<'_>) -> bool {
// events will have *targets* beginning with "runtime"
if meta.is_event() {
return meta.target().starts_with("runtime") || meta.target().starts_with("tokio");
}

// spans will have *names* beginning with "runtime". for backwards
// compatibility with older Tokio versions, enable anything with the `tokio`
// target as well.
meta.name().starts_with("runtime.") || meta.target().starts_with("tokio")
}

let self_trace = self.self_trace;

let (layer, server) = self.build();
let (layer, server) = self.build(subscriber);
let filter =
FilterFn::new(console_filter as for<'r, 's> fn(&'r tracing::Metadata<'s>) -> bool);
let layer = layer.with_filter(filter);

thread::Builder::new()
.name("console_subscriber".into())
Expand Down Expand Up @@ -553,11 +574,11 @@ pub fn init() {
/// [`fmt::Layer`]: https://docs.rs/tracing-subscriber/latest/tracing-subscriber/fmt/struct.Layer.html
/// [`console_subscriber::init`]: crate::init()
#[must_use = "a `Layer` must be added to a `tracing::Subscriber`in order to be used"]
pub fn spawn<S>() -> impl Layer<S>
pub fn spawn<S>(subscriber: Arc<S>) -> impl Layer<Arc<S>>
where
S: Subscriber + for<'a> LookupSpan<'a>,
S: Send + Sync + Subscriber + for<'a> LookupSpan<'a> + 'static,
{
ConsoleLayer::builder().with_default_env().spawn::<S>()
ConsoleLayer::builder().with_default_env().spawn(subscriber)
}

fn duration_from_env(var_name: &str) -> Option<Duration> {
Expand Down
Loading

0 comments on commit 7137c44

Please sign in to comment.