Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delta api implementation #626

Merged
merged 24 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,6 @@ tracing-test = "0.2.5"

[build-dependencies]
shadow-rs = "0.37.0"

[features]
delta = []
2 changes: 1 addition & 1 deletion server/src/auth/token_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use dashmap::DashMap;
use tracing::trace;
use unleash_types::Upsert;

use crate::http::feature_refresher::FeatureRefresher;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::UnleashClient;
use crate::persistence::EdgePersistence;
use crate::types::{
Expand Down
6 changes: 4 additions & 2 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use unleash_yggdrasil::EngineState;

use crate::cli::RedisMode;
use crate::feature_cache::FeatureCache;
use crate::http::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode};
use crate::http::refresher::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode};
use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation};
use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache};
use crate::persistence::file::FilePersister;
Expand All @@ -23,7 +23,7 @@ use crate::{
auth::token_validator::TokenValidator,
cli::{CliArgs, EdgeArgs, EdgeMode, OfflineArgs},
error::EdgeError,
http::{feature_refresher::FeatureRefresher, unleash_client::UnleashClient},
http::{refresher::feature_refresher::FeatureRefresher, unleash_client::UnleashClient},
types::{EdgeResult, EdgeToken, TokenType},
};

Expand Down Expand Up @@ -270,6 +270,7 @@ async fn build_edge(
Duration::seconds(args.features_refresh_interval_seconds as i64),
refresher_mode,
client_meta_information,
args.delta,
);
let feature_refresher = Arc::new(FeatureRefresher::new(
unleash_client,
Expand Down Expand Up @@ -385,6 +386,7 @@ mod tests {
prometheus_password: None,
prometheus_username: None,
streaming: false,
delta: false,
};

let result = build_edge(
Expand Down
4 changes: 4 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ pub struct EdgeArgs {
#[clap(long, env, default_value_t = false, requires = "strict")]
pub streaming: bool,

/// If set to true. Edge connects to upstream using delta polling instead of normal polling. This is experimental feature and might and change.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// If set to true. Edge connects to upstream using delta polling instead of normal polling. This is experimental feature and might and change.
/// If set to true. Edge connects to upstream using delta polling instead of normal polling. This is experimental feature and might change.

#[clap(long, env, default_value_t = false, conflicts_with = "streaming")]
pub delta: bool,

/// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream
#[clap(long, env)]
pub prometheus_remote_write_url: Option<String>,
Expand Down
3 changes: 2 additions & 1 deletion server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::filters::{
filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet,
};
use crate::http::broadcaster::Broadcaster;
use crate::http::feature_refresher::FeatureRefresher;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::tokens::cache_key;
use crate::types::{
Expand Down Expand Up @@ -1012,6 +1012,7 @@ mod tests {
strict: false,
streaming: false,
client_meta_information: ClientMetaInformation::test_config(),
delta: false,
});
let token_validator = Arc::new(TokenValidator {
unleash_client: unleash_client.clone(),
Expand Down
19 changes: 18 additions & 1 deletion server/src/feature_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use unleash_types::{
client_features::{ClientFeature, ClientFeatures, Segment},
Deduplicate,
};

use unleash_types::client_features::ClientFeaturesDelta;
use crate::types::EdgeToken;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -67,6 +67,23 @@ impl FeatureCache {
self.send_full_update(key);
}

pub fn apply_delta(&self, key: String, delta: &ClientFeaturesDelta) {
let client_features = ClientFeatures {
version : 2,
features : delta.updated.clone(),
segments: delta.segments.clone(),
query: None,
meta: None,
};
self.features
.entry(key.clone())
.and_modify(|existing_features| {
existing_features.modify_in_place(delta);
})
.or_insert(client_features);
self.send_full_update(key);
}

pub fn is_empty(&self) -> bool {
self.features.is_empty()
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
metrics::client_metrics::{size_of_batch, MetricsCache},
};

use super::feature_refresher::FeatureRefresher;
use super::refresher::feature_refresher::FeatureRefresher;

lazy_static! {
pub static ref METRICS_UPSTREAM_HTTP_ERRORS: IntGaugeVec = register_int_gauge_vec!(
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(not(tarpaulin_include))]
pub mod background_send_metrics;
pub mod broadcaster;
pub mod feature_refresher;
pub(crate) mod headers;
pub mod unleash_client;
pub mod refresher;
271 changes: 271 additions & 0 deletions server/src/http/refresher/delta_refresher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
use actix_web::http::header::EntityTag;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use unleash_types::client_features::{ClientFeaturesDelta};
use unleash_yggdrasil::EngineState;

use crate::error::{EdgeError, FeatureError};
use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh};
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::tokens::cache_key;

impl FeatureRefresher {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either add #[cfg(feature = "delta")] here, or remove it from the tests part.

async fn handle_client_features_delta_updated(
&self,
refresh_token: &EdgeToken,
delta: ClientFeaturesDelta,
etag: Option<EntityTag>,
) {
debug!("Got updated client features delta. Updating features with {etag:?}");
let key = cache_key(refresh_token);
self.features_cache.apply_delta(key.clone(), &delta);
self.update_last_refresh(
refresh_token,
etag,
self.features_cache.get(&key).unwrap().features.len(),
);
self.engine_cache
.entry(key.clone())
.and_modify(|engine| {
engine.take_delta(&delta);
})
.or_insert_with(|| {
let mut new_state = EngineState::default();

let warnings = new_state.take_delta(&delta);
if let Some(warnings) = warnings {
warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
};
new_state
});
}

pub async fn refresh_single_delta(&self, refresh: TokenRefresh) {
let features_result = self
.unleash_client
.get_client_features_delta(ClientFeaturesRequest {
api_key: refresh.token.token.clone(),
etag: refresh.etag,
})
.await;
match features_result {
Ok(delta_response) => match delta_response {
ClientFeaturesDeltaResponse::NoUpdate(tag) => {
debug!("No update needed. Will update last check time with {tag}");
self.update_last_check(&refresh.token.clone());
}
ClientFeaturesDeltaResponse::Updated(features, etag) => {
self.handle_client_features_delta_updated(&refresh.token, features, etag)
.await
}
},
Err(e) => {
match e {
EdgeError::ClientFeaturesFetchError(fe) => {
match fe {
FeatureError::Retriable(status_code) => match status_code {
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
info!("Upstream is having some problems, increasing my waiting period");
self.backoff(&refresh.token);
}
StatusCode::TOO_MANY_REQUESTS => {
info!("Got told that upstream is receiving too many requests");
self.backoff(&refresh.token);
}
_ => {
info!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
}) {
let cache_key = cache_key(&refresh.token);
// No tokens left that access the environment of our current refresh. Deleting client features and engine cache
self.features_cache.remove(&cache_key);
self.engine_cache.remove(&cache_key);
}
}
FeatureError::NotFound => {
info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again");
self.backoff(&refresh.token);
}
}
}
EdgeError::ClientCacheError => {
info!("Couldn't refresh features, but will retry next go")
}
_ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
}
}


#[cfg(feature = "delta")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're always adding the impl part, I'm not sure if there's much value in using a feature flag just for tests.

#[cfg(test)]
mod tests {
use actix_http::header::IF_NONE_MATCH;
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;
use actix_web::dev::AppConfig;
use actix_web::http::header::{ETag, EntityTag};
use actix_web::{web, App, HttpRequest, HttpResponse};
use chrono::Duration;
use dashmap::DashMap;
use std::sync::Arc;
use crate::feature_cache::FeatureCache;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::{ClientMetaInformation, UnleashClient};
use crate::types::EdgeToken;
use unleash_types::client_features::{
ClientFeature, ClientFeatures, ClientFeaturesDelta, Constraint, Operator, Segment,
};
use unleash_yggdrasil::EngineState;

#[actix_web::test]
#[tracing_test::traced_test]
async fn test_delta() {
let srv = test_features_server().await;
let unleash_client = Arc::new(UnleashClient::new(srv.url("/").as_str(), None).unwrap());
let features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());

let feature_refresher = Arc::new(FeatureRefresher {
unleash_client: unleash_client.clone(),
tokens_to_refresh: Arc::new(Default::default()),
features_cache: features_cache.clone(),
engine_cache: engine_cache.clone(),
refresh_interval: Duration::seconds(6000),
persistence: None,
strict: false,
streaming: false,
delta: true,
client_meta_information:ClientMetaInformation::test_config(),
});
let features = ClientFeatures {
version: 2,
features: vec![],
segments: None,
query: None,
meta: None,
};
let initial_features = features.modify_and_copy(&revision(1));
let final_features = initial_features.modify_and_copy(&revision(2));
let token =
EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
feature_refresher
.register_token_for_refresh(token.clone(), None)
.await;
feature_refresher.refresh_features().await;
let refreshed_features = features_cache
.get(&cache_key(&token))
.unwrap()
.value()
.clone();
assert_eq!(refreshed_features, initial_features);

let token_refresh = feature_refresher
.tokens_to_refresh
.get(&token.token)
.unwrap()
.clone();
feature_refresher.refresh_single_delta(token_refresh).await;
let refreshed_features = features_cache
.get(&cache_key(&token))
.unwrap()
.value()
.clone();
assert_eq!(refreshed_features, final_features);
}

fn cache_key(token: &EdgeToken) -> String {
token
.environment
.clone()
.unwrap_or_else(|| token.token.clone())
}

fn revision(revision_id: u32) -> ClientFeaturesDelta {
match revision_id {
1 => ClientFeaturesDelta {
updated: vec![
ClientFeature {
name: "test1".into(),
feature_type: Some("release".into()),
..Default::default()
},
ClientFeature {
name: "test2".into(),
feature_type: Some("release".into()),
..Default::default()
},
],
removed: vec![],
segments: Some(vec![Segment {
id: 1,
constraints: vec![Constraint {
context_name: "userId".into(),
operator: Operator::In,
case_insensitive: false,
inverted: false,
values: Some(vec!["7".into()]),
value: None,
}],
}]),
revision_id: 1,
},
_ => ClientFeaturesDelta {
updated: vec![ClientFeature {
name: "test1".into(),
feature_type: Some("release".into()),
..Default::default()
}],
removed: vec!["test2".to_string()],
segments: None,
revision_id: 2,
},
}
}

async fn return_client_features_delta(etag_header: Option<String>) -> HttpResponse {
match etag_header {
Some(value) => match value.as_str() {
"\"1\"" => HttpResponse::Ok()
.insert_header(ETag(EntityTag::new_strong("2".to_string())))
.json(revision(2)),
"\"2\"" => HttpResponse::NotModified().finish(),
_ => HttpResponse::NotModified().finish(),
},
None => HttpResponse::Ok()
.insert_header(ETag(EntityTag::new_strong("1".to_string())))
.json(revision(1)),
}
}

async fn test_features_server() -> TestServer {
test_server(move || {
HttpService::new(map_config(
App::new().service(web::resource("/api/client/delta").route(web::get().to(
|req: HttpRequest| {
let etag_header = req
.headers()
.get(IF_NONE_MATCH)
.and_then(|h| h.to_str().ok());
return_client_features_delta(etag_header.map(|s| s.to_string()))
},
))),
|_| AppConfig::default(),
))
.tcp()
})
.await
}
}
Loading
Loading