diff --git a/Cargo.lock b/Cargo.lock index fc8c4dd8..18563a80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,7 +27,7 @@ checksum = "f9e772b3bcafe335042b5db010ab7c09013dad6eac4915c91d8d50902769f331" dependencies = [ "actix-utils", "actix-web", - "derive_more", + "derive_more 0.99.18", "futures-util", "log", "once_cell", @@ -51,7 +51,7 @@ dependencies = [ "brotli", "bytes", "bytestring", - "derive_more", + "derive_more 0.99.18", "encoding_rs", "flate2", "futures-core", @@ -229,7 +229,7 @@ dependencies = [ "bytestring", "cfg-if", "cookie", - "derive_more", + "derive_more 0.99.18", "encoding_rs", "futures-core", "futures-util", @@ -263,6 +263,55 @@ dependencies = [ "syn 2.0.82", ] +[[package]] +name = "actix-web-lab" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee75923689132fc5fb57ccc5bb98d25bb214796a29cd505844eb3b42daf11df0" +dependencies = [ + "actix-http", + "actix-router", + "actix-service", + "actix-utils", + "actix-web", + "actix-web-lab-derive", + "ahash", + "arc-swap", + "bytes", + "bytestring", + "csv", + "derive_more 1.0.0", + "form_urlencoded", + "futures-core", + "futures-util", + "http 0.2.12", + "impl-more", + "itertools 0.13.0", + "local-channel", + "mediatype", + "mime", + "pin-project-lite", + "regex", + "serde", + "serde_html_form", + "serde_json", + "serde_path_to_error", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "actix-web-lab-derive" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c221da13534b9352f3f79fcbbd6095f6d8aee63bdf1da8a73d36f9eeea17d5a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -444,7 +493,7 @@ dependencies = [ "base64 0.22.1", "bytes", "cfg-if", - "derive_more", + "derive_more 0.99.18", "futures-core", "futures-util", "h2 0.3.26", @@ -1281,6 +1330,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "darling" version = "0.20.10" @@ -1405,6 +1475,27 @@ dependencies = [ "syn 2.0.82", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" @@ -1535,6 +1626,22 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "eventsource-client" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43ddc25e1ad2cc0106d5e2d967397b4fb2068a66677ee9b0eea4600e5cfe8fb4" +dependencies = [ + "futures", + "hyper 0.14.31", + "hyper-rustls 0.24.2", + "hyper-timeout", + "log", + "pin-project", + "rand", + "tokio", +] + [[package]] name = "fastrand" version = "2.1.1" @@ -2022,6 +2129,18 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.31", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -2350,6 +2469,12 @@ dependencies = [ "digest", ] +[[package]] +name = "mediatype" +version = "0.19.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8878cd8d1b3c8c8ae4b2ba0a36652b7cf192f618a599a7fbdfa25cffd4ea72dd" + [[package]] name = "memchr" version = "2.7.4" @@ -2721,6 +2846,26 @@ dependencies = [ "sha2", ] +[[package]] +name = "pin-project" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -3395,6 +3540,19 @@ dependencies = [ "syn 2.0.82", ] +[[package]] +name = "serde_html_form" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" +dependencies = [ + "form_urlencoded", + "indexmap 2.6.0", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.132" @@ -3407,6 +3565,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_qs" version = "0.13.0" @@ -3876,6 +4044,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.4.0" @@ -4169,6 +4347,7 @@ dependencies = [ "actix-middleware-etag", "actix-service", "actix-web", + "actix-web-lab", "ahash", "anyhow", "async-trait", @@ -4182,6 +4361,7 @@ dependencies = [ "clap-markdown", "dashmap", "env_logger", + "eventsource-client", "futures", "futures-core", "iter_tools", @@ -4211,6 +4391,7 @@ dependencies = [ "testcontainers", "testcontainers-modules", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "tracing-test", diff --git a/server/Cargo.toml b/server/Cargo.toml index 24ae6573..446b2dc3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -15,6 +15,9 @@ repository = "https://github.com/Unleash/unleash-edge" rust-version = "1.81.0" version = "19.6.3" +[features] +streaming = ["actix-web-lab", "eventsource-client", "tokio-stream"] + [package.metadata.wix] upgrade-guid = "11E5D83A-3034-48BB-9A84-9F589EBD648C" path-guid = "6F606A3B-C7E9-43EC-8B6E-91D7B74F80FC" @@ -30,6 +33,7 @@ actix-http = "3.9.0" actix-middleware-etag = "0.4.2" actix-service = "2.0.2" actix-web = { version = "4.9.0", features = ["rustls-0_23", "compress-zstd"] } +actix-web-lab = { version = "0.23.0", optional = true } ahash = "0.8.11" anyhow = "1.0.91" async-trait = "0.1.83" @@ -41,6 +45,7 @@ cidr = "0.3.0" clap = { version = "4.5.19", features = ["derive", "env"] } clap-markdown = "0.1.4" dashmap = "6.0.1" +eventsource-client = { version = "0.13.0", optional = true } futures = "0.3.30" futures-core = "0.3.30" iter_tools = "0.24.0" @@ -88,6 +93,7 @@ tokio = { version = "1.42.0", features = [ "tracing", "fs", ] } +tokio-stream = { version = "0.1.16", optional = true } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } ulid = "1.1.2" diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 18e60133..9806e846 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -9,6 +9,8 @@ use crate::types::{ self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters, }; use actix_web::web::{self, Data, Json, Query}; +#[cfg(feature = "streaming")] +use actix_web::Responder; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; use unleash_types::client_features::{ClientFeature, ClientFeatures}; @@ -36,6 +38,28 @@ pub async fn get_features( ) -> EdgeJsonResult { resolve_features(edge_token, features_cache, token_cache, filter_query, req).await } + +#[cfg(feature = "streaming")] +#[get("/streaming")] +pub async fn stream_features( + edge_token: EdgeToken, + token_cache: Data>, + filter_query: Query, + req: HttpRequest, +) -> EdgeResult { + let (validated_token, _filter_set, query) = + get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; + match req.app_data::>() { + Some(refresher) => { + refresher + .broadcaster + .connect(validated_token, filter_query, query) + .await + } + _ => Err(EdgeError::ClientCacheError), + } +} + #[utoipa::path( context_path = "/api/client", params(FeatureFilters), @@ -59,13 +83,15 @@ pub async fn post_features( resolve_features(edge_token, features_cache, token_cache, filter_query, req).await } -async fn resolve_features( - edge_token: EdgeToken, - features_cache: Data>, - token_cache: Data>, +fn get_feature_filter( + edge_token: &EdgeToken, + token_cache: &Data>, filter_query: Query, - req: HttpRequest, -) -> EdgeJsonResult { +) -> EdgeResult<( + EdgeToken, + FeatureFilterSet, + unleash_types::client_features::Query, +)> { let validated_token = token_cache .get(&edge_token.token) .map(|e| e.value().clone()) @@ -87,6 +113,19 @@ async fn resolve_features( } .with_filter(project_filter(&validated_token)); + Ok((validated_token, filter_set, query)) +} + +async fn resolve_features( + edge_token: EdgeToken, + features_cache: Data>, + token_cache: Data>, + filter_query: Query, + req: HttpRequest, +) -> EdgeJsonResult { + let (validated_token, filter_set, query) = + get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; + let client_features = match req.app_data::>() { Some(refresher) => { refresher @@ -230,17 +269,20 @@ pub async fn post_bulk_metrics( Ok(HttpResponse::Accepted().finish()) } pub fn configure_client_api(cfg: &mut web::ServiceConfig) { - cfg.service( - web::scope("/client") - .wrap(crate::middleware::as_async_middleware::as_async_middleware( - crate::middleware::validate_token::validate_token, - )) - .service(get_features) - .service(get_feature) - .service(register) - .service(metrics) - .service(post_bulk_metrics), - ); + let client_scope = web::scope("/client") + .wrap(crate::middleware::as_async_middleware::as_async_middleware( + crate::middleware::validate_token::validate_token, + )) + .service(get_features) + .service(get_feature) + .service(register) + .service(metrics) + .service(post_bulk_metrics); + + #[cfg(feature = "streaming")] + let client_scope = client_scope.service(stream_features); + + cfg.service(client_scope); } pub fn configure_experimental_post_features( @@ -255,6 +297,8 @@ pub fn configure_experimental_post_features( #[cfg(test)] mod tests { + #[cfg(feature = "streaming")] + use crate::http::broadcaster::Broadcaster; use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey}; use crate::types::{TokenType, TokenValidationStatus}; use std::collections::HashMap; @@ -978,6 +1022,8 @@ mod tests { persistence: None, strict: false, app_name: "test-app".into(), + #[cfg(feature = "streaming")] + broadcaster: Broadcaster::new(features_cache.clone()), }); let token_validator = Arc::new(TokenValidator { unleash_client: unleash_client.clone(), diff --git a/server/src/error.rs b/server/src/error.rs index 09b2652a..2bbefc9b 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -2,8 +2,12 @@ use std::error::Error; use std::fmt::{Display, Formatter}; use actix_web::{http::StatusCode, HttpResponseBuilder, ResponseError}; +#[cfg(feature = "streaming")] +use actix_web_lab::sse::Event; use serde::Serialize; use serde_json::json; +#[cfg(feature = "streaming")] +use tokio::sync::mpsc::error::SendError; use tracing::debug; use crate::types::{EdgeToken, Status, UnleashBadRequest}; @@ -291,5 +295,13 @@ impl From for EdgeError { } } +#[cfg(feature = "streaming")] +impl From> for EdgeError { + // todo: create better enum representation. use this is placeholder + fn from(_value: SendError) -> Self { + EdgeError::TlsError + } +} + #[cfg(test)] mod tests {} diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs new file mode 100644 index 00000000..0b4a4250 --- /dev/null +++ b/server/src/http/broadcaster.rs @@ -0,0 +1,208 @@ +use std::{ + hash::{Hash, Hasher}, + sync::Arc, + time::Duration, +}; + +use actix_web::{ + rt::time::interval, + web::{Json, Query}, +}; +use actix_web_lab::{ + sse::{self, Event, Sse}, + util::InfallibleStream, +}; +use dashmap::DashMap; +use futures::future; +use serde::Serialize; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::warn; +use unleash_types::client_features::{ClientFeatures, Query as FlagQuery}; + +use crate::{ + error::EdgeError, + filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet}, + tokens::cache_key, + types::{EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters}, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +struct QueryWrapper { + query: FlagQuery, +} + +impl Hash for QueryWrapper { + fn hash(&self, state: &mut H) { + serde_json::to_string(&self.query).unwrap().hash(state); + } +} + +#[derive(Clone, Debug)] +struct ClientGroup { + clients: Vec>, + filter_set: Query, + token: EdgeToken, +} + +pub struct Broadcaster { + active_connections: DashMap, + features_cache: Arc>, +} + +impl Broadcaster { + /// Constructs new broadcaster and spawns ping loop. + pub fn new(features: Arc>) -> Arc { + let this = Arc::new(Broadcaster { + active_connections: DashMap::new(), + features_cache: features, + }); + + Broadcaster::spawn_heartbeat(Arc::clone(&this)); + + this + } + + /// Pings clients every 30 seconds to see if they are alive and remove them from the broadcast + /// list if not. + fn spawn_heartbeat(this: Arc) { + tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(30)); + + loop { + interval.tick().await; + this.heartbeat().await; + } + }); + } + + /// Removes all non-responsive clients from broadcast list. + async fn heartbeat(&self) { + for mut group in self.active_connections.iter_mut() { + let mut ok_clients = Vec::new(); + + for client in &group.clients { + if client + .send(sse::Event::Comment("keep-alive".into())) + .await + .is_ok() + { + ok_clients.push(client.clone()); + } + } + + // validate tokens here? + // ok_clients.iter().filter(|client| client.token_is_valid()) + + group.clients = ok_clients; + } + } + + /// Registers client with broadcaster, returning an SSE response body. + pub async fn connect( + &self, + token: EdgeToken, + filter_set: Query, + query: unleash_types::client_features::Query, + ) -> EdgeResult>>> { + let (tx, rx) = mpsc::channel(10); + + let features = &self + .resolve_features(&token, filter_set.clone(), query.clone()) + .await?; + + tx.send( + sse::Data::new_json(features)? + .event("unleash-connected") + .into(), + ) + .await?; + + self.active_connections + .entry(QueryWrapper { query }) + .and_modify(|group| { + group.clients.push(tx.clone()); + }) + .or_insert(ClientGroup { + clients: vec![tx.clone()], + filter_set, + token, + }); + + Ok(Sse::from_infallible_receiver(rx)) + } + + fn get_query_filters( + filter_query: Query, + token: &EdgeToken, + ) -> FeatureFilterSet { + let query_filters = filter_query.into_inner(); + + let filter_set = if let Some(name_prefix) = query_filters.name_prefix { + FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix))) + } else { + FeatureFilterSet::default() + } + .with_filter(project_filter(token)); + filter_set + } + + async fn resolve_features( + &self, + validated_token: &EdgeToken, + filter_set: Query, + query: FlagQuery, + ) -> EdgeJsonResult { + let filter_set = Broadcaster::get_query_filters(filter_set.clone(), validated_token); + + let features = self + .features_cache + .get(&cache_key(validated_token)) + .map(|client_features| filter_client_features(&client_features, &filter_set)); + + match features { + Some(features) => Ok(Json(ClientFeatures { + query: Some(query), + ..features + })), + // Note: this is a simplification for now, using the following assumptions: + // 1. We'll only allow streaming in strict mode + // 2. We'll check whether the token is subsumed *before* trying to add it to the broadcaster + // If both of these are true, then we should never hit this case (if Thomas's understanding is correct). + None => Err(EdgeError::ClientCacheError), + } + } + + /// Broadcast new features to all clients. + pub async fn broadcast(&self) { + let mut client_events = Vec::new(); + for entry in self.active_connections.iter() { + let (query, group) = entry.pair(); + + let event_data = self + .resolve_features(&group.token, group.filter_set.clone(), query.query.clone()) + .await + .and_then(|features| sse::Data::new_json(&features).map_err(|e| e.into())); + + match event_data { + Ok(sse_data) => { + let event: Event = sse_data.event("unleash-updated").into(); + + for client in &group.clients { + client_events.push((client.clone(), event.clone())); + } + } + Err(e) => { + warn!("Failed to broadcast features: {:?}", e); + } + } + } + // try to send to all clients, ignoring failures + // disconnected clients will get swept up by `remove_stale_clients` + let send_events = client_events + .iter() + .map(|(client, event)| client.send(event.clone())); + + let _ = future::join_all(send_events).await; + } +} diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index e62d7d38..3d6e526e 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -4,6 +4,10 @@ use std::{sync::Arc, time::Duration}; use actix_web::http::header::EntityTag; use chrono::Utc; use dashmap::DashMap; +#[cfg(feature = "streaming")] +use eventsource_client::Client; +#[cfg(feature = "streaming")] +use futures::TryStreamExt; use reqwest::StatusCode; use tracing::{debug, info, warn}; use unleash_types::client_features::Segment; @@ -23,6 +27,8 @@ use crate::{ types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, }; +#[cfg(feature = "streaming")] +use super::broadcaster::Broadcaster; use super::unleash_client::UnleashClient; fn frontend_token_is_covered_by_tokens( @@ -102,6 +108,8 @@ pub struct FeatureRefresher { pub persistence: Option>, pub strict: bool, pub app_name: String, + #[cfg(feature = "streaming")] + pub broadcaster: Arc, } impl Default for FeatureRefresher { @@ -115,6 +123,8 @@ impl Default for FeatureRefresher { persistence: None, strict: true, app_name: "unleash_edge".into(), + #[cfg(feature = "streaming")] + broadcaster: Broadcaster::new(Default::default()), } } } @@ -154,6 +164,8 @@ impl FeatureRefresher { FeatureRefresher { unleash_client, tokens_to_refresh: Arc::new(DashMap::default()), + #[cfg(feature = "streaming")] + broadcaster: Broadcaster::new(features.clone()), features_cache: features, engine_cache: engines, refresh_interval: features_refresh_interval, @@ -299,6 +311,92 @@ impl FeatureRefresher { } } + /// This is where we set up a listener per token. + #[cfg(feature = "streaming")] + pub async fn start_streaming_features_background_task(&self) -> anyhow::Result<()> { + use anyhow::Context; + + let refreshes = self.get_tokens_due_for_refresh(); + for refresh in refreshes { + let token = refresh.token.clone(); + let streaming_url = self.unleash_client.urls.client_features_stream_url.as_str(); + + let es_client = eventsource_client::ClientBuilder::for_url(streaming_url) + .context("Failed to create EventSource client for streaming")? + .header("Authorization", &token.token)? + .reconnect( + eventsource_client::ReconnectOptions::reconnect(true) + .retry_initial(true) + .delay(Duration::from_secs(5)) + .delay_max(Duration::from_secs(30)) + .backoff_factor(2) + .build(), + ) + .build(); + + let refresher = self.clone(); + + tokio::spawn(async move { + let mut stream = es_client + .stream() + .map_ok(move |sse| { + let token = token.clone(); + let refresher = refresher.clone(); + async move { + match sse { + // The first time we're connecting to Unleash. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-connected" => + { + debug!( + "Connected to unleash! Populating my flag cache now.", + ); + + match serde_json::from_str(&event.data) { + Ok(features) => { refresher.handle_client_features_updated(&token, features, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + // Unleash has updated features for us. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-updated" => + { + debug!( + "Got an unleash updated event. Updating cache.", + ); + + match serde_json::from_str(&event.data) { + Ok(features) => { refresher.handle_client_features_updated(&token, features, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + eventsource_client::SSE::Event(event) => { + info!( + "Got an SSE event that I wasn't expecting: {:#?}", + event + ); + } + eventsource_client::SSE::Connected(_) => { + debug!("SSE Connection established"); + } + eventsource_client::SSE::Comment(_) => { + // purposefully left blank. + }, + } + } + }) + .map_err(|e| warn!("Error in SSE stream: {:?}", e)); + + while let Ok(Some(handler)) = stream.try_next().await { + handler.await; + } + }); + } + Ok(()) + } + pub async fn start_refresh_features_background_task(&self) { loop { tokio::select! { @@ -322,34 +420,23 @@ impl FeatureRefresher { } } - pub async fn refresh_single(&self, refresh: TokenRefresh) { - let features_result = self - .unleash_client - .get_client_features(ClientFeaturesRequest { - api_key: refresh.token.token.clone(), - etag: refresh.etag, + async fn handle_client_features_updated( + &self, + refresh_token: &EdgeToken, + features: ClientFeatures, + etag: Option, + ) { + debug!("Got updated client features. Updating features with {etag:?}"); + let key = cache_key(refresh_token); + self.update_last_refresh(refresh_token, etag, features.features.len()); + self.features_cache + .entry(key.clone()) + .and_modify(|existing_data| { + let updated_data = update_client_features(refresh_token, existing_data, &features); + *existing_data = updated_data; }) - .await; - - match features_result { - Ok(feature_response) => match feature_response { - ClientFeaturesResponse::NoUpdate(tag) => { - debug!("No update needed. Will update last check time with {tag}"); - self.update_last_check(&refresh.token.clone()); - } - ClientFeaturesResponse::Updated(features, etag) => { - debug!("Got updated client features. Updating features with {etag:?}"); - let key = cache_key(&refresh.token); - self.update_last_refresh(&refresh.token, etag, features.features.len()); - self.features_cache - .entry(key.clone()) - .and_modify(|existing_data| { - let updated_data = - update_client_features(&refresh.token, existing_data, &features); - *existing_data = updated_data; - }) - .or_insert_with(|| features.clone()); - self.engine_cache + .or_insert_with(|| features.clone()); + self.engine_cache .entry(key.clone()) .and_modify(|engine| { if let Some(f) = self.features_cache.get(&key) { @@ -371,6 +458,29 @@ impl FeatureRefresher { }; new_state }); + + #[cfg(feature = "streaming")] + self.broadcaster.broadcast().await; + } + + pub async fn refresh_single(&self, refresh: TokenRefresh) { + let features_result = self + .unleash_client + .get_client_features(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }) + .await; + + match features_result { + Ok(feature_response) => match feature_response { + ClientFeaturesResponse::NoUpdate(tag) => { + debug!("No update needed. Will update last check time with {tag}"); + self.update_last_check(&refresh.token.clone()); + } + ClientFeaturesResponse::Updated(features, etag) => { + self.handle_client_features_updated(&refresh.token, features, etag) + .await } }, Err(e) => { diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index a550af88..a7849f6a 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -1,4 +1,6 @@ #[cfg(not(tarpaulin_include))] pub mod background_send_metrics; +#[cfg(feature = "streaming")] +pub mod broadcaster; pub mod feature_refresher; pub mod unleash_client; diff --git a/server/src/main.rs b/server/src/main.rs index 85307cc8..811f8bbe 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -77,6 +77,7 @@ async fn main() -> Result<(), anyhow::Error> { let openapi = openapi::ApiDoc::openapi(); let refresher_for_app_data = feature_refresher.clone(); let prom_registry_for_write = metrics_handler.registry.clone(); + let server = HttpServer::new(move || { let qs_config = serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false)); @@ -149,31 +150,67 @@ async fn main() -> Result<(), anyhow::Error> { match schedule_args.mode { cli::EdgeMode::Edge(edge) => { + #[cfg(feature = "streaming")] + { + let refresher_for_background = feature_refresher.clone().unwrap(); + tokio::spawn(async move { + let _ = refresher_for_background + .start_streaming_features_background_task() + .await; + }); + } let refresher = feature_refresher.clone().unwrap(); + let validator = token_validator_schedule.clone().unwrap(); - tokio::select! { - _ = server.run() => { - tracing::info!("Actix is shutting down. Persisting data"); - clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; - tracing::info!("Actix was shutdown properly"); - }, - _ = refresher.start_refresh_features_background_task() => { - tracing::info!("Feature refresher unexpectedly shut down"); - } - _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { - tracing::info!("Metrics poster unexpectedly shut down"); - } - _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { - tracing::info!("Persister was unexpectedly shut down"); - } - _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { - tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); - } - _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { - tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); + + if cfg!(feature = "streaming") { + tokio::select! { + _ = server.run() => { + tracing::info!("Actix is shutting down. Persisting data"); + clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; + tracing::info!("Actix was shutdown properly"); + }, + _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { + tracing::info!("Metrics poster unexpectedly shut down"); + } + _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { + tracing::info!("Persister was unexpectedly shut down"); + } + _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { + tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); + } + _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { + tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); + } + _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { + tracing::info!("Prometheus push unexpectedly shut down"); + } } - _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { - tracing::info!("Prometheus push unexpectedly shut down"); + } else { + tokio::select! { + _ = server.run() => { + tracing::info!("Actix is shutting down. Persisting data"); + clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; + tracing::info!("Actix was shutdown properly"); + }, + _ = refresher.start_refresh_features_background_task() => { + tracing::info!("Feature refresher unexpectedly shut down"); + } + _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { + tracing::info!("Metrics poster unexpectedly shut down"); + } + _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { + tracing::info!("Persister was unexpectedly shut down"); + } + _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { + tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); + } + _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { + tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); + } + _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { + tracing::info!("Prometheus push unexpectedly shut down"); + } } } } diff --git a/server/src/urls.rs b/server/src/urls.rs index 65b065da..0ae87209 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -17,6 +17,8 @@ pub struct UnleashUrls { pub edge_validate_url: Url, pub edge_metrics_url: Url, pub new_api_token_url: Url, + #[cfg(feature = "streaming")] + pub client_features_stream_url: Url, } impl FromStr for UnleashUrls { @@ -49,6 +51,12 @@ impl UnleashUrls { .path_segments_mut() .unwrap() .push("features"); + #[cfg(feature = "streaming")] + let mut client_features_stream_url = client_api_url.clone(); + client_features_stream_url + .path_segments_mut() + .unwrap() + .push("streaming"); let mut client_register_app_url = client_api_url.clone(); client_register_app_url .path_segments_mut() @@ -100,6 +108,8 @@ impl UnleashUrls { edge_validate_url, edge_metrics_url, new_api_token_url, + #[cfg(feature = "streaming")] + client_features_stream_url, } } }