Skip to content

Commit

Permalink
feat: implementation of log service
Browse files Browse the repository at this point in the history
  • Loading branch information
Zoe Spellman committed Mar 7, 2024
1 parent f8d54f7 commit dc2ba87
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 29 deletions.
20 changes: 18 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ version = "0.0.3"
edition = "2021"

[dependencies]
axum = "0.6.20" # this *must* be pinned because 0.7.x relies on hyper 1.x causing a ton of type conversion issues
dotenvy = "0.15"
envy = "0.4"
futures = "0.3"
http = "1.0"
http-body = "0.4.6"
hyper = { version = "0.14", features = ["full", "backports", "deprecated"] }
jsonwebtoken = "9.2"
log = { version = "0.4.21", features = ["serde"] }
once_cell = "1.19"
prost = "0.12"
prost-types = "0.12"
Expand All @@ -37,8 +39,6 @@ tonic-reflection = "0.10"
tower = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }


[build-dependencies]
tonic-build = { version = "0.11", features = ["prost"] }

Expand Down
18 changes: 14 additions & 4 deletions src/app/interfaces/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
use std::pin::Pin;

use axum::body::Bytes;
use futures::ready;
use http_body::combinators::UnsyncBoxBody;
use hyper::{service::Service, Body};
use tonic::body::BoxBody;
use tower::Layer;

use crate::app::context::AppContext;
Expand All @@ -26,7 +27,10 @@ impl ContextMiddlewareLayer {

impl<S> Layer<S> for ContextMiddlewareLayer
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S: Service<hyper::Request<Body>, Response = hyper::Response<UnsyncBoxBody<Bytes, axum::Error>>>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
type Service = ContextMiddleware<S>;
Expand All @@ -47,7 +51,10 @@ where
#[derive(Clone)]
pub struct ContextMiddleware<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S: Service<hyper::Request<Body>, Response = hyper::Response<UnsyncBoxBody<Bytes, axum::Error>>>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
/// The current context of the app, as passed in from the [`ContextMiddlewareLayer`]
Expand All @@ -67,7 +74,10 @@ type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>

impl<S> Service<hyper::Request<Body>> for ContextMiddleware<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S: Service<hyper::Request<Body>, Response = hyper::Response<UnsyncBoxBody<Bytes, axum::Error>>>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
Expand Down
14 changes: 13 additions & 1 deletion src/app/interfaces/servers/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::pin::Pin;

use hyper::Body;
use thiserror::Error;
use tonic::transport::server::{Routes, RoutesBuilder};
use tonic::{
transport::server::{Routes, RoutesBuilder},
Status,
};
use tower::Service;

use crate::features::{chart::ChartService, rating::RatingService, user::UserService};
Expand All @@ -25,6 +28,15 @@ pub enum GrpcError {
AuthError(#[from] tonic::Status),
}

impl From<GrpcError> for Status {
fn from(value: GrpcError) -> Self {
match value {
GrpcError::AuthError(status) => status,
GrpcError::RoutesError(err) => Status::internal(format!("{err}")),
}
}
}

/// The file descriptors defining the [`tonic`] GRPC service
const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("ratings_descriptor");

Expand Down
115 changes: 115 additions & 0 deletions src/app/interfaces/servers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,121 @@
//! API endpoint definitions for different entry methods
pub mod grpc;
pub mod rest;

use std::convert::Infallible;
use std::pin::Pin;

use axum::body::Bytes;
use axum::response::IntoResponse;
use futures::ready;
#[allow(unused_imports)]
pub use grpc::{GrpcService, GrpcServiceBuilder};
use http_body::combinators::UnsyncBoxBody;
use hyper::{header::CONTENT_TYPE, Request};
pub use rest::{RestService, RestServiceBuilder};
use thiserror::Error;
use tower::Service;

use self::grpc::GrpcError;

/// Any error that can occur internally to our service
#[derive(Debug, Error)]
pub enum AppCenterRatingsError {
/// An error from the GRPC endpoints
#[error("an error from the GRPC service occurred: {0}")]
GrpcError(#[from] GrpcError),
/// Technically, an error from the Rest endpoints, but they're infallible
#[error("cannot happen")]
RestError(#[from] Infallible),
}

/// The general service for our app, containing all our endpoints
#[derive(Clone, Debug)]
#[allow(clippy::missing_docs_in_private_items)]
pub struct AppCenterRatingsService {
grpc_service: GrpcService,
grpc_ready: bool,
rest_service: RestService,
rest_ready: bool,
}

impl AppCenterRatingsService {
/// Constructs the service with all the default service endpoints for REST and GRPC
pub fn with_default_routes() -> AppCenterRatingsService {
Self {
grpc_service: GrpcServiceBuilder::default().build(),
grpc_ready: false,
rest_service: RestServiceBuilder::default().build(),
rest_ready: false,
}
}
}

/// A type definition which is simply a future that's in a pinned location in the heap.
type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;

impl Service<hyper::Request<hyper::Body>> for AppCenterRatingsService {
type Response = hyper::Response<UnsyncBoxBody<Bytes, axum::Error>>;

type Error = AppCenterRatingsError;

type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
loop {
match (self.grpc_ready, self.rest_ready) {
(true, true) => return std::task::Poll::Ready(Ok(())),
(false, _) => {
ready!(self.grpc_service.poll_ready(cx))?;
self.grpc_ready = true
}
(_, false) => {
ready!(self.rest_service.poll_ready(cx)).unwrap();
self.rest_ready = true
}
}
}
}

fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
assert!(
self.grpc_ready,
"grpc service not ready. Did you forget to call `poll_ready`?"
);
assert!(
self.rest_ready,
"rest service not ready. Did you forget to call `poll_ready`?"
);

// if we get a grpc request call the grpc service, otherwise call the rest service
// when calling a service it becomes not-ready so we have drive readiness again
if is_grpc_request(&req) {
self.grpc_ready = false;
let future = self.grpc_service.call(req);
Box::pin(async move {
let res = future.await?;
Ok(res.into_response())
})
} else {
self.rest_ready = false;
let future = self.rest_service.call(req);
Box::pin(async move {
let res = future.await?;
Ok(res.into_response())
})
}
}
}

/// Checks to see if this request has a GRPC header (if not we assume REST)
fn is_grpc_request<B>(req: &Request<B>) -> bool {
req.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
}
90 changes: 90 additions & 0 deletions src/app/interfaces/servers/rest/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! The interface for serving on REST endpoints
use std::{convert::Infallible, pin::Pin};

use axum::{body::Bytes, response::IntoResponse, Router};
use http_body::combinators::UnsyncBoxBody;
use hyper::StatusCode;
use tower::Service;

use crate::features::admin::log_level::service::LogLevelService;

/// The base path appended to all our internal endpoints
const BASE_ROUTE: &str = "/v1/";

/// Dispatches to our web endpoints
#[derive(Clone, Debug)]
pub struct RestService {
/// The axum router we use for dispatching to endpoints
router: Router,
}

/// A type definition which is simply a future that's in a pinned location in the heap.
type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;

impl Service<hyper::Request<hyper::Body>> for RestService {
type Response = hyper::Response<UnsyncBoxBody<Bytes, axum::Error>>;

type Error = Infallible;

type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.router
.poll_ready(cx)
.map_err(|_| unreachable!("error is infallible"))
}

fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
let future = self.router.call(req);
Box::pin(future)
}
}

/// Handles any missing paths
async fn handler_404() -> impl IntoResponse {
(StatusCode::NOT_FOUND, "no such API endpoint")
}

/// Builds the REST service
pub struct RestServiceBuilder {
/// The underlying axum router we're building up
router: Router,
}

impl RestServiceBuilder {
/// Creates a new builder with an empty path,
/// you probably actually want [`RestServiceBuilder::default`],
/// since that seeds the default API endpoint paths.
pub fn new() -> Self {
Self {
router: Router::default(),
}
}

/// Adds the log service
pub fn with_log_level(self) -> Self {
Self {
router: self
.router
.nest(BASE_ROUTE, LogLevelService.register_axum_route()),
}
}

/// Builds the REST service, applying all configured paths and
/// forcing the others to 404.
pub fn build(self) -> RestService {
RestService {
router: self.router.fallback(handler_404),
}
}
}

impl Default for RestServiceBuilder {
fn default() -> Self {
Self::new().with_log_level()
}
}
Loading

0 comments on commit dc2ba87

Please sign in to comment.