Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement and provide access to request/response timing information. #2

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/twirp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod test;

pub use client::{Client, ClientBuilder, ClientError, Middleware, Next, Result};
pub use error::*; // many constructors like `invalid_argument()`
pub use server::{serve, Router};
pub use server::{serve, Router, Timings};

// Re-export `reqwest` so that it's easy to implement middleware.
pub use reqwest;
Expand Down
121 changes: 113 additions & 8 deletions crates/twirp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ use futures::Future;
use hyper::{header, Body, Method, Request, Response};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::time::{Duration, Instant};

use crate::headers::{CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF};
use crate::{error, to_proto_body, GenericError, TwirpErrorResponse};

/// A function that handles a request and returns a response.
type HandlerFn = Box<dyn Fn(Request<Body>) -> HandlerResponse + Send + Sync>;

/// Type alias for a handler response.
type HandlerResponse =
Box<dyn Future<Output = Result<Response<Body>, GenericError>> + Unpin + Send>;

type HandlerFn = Box<dyn Fn(Request<Body>) -> HandlerResponse + Send + Sync>;

/// A Router maps a request to a handler.
/// A Router maps a request (method, path) tuple to a handler.
pub struct Router {
routes: HashMap<(Method, String), HandlerFn>,
prefix: &'static str,
}

/// The canonical twirp path prefix. You don't have to use this, but it's the default.
pub const DEFAULT_TWIRP_PATH_PREFIX: &str = "/twirp";

impl Default for Router {
Expand Down Expand Up @@ -48,7 +52,7 @@ impl Router {
}
}

/// Adds a handler to the router for the given method and path.
/// Adds a sync handler to the router for the given method and path.
pub fn add_sync_handler<F>(&mut self, method: Method, path: &str, f: F)
where
F: Fn(Request<Body>) -> Result<Response<Body>, GenericError>
Expand Down Expand Up @@ -96,8 +100,16 @@ impl Router {
> {
let f = f.clone();
Box::new(Box::pin(async move {
match parse_request(req).await {
Ok((req, resp_fmt)) => write_response(f(req).await, resp_fmt),
let mut timings = *req
.extensions()
.get::<Timings>()
.expect("invariant violated: timing info not present in request");
match parse_request(req, &mut timings).await {
Ok((req, resp_fmt)) => {
let res = f(req).await;
timings.set_response_handled();
write_response(res, resp_fmt)
}
Err(err) => {
// This is the only place we use tracing (would be nice to remove)
// tracing::error!(?err, "failed to parse request");
Expand All @@ -109,17 +121,27 @@ impl Router {
twirp_err.to_response()
}
}
.map(|mut resp| {
timings.set_response_written();
resp.extensions_mut().insert(timings);
resp
})
}))
};
let key = (Method::POST, [self.prefix, path].join("/"));
self.routes.insert(key, Box::new(g));
}
}

/// Serve a request using the given router.
pub async fn serve(
router: Arc<Router>,
req: Request<Body>,
mut req: Request<Body>,
) -> Result<Response<Body>, GenericError> {
if req.extensions().get::<Timings>().is_none() {
let start = tokio::time::Instant::now();
req.extensions_mut().insert(Timings::new(start));
}
let key = (req.method().clone(), req.uri().path().to_string());
if let Some(handler) = router.routes.get(&key) {
handler(req).await
Expand Down Expand Up @@ -150,16 +172,21 @@ impl BodyFormat {
}
}

async fn parse_request<T>(req: Request<Body>) -> Result<(T, BodyFormat), GenericError>
async fn parse_request<T>(
req: Request<Body>,
timings: &mut Timings,
) -> Result<(T, BodyFormat), GenericError>
where
T: prost::Message + Default + DeserializeOwned,
{
let format = BodyFormat::from_content_type(&req);
let bytes = hyper::body::to_bytes(req.into_body()).await?;
timings.set_received();
let request = match format {
BodyFormat::Pb => T::decode(bytes)?,
BodyFormat::JsonPb => serde_json::from_slice(&bytes)?,
};
timings.set_parsed();
Ok((request, format))
}

Expand Down Expand Up @@ -191,6 +218,84 @@ where
Ok(res)
}

/// Contains timing information associated with a request.
/// To access the timings in a given request, use the [extensions](Request::extensions)
/// method and specialize to `Timings` appropriately.
#[derive(Debug, Clone, Copy)]
pub struct Timings {
// When the request started.
pub start: Instant,
// When the request was received (headers and body).
pub request_received: Option<Instant>,
// When the request body was parsed.
pub request_parsed: Option<Instant>,
// When the response handler returned.
pub response_handled: Option<Instant>,
// When the response was written.
pub response_written: Option<Instant>,
}

impl Timings {
#[allow(clippy::new_without_default)]
pub fn new(start: Instant) -> Self {
Self {
start,
request_received: None,
request_parsed: None,
response_handled: None,
response_written: None,
}
}

fn set_received(&mut self) {
self.request_received = Some(Instant::now());
}

fn set_parsed(&mut self) {
self.request_parsed = Some(Instant::now());
}

fn set_response_handled(&mut self) {
self.response_handled = Some(Instant::now());
}

fn set_response_written(&mut self) {
self.response_written = Some(Instant::now());
}

pub fn received(&self) -> Option<Duration> {
self.request_received.map(|x| x - self.start)
}

pub fn parsed(&self) -> Option<Duration> {
match (self.request_parsed, self.request_received) {
(Some(parsed), Some(received)) => Some(parsed - received),
_ => None,
}
}

pub fn response_handled(&self) -> Option<Duration> {
match (self.response_handled, self.request_parsed) {
(Some(handled), Some(parsed)) => Some(handled - parsed),
_ => None,
}
}

pub fn response_written(&self) -> Option<Duration> {
match (self.response_written, self.response_handled) {
(Some(written), Some(handled)) => Some(written - handled),
(Some(written), None) => {
if let Some(parsed) = self.request_parsed {
Some(written - parsed)
} else {
self.request_received.map(|received| written - received)
}
}
_ => None,
}
}
}

#[cfg(test)]
mod tests {

Expand Down