Skip to content

Commit

Permalink
fix: Do not remove features cache if upstream goes away
Browse files Browse the repository at this point in the history
If upstream goes away, edge should remain durable and keep the features
it has gotten for the tokens. Since we already keep the authentication
status for the tokens we've seen this fix allows you to start edge,
manage to talk to upstream once, then sever connection to upstream, but
keep serving feature toggles for known tokens
  • Loading branch information
chriswk committed Dec 19, 2023
1 parent 2d02100 commit 6c7337a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 31 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repository = "https://github.com/Unleash/unleash-edge"
version = "16.0.6"

[dependencies]
actix-cors = "0.6.4"
actix-cors = "0.6.5"
actix-http = { version = "3.4.0", features = ["compress-zstd", "rustls-0_21"] }
actix-middleware-etag = "0.3.0"
actix-service = "2.0.2"
Expand All @@ -26,7 +26,7 @@ anyhow = "1.0.75"
async-trait = "0.1.74"
chrono = { version = "0.4.31", features = ["serde"] }
cidr = "0.2.2"
clap = { version = "4.4.8", features = ["derive", "env"] }
clap = { version = "4.4.11", features = ["derive", "env"] }
clap-markdown = "0.1.3"
dashmap = "5.5.3"
futures = "0.3.29"
Expand Down Expand Up @@ -62,7 +62,7 @@ rustls-pemfile = "1.0.4"
serde = { version = "1.0.192", features = ["derive"] }
serde_json = "1.0.108"
serde_qs = { version = "0.12.0", features = ["actix4", "tracing"] }
shadow-rs = "0.24.1"
shadow-rs = { version = "0.25.0" }
tokio = { version = "1.34.0", features = [
"macros",
"rt-multi-thread",
Expand All @@ -75,7 +75,7 @@ ulid = "1.1.0"
unleash-types = { version = "0.10", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.8.0" }
utoipa = { version = "4.1.0", features = ["actix_extras", "chrono"] }
utoipa-swagger-ui = { version = "4", features = ["actix-web"] }
utoipa-swagger-ui = { version = "5", features = ["actix-web"] }
[dev-dependencies]
actix-http = "3.4.0"
actix-http-test = "3.1.0"
Expand All @@ -89,4 +89,4 @@ testcontainers-modules = { version = "0.2.0", features = ["redis"] }
tracing-test = "0.2.4"

[build-dependencies]
shadow-rs = "0.24.1"
shadow-rs = "0.25.0"
4 changes: 2 additions & 2 deletions server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use actix_web::{
};
use dashmap::DashMap;
use serde_qs::actix::QsQuery;
use tracing::{debug, instrument};
use tracing::{debug, instrument, warn};

Check warning

Code scanning / clippy

unused import: warn Warning

unused import: warn
use unleash_types::client_features::Context;
use unleash_types::client_metrics::{ClientApplication, ConnectVia};
use unleash_types::{
Expand Down Expand Up @@ -214,7 +214,7 @@ security(
)
)]
#[get("")]
#[instrument(skip(engine_cache, token_cache))]
#[instrument(skip(edge_token, req, engine_cache, token_cache))]
async fn get_enabled_frontend(
edge_token: EdgeToken,
engine_cache: Data<DashMap<String, EngineState>>,
Expand Down
71 changes: 57 additions & 14 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::http::header::EntityTag;
use chrono::Utc;
use dashmap::DashMap;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use tracing::{debug, info};
use unleash_types::client_features::Segment;
use unleash_types::client_metrics::ClientApplication;
use unleash_types::{
Expand Down Expand Up @@ -376,11 +376,11 @@ impl FeatureRefresher {
self.backoff(&refresh.token);
}
_ => {
warn!("Couldn't refresh features, but will retry next go")
info!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
warn!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
Expand All @@ -392,23 +392,15 @@ impl FeatureRefresher {
}
}
FeatureError::NotFound => {
warn!("Had a bad URL when trying to fetch features. Removing ourselves");
info!("Had a bad URL when trying to fetch features. Removing ourselves from the list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
}) {
let cache_key = cache_key(&refresh.token);
// No tokens left that access the environment of our current refresh. Deleting client features and engine cache
self.features_cache.remove(&cache_key);
self.engine_cache.remove(&cache_key);
}
}
}
}
EdgeError::ClientCacheError => {
warn!("Couldn't refresh features, but will retry next go")
info!("Couldn't refresh features, but will retry next go")
}
_ => warn!("Couldn't refresh features: {e:?}. Will retry next pass"),
_ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
Expand Down Expand Up @@ -448,6 +440,7 @@ mod tests {
use chrono::{Duration, Utc};
use dashmap::DashMap;
use reqwest::Url;
use tracing_test::traced_test;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_yggdrasil::EngineState;

Expand Down Expand Up @@ -898,6 +891,56 @@ mod tests {
assert!(feature_refresher.engine_cache.is_empty());
}

#[tokio::test]
#[traced_test]
pub async fn getting_404_removes_tokens_from_token_to_refresh_but_not_its_features() {
let mut token = EdgeToken::try_from("*:development.secret123".to_string()).unwrap();
token.status = Validated;
token.token_type = Some(TokenType::Client);
let token_cache = DashMap::default();
token_cache.insert(token.token.clone(), token.clone());
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(token_cache);
let example_features = features_from_disk("../examples/features.json");
let cache_key = cache_key(&token);
let mut engine_state = EngineState::default();
engine_state.take_state(example_features.clone());
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
upstream_engine_cache.insert(cache_key.clone(), engine_state);
let mut server = client_api_test_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
)
.await;
let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap();
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let feature_refresher = FeatureRefresher::new(
Arc::new(unleash_client),
features_cache,
engine_cache,
Duration::milliseconds(1),
None,
);
feature_refresher
.register_token_for_refresh(token, None)
.await;
assert!(!feature_refresher.tokens_to_refresh.is_empty());
feature_refresher.refresh_features().await;
assert!(!feature_refresher.tokens_to_refresh.is_empty());
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
server.stop().await;
tokio::time::sleep(std::time::Duration::from_millis(5)).await; // To ensure our refresh is due
feature_refresher.refresh_features().await;
assert!(feature_refresher.tokens_to_refresh.is_empty());
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
}

#[tokio::test]
pub async fn when_we_have_a_cache_and_token_gets_removed_caches_are_emptied() {
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Expand Down

0 comments on commit 6c7337a

Please sign in to comment.