Skip to content

Commit

Permalink
fix: Do not remove features cache if upstream goes away (#362)
Browse files Browse the repository at this point in the history
* fix: Do not remove features cache if upstream goes away

If upstream goes away, edge should remain durable and keep the features
it has gotten for the tokens. Now use backoff to save time.
  • Loading branch information
Christopher Kolstad authored Dec 20, 2023
1 parent 2d02100 commit 32b3c00
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 33 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repository = "https://github.com/Unleash/unleash-edge"
version = "16.0.6"

[dependencies]
actix-cors = "0.6.4"
actix-cors = "0.6.5"
actix-http = { version = "3.4.0", features = ["compress-zstd", "rustls-0_21"] }
actix-middleware-etag = "0.3.0"
actix-service = "2.0.2"
Expand All @@ -26,7 +26,7 @@ anyhow = "1.0.75"
async-trait = "0.1.74"
chrono = { version = "0.4.31", features = ["serde"] }
cidr = "0.2.2"
clap = { version = "4.4.8", features = ["derive", "env"] }
clap = { version = "4.4.11", features = ["derive", "env"] }
clap-markdown = "0.1.3"
dashmap = "5.5.3"
futures = "0.3.29"
Expand Down Expand Up @@ -62,7 +62,7 @@ rustls-pemfile = "1.0.4"
serde = { version = "1.0.192", features = ["derive"] }
serde_json = "1.0.108"
serde_qs = { version = "0.12.0", features = ["actix4", "tracing"] }
shadow-rs = "0.24.1"
shadow-rs = { version = "0.25.0" }
tokio = { version = "1.34.0", features = [
"macros",
"rt-multi-thread",
Expand All @@ -75,7 +75,7 @@ ulid = "1.1.0"
unleash-types = { version = "0.10", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.8.0" }
utoipa = { version = "4.1.0", features = ["actix_extras", "chrono"] }
utoipa-swagger-ui = { version = "4", features = ["actix-web"] }
utoipa-swagger-ui = { version = "5", features = ["actix-web"] }
[dev-dependencies]
actix-http = "3.4.0"
actix-http-test = "3.1.0"
Expand All @@ -89,4 +89,4 @@ testcontainers-modules = { version = "0.2.0", features = ["redis"] }
tracing-test = "0.2.4"

[build-dependencies]
shadow-rs = "0.24.1"
shadow-rs = "0.25.0"
2 changes: 1 addition & 1 deletion server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ security(
)
)]
#[get("")]
#[instrument(skip(engine_cache, token_cache))]
#[instrument(skip(edge_token, req, engine_cache, token_cache))]
async fn get_enabled_frontend(
edge_token: EdgeToken,
engine_cache: Data<DashMap<String, EngineState>>,
Expand Down
71 changes: 56 additions & 15 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::http::header::EntityTag;
use chrono::Utc;
use dashmap::DashMap;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use tracing::{debug, info};
use unleash_types::client_features::Segment;
use unleash_types::client_metrics::ClientApplication;
use unleash_types::{
Expand Down Expand Up @@ -376,11 +376,11 @@ impl FeatureRefresher {
self.backoff(&refresh.token);
}
_ => {
warn!("Couldn't refresh features, but will retry next go")
info!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
warn!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
Expand All @@ -392,23 +392,15 @@ impl FeatureRefresher {
}
}
FeatureError::NotFound => {
warn!("Had a bad URL when trying to fetch features. Removing ourselves");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
}) {
let cache_key = cache_key(&refresh.token);
// No tokens left that access the environment of our current refresh. Deleting client features and engine cache
self.features_cache.remove(&cache_key);
self.engine_cache.remove(&cache_key);
}
info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again");
self.backoff(&refresh.token);
}
}
}
EdgeError::ClientCacheError => {
warn!("Couldn't refresh features, but will retry next go")
info!("Couldn't refresh features, but will retry next go")
}
_ => warn!("Couldn't refresh features: {e:?}. Will retry next pass"),
_ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
Expand Down Expand Up @@ -898,6 +890,55 @@ mod tests {
assert!(feature_refresher.engine_cache.is_empty());
}

#[tokio::test]
pub async fn getting_404_removes_tokens_from_token_to_refresh_but_not_its_features() {
let mut token = EdgeToken::try_from("*:development.secret123".to_string()).unwrap();
token.status = Validated;
token.token_type = Some(TokenType::Client);
let token_cache = DashMap::default();
token_cache.insert(token.token.clone(), token.clone());
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(token_cache);
let example_features = features_from_disk("../examples/features.json");
let cache_key = cache_key(&token);
let mut engine_state = EngineState::default();
engine_state.take_state(example_features.clone());
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
upstream_engine_cache.insert(cache_key.clone(), engine_state);
let mut server = client_api_test_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
)
.await;
let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap();
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let feature_refresher = FeatureRefresher::new(
Arc::new(unleash_client),
features_cache,
engine_cache,
Duration::milliseconds(1),
None,
);
feature_refresher
.register_token_for_refresh(token, None)
.await;
assert!(!feature_refresher.tokens_to_refresh.is_empty());
feature_refresher.refresh_features().await;
assert!(!feature_refresher.tokens_to_refresh.is_empty());
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
server.stop().await;
tokio::time::sleep(std::time::Duration::from_millis(5)).await; // To ensure our refresh is due
feature_refresher.refresh_features().await;
assert_eq!(feature_refresher.tokens_to_refresh.get("*:development.secret123").unwrap().failure_count, 1);
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
}

#[tokio::test]
pub async fn when_we_have_a_cache_and_token_gets_removed_caches_are_emptied() {
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Expand Down
4 changes: 2 additions & 2 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ async fn main() -> Result<(), anyhow::Error> {
tracing::info!("Persister was unexpectedly shut down");
}
_ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => {
tracing::info!("Token validator validator was unexpectedly shut down");
tracing::info!("Token validator validation of known tokens was unexpectedly shut down");
}
_ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => {
tracing::info!("Token validator validator was unexpectedly shut down");
tracing::info!("Token validator validation of startup tokens was unexpectedly shut down");
}
}
}
Expand Down

0 comments on commit 32b3c00

Please sign in to comment.