From 4243e74ac4427a8c6e99a49a6fc7613709569363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Bizjak?= Date: Thu, 12 Oct 2023 23:18:44 +0200 Subject: [PATCH] Add load-shedding to the GRPC V2 service. --- concordium-node/Cargo.lock | 2 +- concordium-node/Cargo.toml | 4 +-- concordium-node/src/grpc2.rs | 56 +++++++++++++++++++++++++----------- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/concordium-node/Cargo.lock b/concordium-node/Cargo.lock index f51ada2a4a..209cfe0f7f 100644 --- a/concordium-node/Cargo.lock +++ b/concordium-node/Cargo.lock @@ -680,7 +680,7 @@ dependencies = [ [[package]] name = "concordium_node" -version = "6.1.6" +version = "6.1.7" dependencies = [ "anyhow", "app_dirs2", diff --git a/concordium-node/Cargo.toml b/concordium-node/Cargo.toml index 0a6e2f271f..2f45f647b4 100644 --- a/concordium-node/Cargo.toml +++ b/concordium-node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "concordium_node" -version = "6.1.6" # must be kept in sync with 'is_compatible_version' in 'src/configuration.rs' +version = "6.1.7" # must be kept in sync with 'is_compatible_version' in 'src/configuration.rs' description = "Concordium Node" authors = ["Concordium "] exclude = [".gitignore", ".gitlab-ci.yml", "test/**/*","**/**/.gitignore","**/**/.gitlab-ci.yml"] @@ -78,7 +78,7 @@ tempfile = { version = "3.1" } tonic = { version = "0.9", features = ["tls"] } tonic-reflection = "0.9" tower-http = { version = "0.4", features = ["trace", "metrics"] } -tower = "0.4" +tower = {version = "0.4", features = ["load-shed"]} tonic-web = "0.9" prost = "0.11" tokio = { version = "1.20", features = ["macros", "rt-multi-thread", "signal", "io-util", "time"] } diff --git a/concordium-node/src/grpc2.rs b/concordium-node/src/grpc2.rs index 3791416a7a..5fdef19f5d 100644 --- a/concordium-node/src/grpc2.rs +++ b/concordium-node/src/grpc2.rs @@ -1064,12 +1064,18 @@ pub mod server { .http2_keepalive_timeout(Some(std::time::Duration::from_secs( config.keepalive_timeout, ))) - .layer(log_layer) - .layer(tower::limit::ConcurrencyLimitLayer::new(config.max_concurrent_requests)) - // Note: the in-flight request layer applies after the limit layer. This is what we want so that the - // metric reflects the actual number of in-flight requests. + // Note: the in-flight request layer applies first here. Since we are using a load-shed + // layer just below this corresponds very directly to the number of requests being actually handled. + // The technical reason for this is that we cannot really stack the in flight requests layer + // below the stats layer since we want to transform some `Err` responses in the stats layer + // to Ok responses with a meaningful gRPC status code, + // but since the in flight request layer adds a guard to count in-flight requests this would + // mean we'd have to construct such a guard in the response, which is not possible. .layer(in_flight_request_layer) - .layer(stats_layer); + .layer(stats_layer) + .layer(tower::load_shed::LoadShedLayer::new()) + .layer(tower::limit::ConcurrencyLimitLayer::new(config.max_concurrent_requests)) + .layer(log_layer); if let Some(identity) = identity { builder = builder .tls_config(ServerTlsConfig::new().identity(identity)) @@ -2560,19 +2566,21 @@ fn get_grpc_code_label(code: tonic::Code) -> &'static str { /// Actual middleware implementation updating the stats. /// The middleware is called once for each gRPC request, even for the streaming /// gRPC methods. -impl tower::Service> for StatsMiddleware +impl + tower::Service> for StatsMiddleware where S: tower::Service< hyper::Request, - Response = hyper::Response, + Response = hyper::Response, + Error = tower::BoxError, > + Clone + Send + 'static, S::Future: Send + 'static, { - type Error = S::Error; + type Error = tower::BoxError; type Future = futures::future::BoxFuture<'static, Result>; - type Response = S::Response; + type Response = hyper::Response; fn poll_ready( &mut self, @@ -2599,15 +2607,29 @@ where // Time taken for the inner service to send back a response, meaning for // streaming gRPC methods this is the duration for it to first return a stream. let duration = request_received.elapsed().as_secs_f64(); - if result.is_err() { - grpc_request_duration - .with_label_values(&[ - endpoint_name.as_str(), - get_grpc_code_label(tonic::Code::Internal), - ]) - .observe(duration); + match result { + Err(e) => { + // If the load shed service terminated the request this will be signalled as + // an Err(Overloaded). So record resource exhaustion + // in the metrics. + let (code, response) = if e.is::() { + let new_response = + tonic::Status::resource_exhausted("Too many concurrent requests.") + .to_http(); + ( + tonic::Code::ResourceExhausted, + Ok(new_response.map(|_| Default::default())), + ) + } else { + (tonic::Code::Internal, Err(e)) + }; + grpc_request_duration + .with_label_values(&[endpoint_name.as_str(), get_grpc_code_label(code)]) + .observe(duration); + return response; + } + Ok(result) => (result, duration), } - (result?, duration) }; // Check if the gRPC status header is part of the HTTP headers, if not check for