From 4243e74ac4427a8c6e99a49a6fc7613709569363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Bizjak?= Date: Thu, 12 Oct 2023 23:18:44 +0200 Subject: [PATCH 1/2] Add load-shedding to the GRPC V2 service. --- concordium-node/Cargo.lock | 2 +- concordium-node/Cargo.toml | 4 +-- concordium-node/src/grpc2.rs | 56 +++++++++++++++++++++++++----------- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/concordium-node/Cargo.lock b/concordium-node/Cargo.lock index f51ada2a4a..209cfe0f7f 100644 --- a/concordium-node/Cargo.lock +++ b/concordium-node/Cargo.lock @@ -680,7 +680,7 @@ dependencies = [ [[package]] name = "concordium_node" -version = "6.1.6" +version = "6.1.7" dependencies = [ "anyhow", "app_dirs2", diff --git a/concordium-node/Cargo.toml b/concordium-node/Cargo.toml index 0a6e2f271f..2f45f647b4 100644 --- a/concordium-node/Cargo.toml +++ b/concordium-node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "concordium_node" -version = "6.1.6" # must be kept in sync with 'is_compatible_version' in 'src/configuration.rs' +version = "6.1.7" # must be kept in sync with 'is_compatible_version' in 'src/configuration.rs' description = "Concordium Node" authors = ["Concordium "] exclude = [".gitignore", ".gitlab-ci.yml", "test/**/*","**/**/.gitignore","**/**/.gitlab-ci.yml"] @@ -78,7 +78,7 @@ tempfile = { version = "3.1" } tonic = { version = "0.9", features = ["tls"] } tonic-reflection = "0.9" tower-http = { version = "0.4", features = ["trace", "metrics"] } -tower = "0.4" +tower = {version = "0.4", features = ["load-shed"]} tonic-web = "0.9" prost = "0.11" tokio = { version = "1.20", features = ["macros", "rt-multi-thread", "signal", "io-util", "time"] } diff --git a/concordium-node/src/grpc2.rs b/concordium-node/src/grpc2.rs index 3791416a7a..5fdef19f5d 100644 --- a/concordium-node/src/grpc2.rs +++ b/concordium-node/src/grpc2.rs @@ -1064,12 +1064,18 @@ pub mod server { .http2_keepalive_timeout(Some(std::time::Duration::from_secs( config.keepalive_timeout, ))) - .layer(log_layer) - .layer(tower::limit::ConcurrencyLimitLayer::new(config.max_concurrent_requests)) - // Note: the in-flight request layer applies after the limit layer. This is what we want so that the - // metric reflects the actual number of in-flight requests. + // Note: the in-flight request layer applies first here. Since we are using a load-shed + // layer just below this corresponds very directly to the number of requests being actually handled. + // The technical reason for this is that we cannot really stack the in flight requests layer + // below the stats layer since we want to transform some `Err` responses in the stats layer + // to Ok responses with a meaningful gRPC status code, + // but since the in flight request layer adds a guard to count in-flight requests this would + // mean we'd have to construct such a guard in the response, which is not possible. .layer(in_flight_request_layer) - .layer(stats_layer); + .layer(stats_layer) + .layer(tower::load_shed::LoadShedLayer::new()) + .layer(tower::limit::ConcurrencyLimitLayer::new(config.max_concurrent_requests)) + .layer(log_layer); if let Some(identity) = identity { builder = builder .tls_config(ServerTlsConfig::new().identity(identity)) @@ -2560,19 +2566,21 @@ fn get_grpc_code_label(code: tonic::Code) -> &'static str { /// Actual middleware implementation updating the stats. /// The middleware is called once for each gRPC request, even for the streaming /// gRPC methods. -impl tower::Service> for StatsMiddleware +impl + tower::Service> for StatsMiddleware where S: tower::Service< hyper::Request, - Response = hyper::Response, + Response = hyper::Response, + Error = tower::BoxError, > + Clone + Send + 'static, S::Future: Send + 'static, { - type Error = S::Error; + type Error = tower::BoxError; type Future = futures::future::BoxFuture<'static, Result>; - type Response = S::Response; + type Response = hyper::Response; fn poll_ready( &mut self, @@ -2599,15 +2607,29 @@ where // Time taken for the inner service to send back a response, meaning for // streaming gRPC methods this is the duration for it to first return a stream. let duration = request_received.elapsed().as_secs_f64(); - if result.is_err() { - grpc_request_duration - .with_label_values(&[ - endpoint_name.as_str(), - get_grpc_code_label(tonic::Code::Internal), - ]) - .observe(duration); + match result { + Err(e) => { + // If the load shed service terminated the request this will be signalled as + // an Err(Overloaded). So record resource exhaustion + // in the metrics. + let (code, response) = if e.is::() { + let new_response = + tonic::Status::resource_exhausted("Too many concurrent requests.") + .to_http(); + ( + tonic::Code::ResourceExhausted, + Ok(new_response.map(|_| Default::default())), + ) + } else { + (tonic::Code::Internal, Err(e)) + }; + grpc_request_duration + .with_label_values(&[endpoint_name.as_str(), get_grpc_code_label(code)]) + .observe(duration); + return response; + } + Ok(result) => (result, duration), } - (result?, duration) }; // Check if the gRPC status header is part of the HTTP headers, if not check for From d472d01e0262177f92b58d3bab3d89f8a4b99ee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Bizjak?= Date: Fri, 13 Oct 2023 11:14:59 +0200 Subject: [PATCH 2/2] Documentation and changelog. --- CHANGELOG.md | 7 +++++++ concordium-node/src/grpc2.rs | 11 ++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2318f67867..3d61af929b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ## Unreleased changes +## 6.1.7 + + - Add load-shedding to the V2 GRPC API. In particular, if at the time of the + request the node is already handling more than + `CONCORDIUM_NODE_GRPC2_MAX_CONCURRENT_REQUESTS` requests then the incoming + request will be immediately rejected. + ## 6.1.6 - Fix a regression in the start up time. When upgrading from an earlier version, the first start-up diff --git a/concordium-node/src/grpc2.rs b/concordium-node/src/grpc2.rs index 5fdef19f5d..2284ce2312 100644 --- a/concordium-node/src/grpc2.rs +++ b/concordium-node/src/grpc2.rs @@ -2613,13 +2613,14 @@ where // an Err(Overloaded). So record resource exhaustion // in the metrics. let (code, response) = if e.is::() { + // return a response with empty body of the correct type. `to_http` + // constructs a response with a `BoxBody` but + // here we need a more general one to make the service generic enough. let new_response = tonic::Status::resource_exhausted("Too many concurrent requests.") - .to_http(); - ( - tonic::Code::ResourceExhausted, - Ok(new_response.map(|_| Default::default())), - ) + .to_http() + .map(|_| Default::default()); + (tonic::Code::ResourceExhausted, Ok(new_response)) } else { (tonic::Code::Internal, Err(e)) };