Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Do not remove features cache if upstream goes away #362

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repository = "https://github.com/Unleash/unleash-edge"
version = "16.0.6"

[dependencies]
actix-cors = "0.6.4"
actix-cors = "0.6.5"
actix-http = { version = "3.4.0", features = ["compress-zstd", "rustls-0_21"] }
actix-middleware-etag = "0.3.0"
actix-service = "2.0.2"
Expand All @@ -26,7 +26,7 @@ anyhow = "1.0.75"
async-trait = "0.1.74"
chrono = { version = "0.4.31", features = ["serde"] }
cidr = "0.2.2"
clap = { version = "4.4.8", features = ["derive", "env"] }
clap = { version = "4.4.11", features = ["derive", "env"] }
clap-markdown = "0.1.3"
dashmap = "5.5.3"
futures = "0.3.29"
Expand Down Expand Up @@ -62,7 +62,7 @@ rustls-pemfile = "1.0.4"
serde = { version = "1.0.192", features = ["derive"] }
serde_json = "1.0.108"
serde_qs = { version = "0.12.0", features = ["actix4", "tracing"] }
shadow-rs = "0.24.1"
shadow-rs = { version = "0.25.0" }
tokio = { version = "1.34.0", features = [
"macros",
"rt-multi-thread",
Expand All @@ -75,7 +75,7 @@ ulid = "1.1.0"
unleash-types = { version = "0.10", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.8.0" }
utoipa = { version = "4.1.0", features = ["actix_extras", "chrono"] }
utoipa-swagger-ui = { version = "4", features = ["actix-web"] }
utoipa-swagger-ui = { version = "5", features = ["actix-web"] }
[dev-dependencies]
actix-http = "3.4.0"
actix-http-test = "3.1.0"
Expand All @@ -89,4 +89,4 @@ testcontainers-modules = { version = "0.2.0", features = ["redis"] }
tracing-test = "0.2.4"

[build-dependencies]
shadow-rs = "0.24.1"
shadow-rs = "0.25.0"
2 changes: 1 addition & 1 deletion server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ security(
)
)]
#[get("")]
#[instrument(skip(engine_cache, token_cache))]
#[instrument(skip(edge_token, req, engine_cache, token_cache))]
thomasheartman marked this conversation as resolved.
Show resolved Hide resolved
async fn get_enabled_frontend(
edge_token: EdgeToken,
engine_cache: Data<DashMap<String, EngineState>>,
Expand Down
71 changes: 56 additions & 15 deletions server/src/http/feature_refresher.rs
thomasheartman marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::http::header::EntityTag;
use chrono::Utc;
use dashmap::DashMap;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use tracing::{debug, info};
use unleash_types::client_features::Segment;
use unleash_types::client_metrics::ClientApplication;
use unleash_types::{
Expand Down Expand Up @@ -376,11 +376,11 @@ impl FeatureRefresher {
self.backoff(&refresh.token);
}
_ => {
warn!("Couldn't refresh features, but will retry next go")
info!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
warn!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
Expand All @@ -392,23 +392,15 @@ impl FeatureRefresher {
}
}
FeatureError::NotFound => {
warn!("Had a bad URL when trying to fetch features. Removing ourselves");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
}) {
let cache_key = cache_key(&refresh.token);
// No tokens left that access the environment of our current refresh. Deleting client features and engine cache
self.features_cache.remove(&cache_key);
self.engine_cache.remove(&cache_key);
}
info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again");
self.backoff(&refresh.token);
}
}
}
EdgeError::ClientCacheError => {
warn!("Couldn't refresh features, but will retry next go")
info!("Couldn't refresh features, but will retry next go")
}
_ => warn!("Couldn't refresh features: {e:?}. Will retry next pass"),
_ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
Expand Down Expand Up @@ -898,6 +890,55 @@ mod tests {
assert!(feature_refresher.engine_cache.is_empty());
}

#[tokio::test]
pub async fn getting_404_removes_tokens_from_token_to_refresh_but_not_its_features() {
let mut token = EdgeToken::try_from("*:development.secret123".to_string()).unwrap();
token.status = Validated;
token.token_type = Some(TokenType::Client);
let token_cache = DashMap::default();
token_cache.insert(token.token.clone(), token.clone());
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(token_cache);
let example_features = features_from_disk("../examples/features.json");
let cache_key = cache_key(&token);
let mut engine_state = EngineState::default();
engine_state.take_state(example_features.clone());
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
upstream_engine_cache.insert(cache_key.clone(), engine_state);
let mut server = client_api_test_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
)
.await;
let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap();
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let feature_refresher = FeatureRefresher::new(
Arc::new(unleash_client),
features_cache,
engine_cache,
Duration::milliseconds(1),
None,
);
feature_refresher
.register_token_for_refresh(token, None)
.await;
assert!(!feature_refresher.tokens_to_refresh.is_empty());
feature_refresher.refresh_features().await;
assert!(!feature_refresher.tokens_to_refresh.is_empty());
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
server.stop().await;
tokio::time::sleep(std::time::Duration::from_millis(5)).await; // To ensure our refresh is due
feature_refresher.refresh_features().await;
assert_eq!(feature_refresher.tokens_to_refresh.get("*:development.secret123").unwrap().failure_count, 1);
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
}

#[tokio::test]
pub async fn when_we_have_a_cache_and_token_gets_removed_caches_are_emptied() {
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Expand Down
4 changes: 2 additions & 2 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ async fn main() -> Result<(), anyhow::Error> {
tracing::info!("Persister was unexpectedly shut down");
}
_ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => {
tracing::info!("Token validator validator was unexpectedly shut down");
tracing::info!("Token validator validation of known tokens was unexpectedly shut down");
}
_ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => {
tracing::info!("Token validator validator was unexpectedly shut down");
tracing::info!("Token validator validation of startup tokens was unexpectedly shut down");
}
}
}
Expand Down