Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(app): Backend frame count metrics #3308

Merged
merged 8 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,9 @@ dependencies = [
"ahash",
"bytes",
"futures",
"futures-util",
"http",
"http-body",
"hyper",
"linkerd-app-core",
"linkerd-app-test",
Expand Down Expand Up @@ -1744,6 +1746,7 @@ dependencies = [
name = "linkerd-http-prom"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http",
"http-body",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }

[dev-dependencies]
futures-util = "0.3"
http-body = "0.4"
hyper = { version = "0.14", features = ["http1", "http2"] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-rustls = "0.24"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{BackendRef, ParentRef, RouteRef};
use linkerd_app_core::{metrics::prom, svc};
use linkerd_http_prom::{
body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseBodyFamilies},
record_response::{self, NewResponseDuration, StreamLabel},
NewCountRequests, RequestCount, RequestCountFamilies,
};
Expand All @@ -15,6 +16,7 @@ mod tests;
pub struct RouteBackendMetrics<L: StreamLabel> {
requests: RequestCountFamilies<labels::RouteBackend>,
responses: ResponseMetrics<L>,
body_metrics: ResponseBodyFamilies<labels::RouteBackend>,
}

type ResponseMetrics<L> = record_response::ResponseMetrics<
Expand All @@ -26,14 +28,24 @@ pub fn layer<T, N>(
metrics: &RouteBackendMetrics<T::StreamLabel>,
) -> impl svc::Layer<
N,
Service = NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Service = NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
>,
> + Clone
where
T: MkStreamLabel,
N: svc::NewService<T>,
NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
>: svc::NewService<T>,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Expand All @@ -44,28 +56,37 @@ where
let RouteBackendMetrics {
requests,
responses,
body_metrics,
} = metrics.clone();

svc::layer::mk(move |inner| {
use svc::Layer;
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer(
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
),
)
})
}

#[derive(Clone, Debug)]
pub struct ExtractRequestCount(RequestCountFamilies<labels::RouteBackend>);

#[derive(Clone, Debug)]
pub struct ExtractRecordBodyDataParams(ResponseBodyFamilies<labels::RouteBackend>);

// === impl RouteBackendMetrics ===

impl<L: StreamLabel> RouteBackendMetrics<L> {
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
let requests = RequestCountFamilies::register(reg);
let responses = record_response::ResponseMetrics::register(reg, histo);
let body_metrics = ResponseBodyFamilies::register(reg);
Self {
requests,
responses,
body_metrics,
}
}

Expand All @@ -83,13 +104,22 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
self.responses.get_statuses(l)
}

#[cfg(test)]
pub(crate) fn get_response_body_metrics(
&self,
l: &labels::RouteBackend,
) -> linkerd_http_prom::body_data::response::BodyDataMetrics {
self.body_metrics.metrics(l)
}
}

impl<L: StreamLabel> Default for RouteBackendMetrics<L> {
fn default() -> Self {
Self {
requests: Default::default(),
responses: Default::default(),
body_metrics: Default::default(),
}
}
}
Expand All @@ -99,6 +129,7 @@ impl<L: StreamLabel> Clone for RouteBackendMetrics<L> {
Self {
requests: self.requests.clone(),
responses: self.responses.clone(),
body_metrics: self.body_metrics.clone(),
}
}
}
Expand All @@ -114,3 +145,17 @@ where
.metrics(&labels::RouteBackend(t.param(), t.param(), t.param()))
}
}

// === impl ExtractRecordBodyDataParams ===

impl<T> svc::ExtractParam<BodyDataMetrics, T> for ExtractRecordBodyDataParams
where
T: svc::Param<ParentRef> + svc::Param<RouteRef> + svc::Param<BackendRef>,
{
fn extract_param(&self, t: &T) -> BodyDataMetrics {
let Self(families) = self;
let labels = labels::RouteBackend(t.param(), t.param(), t.param());

families.metrics(&labels)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use super::{
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
};
use crate::http::{concrete, logical::Concrete};
use bytes::Buf;
use linkerd_app_core::{
svc::{self, http::BoxBody, Layer, NewService},
transport::{Remote, ServerAddr},
Error,
};
use linkerd_proxy_client_policy as policy;

Expand Down Expand Up @@ -114,6 +116,145 @@ async fn http_request_statuses() {
assert_eq!(mixed.get(), 1);
}

/// Tests that metrics count frames in the backend response body.
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn body_data_layer_records_frames() -> Result<(), Error> {
use http_body::Body;
use linkerd_app_core::proxy::http;
use linkerd_http_prom::body_data::response::BodyDataMetrics;
use tower::{Service, ServiceExt};

let _trace = linkerd_tracing::test::trace_init();

let metrics = super::RouteBackendMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let backend_ref = crate::BackendRef(policy::Meta::new_default("backend"));

let (mut svc, mut handle) =
mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref);
handle.allow(1);

// Create a request.
let req = {
let empty = hyper::Body::empty();
let body = BoxBody::new(empty);
http::Request::builder().method("DOOT").body(body).unwrap()
};

// Call the service once it is ready to accept a request.
tracing::info!("calling service");
svc.ready().await.expect("ready");
let call = svc.call(req);
let (req, send_resp) = handle.next_request().await.unwrap();
debug_assert_eq!(req.method().as_str(), "DOOT");

// Acquire the counters for this backend.
tracing::info!("acquiring response body metrics");
let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone());
let BodyDataMetrics {
// TODO(kate): currently, histograms do not expose their observation count or sum. so,
// we're left unable to exercise these metrics until prometheus/client_rust#242 lands.
// - https://github.com/prometheus/client_rust/pull/241
// - https://github.com/prometheus/client_rust/pull/242
#[cfg(feature = "prometheus-client-rust-242")]
frame_size,
..
} = metrics.get_response_body_metrics(&labels);

// Before we've sent a response, the counter should be zero.
#[cfg(feature = "prometheus-client-rust-242")]
{
assert_eq!(frame_size.count(), 0);
assert_eq!(frame_size.sum(), 0);
}

// Create a response whose body is backed by a channel that we can send chunks to, send it.
tracing::info!("sending response");
let mut resp_tx = {
let (tx, body) = hyper::Body::channel();
let body = BoxBody::new(body);
let resp = http::Response::builder()
.status(http::StatusCode::IM_A_TEAPOT)
.body(body)
.unwrap();
send_resp.send_response(resp);
tx
};

// Before we've sent any bytes, the counter should be zero.
#[cfg(feature = "prometheus-client-rust-242")]
{
assert_eq!(frame_size.count(), 0);
assert_eq!(frame_size.sum(), 0);
}

// On the client end, poll our call future and await the response.
tracing::info!("polling service future");
let (parts, body) = call.await?.into_parts();
debug_assert_eq!(parts.status, 418);

let mut body = Box::pin(body);

/// Returns the next chunk from a boxed body.
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Result<Vec<u8>, Error> {
use std::task::{Context, Poll};
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
let data = match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(Some(Ok(d))) => d,
_ => panic!("next chunk should be ready"),
};
let chunk = data.chunk().to_vec();
Ok(chunk)
}

{
// Send a chunk, confirm that our counters are incremented.
tracing::info!("sending first chunk");
resp_tx.send_data("hello".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out");
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.count(), 1);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.sum(), 5);
}

{
// Send another chunk, confirm that our counters are incremented once more.
tracing::info!("sending second chunk");
resp_tx.send_data(", world!".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!(
", world!".as_bytes(),
chunk,
"should get same value back out"
);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.count(), 2);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.sum(), 5 + 8);
}

{
// Close the body, show that the counters remain at the same values.
use std::task::{Context, Poll};
tracing::info!("closing response body");
drop(resp_tx);
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(None) => {}
_ => panic!("got unexpected poll result"),
};
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.count(), 2);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.sum(), 5 + 8);
}

Ok(())
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_request_statuses_ok() {
let _trace = linkerd_tracing::test::trace_init();
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/prom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Tower middleware for Prometheus metrics.
test-util = []

[dependencies]
bytes = "1"
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
Expand Down
5 changes: 5 additions & 0 deletions linkerd/http/prom/src/body_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod request;
pub mod response;

mod body;
mod metrics;
Loading
Loading