Skip to content

Commit

Permalink
Merge branch 'main' into release-plz-2025-01-16T03-27-38Z
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 16, 2025
2 parents 6a57e8d + 9abcfee commit 6b8e2e7
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 10 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ futures = "0.3.31"
futures-core = "0.3.31"
iter_tools = "0.24.0"
itertools = "0.14.0"
json-structural-diff = "0.2.0"
lazy_static = "1.5.0"
num_cpus = "1.16.0"
opentelemetry = { version = "0.27.1", features = ["trace", "metrics"] }
Expand Down Expand Up @@ -94,8 +95,8 @@ tokio-stream = { version = "0.1.17" }
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] }
ulid = "1.1.4"
unleash-types = { version = "0.15.3", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.14.5" }
unleash-types = { version = "0.15.4", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.14.6" }
utoipa = { version = "5.3.1", features = ["actix_extras", "chrono"] }
utoipa-swagger-ui = { version = "8.1.1", features = ["actix-web"] }
[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ async fn build_edge(
refresher_mode,
client_meta_information,
args.delta,
args.delta_diff
);
let feature_refresher = Arc::new(FeatureRefresher::new(
unleash_client,
Expand Down Expand Up @@ -387,6 +388,7 @@ mod tests {
prometheus_username: None,
streaming: false,
delta: false,
delta_diff: false,
};

let result = build_edge(
Expand Down
4 changes: 4 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ pub struct EdgeArgs {
#[clap(long, env, default_value_t = false, requires = "strict")]
pub delta: bool,

/// If set to true, it compares features payload with delta payload and logs diff. This is experimental feature and might change. Requires strict mode
#[clap(long, env, default_value_t = false, requires = "strict")]
pub delta_diff: bool,

/// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream
#[clap(long, env)]
pub prometheus_remote_write_url: Option<String>,
Expand Down
1 change: 1 addition & 0 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ mod tests {
streaming: false,
client_meta_information: ClientMetaInformation::test_config(),
delta: false,
delta_diff: false,
});
let token_validator = Arc::new(TokenValidator {
unleash_client: unleash_client.clone(),
Expand Down
7 changes: 4 additions & 3 deletions server/src/http/refresher/delta_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ impl FeatureRefresher {
}

pub async fn refresh_single_delta(&self, refresh: TokenRefresh) {
let features_result = self
let delta_result = self
.unleash_client
.get_client_features_delta(ClientFeaturesRequest {
api_key: refresh.token.token.clone(),
etag: refresh.etag,
})
.await;
match features_result {
match delta_result {
Ok(delta_response) => match delta_response {
ClientFeaturesDeltaResponse::NoUpdate(tag) => {
debug!("No update needed. Will update last check time with {tag}");
Expand Down Expand Up @@ -154,7 +154,8 @@ mod tests {
strict: false,
streaming: false,
delta: true,
client_meta_information:ClientMetaInformation::test_config(),
delta_diff : false,
client_meta_information: ClientMetaInformation::test_config(),
});
let features = ClientFeatures {
version: 2,
Expand Down
52 changes: 47 additions & 5 deletions server/src/http/refresher/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::Utc;
use dashmap::DashMap;
use eventsource_client::Client;
use futures::TryStreamExt;
use json_structural_diff::JsonDiff;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use unleash_types::client_features::{ClientFeatures};
Expand All @@ -18,9 +19,7 @@ use crate::filters::{filter_client_features, FeatureFilterSet};
use crate::http::headers::{
UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER,
};
use crate::types::{
build, EdgeResult, TokenType, TokenValidationStatus,
};
use crate::types::{build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus};
use crate::{
persistence::EdgePersistence,
tokens::{cache_key, simplify},
Expand Down Expand Up @@ -52,6 +51,7 @@ pub struct FeatureRefresher {
pub streaming: bool,
pub client_meta_information: ClientMetaInformation,
pub delta: bool,
pub delta_diff: bool,
}

impl Default for FeatureRefresher {
Expand All @@ -67,6 +67,7 @@ impl Default for FeatureRefresher {
streaming: false,
client_meta_information: Default::default(),
delta: false,
delta_diff: false,
}
}
}
Expand Down Expand Up @@ -105,6 +106,7 @@ pub struct FeatureRefreshConfig {
mode: FeatureRefresherMode,
client_meta_information: ClientMetaInformation,
delta: bool,
delta_diff: bool,
}

impl FeatureRefreshConfig {
Expand All @@ -113,12 +115,14 @@ impl FeatureRefreshConfig {
mode: FeatureRefresherMode,
client_meta_information: ClientMetaInformation,
delta: bool,
delta_diff: bool,
) -> Self {
Self {
features_refresh_interval,
mode,
client_meta_information,
delta,
delta_diff
}
}
}
Expand All @@ -142,6 +146,7 @@ impl FeatureRefresher {
streaming: config.mode == FeatureRefresherMode::Streaming,
client_meta_information: config.client_meta_information,
delta: config.delta,
delta_diff: config.delta_diff,
}
}

Expand Down Expand Up @@ -393,6 +398,40 @@ impl FeatureRefresher {
Ok(())
}

async fn compare_delta_cache(&self, refresh: &TokenRefresh) {
let delta_result = self
.unleash_client
.get_client_features_delta(ClientFeaturesRequest {
api_key: refresh.token.token.clone(),
etag: refresh.etag.clone(),
})
.await;

let key = cache_key(&refresh.token);
if let Some(client_features) = self.features_cache.get(&key).as_ref() {
if let Ok(ClientFeaturesDeltaResponse::Updated(delta_features, _etag)) = delta_result {
let c_features = &client_features.features;
let d_features = delta_features.updated;

let delta_json = serde_json::to_value(d_features).unwrap();
let client_json = serde_json::to_value(c_features).unwrap();

let delta_json_len = delta_json.to_string().len();
let client_json_len = client_json.to_string().len();

if delta_json_len == client_json_len {
info!("The JSON structure lengths are identical.");
} else {
info!("Structural differences found:");
info!("Length of delta_json: {}", delta_json_len);
info!("Length of old_json: {}", client_json_len);
let diff = JsonDiff::diff(&delta_json, &client_json, false);
debug!("{:?}", diff.diff.unwrap());
}
}
}
}

pub async fn start_refresh_features_background_task(&self) {
if self.streaming {
loop {
Expand Down Expand Up @@ -470,7 +509,7 @@ impl FeatureRefresher {
.unleash_client
.get_client_features(ClientFeaturesRequest {
api_key: refresh.token.token.clone(),
etag: refresh.etag,
etag: refresh.etag.clone(),
})
.await;

Expand All @@ -482,7 +521,10 @@ impl FeatureRefresher {
}
ClientFeaturesResponse::Updated(features, etag) => {
self.handle_client_features_updated(&refresh.token, features, etag)
.await
.await;
if cfg!(feature = "delta") && self.delta_diff {
self.compare_delta_cache(&refresh).await;
}
}
},
Err(e) => {
Expand Down
1 change: 1 addition & 0 deletions server/tests/streaming_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ mod streaming_test {
strict: true,
dynamic: false,
delta: false,
delta_diff:false,
prometheus_remote_write_url: None,
prometheus_push_interval: 60,
prometheus_username: None,
Expand Down

0 comments on commit 6b8e2e7

Please sign in to comment.