Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: obey http status responses. backoff when 429 or 50x #339

Merged
merged 3 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ opentelemetry_sdk = { version = "0.21.1", features = [
] }
prometheus = { version = "0.13.3", features = ["process"] }
prometheus-static-metric = "0.5.1"
rand = "0.8.5"
redis = { version = "0.23.3", features = ["tokio-comp", "tokio-rustls-comp"] }
reqwest = { version = "0.11.22", default-features = false, features = [
"rustls",
Expand Down
6 changes: 3 additions & 3 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::{EdgeError, FeatureError};
use crate::error::EdgeError;
use crate::filters::{
filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet,
};
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn resolve_features(
None => features_cache
.get(&cache_key(&validated_token))
.map(|client_features| filter_client_features(&client_features, &filter_set))
.ok_or(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)),
.ok_or(EdgeError::ClientCacheError),
}?;

Ok(Json(ClientFeatures {
Expand Down Expand Up @@ -140,7 +140,7 @@ pub async fn get_feature(
None => features_cache
.get(&cache_key(&validated_token))
.map(|client_features| filter_client_features(&client_features, &filter_set))
.ok_or(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)),
.ok_or(EdgeError::ClientCacheError),
}
.map(|client_features| client_features.features.into_iter().next())?
.ok_or(EdgeError::FeatureNotFound(feature_name.into_inner()))
Expand Down
12 changes: 10 additions & 2 deletions server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub const TRUST_PROXY_PARSE_ERROR: &str =
pub enum FeatureError {
AccessDenied,
NotFound,
Retriable,
Retriable(StatusCode),
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -89,6 +89,7 @@ pub enum EdgeError {
AuthorizationDenied,
AuthorizationPending,
ClientBuildError(String),
ClientCacheError,
ClientCertificateError(CertificateError),
ClientFeaturesFetchError(FeatureError),
ClientFeaturesParseError(String),
Expand Down Expand Up @@ -131,7 +132,10 @@ impl Display for EdgeError {
EdgeError::PersistenceError(msg) => write!(f, "{msg}"),
EdgeError::JsonParseError(msg) => write!(f, "{msg}"),
EdgeError::ClientFeaturesFetchError(fe) => match fe {
FeatureError::Retriable => write!(f, "Could not fetch client features. Will retry"),
FeatureError::Retriable(status_code) => write!(
f,
"Could not fetch client features. Will retry {status_code}"
),
FeatureError::AccessDenied => write!(
f,
"Could not fetch client features because api key was not allowed"
Expand Down Expand Up @@ -196,6 +200,9 @@ impl Display for EdgeError {
"Client hydration failed. Somehow we said [{message}] when it did"
)
}
EdgeError::ClientCacheError => {
write!(f, "Fetching client features from cache failed")
}
}
}
}
Expand Down Expand Up @@ -230,6 +237,7 @@ impl ResponseError for EdgeError {
EdgeError::HealthCheckError(_) => StatusCode::INTERNAL_SERVER_ERROR,
EdgeError::ReadyCheckError(_) => StatusCode::INTERNAL_SERVER_ERROR,
EdgeError::ClientHydrationFailed(_) => StatusCode::INTERNAL_SERVER_ERROR,
EdgeError::ClientCacheError => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
57 changes: 55 additions & 2 deletions server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix_web::http::StatusCode;
use tracing::{error, warn};
use std::cmp::max;
use tracing::{error, info, trace, warn};

use super::unleash_client::UnleashClient;
use std::time::Duration;
Expand All @@ -10,6 +11,7 @@ use crate::{
};
use lazy_static::lazy_static;
use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts};
use rand::Rng;
use std::sync::Arc;

lazy_static! {
Expand All @@ -30,8 +32,11 @@ pub async fn send_metrics_task(
unleash_client: Arc<UnleashClient>,
send_interval: u64,
) {
let mut failures = 0;
let mut interval = Duration::from_secs(send_interval);
loop {
let batches = metrics_cache.get_appropriately_sized_batches();
trace!("Posting {} batches", batches.len());
for batch in batches {
if !batch.applications.is_empty() || !batch.metrics.is_empty() {
if let Err(edge_error) = unleash_client.send_batch_metrics(batch.clone()).await {
Expand All @@ -48,6 +53,34 @@ pub async fn send_metrics_task(
StatusCode::BAD_REQUEST => {
error!("Unleash said [{message:?}]. Dropping this metric bucket to avoid consuming too much memory");
}
StatusCode::NOT_FOUND => {
failures = 10;
interval = new_interval(send_interval, failures, 5);
error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.as_secs());
}
StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
failures = 10;
interval = new_interval(send_interval, failures, 5);
error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.as_secs());
}
StatusCode::TOO_MANY_REQUESTS => {
failures = max(10, failures + 1);
interval = new_interval(send_interval, failures, 5);
info!(
"Upstream said it was too busy, backing off to {} seconds",
interval.as_secs()
);
metrics_cache.reinsert_batch(batch);
}
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
failures = max(10, failures + 1);
interval = new_interval(send_interval, failures, 5);
info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.as_secs());
metrics_cache.reinsert_batch(batch);
}
_ => {
warn!("Failed to send metrics. Status code was {status_code}. Will reinsert metrics for next attempt");
metrics_cache.reinsert_batch(batch);
Expand All @@ -59,9 +92,29 @@ pub async fn send_metrics_task(
METRICS_UNEXPECTED_ERRORS.inc();
}
}
} else {
failures = max(0, failures - 1);
interval = new_interval(send_interval, failures, 5);
}
}
}
tokio::time::sleep(Duration::from_secs(send_interval)).await;
trace!(
"Done posting traces. Sleeping for {} seconds and then going again",
interval.as_secs()
);
tokio::time::sleep(interval).await;
}
}

fn new_interval(send_interval: u64, failures: u64, max_jitter_seconds: u64) -> Duration {
let initial = Duration::from_secs(send_interval);
let added_interval_from_failure = Duration::from_secs(send_interval * failures);
let jitter = random_jitter_milliseconds(max_jitter_seconds);
initial + added_interval_from_failure + jitter
}

fn random_jitter_milliseconds(max_jitter_seconds: u64) -> Duration {
let mut rng = rand::thread_rng();
let jitter = rng.gen_range(0..(max_jitter_seconds * 1000));
Duration::from_millis(jitter)
}
58 changes: 43 additions & 15 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::{sync::Arc, time::Duration};
use actix_web::http::header::EntityTag;
use chrono::Utc;
use dashmap::DashMap;
use tracing::{debug, warn};
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use unleash_types::client_features::Segment;
use unleash_types::client_metrics::ClientApplication;
use unleash_types::{
Expand Down Expand Up @@ -167,8 +168,8 @@ impl FeatureRefresher {
.map(|e| e.value().clone())
.filter(|token| {
token
.last_check
.map(|last| Utc::now() - last > self.refresh_interval)
.next_refresh
.map(|refresh| Utc::now() > refresh)
.unwrap_or(true)
})
.collect()
Expand Down Expand Up @@ -362,9 +363,22 @@ impl FeatureRefresher {
match e {
EdgeError::ClientFeaturesFetchError(fe) => {
match fe {
FeatureError::Retriable => {
warn!("Couldn't refresh features, but will retry next go")
}
FeatureError::Retriable(status_code) => match status_code {
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
info!("Upstream is having some problems, increasing my waiting period");
self.backoff(&refresh.token);
}
StatusCode::TOO_MANY_REQUESTS => {
info!("Got told that upstream is receiving too many requests");
self.backoff(&refresh.token);
}
_ => {
warn!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
warn!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
Expand All @@ -391,25 +405,31 @@ impl FeatureRefresher {
}
}
}
EdgeError::ClientCacheError => {
warn!("Couldn't refresh features, but will retry next go")
}
_ => warn!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
}

pub fn backoff(&self, token: &EdgeToken) {
self.tokens_to_refresh
.alter(&token.token, |_k, old_refresh| {
old_refresh.backoff(&self.refresh_interval)
});
}
pub fn update_last_check(&self, token: &EdgeToken) {
if let Some(mut token) = self.tokens_to_refresh.get_mut(&token.token) {
token.last_check = Some(Utc::now());
}
self.tokens_to_refresh
.alter(&token.token, |_k, old_refresh| {
old_refresh.successful_check(&self.refresh_interval)
});
}

pub fn update_last_refresh(&self, token: &EdgeToken, etag: Option<EntityTag>) {
self.tokens_to_refresh
.entry(token.token.clone())
.and_modify(|token_to_refresh| {
token_to_refresh.last_check = Some(Utc::now());
token_to_refresh.last_refreshed = Some(Utc::now());
token_to_refresh.etag = etag
.alter(&token.token, |_k, old_refresh| {
old_refresh.successful_refresh(&self.refresh_interval, etag)
});
}
}
Expand Down Expand Up @@ -768,17 +788,21 @@ mod tests {
let no_etag_so_is_due_for_refresh = TokenRefresh {
token: no_etag_due_for_refresh_token,
etag: None,
next_refresh: None,
last_refreshed: None,
last_check: None,
failure_count: 0,
};
let etag_and_last_refreshed_token =
EdgeToken::try_from("projectb:development.etag_and_last_refreshed_token".to_string())
.unwrap();
let etag_and_last_refreshed_less_than_duration_ago = TokenRefresh {
token: etag_and_last_refreshed_token,
etag: Some(EntityTag::new_weak("abcde".into())),
next_refresh: Some(Utc::now() + Duration::seconds(10)),
last_refreshed: Some(Utc::now()),
last_check: Some(Utc::now()),
failure_count: 0,
};
let etag_but_old_token =
EdgeToken::try_from("projectb:development.etag_but_old_token".to_string()).unwrap();
Expand All @@ -787,8 +811,10 @@ mod tests {
let etag_but_last_refreshed_ten_seconds_ago = TokenRefresh {
token: etag_but_old_token,
etag: Some(EntityTag::new_weak("abcde".into())),
next_refresh: None,
last_refreshed: Some(ten_seconds_ago),
last_check: Some(ten_seconds_ago),
failure_count: 0,
};
feature_refresher.tokens_to_refresh.insert(
etag_but_last_refreshed_ten_seconds_ago.token.token.clone(),
Expand Down Expand Up @@ -1110,8 +1136,10 @@ mod tests {
let token_refresh = TokenRefresh {
token: wildcard_token.clone(),
etag: None,
next_refresh: None,
last_refreshed: None,
last_check: None,
failure_count: 0,
};

current_tokens.insert(wildcard_token.token, token_refresh);
Expand Down
11 changes: 8 additions & 3 deletions server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ impl UnleashClient {
.send()
.await
.map_err(|e| {
warn!("Failed to fetch errors due to [{e:?}] - Will retry");
EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)
warn!("Failed to fetch. Due to [{e:?}] - Will retry");
match e.status() {
Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
}
})?;
let stop_time = Utc::now();
CLIENT_FEATURE_FETCH
Expand Down Expand Up @@ -407,7 +410,9 @@ impl UnleashClient {
CLIENT_FEATURE_FETCH_FAILURES
.with_label_values(&[response.status().as_str()])
.inc();
Err(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable))
Err(EdgeError::ClientFeaturesFetchError(
FeatureError::Retriable(response.status()),
))
}
}

Expand Down
4 changes: 4 additions & 0 deletions server/src/persistence/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,21 @@ mod tests {
status: TokenValidationStatus::Validated,
},
etag: Some(EntityTag::new_weak("1234".into())),
next_refresh: None,
last_refreshed: Some(Utc::now()),
last_check: Some(Utc::now()),
failure_count: 0,
},
TokenRefresh {
token: EdgeToken {
token: "otherthing:otherthing:aljjsdnasd".into(),
..EdgeToken::default()
},
etag: None,
next_refresh: None,
last_refreshed: None,
last_check: None,
failure_count: 0,
},
];

Expand Down
Loading