From f51128744f0f3759d3a8a6f8456130946bc998dd Mon Sep 17 00:00:00 2001 From: Thomas Heartman Date: Thu, 12 Dec 2024 10:20:05 +0100 Subject: [PATCH] experimental: sse client support (#592) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit > ⚠️ This is experimental and a work in progress ## Overview tl;dr: This is a very experimental implementation of streaming in Edge: it both listens to the experimental Unleash streaming endpoint and pushes updates to any listeners subscribing to Edge (in effect mirroring the Unleash endpoint). All related code is behind the "streaming" feature gate. More detail: I have added an event source client to the `features_refresher` file. If the streaming flag is on, we'll spawn a task that takes care of listening to Unleash for events instead of starting the poll loop for flags. There is a new endpoint at `api/client/streaming` (mirroring the Unleash endpoint) that you can hit to start listening for updates. The updates are handled by a new `broadcaster` module (largely stolen from [this Actix example](https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs)). The broadcaster stores its client in a hash map that uses the flag query as the key and maps it to a vec of clients that use the same query. ## Left to do Regarding the implementation: I'm not very familiar with Actix, and I haven't done a whole lot of async / multithreaded rust before, so there's probably gonna be a whole lot of things that we can improve, from the architecture level to the specific data structures used. ~~Also, due to the very time-limited spike we did, we need to actually do some real error handling. There's a good few places where we either ignore errors or would just panic if we ever encountered them.~~ But aside from that, there's a few other things to do too: - [x] Store the streaming url in the urls struct - [ ] Add fetch mode streaming/polling as a CLI option - [ ] The broadcaster needs to store the token used with each client; not just the query (you can have multiple tokens with the same query and those tokens can be invalidated separately) - [ ] We should probably find a more sensible way to use the query as a key in the hash map: serializing it and hashing the string seems roundabout and wonky. --- Cargo.lock | 189 +++++++++++++++++++++++- server/Cargo.toml | 6 + server/src/client_api.rs | 80 ++++++++--- server/src/error.rs | 12 ++ server/src/http/broadcaster.rs | 208 +++++++++++++++++++++++++++ server/src/http/feature_refresher.rs | 164 +++++++++++++++++---- server/src/http/mod.rs | 2 + server/src/main.rs | 81 ++++++++--- server/src/urls.rs | 10 ++ 9 files changed, 682 insertions(+), 70 deletions(-) create mode 100644 server/src/http/broadcaster.rs diff --git a/Cargo.lock b/Cargo.lock index fc8c4dd8..18563a80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,7 +27,7 @@ checksum = "f9e772b3bcafe335042b5db010ab7c09013dad6eac4915c91d8d50902769f331" dependencies = [ "actix-utils", "actix-web", - "derive_more", + "derive_more 0.99.18", "futures-util", "log", "once_cell", @@ -51,7 +51,7 @@ dependencies = [ "brotli", "bytes", "bytestring", - "derive_more", + "derive_more 0.99.18", "encoding_rs", "flate2", "futures-core", @@ -229,7 +229,7 @@ dependencies = [ "bytestring", "cfg-if", "cookie", - "derive_more", + "derive_more 0.99.18", "encoding_rs", "futures-core", "futures-util", @@ -263,6 +263,55 @@ dependencies = [ "syn 2.0.82", ] +[[package]] +name = "actix-web-lab" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee75923689132fc5fb57ccc5bb98d25bb214796a29cd505844eb3b42daf11df0" +dependencies = [ + "actix-http", + "actix-router", + "actix-service", + "actix-utils", + "actix-web", + "actix-web-lab-derive", + "ahash", + "arc-swap", + "bytes", + "bytestring", + "csv", + "derive_more 1.0.0", + "form_urlencoded", + "futures-core", + "futures-util", + "http 0.2.12", + "impl-more", + "itertools 0.13.0", + "local-channel", + "mediatype", + "mime", + "pin-project-lite", + "regex", + "serde", + "serde_html_form", + "serde_json", + "serde_path_to_error", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "actix-web-lab-derive" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c221da13534b9352f3f79fcbbd6095f6d8aee63bdf1da8a73d36f9eeea17d5a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -444,7 +493,7 @@ dependencies = [ "base64 0.22.1", "bytes", "cfg-if", - "derive_more", + "derive_more 0.99.18", "futures-core", "futures-util", "h2 0.3.26", @@ -1281,6 +1330,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "darling" version = "0.20.10" @@ -1405,6 +1475,27 @@ dependencies = [ "syn 2.0.82", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" @@ -1535,6 +1626,22 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "eventsource-client" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43ddc25e1ad2cc0106d5e2d967397b4fb2068a66677ee9b0eea4600e5cfe8fb4" +dependencies = [ + "futures", + "hyper 0.14.31", + "hyper-rustls 0.24.2", + "hyper-timeout", + "log", + "pin-project", + "rand", + "tokio", +] + [[package]] name = "fastrand" version = "2.1.1" @@ -2022,6 +2129,18 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.31", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -2350,6 +2469,12 @@ dependencies = [ "digest", ] +[[package]] +name = "mediatype" +version = "0.19.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8878cd8d1b3c8c8ae4b2ba0a36652b7cf192f618a599a7fbdfa25cffd4ea72dd" + [[package]] name = "memchr" version = "2.7.4" @@ -2721,6 +2846,26 @@ dependencies = [ "sha2", ] +[[package]] +name = "pin-project" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -3395,6 +3540,19 @@ dependencies = [ "syn 2.0.82", ] +[[package]] +name = "serde_html_form" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" +dependencies = [ + "form_urlencoded", + "indexmap 2.6.0", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.132" @@ -3407,6 +3565,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_qs" version = "0.13.0" @@ -3876,6 +4044,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.4.0" @@ -4169,6 +4347,7 @@ dependencies = [ "actix-middleware-etag", "actix-service", "actix-web", + "actix-web-lab", "ahash", "anyhow", "async-trait", @@ -4182,6 +4361,7 @@ dependencies = [ "clap-markdown", "dashmap", "env_logger", + "eventsource-client", "futures", "futures-core", "iter_tools", @@ -4211,6 +4391,7 @@ dependencies = [ "testcontainers", "testcontainers-modules", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "tracing-test", diff --git a/server/Cargo.toml b/server/Cargo.toml index 24ae6573..446b2dc3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -15,6 +15,9 @@ repository = "https://github.com/Unleash/unleash-edge" rust-version = "1.81.0" version = "19.6.3" +[features] +streaming = ["actix-web-lab", "eventsource-client", "tokio-stream"] + [package.metadata.wix] upgrade-guid = "11E5D83A-3034-48BB-9A84-9F589EBD648C" path-guid = "6F606A3B-C7E9-43EC-8B6E-91D7B74F80FC" @@ -30,6 +33,7 @@ actix-http = "3.9.0" actix-middleware-etag = "0.4.2" actix-service = "2.0.2" actix-web = { version = "4.9.0", features = ["rustls-0_23", "compress-zstd"] } +actix-web-lab = { version = "0.23.0", optional = true } ahash = "0.8.11" anyhow = "1.0.91" async-trait = "0.1.83" @@ -41,6 +45,7 @@ cidr = "0.3.0" clap = { version = "4.5.19", features = ["derive", "env"] } clap-markdown = "0.1.4" dashmap = "6.0.1" +eventsource-client = { version = "0.13.0", optional = true } futures = "0.3.30" futures-core = "0.3.30" iter_tools = "0.24.0" @@ -88,6 +93,7 @@ tokio = { version = "1.42.0", features = [ "tracing", "fs", ] } +tokio-stream = { version = "0.1.16", optional = true } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } ulid = "1.1.2" diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 18e60133..9806e846 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -9,6 +9,8 @@ use crate::types::{ self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters, }; use actix_web::web::{self, Data, Json, Query}; +#[cfg(feature = "streaming")] +use actix_web::Responder; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; use unleash_types::client_features::{ClientFeature, ClientFeatures}; @@ -36,6 +38,28 @@ pub async fn get_features( ) -> EdgeJsonResult { resolve_features(edge_token, features_cache, token_cache, filter_query, req).await } + +#[cfg(feature = "streaming")] +#[get("/streaming")] +pub async fn stream_features( + edge_token: EdgeToken, + token_cache: Data>, + filter_query: Query, + req: HttpRequest, +) -> EdgeResult { + let (validated_token, _filter_set, query) = + get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; + match req.app_data::>() { + Some(refresher) => { + refresher + .broadcaster + .connect(validated_token, filter_query, query) + .await + } + _ => Err(EdgeError::ClientCacheError), + } +} + #[utoipa::path( context_path = "/api/client", params(FeatureFilters), @@ -59,13 +83,15 @@ pub async fn post_features( resolve_features(edge_token, features_cache, token_cache, filter_query, req).await } -async fn resolve_features( - edge_token: EdgeToken, - features_cache: Data>, - token_cache: Data>, +fn get_feature_filter( + edge_token: &EdgeToken, + token_cache: &Data>, filter_query: Query, - req: HttpRequest, -) -> EdgeJsonResult { +) -> EdgeResult<( + EdgeToken, + FeatureFilterSet, + unleash_types::client_features::Query, +)> { let validated_token = token_cache .get(&edge_token.token) .map(|e| e.value().clone()) @@ -87,6 +113,19 @@ async fn resolve_features( } .with_filter(project_filter(&validated_token)); + Ok((validated_token, filter_set, query)) +} + +async fn resolve_features( + edge_token: EdgeToken, + features_cache: Data>, + token_cache: Data>, + filter_query: Query, + req: HttpRequest, +) -> EdgeJsonResult { + let (validated_token, filter_set, query) = + get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; + let client_features = match req.app_data::>() { Some(refresher) => { refresher @@ -230,17 +269,20 @@ pub async fn post_bulk_metrics( Ok(HttpResponse::Accepted().finish()) } pub fn configure_client_api(cfg: &mut web::ServiceConfig) { - cfg.service( - web::scope("/client") - .wrap(crate::middleware::as_async_middleware::as_async_middleware( - crate::middleware::validate_token::validate_token, - )) - .service(get_features) - .service(get_feature) - .service(register) - .service(metrics) - .service(post_bulk_metrics), - ); + let client_scope = web::scope("/client") + .wrap(crate::middleware::as_async_middleware::as_async_middleware( + crate::middleware::validate_token::validate_token, + )) + .service(get_features) + .service(get_feature) + .service(register) + .service(metrics) + .service(post_bulk_metrics); + + #[cfg(feature = "streaming")] + let client_scope = client_scope.service(stream_features); + + cfg.service(client_scope); } pub fn configure_experimental_post_features( @@ -255,6 +297,8 @@ pub fn configure_experimental_post_features( #[cfg(test)] mod tests { + #[cfg(feature = "streaming")] + use crate::http::broadcaster::Broadcaster; use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey}; use crate::types::{TokenType, TokenValidationStatus}; use std::collections::HashMap; @@ -978,6 +1022,8 @@ mod tests { persistence: None, strict: false, app_name: "test-app".into(), + #[cfg(feature = "streaming")] + broadcaster: Broadcaster::new(features_cache.clone()), }); let token_validator = Arc::new(TokenValidator { unleash_client: unleash_client.clone(), diff --git a/server/src/error.rs b/server/src/error.rs index 09b2652a..2bbefc9b 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -2,8 +2,12 @@ use std::error::Error; use std::fmt::{Display, Formatter}; use actix_web::{http::StatusCode, HttpResponseBuilder, ResponseError}; +#[cfg(feature = "streaming")] +use actix_web_lab::sse::Event; use serde::Serialize; use serde_json::json; +#[cfg(feature = "streaming")] +use tokio::sync::mpsc::error::SendError; use tracing::debug; use crate::types::{EdgeToken, Status, UnleashBadRequest}; @@ -291,5 +295,13 @@ impl From for EdgeError { } } +#[cfg(feature = "streaming")] +impl From> for EdgeError { + // todo: create better enum representation. use this is placeholder + fn from(_value: SendError) -> Self { + EdgeError::TlsError + } +} + #[cfg(test)] mod tests {} diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs new file mode 100644 index 00000000..0b4a4250 --- /dev/null +++ b/server/src/http/broadcaster.rs @@ -0,0 +1,208 @@ +use std::{ + hash::{Hash, Hasher}, + sync::Arc, + time::Duration, +}; + +use actix_web::{ + rt::time::interval, + web::{Json, Query}, +}; +use actix_web_lab::{ + sse::{self, Event, Sse}, + util::InfallibleStream, +}; +use dashmap::DashMap; +use futures::future; +use serde::Serialize; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::warn; +use unleash_types::client_features::{ClientFeatures, Query as FlagQuery}; + +use crate::{ + error::EdgeError, + filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet}, + tokens::cache_key, + types::{EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters}, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +struct QueryWrapper { + query: FlagQuery, +} + +impl Hash for QueryWrapper { + fn hash(&self, state: &mut H) { + serde_json::to_string(&self.query).unwrap().hash(state); + } +} + +#[derive(Clone, Debug)] +struct ClientGroup { + clients: Vec>, + filter_set: Query, + token: EdgeToken, +} + +pub struct Broadcaster { + active_connections: DashMap, + features_cache: Arc>, +} + +impl Broadcaster { + /// Constructs new broadcaster and spawns ping loop. + pub fn new(features: Arc>) -> Arc { + let this = Arc::new(Broadcaster { + active_connections: DashMap::new(), + features_cache: features, + }); + + Broadcaster::spawn_heartbeat(Arc::clone(&this)); + + this + } + + /// Pings clients every 30 seconds to see if they are alive and remove them from the broadcast + /// list if not. + fn spawn_heartbeat(this: Arc) { + tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(30)); + + loop { + interval.tick().await; + this.heartbeat().await; + } + }); + } + + /// Removes all non-responsive clients from broadcast list. + async fn heartbeat(&self) { + for mut group in self.active_connections.iter_mut() { + let mut ok_clients = Vec::new(); + + for client in &group.clients { + if client + .send(sse::Event::Comment("keep-alive".into())) + .await + .is_ok() + { + ok_clients.push(client.clone()); + } + } + + // validate tokens here? + // ok_clients.iter().filter(|client| client.token_is_valid()) + + group.clients = ok_clients; + } + } + + /// Registers client with broadcaster, returning an SSE response body. + pub async fn connect( + &self, + token: EdgeToken, + filter_set: Query, + query: unleash_types::client_features::Query, + ) -> EdgeResult>>> { + let (tx, rx) = mpsc::channel(10); + + let features = &self + .resolve_features(&token, filter_set.clone(), query.clone()) + .await?; + + tx.send( + sse::Data::new_json(features)? + .event("unleash-connected") + .into(), + ) + .await?; + + self.active_connections + .entry(QueryWrapper { query }) + .and_modify(|group| { + group.clients.push(tx.clone()); + }) + .or_insert(ClientGroup { + clients: vec![tx.clone()], + filter_set, + token, + }); + + Ok(Sse::from_infallible_receiver(rx)) + } + + fn get_query_filters( + filter_query: Query, + token: &EdgeToken, + ) -> FeatureFilterSet { + let query_filters = filter_query.into_inner(); + + let filter_set = if let Some(name_prefix) = query_filters.name_prefix { + FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix))) + } else { + FeatureFilterSet::default() + } + .with_filter(project_filter(token)); + filter_set + } + + async fn resolve_features( + &self, + validated_token: &EdgeToken, + filter_set: Query, + query: FlagQuery, + ) -> EdgeJsonResult { + let filter_set = Broadcaster::get_query_filters(filter_set.clone(), validated_token); + + let features = self + .features_cache + .get(&cache_key(validated_token)) + .map(|client_features| filter_client_features(&client_features, &filter_set)); + + match features { + Some(features) => Ok(Json(ClientFeatures { + query: Some(query), + ..features + })), + // Note: this is a simplification for now, using the following assumptions: + // 1. We'll only allow streaming in strict mode + // 2. We'll check whether the token is subsumed *before* trying to add it to the broadcaster + // If both of these are true, then we should never hit this case (if Thomas's understanding is correct). + None => Err(EdgeError::ClientCacheError), + } + } + + /// Broadcast new features to all clients. + pub async fn broadcast(&self) { + let mut client_events = Vec::new(); + for entry in self.active_connections.iter() { + let (query, group) = entry.pair(); + + let event_data = self + .resolve_features(&group.token, group.filter_set.clone(), query.query.clone()) + .await + .and_then(|features| sse::Data::new_json(&features).map_err(|e| e.into())); + + match event_data { + Ok(sse_data) => { + let event: Event = sse_data.event("unleash-updated").into(); + + for client in &group.clients { + client_events.push((client.clone(), event.clone())); + } + } + Err(e) => { + warn!("Failed to broadcast features: {:?}", e); + } + } + } + // try to send to all clients, ignoring failures + // disconnected clients will get swept up by `remove_stale_clients` + let send_events = client_events + .iter() + .map(|(client, event)| client.send(event.clone())); + + let _ = future::join_all(send_events).await; + } +} diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index e62d7d38..3d6e526e 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -4,6 +4,10 @@ use std::{sync::Arc, time::Duration}; use actix_web::http::header::EntityTag; use chrono::Utc; use dashmap::DashMap; +#[cfg(feature = "streaming")] +use eventsource_client::Client; +#[cfg(feature = "streaming")] +use futures::TryStreamExt; use reqwest::StatusCode; use tracing::{debug, info, warn}; use unleash_types::client_features::Segment; @@ -23,6 +27,8 @@ use crate::{ types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, }; +#[cfg(feature = "streaming")] +use super::broadcaster::Broadcaster; use super::unleash_client::UnleashClient; fn frontend_token_is_covered_by_tokens( @@ -102,6 +108,8 @@ pub struct FeatureRefresher { pub persistence: Option>, pub strict: bool, pub app_name: String, + #[cfg(feature = "streaming")] + pub broadcaster: Arc, } impl Default for FeatureRefresher { @@ -115,6 +123,8 @@ impl Default for FeatureRefresher { persistence: None, strict: true, app_name: "unleash_edge".into(), + #[cfg(feature = "streaming")] + broadcaster: Broadcaster::new(Default::default()), } } } @@ -154,6 +164,8 @@ impl FeatureRefresher { FeatureRefresher { unleash_client, tokens_to_refresh: Arc::new(DashMap::default()), + #[cfg(feature = "streaming")] + broadcaster: Broadcaster::new(features.clone()), features_cache: features, engine_cache: engines, refresh_interval: features_refresh_interval, @@ -299,6 +311,92 @@ impl FeatureRefresher { } } + /// This is where we set up a listener per token. + #[cfg(feature = "streaming")] + pub async fn start_streaming_features_background_task(&self) -> anyhow::Result<()> { + use anyhow::Context; + + let refreshes = self.get_tokens_due_for_refresh(); + for refresh in refreshes { + let token = refresh.token.clone(); + let streaming_url = self.unleash_client.urls.client_features_stream_url.as_str(); + + let es_client = eventsource_client::ClientBuilder::for_url(streaming_url) + .context("Failed to create EventSource client for streaming")? + .header("Authorization", &token.token)? + .reconnect( + eventsource_client::ReconnectOptions::reconnect(true) + .retry_initial(true) + .delay(Duration::from_secs(5)) + .delay_max(Duration::from_secs(30)) + .backoff_factor(2) + .build(), + ) + .build(); + + let refresher = self.clone(); + + tokio::spawn(async move { + let mut stream = es_client + .stream() + .map_ok(move |sse| { + let token = token.clone(); + let refresher = refresher.clone(); + async move { + match sse { + // The first time we're connecting to Unleash. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-connected" => + { + debug!( + "Connected to unleash! Populating my flag cache now.", + ); + + match serde_json::from_str(&event.data) { + Ok(features) => { refresher.handle_client_features_updated(&token, features, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + // Unleash has updated features for us. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-updated" => + { + debug!( + "Got an unleash updated event. Updating cache.", + ); + + match serde_json::from_str(&event.data) { + Ok(features) => { refresher.handle_client_features_updated(&token, features, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + eventsource_client::SSE::Event(event) => { + info!( + "Got an SSE event that I wasn't expecting: {:#?}", + event + ); + } + eventsource_client::SSE::Connected(_) => { + debug!("SSE Connection established"); + } + eventsource_client::SSE::Comment(_) => { + // purposefully left blank. + }, + } + } + }) + .map_err(|e| warn!("Error in SSE stream: {:?}", e)); + + while let Ok(Some(handler)) = stream.try_next().await { + handler.await; + } + }); + } + Ok(()) + } + pub async fn start_refresh_features_background_task(&self) { loop { tokio::select! { @@ -322,34 +420,23 @@ impl FeatureRefresher { } } - pub async fn refresh_single(&self, refresh: TokenRefresh) { - let features_result = self - .unleash_client - .get_client_features(ClientFeaturesRequest { - api_key: refresh.token.token.clone(), - etag: refresh.etag, + async fn handle_client_features_updated( + &self, + refresh_token: &EdgeToken, + features: ClientFeatures, + etag: Option, + ) { + debug!("Got updated client features. Updating features with {etag:?}"); + let key = cache_key(refresh_token); + self.update_last_refresh(refresh_token, etag, features.features.len()); + self.features_cache + .entry(key.clone()) + .and_modify(|existing_data| { + let updated_data = update_client_features(refresh_token, existing_data, &features); + *existing_data = updated_data; }) - .await; - - match features_result { - Ok(feature_response) => match feature_response { - ClientFeaturesResponse::NoUpdate(tag) => { - debug!("No update needed. Will update last check time with {tag}"); - self.update_last_check(&refresh.token.clone()); - } - ClientFeaturesResponse::Updated(features, etag) => { - debug!("Got updated client features. Updating features with {etag:?}"); - let key = cache_key(&refresh.token); - self.update_last_refresh(&refresh.token, etag, features.features.len()); - self.features_cache - .entry(key.clone()) - .and_modify(|existing_data| { - let updated_data = - update_client_features(&refresh.token, existing_data, &features); - *existing_data = updated_data; - }) - .or_insert_with(|| features.clone()); - self.engine_cache + .or_insert_with(|| features.clone()); + self.engine_cache .entry(key.clone()) .and_modify(|engine| { if let Some(f) = self.features_cache.get(&key) { @@ -371,6 +458,29 @@ impl FeatureRefresher { }; new_state }); + + #[cfg(feature = "streaming")] + self.broadcaster.broadcast().await; + } + + pub async fn refresh_single(&self, refresh: TokenRefresh) { + let features_result = self + .unleash_client + .get_client_features(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }) + .await; + + match features_result { + Ok(feature_response) => match feature_response { + ClientFeaturesResponse::NoUpdate(tag) => { + debug!("No update needed. Will update last check time with {tag}"); + self.update_last_check(&refresh.token.clone()); + } + ClientFeaturesResponse::Updated(features, etag) => { + self.handle_client_features_updated(&refresh.token, features, etag) + .await } }, Err(e) => { diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index a550af88..a7849f6a 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -1,4 +1,6 @@ #[cfg(not(tarpaulin_include))] pub mod background_send_metrics; +#[cfg(feature = "streaming")] +pub mod broadcaster; pub mod feature_refresher; pub mod unleash_client; diff --git a/server/src/main.rs b/server/src/main.rs index 85307cc8..811f8bbe 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -77,6 +77,7 @@ async fn main() -> Result<(), anyhow::Error> { let openapi = openapi::ApiDoc::openapi(); let refresher_for_app_data = feature_refresher.clone(); let prom_registry_for_write = metrics_handler.registry.clone(); + let server = HttpServer::new(move || { let qs_config = serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false)); @@ -149,31 +150,67 @@ async fn main() -> Result<(), anyhow::Error> { match schedule_args.mode { cli::EdgeMode::Edge(edge) => { + #[cfg(feature = "streaming")] + { + let refresher_for_background = feature_refresher.clone().unwrap(); + tokio::spawn(async move { + let _ = refresher_for_background + .start_streaming_features_background_task() + .await; + }); + } let refresher = feature_refresher.clone().unwrap(); + let validator = token_validator_schedule.clone().unwrap(); - tokio::select! { - _ = server.run() => { - tracing::info!("Actix is shutting down. Persisting data"); - clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; - tracing::info!("Actix was shutdown properly"); - }, - _ = refresher.start_refresh_features_background_task() => { - tracing::info!("Feature refresher unexpectedly shut down"); - } - _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { - tracing::info!("Metrics poster unexpectedly shut down"); - } - _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { - tracing::info!("Persister was unexpectedly shut down"); - } - _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { - tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); - } - _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { - tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); + + if cfg!(feature = "streaming") { + tokio::select! { + _ = server.run() => { + tracing::info!("Actix is shutting down. Persisting data"); + clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; + tracing::info!("Actix was shutdown properly"); + }, + _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { + tracing::info!("Metrics poster unexpectedly shut down"); + } + _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { + tracing::info!("Persister was unexpectedly shut down"); + } + _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { + tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); + } + _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { + tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); + } + _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { + tracing::info!("Prometheus push unexpectedly shut down"); + } } - _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { - tracing::info!("Prometheus push unexpectedly shut down"); + } else { + tokio::select! { + _ = server.run() => { + tracing::info!("Actix is shutting down. Persisting data"); + clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; + tracing::info!("Actix was shutdown properly"); + }, + _ = refresher.start_refresh_features_background_task() => { + tracing::info!("Feature refresher unexpectedly shut down"); + } + _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { + tracing::info!("Metrics poster unexpectedly shut down"); + } + _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { + tracing::info!("Persister was unexpectedly shut down"); + } + _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { + tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); + } + _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { + tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); + } + _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { + tracing::info!("Prometheus push unexpectedly shut down"); + } } } } diff --git a/server/src/urls.rs b/server/src/urls.rs index 65b065da..0ae87209 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -17,6 +17,8 @@ pub struct UnleashUrls { pub edge_validate_url: Url, pub edge_metrics_url: Url, pub new_api_token_url: Url, + #[cfg(feature = "streaming")] + pub client_features_stream_url: Url, } impl FromStr for UnleashUrls { @@ -49,6 +51,12 @@ impl UnleashUrls { .path_segments_mut() .unwrap() .push("features"); + #[cfg(feature = "streaming")] + let mut client_features_stream_url = client_api_url.clone(); + client_features_stream_url + .path_segments_mut() + .unwrap() + .push("streaming"); let mut client_register_app_url = client_api_url.clone(); client_register_app_url .path_segments_mut() @@ -100,6 +108,8 @@ impl UnleashUrls { edge_validate_url, edge_metrics_url, new_api_token_url, + #[cfg(feature = "streaming")] + client_features_stream_url, } } }