From 433287e56e67133afb783261d0d6de8f3552b5ee Mon Sep 17 00:00:00 2001 From: sjaanus Date: Wed, 8 Jan 2025 11:59:49 +0200 Subject: [PATCH 01/16] feat: git status --- Cargo.lock | 16 +++- server/src/feature_cache.rs | 63 +++++++++++++- server/src/http/feature_refresher.rs | 122 ++++++++++++++++++++++++++- server/src/http/unleash_client.rs | 100 +++++++++++++++++++++- server/src/types.rs | 8 +- server/src/urls.rs | 17 +++- 6 files changed, 312 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6de72b26..c339861c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4421,7 +4421,7 @@ dependencies = [ "tracing-subscriber", "tracing-test", "ulid", - "unleash-types", + "unleash-types 0.15.2", "unleash-yggdrasil", "utoipa", "utoipa-swagger-ui", @@ -4432,6 +4432,18 @@ name = "unleash-types" version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ead505b176bd504c31815a4804d305789c3cbd6f89e932d118cd0830a955f0" +dependencies = [ + "chrono", + "derive_builder", + "serde", + "serde_json", +] + +[[package]] +name = "unleash-types" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14bf22b052d349e647b8e97043562c0b87f93d63b4944b72dd7b56acb8ac3edf" dependencies = [ "base64 0.22.1", "chrono", @@ -4461,7 +4473,7 @@ dependencies = [ "semver", "serde", "serde_json", - "unleash-types", + "unleash-types 0.14.0", ] [[package]] diff --git a/server/src/feature_cache.rs b/server/src/feature_cache.rs index d2891569..8dc0171b 100644 --- a/server/src/feature_cache.rs +++ b/server/src/feature_cache.rs @@ -4,7 +4,7 @@ use unleash_types::{ client_features::{ClientFeature, ClientFeatures, Segment}, Deduplicate, }; - +use unleash_types::client_features::ClientFeaturesDelta; use crate::types::EdgeToken; #[derive(Debug, Clone)] @@ -67,6 +67,41 @@ impl FeatureCache { self.send_full_update(key); } + pub fn apply_delta(&self, key: String, token: &EdgeToken, delta: ClientFeaturesDelta) { + let client_features = ClientFeatures { + version : 1, + features : delta.updated.clone(), + segments: delta.segments.clone(), + query: None, + meta: None, + }; + self.features + .entry(key.clone()) + .and_modify(|existing_features| { + let updated = update_client_features_delta(token, existing_features, &delta); // TODO: is this replacing or merging the flags + *existing_features = updated; + }) + .or_insert(client_features); + self.send_full_update(key); + + // let mut current_state = self.compiled_state.take().unwrap_or_default(); + // let segment_map = build_segment_map(&delta.segments); + // let mut warnings: Vec = vec![]; + // for removed in delta.removed.clone() { + // current_state.remove(&removed); + // } + // for update in delta.updated.clone() { + // let updated_state = compile(&update, &segment_map, &mut warnings); + // current_state.insert(update.name.clone(), updated_state); + // } + // self.compiled_state = Some(current_state); + // if warnings.is_empty() { + // None + // } else { + // Some(warnings) + // } + } + pub fn is_empty(&self) -> bool { self.features.is_empty() } @@ -103,6 +138,32 @@ fn update_client_features( } } +fn update_client_features_delta( + token: &EdgeToken, + old: &ClientFeatures, + delta: &ClientFeaturesDelta, +) -> ClientFeatures { + let mut updated_features = + update_projects_from_feature_update(token, &old.features, &delta.updated); + + for removed_feature in &delta.removed { + updated_features.retain(|feature| feature.name != *removed_feature); + } + updated_features.sort(); + + let segments = merge_segments_update(old.segments.clone(), delta.segments.clone()); + ClientFeatures { + version: 1, + features: updated_features, + segments: segments.map(|mut s| { + s.sort(); + s + }), + query: None, + meta: None, + } +} + pub(crate) fn update_projects_from_feature_update( token: &EdgeToken, original: &[ClientFeature], diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 1468915e..4f42aba9 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -8,7 +8,7 @@ use eventsource_client::Client; use futures::TryStreamExt; use reqwest::StatusCode; use tracing::{debug, info, warn}; -use unleash_types::client_features::ClientFeatures; +use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; use unleash_yggdrasil::EngineState; @@ -18,7 +18,7 @@ use crate::filters::{filter_client_features, FeatureFilterSet}; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; -use crate::types::{build, EdgeResult, TokenType, TokenValidationStatus}; +use crate::types::{build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus}; use crate::{ persistence::EdgePersistence, tokens::{cache_key, simplify}, @@ -48,6 +48,7 @@ pub struct FeatureRefresher { pub persistence: Option>, pub strict: bool, pub streaming: bool, + pub delta: bool, pub app_name: String, } @@ -62,6 +63,7 @@ impl Default for FeatureRefresher { persistence: None, strict: true, streaming: false, + delta: false, app_name: "unleash_edge".into(), } } @@ -99,6 +101,7 @@ pub enum FeatureRefresherMode { pub struct FeatureRefreshConfig { features_refresh_interval: chrono::Duration, mode: FeatureRefresherMode, + delta: bool, app_name: String, } @@ -133,6 +136,7 @@ impl FeatureRefresher { persistence, strict: config.mode != FeatureRefresherMode::Dynamic, streaming: config.mode == FeatureRefresherMode::Streaming, + delta : config.delta, app_name: config.app_name, } } @@ -401,13 +405,21 @@ impl FeatureRefresher { pub async fn hydrate_new_tokens(&self) { let hydrations = self.get_tokens_never_refreshed(); for hydration in hydrations { - self.refresh_single(hydration).await; + if self.delta { + self.refresh_single_delta(hydration).await; + } else { + self.refresh_single(hydration).await; + } } } pub async fn refresh_features(&self) { let refreshes = self.get_tokens_due_for_refresh(); for refresh in refreshes { - self.refresh_single(refresh).await; + if self.delta { + self.refresh_single_delta(refresh).await; + } else { + self.refresh_single(refresh).await; + } } } @@ -446,6 +458,41 @@ impl FeatureRefresher { }); } + async fn handle_client_features_delta_updated( + &self, + refresh_token: &EdgeToken, + features: ClientFeaturesDelta, + etag: Option, + ) { + debug!("Got updated client features delta. Updating features with {etag:?}"); + let key = cache_key(refresh_token); + self.update_last_refresh(refresh_token, etag, features.updated.len()); /// TODO: why we need to set updated here + self.features_cache + .modify(key.clone(), refresh_token, features.clone()); + self.engine_cache + .entry(key.clone()) + .and_modify(|engine| { + if let Some(f) = self.features_cache.get(&key) { + let mut new_state = EngineState::default(); + let warnings = new_state.take_state(f.clone()); + if let Some(warnings) = warnings { + warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); + }; + *engine = new_state; + + } + }) + .or_insert_with(|| { + let mut new_state = EngineState::default(); + + let warnings = new_state.take_state(features); + if let Some(warnings) = warnings { + warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); + }; + new_state + }); + } + pub async fn refresh_single(&self, refresh: TokenRefresh) { let features_result = self .unleash_client @@ -512,6 +559,73 @@ impl FeatureRefresher { } } } + + pub async fn refresh_single_delta(&self, refresh: TokenRefresh) { + let features_result = self + .unleash_client + .get_client_features_delta(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }) + .await; + + match features_result { + Ok(feature_response) => match feature_response { + ClientFeaturesDeltaResponse::NoUpdate(tag) => { + debug!("No update needed. Will update last check time with {tag}"); + self.update_last_check(&refresh.token.clone()); + } + ClientFeaturesDeltaResponse::Updated(features, etag) => { + self.handle_client_features_updated(&refresh.token, features, etag) + .await + } + }, + Err(e) => { + match e { + EdgeError::ClientFeaturesFetchError(fe) => { + match fe { + FeatureError::Retriable(status_code) => match status_code { + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => { + info!("Upstream is having some problems, increasing my waiting period"); + self.backoff(&refresh.token); + } + StatusCode::TOO_MANY_REQUESTS => { + info!("Got told that upstream is receiving too many requests"); + self.backoff(&refresh.token); + } + _ => { + info!("Couldn't refresh features, but will retry next go") + } + }, + FeatureError::AccessDenied => { + info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks"); + self.tokens_to_refresh.remove(&refresh.token.token); + if !self.tokens_to_refresh.iter().any(|e| { + e.value().token.environment == refresh.token.environment + }) { + let cache_key = cache_key(&refresh.token); + // No tokens left that access the environment of our current refresh. Deleting client features and engine cache + self.features_cache.remove(&cache_key); + self.engine_cache.remove(&cache_key); + } + } + FeatureError::NotFound => { + info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again"); + self.backoff(&refresh.token); + } + } + } + EdgeError::ClientCacheError => { + info!("Couldn't refresh features, but will retry next go") + } + _ => info!("Couldn't refresh features: {e:?}. Will retry next pass"), + } + } + } + } pub fn backoff(&self, token: &EdgeToken) { self.tokens_to_refresh .alter(&token.token, |_k, old_refresh| { diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index ae80b2be..7f1a1ead 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -14,7 +14,7 @@ use reqwest::{ClientBuilder, Identity, RequestBuilder, StatusCode, Url}; use serde::{Deserialize, Serialize}; use tracing::error; use tracing::{info, trace, warn}; -use unleash_types::client_features::ClientFeatures; +use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_metrics::ClientApplication; use crate::cli::ClientIdentity; @@ -26,7 +26,7 @@ use crate::http::headers::{ use crate::metrics::client_metrics::MetricsBatch; use crate::tls::build_upstream_certificate; use crate::types::{ - ClientFeaturesResponse, EdgeResult, EdgeToken, TokenValidationStatus, ValidateTokensRequest, + ClientFeaturesResponse, ClientFeaturesDeltaResponse, EdgeResult, EdgeToken, TokenValidationStatus, ValidateTokensRequest, }; use crate::urls::UnleashUrls; use crate::{error::EdgeError, types::ClientFeaturesRequest}; @@ -46,6 +46,13 @@ lazy_static! { &["status_code"], vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0] ) + .unwrap(); + pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!( + "client_feature_delta_fetch", + "Timings for fetching feature deltas in milliseconds", + &["status_code"], + vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0] + ) .unwrap(); pub static ref CLIENT_FEATURE_FETCH_FAILURES: IntGaugeVec = register_int_gauge_vec!( Opts::new( @@ -242,6 +249,18 @@ impl UnleashClient { } } + fn client_features_delta_req(&self, req: ClientFeaturesRequest) -> RequestBuilder { + let client_req = self + .backing_client + .get(self.urls.client_features_delta_url.to_string()) + .headers(self.header_map(Some(req.api_key))); + if let Some(tag) = req.etag { + client_req.header(header::IF_NONE_MATCH, tag.to_string()) + } else { + client_req + } + } + fn header_map(&self, api_key: Option) -> HeaderMap { let mut header_map = HeaderMap::new(); let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap(); @@ -367,6 +386,83 @@ impl UnleashClient { } } + pub async fn get_client_features_delta( + &self, + request: ClientFeaturesRequest, + ) -> EdgeResult { + let start_time = Utc::now(); + let response = self + .client_features_delta_req(request.clone()) + .send() + .await + .map_err(|e| { + warn!("Failed to fetch. Due to [{e:?}] - Will retry"); + match e.status() { + Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)), + None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound), + } + })?; + let stop_time = Utc::now(); + CLIENT_FEATURE_DELTA_FETCH + .with_label_values(&[&response.status().as_u16().to_string()]) + .observe( + stop_time + .signed_duration_since(start_time) + .num_milliseconds() as f64, + ); + if response.status() == StatusCode::NOT_MODIFIED { + Ok(ClientFeaturesDeltaResponse::NoUpdate( + request.etag.expect("Got NOT_MODIFIED without an ETag"), + )) + } else if response.status().is_success() { + let etag = response + .headers() + .get("ETag") + .or_else(|| response.headers().get("etag")) + .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok()); + let features = response.json::().await.map_err(|e| { + warn!("Could not parse features response to internal representation"); + EdgeError::ClientFeaturesParseError(e.to_string()) + })?; + Ok(ClientFeaturesDeltaResponse::Updated(features, etag)) + } else if response.status() == StatusCode::FORBIDDEN { + CLIENT_FEATURE_FETCH_FAILURES + .with_label_values(&[response.status().as_str()]) + .inc(); + Err(EdgeError::ClientFeaturesFetchError( + FeatureError::AccessDenied, + )) + } else if response.status() == StatusCode::UNAUTHORIZED { + CLIENT_FEATURE_FETCH_FAILURES + .with_label_values(&[response.status().as_str()]) + .inc(); + warn!( + "Failed to get features. Url: [{}]. Status code: [401]", + self.urls.client_features_delta_url.to_string() + ); + Err(EdgeError::ClientFeaturesFetchError( + FeatureError::AccessDenied, + )) + } else if response.status() == StatusCode::NOT_FOUND { + CLIENT_FEATURE_FETCH_FAILURES + .with_label_values(&[response.status().as_str()]) + .inc(); + warn!( + "Failed to get features. Url: [{}]. Status code: [{}]", + self.urls.client_features_delta_url.to_string(), + response.status().as_str() + ); + Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound)) + } else { + CLIENT_FEATURE_FETCH_FAILURES + .with_label_values(&[response.status().as_str()]) + .inc(); + Err(EdgeError::ClientFeaturesFetchError( + FeatureError::Retriable(response.status()), + )) + } + } + pub async fn send_batch_metrics(&self, request: MetricsBatch) -> EdgeResult<()> { trace!("Sending metrics to old /edge/metrics endpoint"); let result = self diff --git a/server/src/types.rs b/server/src/types.rs index bb05b600..6b0ee23e 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -15,7 +15,7 @@ use chrono::{DateTime, Duration, Utc}; use dashmap::DashMap; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use shadow_rs::shadow; -use unleash_types::client_features::ClientFeatures; +use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_features::Context; use unleash_types::client_metrics::{ClientApplication, ClientMetricsEnv}; use unleash_yggdrasil::EngineState; @@ -96,6 +96,12 @@ pub enum ClientFeaturesResponse { Updated(ClientFeatures, Option), } +#[derive(Clone, Debug)] +pub enum ClientFeaturesDeltaResponse { + NoUpdate(EntityTag), + Updated(ClientFeaturesDelta, Option), +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Default, Deserialize, utoipa::ToSchema)] pub enum TokenValidationStatus { Invalid, diff --git a/server/src/urls.rs b/server/src/urls.rs index 7315a051..51fa0759 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -10,6 +10,7 @@ pub struct UnleashUrls { pub api_url: Url, pub client_api_url: Url, pub client_features_url: Url, + pub client_features_delta_url: Url, pub client_register_app_url: Url, pub client_metrics_url: Url, pub client_bulk_metrics_url: Url, @@ -50,6 +51,11 @@ impl UnleashUrls { .path_segments_mut() .unwrap() .push("features"); + let mut client_features_delta_url = client_api_url.clone(); + client_features_delta_url + .path_segments_mut() + .unwrap() + .push("delta"); let mut client_features_stream_url = client_api_url.clone(); client_features_stream_url .path_segments_mut() @@ -99,6 +105,7 @@ impl UnleashUrls { api_url, client_api_url, client_features_url, + client_features_delta_url, client_register_app_url, client_bulk_metrics_url, client_metrics_url, @@ -116,19 +123,21 @@ mod tests { use super::*; use test_case::test_case; - #[test_case("https://app.unleash-hosted.com/demo", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features" ; "No trailing slash, https protocol")] - #[test_case("https://app.unleash-hosted.com/demo/", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features" ; "One trailing slash, https protocol")] - #[test_case("http://app.unleash-hosted.com/demo/", "http://app.unleash-hosted.com/demo/api", "http://app.unleash-hosted.com/demo/api/client", "http://app.unleash-hosted.com/demo/api/client/features" ; "One trailing slash, http protocol")] - #[test_case("http://app.unleash-hosted.com/", "http://app.unleash-hosted.com/api", "http://app.unleash-hosted.com/api/client", "http://app.unleash-hosted.com/api/client/features" ; "One trailing slash, no subpath, http protocol")] + #[test_case("https://app.unleash-hosted.com/demo", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "No trailing slash, https protocol")] + #[test_case("https://app.unleash-hosted.com/demo/", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, https protocol")] + #[test_case("http://app.unleash-hosted.com/demo/", "http://app.unleash-hosted.com/demo/api", "http://app.unleash-hosted.com/demo/api/client", "http://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, http protocol")] + #[test_case("http://app.unleash-hosted.com/", "http://app.unleash-hosted.com/api", "http://app.unleash-hosted.com/api/client", "http://app.unleash-hosted.com/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, no subpath, http protocol")] pub fn can_handle_base_urls( base_url: &str, api_url: &str, client_url: &str, client_features_url: &str, + client_features_delta_url: &str, ) { let urls = UnleashUrls::from_str(base_url).unwrap(); assert_eq!(urls.api_url.to_string(), api_url); assert_eq!(urls.client_api_url.to_string(), client_url); assert_eq!(urls.client_features_url.to_string(), client_features_url); + assert_eq!(urls.client_features_delta_url.to_string(), client_features_delta_url); } } From 632fcce2dab98f4c5eda9356361e7e9e4ba43298 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Wed, 8 Jan 2025 15:05:17 +0200 Subject: [PATCH 02/16] Update --- Cargo.lock | 20 +++--------- server/Cargo.toml | 4 +-- server/src/builder.rs | 2 ++ server/src/cli.rs | 4 +++ server/src/client_api.rs | 1 + server/src/feature_cache.rs | 48 ++-------------------------- server/src/http/feature_refresher.rs | 25 ++++++--------- server/src/urls.rs | 4 +-- 8 files changed, 26 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c339861c..6a3597c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4421,7 +4421,7 @@ dependencies = [ "tracing-subscriber", "tracing-test", "ulid", - "unleash-types 0.15.2", + "unleash-types", "unleash-yggdrasil", "utoipa", "utoipa-swagger-ui", @@ -4432,18 +4432,6 @@ name = "unleash-types" version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ead505b176bd504c31815a4804d305789c3cbd6f89e932d118cd0830a955f0" -dependencies = [ - "chrono", - "derive_builder", - "serde", - "serde_json", -] - -[[package]] -name = "unleash-types" -version = "0.15.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14bf22b052d349e647b8e97043562c0b87f93d63b4944b72dd7b56acb8ac3edf" dependencies = [ "base64 0.22.1", "chrono", @@ -4456,9 +4444,9 @@ dependencies = [ [[package]] name = "unleash-yggdrasil" -version = "0.14.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3e124cfc3c64ca53d499373f4c356c0576923065aeb6b03ebfdb03809ab207" +checksum = "b92adb844a4723d5eed3fc757dccd37f03ac5eaeef59f2be4de5aa0962b223b8" dependencies = [ "chrono", "convert_case 0.6.0", @@ -4473,7 +4461,7 @@ dependencies = [ "semver", "serde", "serde_json", - "unleash-types 0.14.0", + "unleash-types", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index 3b2682dd..f4e7bff1 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -94,8 +94,8 @@ tokio-stream = { version = "0.1.17" } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } ulid = "1.1.2" -unleash-types = { version = "0.15.3", features = ["openapi", "hashes"] } -unleash-yggdrasil = { version = "0.14.3" } +unleash-types = { version = "0.15.1", features = ["openapi", "hashes"] } +unleash-yggdrasil = { version = "0.14.5" } utoipa = { version = "5", features = ["actix_extras", "chrono"] } utoipa-swagger-ui = { version = "8", features = ["actix-web"] } [dev-dependencies] diff --git a/server/src/builder.rs b/server/src/builder.rs index 6209e93f..363a4e1a 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -268,6 +268,7 @@ async fn build_edge(args: &EdgeArgs, app_name: &str) -> EdgeResult { Duration::seconds(args.features_refresh_interval_seconds as i64), refresher_mode, app_name.to_string(), + args.delta, ); let feature_refresher = Arc::new(FeatureRefresher::new( unleash_client, @@ -373,6 +374,7 @@ mod tests { prometheus_password: None, prometheus_username: None, streaming: false, + delta: false, }; let result = build_edge(&args, "test-app").await; diff --git a/server/src/cli.rs b/server/src/cli.rs index 41c80a35..45b9029b 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -217,6 +217,10 @@ pub struct EdgeArgs { #[clap(long, env, default_value_t = false, requires = "strict")] pub streaming: bool, + /// If set to true. Edge connects to upstream using delta polling instead of normal polling. + #[clap(long, env, default_value_t = false, conflicts_with = "streaming")] + pub delta: bool, + /// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream #[clap(long, env)] pub prometheus_remote_write_url: Option, diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 059b9a92..c37a8cce 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -1011,6 +1011,7 @@ mod tests { persistence: None, strict: false, streaming: false, + delta: false, app_name: "test-app".into(), }); let token_validator = Arc::new(TokenValidator { diff --git a/server/src/feature_cache.rs b/server/src/feature_cache.rs index 8dc0171b..7750fa8c 100644 --- a/server/src/feature_cache.rs +++ b/server/src/feature_cache.rs @@ -67,7 +67,7 @@ impl FeatureCache { self.send_full_update(key); } - pub fn apply_delta(&self, key: String, token: &EdgeToken, delta: ClientFeaturesDelta) { + pub fn apply_delta(&self, key: String, delta: &ClientFeaturesDelta) { let client_features = ClientFeatures { version : 1, features : delta.updated.clone(), @@ -78,28 +78,10 @@ impl FeatureCache { self.features .entry(key.clone()) .and_modify(|existing_features| { - let updated = update_client_features_delta(token, existing_features, &delta); // TODO: is this replacing or merging the flags - *existing_features = updated; + existing_features.modify_in_place(delta); }) .or_insert(client_features); self.send_full_update(key); - - // let mut current_state = self.compiled_state.take().unwrap_or_default(); - // let segment_map = build_segment_map(&delta.segments); - // let mut warnings: Vec = vec![]; - // for removed in delta.removed.clone() { - // current_state.remove(&removed); - // } - // for update in delta.updated.clone() { - // let updated_state = compile(&update, &segment_map, &mut warnings); - // current_state.insert(update.name.clone(), updated_state); - // } - // self.compiled_state = Some(current_state); - // if warnings.is_empty() { - // None - // } else { - // Some(warnings) - // } } pub fn is_empty(&self) -> bool { @@ -138,32 +120,6 @@ fn update_client_features( } } -fn update_client_features_delta( - token: &EdgeToken, - old: &ClientFeatures, - delta: &ClientFeaturesDelta, -) -> ClientFeatures { - let mut updated_features = - update_projects_from_feature_update(token, &old.features, &delta.updated); - - for removed_feature in &delta.removed { - updated_features.retain(|feature| feature.name != *removed_feature); - } - updated_features.sort(); - - let segments = merge_segments_update(old.segments.clone(), delta.segments.clone()); - ClientFeatures { - version: 1, - features: updated_features, - segments: segments.map(|mut s| { - s.sort(); - s - }), - query: None, - meta: None, - } -} - pub(crate) fn update_projects_from_feature_update( token: &EdgeToken, original: &[ClientFeature], diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 4f42aba9..906d1347 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -110,11 +110,13 @@ impl FeatureRefreshConfig { features_refresh_interval: chrono::Duration, mode: FeatureRefresherMode, app_name: String, + delta: bool, ) -> Self { Self { features_refresh_interval, mode, app_name, + delta, } } } @@ -461,31 +463,22 @@ impl FeatureRefresher { async fn handle_client_features_delta_updated( &self, refresh_token: &EdgeToken, - features: ClientFeaturesDelta, + delta: ClientFeaturesDelta, etag: Option, ) { debug!("Got updated client features delta. Updating features with {etag:?}"); let key = cache_key(refresh_token); - self.update_last_refresh(refresh_token, etag, features.updated.len()); /// TODO: why we need to set updated here - self.features_cache - .modify(key.clone(), refresh_token, features.clone()); + self.features_cache.apply_delta(key.clone(), &delta); + self.update_last_refresh(refresh_token, etag, self.features_cache.get(&key).unwrap().features.len()); self.engine_cache .entry(key.clone()) .and_modify(|engine| { - if let Some(f) = self.features_cache.get(&key) { - let mut new_state = EngineState::default(); - let warnings = new_state.take_state(f.clone()); - if let Some(warnings) = warnings { - warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); - }; - *engine = new_state; - - } + engine.take_delta(&delta); }) .or_insert_with(|| { let mut new_state = EngineState::default(); - let warnings = new_state.take_state(features); + let warnings = new_state.take_delta(&delta); if let Some(warnings) = warnings { warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); }; @@ -570,13 +563,13 @@ impl FeatureRefresher { .await; match features_result { - Ok(feature_response) => match feature_response { + Ok(delta_response) => match delta_response { ClientFeaturesDeltaResponse::NoUpdate(tag) => { debug!("No update needed. Will update last check time with {tag}"); self.update_last_check(&refresh.token.clone()); } ClientFeaturesDeltaResponse::Updated(features, etag) => { - self.handle_client_features_updated(&refresh.token, features, etag) + self.handle_client_features_delta_updated(&refresh.token, features, etag) .await } }, diff --git a/server/src/urls.rs b/server/src/urls.rs index 51fa0759..12336c4c 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -125,8 +125,8 @@ mod tests { #[test_case("https://app.unleash-hosted.com/demo", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "No trailing slash, https protocol")] #[test_case("https://app.unleash-hosted.com/demo/", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, https protocol")] - #[test_case("http://app.unleash-hosted.com/demo/", "http://app.unleash-hosted.com/demo/api", "http://app.unleash-hosted.com/demo/api/client", "http://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, http protocol")] - #[test_case("http://app.unleash-hosted.com/", "http://app.unleash-hosted.com/api", "http://app.unleash-hosted.com/api/client", "http://app.unleash-hosted.com/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, no subpath, http protocol")] + #[test_case("http://app.unleash-hosted.com/demo/", "http://app.unleash-hosted.com/demo/api", "http://app.unleash-hosted.com/demo/api/client", "http://app.unleash-hosted.com/demo/api/client/features", "http://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, http protocol")] + #[test_case("http://app.unleash-hosted.com/", "http://app.unleash-hosted.com/api", "http://app.unleash-hosted.com/api/client", "http://app.unleash-hosted.com/api/client/features", "http://app.unleash-hosted.com/api/client/delta" ; "One trailing slash, no subpath, http protocol")] pub fn can_handle_base_urls( base_url: &str, api_url: &str, From 070d4a8c04bd6fab27d996407bb5f418380e373c Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 09:31:52 +0200 Subject: [PATCH 03/16] Update --- server/src/builder.rs | 4 ++-- server/src/http/feature_refresher.rs | 12 ++++++++++-- server/src/http/unleash_client.rs | 14 +++++++------- .../middleware/client_token_from_frontend_token.rs | 4 ++-- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 363a4e1a..2c60ad02 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -13,7 +13,7 @@ use unleash_yggdrasil::EngineState; use crate::cli::RedisMode; use crate::feature_cache::FeatureCache; use crate::http::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode}; -use crate::http::unleash_client::new_reqwest_client; +use crate::http::unleash_client::new_request_client; use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache}; use crate::persistence::file::FilePersister; use crate::persistence::redis::RedisPersister; @@ -236,7 +236,7 @@ async fn build_edge(args: &EdgeArgs, app_name: &str) -> EdgeResult { let persistence = get_data_source(args).await; - let http_client = new_reqwest_client( + let http_client = new_request_client( "unleash_edge".into(), args.skip_ssl_verification, args.client_identity.clone(), diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 906d1347..bd828567 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -664,7 +664,7 @@ mod tests { use crate::feature_cache::{update_projects_from_feature_update, FeatureCache}; use crate::filters::{project_filter, FeatureFilterSet}; - use crate::http::unleash_client::new_reqwest_client; + use crate::http::unleash_client::new_request_client; use crate::tests::features_from_disk; use crate::tokens::cache_key; use crate::types::TokenValidationStatus::Validated; @@ -686,7 +686,7 @@ mod tests { } fn create_test_client() -> UnleashClient { - let http_client = new_reqwest_client( + let http_client = new_request_client( "unleash_edge".into(), false, None, @@ -1695,4 +1695,12 @@ mod tests { ); assert_eq!(updated.len(), 0); } + + mod delta_tests { + use super::*; + #[test] + fn delta_works() { + + } + } } diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 7f1a1ead..dd70fca9 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -136,7 +136,7 @@ fn build_identity(tls: Option) -> EdgeResult { ) } -pub fn new_reqwest_client( +pub fn new_request_client( instance_id: String, skip_ssl_verification: bool, client_identity: Option, @@ -201,7 +201,7 @@ impl UnleashClient { let instance_id = instance_id_opt.unwrap_or_else(|| Ulid::new().to_string()); Ok(Self { urls: UnleashUrls::from_str(server_url)?, - backing_client: new_reqwest_client( + backing_client: new_request_client( instance_id, false, None, @@ -222,7 +222,7 @@ impl UnleashClient { Ok(Self { urls: UnleashUrls::from_str(server_url)?, - backing_client: new_reqwest_client( + backing_client: new_request_client( Ulid::new().to_string(), true, None, @@ -599,7 +599,7 @@ mod tests { use unleash_types::client_features::{ClientFeature, ClientFeatures}; use crate::cli::ClientIdentity; - use crate::http::unleash_client::new_reqwest_client; + use crate::http::unleash_client::new_request_client; use crate::{ cli::TlsOptions, middleware::as_async_middleware::as_async_middleware, @@ -896,7 +896,7 @@ mod tests { pkcs12_identity_file: Some(PathBuf::from(pfx)), pkcs12_passphrase: Some(passphrase.into()), }; - let client = new_reqwest_client( + let client = new_request_client( "test_pkcs12".into(), false, Some(identity), @@ -918,7 +918,7 @@ mod tests { pkcs12_identity_file: Some(PathBuf::from(pfx)), pkcs12_passphrase: Some(passphrase.into()), }; - let client = new_reqwest_client( + let client = new_request_client( "test_pkcs12".into(), false, Some(identity), @@ -940,7 +940,7 @@ mod tests { pkcs12_identity_file: None, pkcs12_passphrase: None, }; - let client = new_reqwest_client( + let client = new_request_client( "test_pkcs8".into(), false, Some(identity), diff --git a/server/src/middleware/client_token_from_frontend_token.rs b/server/src/middleware/client_token_from_frontend_token.rs index b33eb17c..402a6b68 100644 --- a/server/src/middleware/client_token_from_frontend_token.rs +++ b/server/src/middleware/client_token_from_frontend_token.rs @@ -65,7 +65,7 @@ mod tests { use crate::auth::token_validator::TokenValidator; use crate::feature_cache::FeatureCache; use crate::http::feature_refresher::FeatureRefresher; - use crate::http::unleash_client::{new_reqwest_client, UnleashClient}; + use crate::http::unleash_client::{new_request_client, UnleashClient}; use crate::tests::upstream_server; use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; @@ -131,7 +131,7 @@ mod tests { ) .await; - let http_client = new_reqwest_client( + let http_client = new_request_client( "unleash_edge".into(), false, None, From 48ae9570ad12d567c5936da656bff593ec18a94e Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 09:35:08 +0200 Subject: [PATCH 04/16] Update --- server/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 6ffce4e1..1d60d53a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -94,7 +94,7 @@ tokio-stream = { version = "0.1.17" } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } ulid = "1.1.2" -unleash-types = { version = "0.15.1", features = ["openapi", "hashes"] } +unleash-types = { version = "0.15.3", features = ["openapi", "hashes"] } unleash-yggdrasil = { version = "0.14.5" } utoipa = { version = "5", features = ["actix_extras", "chrono"] } utoipa-swagger-ui = { version = "8", features = ["actix-web"] } From 358ad4f689c1fb51cacec74d49362efb46206b6e Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 12:25:08 +0200 Subject: [PATCH 05/16] Finish test --- server/src/http/feature_refresher.rs | 14 +-- server/src/http/unleash_client.rs | 9 +- server/tests/delta_test.rs | 147 +++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 12 deletions(-) create mode 100644 server/tests/delta_test.rs diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index bd828567..aa455dd3 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -1696,11 +1696,11 @@ mod tests { assert_eq!(updated.len(), 0); } - mod delta_tests { - use super::*; - #[test] - fn delta_works() { - - } - } + // mod delta_tests { + // use super::*; + // #[test] + // fn delta_works() { + // + // } + // } } diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index dd70fca9..13f81936 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -26,7 +26,8 @@ use crate::http::headers::{ use crate::metrics::client_metrics::MetricsBatch; use crate::tls::build_upstream_certificate; use crate::types::{ - ClientFeaturesResponse, ClientFeaturesDeltaResponse, EdgeResult, EdgeToken, TokenValidationStatus, ValidateTokensRequest, + ClientFeaturesDeltaResponse, ClientFeaturesResponse, EdgeResult, EdgeToken, + TokenValidationStatus, ValidateTokensRequest, }; use crate::urls::UnleashUrls; use crate::{error::EdgeError, types::ClientFeaturesRequest}; @@ -47,7 +48,7 @@ lazy_static! { vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0] ) .unwrap(); - pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!( + pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!( "client_feature_delta_fetch", "Timings for fetching feature deltas in milliseconds", &["status_code"], @@ -194,7 +195,6 @@ impl UnleashClient { } } - #[cfg(test)] pub fn new(server_url: &str, instance_id_opt: Option) -> Result { use ulid::Ulid; @@ -598,6 +598,7 @@ mod tests { use chrono::Duration; use unleash_types::client_features::{ClientFeature, ClientFeatures}; + use super::{EdgeTokens, UnleashClient}; use crate::cli::ClientIdentity; use crate::http::unleash_client::new_request_client; use crate::{ @@ -610,8 +611,6 @@ mod tests { }, }; - use super::{EdgeTokens, UnleashClient}; - impl ClientFeaturesRequest { pub(crate) fn new(api_key: String, etag: Option) -> Self { Self { diff --git a/server/tests/delta_test.rs b/server/tests/delta_test.rs new file mode 100644 index 00000000..ed3ec353 --- /dev/null +++ b/server/tests/delta_test.rs @@ -0,0 +1,147 @@ +mod delta_test { + use std::sync::Arc; + use actix_http::header::IF_NONE_MATCH; + use actix_http::HttpService; + use actix_http_test::{test_server, TestServer}; + use actix_service::map_config; + use actix_web::dev::AppConfig; + use actix_web::{web, App, HttpRequest, HttpResponse}; + use actix_web::http::header::{ETag, EntityTag}; + use chrono::Duration; + use dashmap::DashMap; + use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta, Constraint, Operator, Segment}; + use unleash_yggdrasil::EngineState; + use unleash_edge::feature_cache::FeatureCache; + use unleash_edge::http::feature_refresher::FeatureRefresher; + use unleash_edge::http::unleash_client::UnleashClient; + use unleash_edge::types::{EdgeToken}; + + #[actix_web::test] + async fn test_delta() { + let srv = test_features_server().await; + let unleash_client = Arc::new(UnleashClient::new(srv.url("/").as_str(), None).unwrap()); + let features_cache: Arc = Arc::new(FeatureCache::default()); + let engine_cache: Arc> = Arc::new(DashMap::default()); + + let feature_refresher = Arc::new(FeatureRefresher { + unleash_client: unleash_client.clone(), + tokens_to_refresh: Arc::new(Default::default()), + features_cache: features_cache.clone(), + engine_cache: engine_cache.clone(), + refresh_interval: Duration::milliseconds(1), + persistence: None, + strict: false, + streaming: false, + delta: true, + app_name: "test-app".into(), + }); + let features = ClientFeatures { + version: 1, + features: vec![], + segments: None, + query: None, + meta: None, + }; + let initial_features = features.modify_and_copy(&revision(1)); + let final_features = initial_features.modify_and_copy(&revision(2)); + let token = + EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap(); + feature_refresher + .register_token_for_refresh(token.clone(), None) + .await; + feature_refresher.refresh_features().await; + let refreshed_features = features_cache.get(&cache_key(&token)).unwrap().value().clone(); + assert_eq!(refreshed_features, initial_features); + + let token_refresh = feature_refresher.tokens_to_refresh.get(&token.token).unwrap(); + feature_refresher.refresh_single_delta(token_refresh.value().clone()).await; + let refreshed_features = features_cache.get(&cache_key(&token)).unwrap().value().clone(); + assert_eq!(refreshed_features, final_features); + } + + fn cache_key(token: &EdgeToken) -> String { + token + .environment + .clone() + .unwrap_or_else(|| token.token.clone()) + } + + fn revision(revision_id: u32) -> ClientFeaturesDelta { + match revision_id { + 1 => ClientFeaturesDelta { + updated: vec![ + ClientFeature { + name: "test1".into(), + feature_type: Some("release".into()), + ..Default::default() + }, + ClientFeature { + name: "test2".into(), + feature_type: Some("release".into()), + ..Default::default() + }, + ], + removed: vec![], + segments: Some(vec![ + Segment { + id: 1, + constraints: vec![Constraint { + context_name: "userId".into(), + operator: Operator::In, + case_insensitive: false, + inverted: false, + values: Some(vec!["7".into()]), + value: None, + }], + } + ]), + revision_id: 1, + }, + _ => ClientFeaturesDelta { + updated: vec![ + ClientFeature { + name: "test1".into(), + feature_type: Some("release".into()), + ..Default::default() + }, + + ], + removed: vec!["test2".to_string()], + segments: None, + revision_id: 2, + } + } + } + + async fn return_client_features_delta(etag_header: Option) -> HttpResponse { + match etag_header { + Some(value) => match value.as_str() { + "1" => HttpResponse::Ok().insert_header(ETag(EntityTag::new_strong("2".to_string()))).json(revision(2)), + "2" => HttpResponse::NotModified().finish(), + _ => HttpResponse::NotModified().finish(), + }, + None => HttpResponse::Ok().insert_header(ETag(EntityTag::new_strong("1".to_string()))).json(revision(1)) + } + + } + + async fn test_features_server() -> TestServer { + test_server(move || { + HttpService::new(map_config( + App::new() + .service( + web::resource("/api/client/delta").route(web::get().to(|req: HttpRequest| { + let etag_header = req + .headers() + .get(IF_NONE_MATCH) + .and_then(|h| h.to_str().ok()); + return_client_features_delta(etag_header.map(|s| s.to_string())) + })), + ), + |_| AppConfig::default(), + )) + .tcp() + }) + .await + } +} From 4e367157a4140d7bc61d00cd0364a9e9d98dd856 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Thu, 9 Jan 2025 11:47:40 +0100 Subject: [PATCH 06/16] fix: undo deadlock hold on value() from dashmap --- server/src/http/feature_refresher.rs | 13 +++- server/tests/delta_test.rs | 106 +++++++++++++++------------ 2 files changed, 69 insertions(+), 50 deletions(-) diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index aa455dd3..8c39fc35 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -18,7 +18,9 @@ use crate::filters::{filter_client_features, FeatureFilterSet}; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; -use crate::types::{build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus}; +use crate::types::{ + build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus, +}; use crate::{ persistence::EdgePersistence, tokens::{cache_key, simplify}, @@ -138,7 +140,7 @@ impl FeatureRefresher { persistence, strict: config.mode != FeatureRefresherMode::Dynamic, streaming: config.mode == FeatureRefresherMode::Streaming, - delta : config.delta, + delta: config.delta, app_name: config.app_name, } } @@ -469,7 +471,11 @@ impl FeatureRefresher { debug!("Got updated client features delta. Updating features with {etag:?}"); let key = cache_key(refresh_token); self.features_cache.apply_delta(key.clone(), &delta); - self.update_last_refresh(refresh_token, etag, self.features_cache.get(&key).unwrap().features.len()); + self.update_last_refresh( + refresh_token, + etag, + self.features_cache.get(&key).unwrap().features.len(), + ); self.engine_cache .entry(key.clone()) .and_modify(|engine| { @@ -561,7 +567,6 @@ impl FeatureRefresher { etag: refresh.etag, }) .await; - match features_result { Ok(delta_response) => match delta_response { ClientFeaturesDeltaResponse::NoUpdate(tag) => { diff --git a/server/tests/delta_test.rs b/server/tests/delta_test.rs index ed3ec353..be82c3e1 100644 --- a/server/tests/delta_test.rs +++ b/server/tests/delta_test.rs @@ -1,22 +1,25 @@ mod delta_test { - use std::sync::Arc; use actix_http::header::IF_NONE_MATCH; use actix_http::HttpService; use actix_http_test::{test_server, TestServer}; use actix_service::map_config; use actix_web::dev::AppConfig; - use actix_web::{web, App, HttpRequest, HttpResponse}; use actix_web::http::header::{ETag, EntityTag}; + use actix_web::{web, App, HttpRequest, HttpResponse}; use chrono::Duration; use dashmap::DashMap; - use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta, Constraint, Operator, Segment}; - use unleash_yggdrasil::EngineState; + use std::sync::Arc; use unleash_edge::feature_cache::FeatureCache; use unleash_edge::http::feature_refresher::FeatureRefresher; use unleash_edge::http::unleash_client::UnleashClient; - use unleash_edge::types::{EdgeToken}; + use unleash_edge::types::EdgeToken; + use unleash_types::client_features::{ + ClientFeature, ClientFeatures, ClientFeaturesDelta, Constraint, Operator, Segment, + }; + use unleash_yggdrasil::EngineState; #[actix_web::test] + #[tracing_test::traced_test] async fn test_delta() { let srv = test_features_server().await; let unleash_client = Arc::new(UnleashClient::new(srv.url("/").as_str(), None).unwrap()); @@ -28,7 +31,7 @@ mod delta_test { tokens_to_refresh: Arc::new(Default::default()), features_cache: features_cache.clone(), engine_cache: engine_cache.clone(), - refresh_interval: Duration::milliseconds(1), + refresh_interval: Duration::seconds(6000), persistence: None, strict: false, streaming: false, @@ -50,12 +53,24 @@ mod delta_test { .register_token_for_refresh(token.clone(), None) .await; feature_refresher.refresh_features().await; - let refreshed_features = features_cache.get(&cache_key(&token)).unwrap().value().clone(); + let refreshed_features = features_cache + .get(&cache_key(&token)) + .unwrap() + .value() + .clone(); assert_eq!(refreshed_features, initial_features); - let token_refresh = feature_refresher.tokens_to_refresh.get(&token.token).unwrap(); - feature_refresher.refresh_single_delta(token_refresh.value().clone()).await; - let refreshed_features = features_cache.get(&cache_key(&token)).unwrap().value().clone(); + let token_refresh = feature_refresher + .tokens_to_refresh + .get(&token.token) + .unwrap() + .clone(); + feature_refresher.refresh_single_delta(token_refresh).await; + let refreshed_features = features_cache + .get(&cache_key(&token)) + .unwrap() + .value() + .clone(); assert_eq!(refreshed_features, final_features); } @@ -82,62 +97,61 @@ mod delta_test { }, ], removed: vec![], - segments: Some(vec![ - Segment { - id: 1, - constraints: vec![Constraint { - context_name: "userId".into(), - operator: Operator::In, - case_insensitive: false, - inverted: false, - values: Some(vec!["7".into()]), - value: None, - }], - } - ]), + segments: Some(vec![Segment { + id: 1, + constraints: vec![Constraint { + context_name: "userId".into(), + operator: Operator::In, + case_insensitive: false, + inverted: false, + values: Some(vec!["7".into()]), + value: None, + }], + }]), revision_id: 1, }, _ => ClientFeaturesDelta { - updated: vec![ - ClientFeature { - name: "test1".into(), - feature_type: Some("release".into()), - ..Default::default() - }, - - ], + updated: vec![ClientFeature { + name: "test1".into(), + feature_type: Some("release".into()), + ..Default::default() + }], removed: vec!["test2".to_string()], segments: None, revision_id: 2, - } + }, } } async fn return_client_features_delta(etag_header: Option) -> HttpResponse { match etag_header { Some(value) => match value.as_str() { - "1" => HttpResponse::Ok().insert_header(ETag(EntityTag::new_strong("2".to_string()))).json(revision(2)), - "2" => HttpResponse::NotModified().finish(), + "\"1\"" => HttpResponse::Ok() + .insert_header(ETag(EntityTag::new_strong("2".to_string()))) + .json(revision(2)), + "\"2\"" => HttpResponse::NotModified().finish(), _ => HttpResponse::NotModified().finish(), }, - None => HttpResponse::Ok().insert_header(ETag(EntityTag::new_strong("1".to_string()))).json(revision(1)) + None => HttpResponse::Ok() + .insert_header(ETag(EntityTag::new_strong("1".to_string()))) + .json(revision(1)), } - } async fn test_features_server() -> TestServer { test_server(move || { HttpService::new(map_config( - App::new() - .service( - web::resource("/api/client/delta").route(web::get().to(|req: HttpRequest| { - let etag_header = req - .headers() - .get(IF_NONE_MATCH) - .and_then(|h| h.to_str().ok()); - return_client_features_delta(etag_header.map(|s| s.to_string())) - })), - ), + App::new().service(web::resource("/api/client/delta").route(web::get().to( + |req: HttpRequest| { + println!("Got delta request"); + let etag_header = req + .headers() + .get(IF_NONE_MATCH) + .and_then(|h| h.to_str().ok()); + println!("Our etag header is {etag_header:?}"); + return_client_features_delta(etag_header.map(|s| s.to_string())) + }, + ))), |_| AppConfig::default(), )) .tcp() From ca97225561134334ace25884fc812ebdbbeb3cb1 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 12:50:53 +0200 Subject: [PATCH 07/16] Fix --- server/tests/delta_test.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/tests/delta_test.rs b/server/tests/delta_test.rs index be82c3e1..a89d761b 100644 --- a/server/tests/delta_test.rs +++ b/server/tests/delta_test.rs @@ -143,12 +143,10 @@ mod delta_test { HttpService::new(map_config( App::new().service(web::resource("/api/client/delta").route(web::get().to( |req: HttpRequest| { - println!("Got delta request"); let etag_header = req .headers() .get(IF_NONE_MATCH) .and_then(|h| h.to_str().ok()); - println!("Our etag header is {etag_header:?}"); return_client_features_delta(etag_header.map(|s| s.to_string())) }, ))), From 1dc74b880a9a3d656425c17d846442be5cd6592c Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 12:52:46 +0200 Subject: [PATCH 08/16] Fix --- server/src/builder.rs | 4 ++-- server/src/http/unleash_client.rs | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 2c60ad02..363a4e1a 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -13,7 +13,7 @@ use unleash_yggdrasil::EngineState; use crate::cli::RedisMode; use crate::feature_cache::FeatureCache; use crate::http::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode}; -use crate::http::unleash_client::new_request_client; +use crate::http::unleash_client::new_reqwest_client; use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache}; use crate::persistence::file::FilePersister; use crate::persistence::redis::RedisPersister; @@ -236,7 +236,7 @@ async fn build_edge(args: &EdgeArgs, app_name: &str) -> EdgeResult { let persistence = get_data_source(args).await; - let http_client = new_request_client( + let http_client = new_reqwest_client( "unleash_edge".into(), args.skip_ssl_verification, args.client_identity.clone(), diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 13f81936..74a1a56e 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -137,7 +137,7 @@ fn build_identity(tls: Option) -> EdgeResult { ) } -pub fn new_request_client( +pub fn new_reqwest_client( instance_id: String, skip_ssl_verification: bool, client_identity: Option, @@ -201,7 +201,7 @@ impl UnleashClient { let instance_id = instance_id_opt.unwrap_or_else(|| Ulid::new().to_string()); Ok(Self { urls: UnleashUrls::from_str(server_url)?, - backing_client: new_request_client( + backing_client: new_reqwest_client( instance_id, false, None, @@ -222,7 +222,7 @@ impl UnleashClient { Ok(Self { urls: UnleashUrls::from_str(server_url)?, - backing_client: new_request_client( + backing_client: new_reqwest_client( Ulid::new().to_string(), true, None, @@ -600,7 +600,7 @@ mod tests { use super::{EdgeTokens, UnleashClient}; use crate::cli::ClientIdentity; - use crate::http::unleash_client::new_request_client; + use crate::http::unleash_client::new_reqwest_client; use crate::{ cli::TlsOptions, middleware::as_async_middleware::as_async_middleware, @@ -895,7 +895,7 @@ mod tests { pkcs12_identity_file: Some(PathBuf::from(pfx)), pkcs12_passphrase: Some(passphrase.into()), }; - let client = new_request_client( + let client = new_reqwest_client( "test_pkcs12".into(), false, Some(identity), @@ -917,7 +917,7 @@ mod tests { pkcs12_identity_file: Some(PathBuf::from(pfx)), pkcs12_passphrase: Some(passphrase.into()), }; - let client = new_request_client( + let client = new_reqwest_client( "test_pkcs12".into(), false, Some(identity), @@ -939,7 +939,7 @@ mod tests { pkcs12_identity_file: None, pkcs12_passphrase: None, }; - let client = new_request_client( + let client = new_reqwest_client( "test_pkcs8".into(), false, Some(identity), From 0e61ea0d2d3b1ed1e64deeefc3d485c5c2b9aa2b Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 13:02:14 +0200 Subject: [PATCH 09/16] Fix --- server/src/http/feature_refresher.rs | 4 ++-- server/src/middleware/client_token_from_frontend_token.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 8c39fc35..b9b2c616 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -669,7 +669,7 @@ mod tests { use crate::feature_cache::{update_projects_from_feature_update, FeatureCache}; use crate::filters::{project_filter, FeatureFilterSet}; - use crate::http::unleash_client::new_request_client; + use crate::http::unleash_client::new_reqwest_client; use crate::tests::features_from_disk; use crate::tokens::cache_key; use crate::types::TokenValidationStatus::Validated; @@ -691,7 +691,7 @@ mod tests { } fn create_test_client() -> UnleashClient { - let http_client = new_request_client( + let http_client = new_reqwest_client( "unleash_edge".into(), false, None, diff --git a/server/src/middleware/client_token_from_frontend_token.rs b/server/src/middleware/client_token_from_frontend_token.rs index 402a6b68..b33eb17c 100644 --- a/server/src/middleware/client_token_from_frontend_token.rs +++ b/server/src/middleware/client_token_from_frontend_token.rs @@ -65,7 +65,7 @@ mod tests { use crate::auth::token_validator::TokenValidator; use crate::feature_cache::FeatureCache; use crate::http::feature_refresher::FeatureRefresher; - use crate::http::unleash_client::{new_request_client, UnleashClient}; + use crate::http::unleash_client::{new_reqwest_client, UnleashClient}; use crate::tests::upstream_server; use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; @@ -131,7 +131,7 @@ mod tests { ) .await; - let http_client = new_request_client( + let http_client = new_reqwest_client( "unleash_edge".into(), false, None, From 23f37d6aaaa814eebf10fca2cd7f5f4f2922c529 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 13:27:59 +0200 Subject: [PATCH 10/16] Fix --- server/src/http/unleash_client.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 69a30d25..44349a58 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -620,8 +620,6 @@ mod tests { }; use chrono::Duration; use unleash_types::client_features::{ClientFeature, ClientFeatures}; - - use super::{EdgeTokens, UnleashClient}; use crate::cli::ClientIdentity; use crate::http::unleash_client::new_reqwest_client; use crate::{ From e0b78f9842f16cb1fe6be7116992e68f6117f49d Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 13:31:36 +0200 Subject: [PATCH 11/16] Fix --- server/tests/delta_test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/tests/delta_test.rs b/server/tests/delta_test.rs index a89d761b..7999f86a 100644 --- a/server/tests/delta_test.rs +++ b/server/tests/delta_test.rs @@ -11,7 +11,7 @@ mod delta_test { use std::sync::Arc; use unleash_edge::feature_cache::FeatureCache; use unleash_edge::http::feature_refresher::FeatureRefresher; - use unleash_edge::http::unleash_client::UnleashClient; + use unleash_edge::http::unleash_client::{ClientMetaInformation, UnleashClient}; use unleash_edge::types::EdgeToken; use unleash_types::client_features::{ ClientFeature, ClientFeatures, ClientFeaturesDelta, Constraint, Operator, Segment, @@ -36,7 +36,7 @@ mod delta_test { strict: false, streaming: false, delta: true, - app_name: "test-app".into(), + client_meta_information:ClientMetaInformation::test_config(), }); let features = ClientFeatures { version: 1, From 8a625b52546ddf9e8af00f45e0e077eabf13cab3 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 13:52:45 +0200 Subject: [PATCH 12/16] Fix --- server/src/http/feature_refresher.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 1b4276c5..369ac994 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -1693,7 +1693,7 @@ mod tests { token_type: Some(TokenType::Client), environment: Some("dev".into()), projects: vec![String::from("testproject"), String::from("someother")], - status: TokenValidationStatus::Validated, + status: Validated, }; let updated = update_projects_from_feature_update( &token_with_access_to_both_empty_and_full_project, @@ -1702,12 +1702,4 @@ mod tests { ); assert_eq!(updated.len(), 0); } - - // mod delta_tests { - // use super::*; - // #[test] - // fn delta_works() { - // - // } - // } } From e57a6ed3bab79b9c5f3d8657078dd836a7cf8511 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 14:23:40 +0200 Subject: [PATCH 13/16] Fix --- server/src/cli.rs | 2 +- server/src/feature_cache.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 428a92fc..d46674c0 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -217,7 +217,7 @@ pub struct EdgeArgs { #[clap(long, env, default_value_t = false, requires = "strict")] pub streaming: bool, - /// If set to true. Edge connects to upstream using delta polling instead of normal polling. + /// If set to true. Edge connects to upstream using delta polling instead of normal polling. This is experimental feature and might and change. #[clap(long, env, default_value_t = false, conflicts_with = "streaming")] pub delta: bool, diff --git a/server/src/feature_cache.rs b/server/src/feature_cache.rs index 7750fa8c..3e1021e3 100644 --- a/server/src/feature_cache.rs +++ b/server/src/feature_cache.rs @@ -69,7 +69,7 @@ impl FeatureCache { pub fn apply_delta(&self, key: String, delta: &ClientFeaturesDelta) { let client_features = ClientFeatures { - version : 1, + version : 2, features : delta.updated.clone(), segments: delta.segments.clone(), query: None, From f2edcbf02dcdcb7ef18626cbde36b3d30fb5ddb6 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Thu, 9 Jan 2025 14:37:22 +0200 Subject: [PATCH 14/16] Fix --- server/tests/delta_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tests/delta_test.rs b/server/tests/delta_test.rs index 7999f86a..3dd89cd4 100644 --- a/server/tests/delta_test.rs +++ b/server/tests/delta_test.rs @@ -39,7 +39,7 @@ mod delta_test { client_meta_information:ClientMetaInformation::test_config(), }); let features = ClientFeatures { - version: 1, + version: 2, features: vec![], segments: None, query: None, From b973bf2d7530893ada0d20a1bf8306126657444e Mon Sep 17 00:00:00 2001 From: sjaanus Date: Fri, 10 Jan 2025 10:57:10 +0200 Subject: [PATCH 15/16] Add flag --- server/Cargo.toml | 3 + server/src/auth/token_validator.rs | 2 +- server/src/builder.rs | 4 +- server/src/client_api.rs | 2 +- server/src/http/background_send_metrics.rs | 2 +- server/src/http/mod.rs | 2 +- .../http/refresher/delta_refresher.rs} | 126 +++++++++++++++++- .../http/{ => refresher}/feature_refresher.rs | 108 +-------------- server/src/http/refresher/mod.rs | 2 + server/src/internal_backstage.rs | 4 +- server/src/main.rs | 2 +- .../client_token_from_frontend_token.rs | 4 +- 12 files changed, 141 insertions(+), 120 deletions(-) rename server/{tests/delta_test.rs => src/http/refresher/delta_refresher.rs} (51%) rename server/src/http/{ => refresher}/feature_refresher.rs (93%) create mode 100644 server/src/http/refresher/mod.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index 3cef7b88..a9075b09 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -116,3 +116,6 @@ tracing-test = "0.2.5" [build-dependencies] shadow-rs = "0.37.0" + +[features] +delta = [] diff --git a/server/src/auth/token_validator.rs b/server/src/auth/token_validator.rs index 58c01437..f566d4bb 100644 --- a/server/src/auth/token_validator.rs +++ b/server/src/auth/token_validator.rs @@ -4,7 +4,7 @@ use dashmap::DashMap; use tracing::trace; use unleash_types::Upsert; -use crate::http::feature_refresher::FeatureRefresher; +use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::UnleashClient; use crate::persistence::EdgePersistence; use crate::types::{ diff --git a/server/src/builder.rs b/server/src/builder.rs index 4cee635c..f6130ec5 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -12,7 +12,7 @@ use unleash_yggdrasil::EngineState; use crate::cli::RedisMode; use crate::feature_cache::FeatureCache; -use crate::http::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode}; +use crate::http::refresher::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode}; use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation}; use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache}; use crate::persistence::file::FilePersister; @@ -23,7 +23,7 @@ use crate::{ auth::token_validator::TokenValidator, cli::{CliArgs, EdgeArgs, EdgeMode, OfflineArgs}, error::EdgeError, - http::{feature_refresher::FeatureRefresher, unleash_client::UnleashClient}, + http::{refresher::feature_refresher::FeatureRefresher, unleash_client::UnleashClient}, types::{EdgeResult, EdgeToken, TokenType}, }; diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 87c8a437..3d8ef9d0 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -4,7 +4,7 @@ use crate::filters::{ filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet, }; use crate::http::broadcaster::Broadcaster; -use crate::http::feature_refresher::FeatureRefresher; +use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::metrics::client_metrics::MetricsCache; use crate::tokens::cache_key; use crate::types::{ diff --git a/server/src/http/background_send_metrics.rs b/server/src/http/background_send_metrics.rs index dd6001d0..a0d9aba4 100644 --- a/server/src/http/background_send_metrics.rs +++ b/server/src/http/background_send_metrics.rs @@ -14,7 +14,7 @@ use crate::{ metrics::client_metrics::{size_of_batch, MetricsCache}, }; -use super::feature_refresher::FeatureRefresher; +use super::refresher::feature_refresher::FeatureRefresher; lazy_static! { pub static ref METRICS_UPSTREAM_HTTP_ERRORS: IntGaugeVec = register_int_gauge_vec!( diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index f7251942..c223d055 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -1,6 +1,6 @@ #[cfg(not(tarpaulin_include))] pub mod background_send_metrics; pub mod broadcaster; -pub mod feature_refresher; pub(crate) mod headers; pub mod unleash_client; +pub mod refresher; diff --git a/server/tests/delta_test.rs b/server/src/http/refresher/delta_refresher.rs similarity index 51% rename from server/tests/delta_test.rs rename to server/src/http/refresher/delta_refresher.rs index 3dd89cd4..e803fb6e 100644 --- a/server/tests/delta_test.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -1,4 +1,116 @@ -mod delta_test { +use actix_web::http::header::EntityTag; +use reqwest::StatusCode; +use tracing::{debug, info, warn}; +use unleash_types::client_features::{ClientFeaturesDelta}; +use unleash_yggdrasil::EngineState; + +use crate::error::{EdgeError, FeatureError}; +use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh}; +use crate::http::refresher::feature_refresher::FeatureRefresher; +use crate::tokens::cache_key; + +impl FeatureRefresher { + async fn handle_client_features_delta_updated( + &self, + refresh_token: &EdgeToken, + delta: ClientFeaturesDelta, + etag: Option, + ) { + debug!("Got updated client features delta. Updating features with {etag:?}"); + let key = cache_key(refresh_token); + self.features_cache.apply_delta(key.clone(), &delta); + self.update_last_refresh( + refresh_token, + etag, + self.features_cache.get(&key).unwrap().features.len(), + ); + self.engine_cache + .entry(key.clone()) + .and_modify(|engine| { + engine.take_delta(&delta); + }) + .or_insert_with(|| { + let mut new_state = EngineState::default(); + + let warnings = new_state.take_delta(&delta); + if let Some(warnings) = warnings { + warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); + }; + new_state + }); + } + + pub async fn refresh_single_delta(&self, refresh: TokenRefresh) { + let features_result = self + .unleash_client + .get_client_features_delta(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }) + .await; + match features_result { + Ok(delta_response) => match delta_response { + ClientFeaturesDeltaResponse::NoUpdate(tag) => { + debug!("No update needed. Will update last check time with {tag}"); + self.update_last_check(&refresh.token.clone()); + } + ClientFeaturesDeltaResponse::Updated(features, etag) => { + self.handle_client_features_delta_updated(&refresh.token, features, etag) + .await + } + }, + Err(e) => { + match e { + EdgeError::ClientFeaturesFetchError(fe) => { + match fe { + FeatureError::Retriable(status_code) => match status_code { + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => { + info!("Upstream is having some problems, increasing my waiting period"); + self.backoff(&refresh.token); + } + StatusCode::TOO_MANY_REQUESTS => { + info!("Got told that upstream is receiving too many requests"); + self.backoff(&refresh.token); + } + _ => { + info!("Couldn't refresh features, but will retry next go") + } + }, + FeatureError::AccessDenied => { + info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks"); + self.tokens_to_refresh.remove(&refresh.token.token); + if !self.tokens_to_refresh.iter().any(|e| { + e.value().token.environment == refresh.token.environment + }) { + let cache_key = cache_key(&refresh.token); + // No tokens left that access the environment of our current refresh. Deleting client features and engine cache + self.features_cache.remove(&cache_key); + self.engine_cache.remove(&cache_key); + } + } + FeatureError::NotFound => { + info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again"); + self.backoff(&refresh.token); + } + } + } + EdgeError::ClientCacheError => { + info!("Couldn't refresh features, but will retry next go") + } + _ => info!("Couldn't refresh features: {e:?}. Will retry next pass"), + } + } + } + } +} + + +// #[cfg(feature = "delta")] +#[cfg(test)] +mod tests { use actix_http::header::IF_NONE_MATCH; use actix_http::HttpService; use actix_http_test::{test_server, TestServer}; @@ -9,10 +121,10 @@ mod delta_test { use chrono::Duration; use dashmap::DashMap; use std::sync::Arc; - use unleash_edge::feature_cache::FeatureCache; - use unleash_edge::http::feature_refresher::FeatureRefresher; - use unleash_edge::http::unleash_client::{ClientMetaInformation, UnleashClient}; - use unleash_edge::types::EdgeToken; + use crate::feature_cache::FeatureCache; + use crate::http::refresher::feature_refresher::FeatureRefresher; + use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; + use crate::types::EdgeToken; use unleash_types::client_features::{ ClientFeature, ClientFeatures, ClientFeaturesDelta, Constraint, Operator, Segment, }; @@ -152,8 +264,8 @@ mod delta_test { ))), |_| AppConfig::default(), )) - .tcp() + .tcp() }) - .await + .await } } diff --git a/server/src/http/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs similarity index 93% rename from server/src/http/feature_refresher.rs rename to server/src/http/refresher/feature_refresher.rs index 369ac994..3f647937 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -8,7 +8,7 @@ use eventsource_client::Client; use futures::TryStreamExt; use reqwest::StatusCode; use tracing::{debug, info, warn}; -use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta}; +use unleash_types::client_features::{ClientFeatures}; use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; use unleash_yggdrasil::EngineState; @@ -19,7 +19,7 @@ use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; use crate::types::{ - build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus, + build, EdgeResult, TokenType, TokenValidationStatus, }; use crate::{ persistence::EdgePersistence, @@ -27,7 +27,7 @@ use crate::{ types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, }; -use super::unleash_client::{ClientMetaInformation, UnleashClient}; +use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; fn frontend_token_is_covered_by_tokens( frontend_token: &EdgeToken, @@ -412,7 +412,7 @@ impl FeatureRefresher { pub async fn hydrate_new_tokens(&self) { let hydrations = self.get_tokens_never_refreshed(); for hydration in hydrations { - if self.delta { + if cfg!(feature = "delta") && self.delta { self.refresh_single_delta(hydration).await; } else { self.refresh_single(hydration).await; @@ -422,11 +422,12 @@ impl FeatureRefresher { pub async fn refresh_features(&self) { let refreshes = self.get_tokens_due_for_refresh(); for refresh in refreshes { - if self.delta { + if cfg!(feature = "delta") && self.delta { self.refresh_single_delta(refresh).await; } else { self.refresh_single(refresh).await; } + } } @@ -464,37 +465,6 @@ impl FeatureRefresher { new_state }); } - - async fn handle_client_features_delta_updated( - &self, - refresh_token: &EdgeToken, - delta: ClientFeaturesDelta, - etag: Option, - ) { - debug!("Got updated client features delta. Updating features with {etag:?}"); - let key = cache_key(refresh_token); - self.features_cache.apply_delta(key.clone(), &delta); - self.update_last_refresh( - refresh_token, - etag, - self.features_cache.get(&key).unwrap().features.len(), - ); - self.engine_cache - .entry(key.clone()) - .and_modify(|engine| { - engine.take_delta(&delta); - }) - .or_insert_with(|| { - let mut new_state = EngineState::default(); - - let warnings = new_state.take_delta(&delta); - if let Some(warnings) = warnings { - warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); - }; - new_state - }); - } - pub async fn refresh_single(&self, refresh: TokenRefresh) { let features_result = self .unleash_client @@ -561,72 +531,6 @@ impl FeatureRefresher { } } } - - pub async fn refresh_single_delta(&self, refresh: TokenRefresh) { - let features_result = self - .unleash_client - .get_client_features_delta(ClientFeaturesRequest { - api_key: refresh.token.token.clone(), - etag: refresh.etag, - }) - .await; - match features_result { - Ok(delta_response) => match delta_response { - ClientFeaturesDeltaResponse::NoUpdate(tag) => { - debug!("No update needed. Will update last check time with {tag}"); - self.update_last_check(&refresh.token.clone()); - } - ClientFeaturesDeltaResponse::Updated(features, etag) => { - self.handle_client_features_delta_updated(&refresh.token, features, etag) - .await - } - }, - Err(e) => { - match e { - EdgeError::ClientFeaturesFetchError(fe) => { - match fe { - FeatureError::Retriable(status_code) => match status_code { - StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => { - info!("Upstream is having some problems, increasing my waiting period"); - self.backoff(&refresh.token); - } - StatusCode::TOO_MANY_REQUESTS => { - info!("Got told that upstream is receiving too many requests"); - self.backoff(&refresh.token); - } - _ => { - info!("Couldn't refresh features, but will retry next go") - } - }, - FeatureError::AccessDenied => { - info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks"); - self.tokens_to_refresh.remove(&refresh.token.token); - if !self.tokens_to_refresh.iter().any(|e| { - e.value().token.environment == refresh.token.environment - }) { - let cache_key = cache_key(&refresh.token); - // No tokens left that access the environment of our current refresh. Deleting client features and engine cache - self.features_cache.remove(&cache_key); - self.engine_cache.remove(&cache_key); - } - } - FeatureError::NotFound => { - info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again"); - self.backoff(&refresh.token); - } - } - } - EdgeError::ClientCacheError => { - info!("Couldn't refresh features, but will retry next go") - } - _ => info!("Couldn't refresh features: {e:?}. Will retry next pass"), - } - } - } - } pub fn backoff(&self, token: &EdgeToken) { self.tokens_to_refresh .alter(&token.token, |_k, old_refresh| { diff --git a/server/src/http/refresher/mod.rs b/server/src/http/refresher/mod.rs new file mode 100644 index 00000000..e826dcf7 --- /dev/null +++ b/server/src/http/refresher/mod.rs @@ -0,0 +1,2 @@ +pub mod feature_refresher; +pub mod delta_refresher; diff --git a/server/src/internal_backstage.rs b/server/src/internal_backstage.rs index 70468fbb..ed55f703 100644 --- a/server/src/internal_backstage.rs +++ b/server/src/internal_backstage.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::ClientApplication; -use crate::http::feature_refresher::FeatureRefresher; +use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::metrics::actix_web_metrics::PrometheusMetricsHandler; use crate::metrics::client_metrics::MetricsCache; use crate::types::{BuildInfo, EdgeJsonResult, EdgeToken, TokenInfo, TokenRefresh}; @@ -182,7 +182,7 @@ mod tests { use crate::auth::token_validator::TokenValidator; use crate::feature_cache::FeatureCache; - use crate::http::feature_refresher::FeatureRefresher; + use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::UnleashClient; use crate::internal_backstage::EdgeStatus; use crate::middleware; diff --git a/server/src/main.rs b/server/src/main.rs index 641f8e0f..14b0f7d5 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -17,7 +17,7 @@ use unleash_edge::builder::build_caches_and_refreshers; use unleash_edge::cli::{CliArgs, EdgeMode}; use unleash_edge::feature_cache::FeatureCache; use unleash_edge::http::background_send_metrics::send_metrics_one_shot; -use unleash_edge::http::feature_refresher::FeatureRefresher; +use unleash_edge::http::refresher::feature_refresher::FeatureRefresher; use unleash_edge::metrics::client_metrics::MetricsCache; use unleash_edge::offline::offline_hotload; use unleash_edge::persistence::{persist_data, EdgePersistence}; diff --git a/server/src/middleware/client_token_from_frontend_token.rs b/server/src/middleware/client_token_from_frontend_token.rs index f3f2a9c9..ee85fd25 100644 --- a/server/src/middleware/client_token_from_frontend_token.rs +++ b/server/src/middleware/client_token_from_frontend_token.rs @@ -7,7 +7,7 @@ use dashmap::DashMap; use tracing::debug; use crate::{ - http::feature_refresher::FeatureRefresher, + http::refresher::feature_refresher::FeatureRefresher, tokens, types::{EdgeResult, EdgeToken, TokenValidationStatus}, }; @@ -64,7 +64,7 @@ mod tests { use crate::auth::token_validator::TokenValidator; use crate::feature_cache::FeatureCache; - use crate::http::feature_refresher::FeatureRefresher; + use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::{new_reqwest_client, UnleashClient}; use crate::tests::upstream_server; use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; From 5e01e91d8a3f244d29ed5d9c81314a6153b98e94 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Fri, 10 Jan 2025 11:12:34 +0200 Subject: [PATCH 16/16] Fix --- server/src/http/refresher/delta_refresher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index e803fb6e..312274c7 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -108,7 +108,7 @@ impl FeatureRefresher { } -// #[cfg(feature = "delta")] +#[cfg(feature = "delta")] #[cfg(test)] mod tests { use actix_http::header::IF_NONE_MATCH;