From 33a511df756b6423a51d77e608bcf8b573cd3f99 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Thu, 23 Nov 2023 16:05:06 +0100 Subject: [PATCH] feat: obey http status responses. backoff when 429 or 50x (#339) --- server/Cargo.toml | 1 + server/src/client_api.rs | 6 +- server/src/error.rs | 12 +++- server/src/http/background_send_metrics.rs | 57 ++++++++++++++++++- server/src/http/feature_refresher.rs | 58 ++++++++++++++----- server/src/http/unleash_client.rs | 11 +++- server/src/persistence/file.rs | 4 ++ server/src/types.rs | 65 ++++++++++++++++++++++ 8 files changed, 189 insertions(+), 25 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 52c6f193..556c8cf5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -49,6 +49,7 @@ opentelemetry_sdk = { version = "0.21.1", features = [ ] } prometheus = { version = "0.13.3", features = ["process"] } prometheus-static-metric = "0.5.1" +rand = "0.8.5" redis = { version = "0.23.3", features = ["tokio-comp", "tokio-rustls-comp"] } reqwest = { version = "0.11.22", default-features = false, features = [ "rustls", diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 5863c9b1..7a4925cb 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -1,4 +1,4 @@ -use crate::error::{EdgeError, FeatureError}; +use crate::error::EdgeError; use crate::filters::{ filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet, }; @@ -94,7 +94,7 @@ async fn resolve_features( None => features_cache .get(&cache_key(&validated_token)) .map(|client_features| filter_client_features(&client_features, &filter_set)) - .ok_or(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)), + .ok_or(EdgeError::ClientCacheError), }?; Ok(Json(ClientFeatures { @@ -140,7 +140,7 @@ pub async fn get_feature( None => features_cache .get(&cache_key(&validated_token)) .map(|client_features| filter_client_features(&client_features, &filter_set)) - .ok_or(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)), + .ok_or(EdgeError::ClientCacheError), } .map(|client_features| client_features.features.into_iter().next())? .ok_or(EdgeError::FeatureNotFound(feature_name.into_inner())) diff --git a/server/src/error.rs b/server/src/error.rs index 5c33ff5f..de1b2394 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -15,7 +15,7 @@ pub const TRUST_PROXY_PARSE_ERROR: &str = pub enum FeatureError { AccessDenied, NotFound, - Retriable, + Retriable(StatusCode), } #[derive(Debug, Serialize)] @@ -89,6 +89,7 @@ pub enum EdgeError { AuthorizationDenied, AuthorizationPending, ClientBuildError(String), + ClientCacheError, ClientCertificateError(CertificateError), ClientFeaturesFetchError(FeatureError), ClientFeaturesParseError(String), @@ -131,7 +132,10 @@ impl Display for EdgeError { EdgeError::PersistenceError(msg) => write!(f, "{msg}"), EdgeError::JsonParseError(msg) => write!(f, "{msg}"), EdgeError::ClientFeaturesFetchError(fe) => match fe { - FeatureError::Retriable => write!(f, "Could not fetch client features. Will retry"), + FeatureError::Retriable(status_code) => write!( + f, + "Could not fetch client features. Will retry {status_code}" + ), FeatureError::AccessDenied => write!( f, "Could not fetch client features because api key was not allowed" @@ -196,6 +200,9 @@ impl Display for EdgeError { "Client hydration failed. Somehow we said [{message}] when it did" ) } + EdgeError::ClientCacheError => { + write!(f, "Fetching client features from cache failed") + } } } } @@ -230,6 +237,7 @@ impl ResponseError for EdgeError { EdgeError::HealthCheckError(_) => StatusCode::INTERNAL_SERVER_ERROR, EdgeError::ReadyCheckError(_) => StatusCode::INTERNAL_SERVER_ERROR, EdgeError::ClientHydrationFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, + EdgeError::ClientCacheError => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/http/background_send_metrics.rs b/server/src/http/background_send_metrics.rs index a8d11a5d..4fd1cf23 100644 --- a/server/src/http/background_send_metrics.rs +++ b/server/src/http/background_send_metrics.rs @@ -1,5 +1,6 @@ use actix_web::http::StatusCode; -use tracing::{error, warn}; +use std::cmp::max; +use tracing::{error, info, trace, warn}; use super::unleash_client::UnleashClient; use std::time::Duration; @@ -10,6 +11,7 @@ use crate::{ }; use lazy_static::lazy_static; use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts}; +use rand::Rng; use std::sync::Arc; lazy_static! { @@ -30,8 +32,11 @@ pub async fn send_metrics_task( unleash_client: Arc, send_interval: u64, ) { + let mut failures = 0; + let mut interval = Duration::from_secs(send_interval); loop { let batches = metrics_cache.get_appropriately_sized_batches(); + trace!("Posting {} batches", batches.len()); for batch in batches { if !batch.applications.is_empty() || !batch.metrics.is_empty() { if let Err(edge_error) = unleash_client.send_batch_metrics(batch.clone()).await { @@ -48,6 +53,34 @@ pub async fn send_metrics_task( StatusCode::BAD_REQUEST => { error!("Unleash said [{message:?}]. Dropping this metric bucket to avoid consuming too much memory"); } + StatusCode::NOT_FOUND => { + failures = 10; + interval = new_interval(send_interval, failures, 5); + error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.as_secs()); + } + StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => { + failures = 10; + interval = new_interval(send_interval, failures, 5); + error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.as_secs()); + } + StatusCode::TOO_MANY_REQUESTS => { + failures = max(10, failures + 1); + interval = new_interval(send_interval, failures, 5); + info!( + "Upstream said it was too busy, backing off to {} seconds", + interval.as_secs() + ); + metrics_cache.reinsert_batch(batch); + } + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => { + failures = max(10, failures + 1); + interval = new_interval(send_interval, failures, 5); + info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.as_secs()); + metrics_cache.reinsert_batch(batch); + } _ => { warn!("Failed to send metrics. Status code was {status_code}. Will reinsert metrics for next attempt"); metrics_cache.reinsert_batch(batch); @@ -59,9 +92,29 @@ pub async fn send_metrics_task( METRICS_UNEXPECTED_ERRORS.inc(); } } + } else { + failures = max(0, failures - 1); + interval = new_interval(send_interval, failures, 5); } } } - tokio::time::sleep(Duration::from_secs(send_interval)).await; + trace!( + "Done posting traces. Sleeping for {} seconds and then going again", + interval.as_secs() + ); + tokio::time::sleep(interval).await; } } + +fn new_interval(send_interval: u64, failures: u64, max_jitter_seconds: u64) -> Duration { + let initial = Duration::from_secs(send_interval); + let added_interval_from_failure = Duration::from_secs(send_interval * failures); + let jitter = random_jitter_milliseconds(max_jitter_seconds); + initial + added_interval_from_failure + jitter +} + +fn random_jitter_milliseconds(max_jitter_seconds: u64) -> Duration { + let mut rng = rand::thread_rng(); + let jitter = rng.gen_range(0..(max_jitter_seconds * 1000)); + Duration::from_millis(jitter) +} diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 4b8354dd..5ff3cb83 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -4,7 +4,8 @@ use std::{sync::Arc, time::Duration}; use actix_web::http::header::EntityTag; use chrono::Utc; use dashmap::DashMap; -use tracing::{debug, warn}; +use reqwest::StatusCode; +use tracing::{debug, info, warn}; use unleash_types::client_features::Segment; use unleash_types::client_metrics::ClientApplication; use unleash_types::{ @@ -167,8 +168,8 @@ impl FeatureRefresher { .map(|e| e.value().clone()) .filter(|token| { token - .last_check - .map(|last| Utc::now() - last > self.refresh_interval) + .next_refresh + .map(|refresh| Utc::now() > refresh) .unwrap_or(true) }) .collect() @@ -362,9 +363,22 @@ impl FeatureRefresher { match e { EdgeError::ClientFeaturesFetchError(fe) => { match fe { - FeatureError::Retriable => { - warn!("Couldn't refresh features, but will retry next go") - } + FeatureError::Retriable(status_code) => match status_code { + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => { + info!("Upstream is having some problems, increasing my waiting period"); + self.backoff(&refresh.token); + } + StatusCode::TOO_MANY_REQUESTS => { + info!("Got told that upstream is receiving too many requests"); + self.backoff(&refresh.token); + } + _ => { + warn!("Couldn't refresh features, but will retry next go") + } + }, FeatureError::AccessDenied => { warn!("Token used to fetch features was Forbidden, will remove from list of refresh tasks"); self.tokens_to_refresh.remove(&refresh.token.token); @@ -391,25 +405,31 @@ impl FeatureRefresher { } } } + EdgeError::ClientCacheError => { + warn!("Couldn't refresh features, but will retry next go") + } _ => warn!("Couldn't refresh features: {e:?}. Will retry next pass"), } } } } - + pub fn backoff(&self, token: &EdgeToken) { + self.tokens_to_refresh + .alter(&token.token, |_k, old_refresh| { + old_refresh.backoff(&self.refresh_interval) + }); + } pub fn update_last_check(&self, token: &EdgeToken) { - if let Some(mut token) = self.tokens_to_refresh.get_mut(&token.token) { - token.last_check = Some(Utc::now()); - } + self.tokens_to_refresh + .alter(&token.token, |_k, old_refresh| { + old_refresh.successful_check(&self.refresh_interval) + }); } pub fn update_last_refresh(&self, token: &EdgeToken, etag: Option) { self.tokens_to_refresh - .entry(token.token.clone()) - .and_modify(|token_to_refresh| { - token_to_refresh.last_check = Some(Utc::now()); - token_to_refresh.last_refreshed = Some(Utc::now()); - token_to_refresh.etag = etag + .alter(&token.token, |_k, old_refresh| { + old_refresh.successful_refresh(&self.refresh_interval, etag) }); } } @@ -768,8 +788,10 @@ mod tests { let no_etag_so_is_due_for_refresh = TokenRefresh { token: no_etag_due_for_refresh_token, etag: None, + next_refresh: None, last_refreshed: None, last_check: None, + failure_count: 0, }; let etag_and_last_refreshed_token = EdgeToken::try_from("projectb:development.etag_and_last_refreshed_token".to_string()) @@ -777,8 +799,10 @@ mod tests { let etag_and_last_refreshed_less_than_duration_ago = TokenRefresh { token: etag_and_last_refreshed_token, etag: Some(EntityTag::new_weak("abcde".into())), + next_refresh: Some(Utc::now() + Duration::seconds(10)), last_refreshed: Some(Utc::now()), last_check: Some(Utc::now()), + failure_count: 0, }; let etag_but_old_token = EdgeToken::try_from("projectb:development.etag_but_old_token".to_string()).unwrap(); @@ -787,8 +811,10 @@ mod tests { let etag_but_last_refreshed_ten_seconds_ago = TokenRefresh { token: etag_but_old_token, etag: Some(EntityTag::new_weak("abcde".into())), + next_refresh: None, last_refreshed: Some(ten_seconds_ago), last_check: Some(ten_seconds_ago), + failure_count: 0, }; feature_refresher.tokens_to_refresh.insert( etag_but_last_refreshed_ten_seconds_ago.token.token.clone(), @@ -1110,8 +1136,10 @@ mod tests { let token_refresh = TokenRefresh { token: wildcard_token.clone(), etag: None, + next_refresh: None, last_refreshed: None, last_check: None, + failure_count: 0, }; current_tokens.insert(wildcard_token.token, token_refresh); diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 0a2b5158..0dd2dcf4 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -349,8 +349,11 @@ impl UnleashClient { .send() .await .map_err(|e| { - warn!("Failed to fetch errors due to [{e:?}] - Will retry"); - EdgeError::ClientFeaturesFetchError(FeatureError::Retriable) + warn!("Failed to fetch. Due to [{e:?}] - Will retry"); + match e.status() { + Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)), + None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound), + } })?; let stop_time = Utc::now(); CLIENT_FEATURE_FETCH @@ -407,7 +410,9 @@ impl UnleashClient { CLIENT_FEATURE_FETCH_FAILURES .with_label_values(&[response.status().as_str()]) .inc(); - Err(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)) + Err(EdgeError::ClientFeaturesFetchError( + FeatureError::Retriable(response.status()), + )) } } diff --git a/server/src/persistence/file.rs b/server/src/persistence/file.rs index 8c9573e6..c53383f9 100644 --- a/server/src/persistence/file.rs +++ b/server/src/persistence/file.rs @@ -250,8 +250,10 @@ mod tests { status: TokenValidationStatus::Validated, }, etag: Some(EntityTag::new_weak("1234".into())), + next_refresh: None, last_refreshed: Some(Utc::now()), last_check: Some(Utc::now()), + failure_count: 0, }, TokenRefresh { token: EdgeToken { @@ -259,8 +261,10 @@ mod tests { ..EdgeToken::default() }, etag: None, + next_refresh: None, last_refreshed: None, last_check: None, + failure_count: 0, }, ]; diff --git a/server/src/types.rs b/server/src/types.rs index ed89526f..46858f50 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -1,6 +1,8 @@ +use std::cmp::min; use std::fmt; use std::fmt::{Display, Formatter}; use std::net::IpAddr; + use std::sync::Arc; use std::{ hash::{Hash, Hasher}, @@ -181,8 +183,10 @@ pub struct TokenRefresh { serialize_with = "serialize_entity_tag" )] pub etag: Option, + pub next_refresh: Option>, pub last_refreshed: Option>, pub last_check: Option>, + pub failure_count: u32, } #[derive(Clone, Deserialize, Serialize, Debug)] @@ -207,8 +211,69 @@ impl TokenRefresh { etag, last_refreshed: None, last_check: None, + next_refresh: None, + failure_count: 0, + } + } + + /// Something went wrong (but it was retriable. Increment our failure count and set last_checked and next_refresh + pub fn backoff(&self, refresh_interval: &Duration) -> Self { + let failure_count: u32 = min(self.failure_count + 1, 10); + let now = Utc::now(); + let next_refresh = calculate_next_refresh(now, *refresh_interval, failure_count as u64); + Self { + failure_count, + next_refresh: Some(next_refresh), + last_check: Some(now), + ..self.clone() + } + } + /// We successfully talked to upstream, but there was no updates. Update our next_refresh, decrement our failure count and set when we last_checked + pub fn successful_check(&self, refresh_interval: &Duration) -> Self { + let failure_count = if self.failure_count > 0 { + self.failure_count - 1 + } else { + 0 + }; + let now = Utc::now(); + let next_refresh = calculate_next_refresh(now, *refresh_interval, failure_count as u64); + Self { + failure_count, + next_refresh: Some(next_refresh), + last_check: Some(now), + ..self.clone() } } + /// We successfully talked to upstream. There were updates. Update next_refresh, last_refreshed and last_check, and decrement our failure count + pub fn successful_refresh(&self, refresh_interval: &Duration, etag: Option) -> Self { + let failure_count = if self.failure_count > 0 { + self.failure_count - 1 + } else { + 0 + }; + let now = Utc::now(); + let next_refresh = calculate_next_refresh(now, *refresh_interval, failure_count as u64); + Self { + failure_count, + next_refresh: Some(next_refresh), + last_refreshed: Some(now), + last_check: Some(now), + etag, + ..self.clone() + } + } +} + +fn calculate_next_refresh( + now: DateTime, + refresh_interval: Duration, + failure_count: u64, +) -> DateTime { + if failure_count == 0 { + now + refresh_interval + } else { + now + refresh_interval + (refresh_interval * (failure_count.try_into().unwrap_or(0))) + } } impl fmt::Debug for TokenRefresh {