diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 58d134db..24f96ed4 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -48,7 +48,7 @@ pub async fn stream_features( let (validated_token, filter_set, query) = get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; let features = resolve_features_2( - query, + query.clone(), validated_token.clone(), filter_set, features_cache, @@ -56,7 +56,15 @@ pub async fn stream_features( ) .await; match (req.app_data::>(), features) { - (Some(refresher), Ok(features)) => Ok(refresher.broadcaster.new_client(features).await), + (Some(refresher), Ok(features)) => Ok(refresher + .broadcaster + .new_client( + validated_token, + filter_query.clone(), + query.clone(), + features, + ) + .await), _ => todo!(), } } @@ -330,6 +338,7 @@ pub fn configure_experimental_post_features( mod tests { use crate::http::broadcaster::Broadcaster; + use crate::internal_backstage::features; use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey}; use crate::types::{TokenType, TokenValidationStatus}; use std::collections::HashMap; @@ -1053,7 +1062,7 @@ mod tests { persistence: None, strict: false, app_name: "test-app".into(), - broadcaster: Broadcaster::create(), + broadcaster: Broadcaster::new(features_cache.clone()), }); let token_validator = Arc::new(TokenValidator { unleash_client: unleash_client.clone(), diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs index 19b68d5b..ae034728 100644 --- a/server/src/http/broadcaster.rs +++ b/server/src/http/broadcaster.rs @@ -1,21 +1,30 @@ /// copied from https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; -use actix_web::{rt::time::interval, web::Json}; +use actix_web::{ + rt::time::interval, + web::{Json, Query}, +}; use actix_web_lab::{ sse::{self, Event, Sse}, util::InfallibleStream, }; +use dashmap::DashMap; use futures_util::future; use parking_lot::Mutex; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use unleash_types::client_features::{ClientFeatures, Query}; +use unleash_types::client_features::ClientFeatures; -use crate::{filters::FeatureFilterSet, types::EdgeToken}; +use crate::{ + filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet}, + tokens::cache_key, + types::{EdgeResult, EdgeToken, FeatureFilters}, +}; pub struct Broadcaster { inner: Mutex, + features_cache: Arc>, } // this doesn't work because filter_set isn't clone. However, we can probably @@ -31,25 +40,37 @@ pub struct Broadcaster { // Then, when we drop clients, also drop their corresponding entries from the // map. -// #[derive(Debug, Clone)] +#[derive(Debug, Clone)] struct StreamClient { stream: mpsc::Sender, + id: String, +} + +struct QueryStuff { token: EdgeToken, - filter_set: FeatureFilterSet, - query: Query, + filter_set: Query, + query: unleash_types::client_features::Query, +} + +impl std::fmt::Debug for QueryStuff { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "QueryStuff") + } } #[derive(Debug, Default)] struct BroadcasterInner { - clients: Vec>, + clients: Vec, + filters: HashMap, } impl Broadcaster { /// Constructs new broadcaster and spawns ping loop. - pub fn create() -> Arc { + pub fn new(features: Arc>) -> Arc { let this = Arc::new(Broadcaster { inner: Mutex::new(BroadcasterInner::default()), + features_cache: features, }); Broadcaster::spawn_ping(Arc::clone(&this)); @@ -78,6 +99,7 @@ impl Broadcaster { for client in clients { if client + .stream .send(sse::Event::Comment("keep-alive".into())) .await .is_ok() @@ -97,13 +119,25 @@ impl Broadcaster { /// updates later. pub async fn new_client( &self, - // token: EdgeToken, - // filter_set: FeatureFilterSet, - // query: Query, + token: EdgeToken, + filter_set: Query, + query: unleash_types::client_features::Query, features: Json, ) -> Sse>> { let (tx, rx) = mpsc::channel(10); + let token_string = token.token.clone(); + let query_stuff = QueryStuff { + token, + filter_set, + query, + }; + + self.inner + .lock() + .filters + .insert(token_string.clone(), query_stuff); + tx.send( sse::Data::new_json(features) .unwrap() @@ -113,7 +147,10 @@ impl Broadcaster { .await .unwrap(); - self.inner.lock().clients.push(tx); + self.inner.lock().clients.push(StreamClient { + stream: tx, + id: token_string, + }); Sse::from_infallible_receiver(rx) // we're already using remove_stale_clients to clean up disconnected @@ -130,22 +167,51 @@ impl Broadcaster { pub async fn rebroadcast(&self, data: Event) { let clients = self.inner.lock().clients.clone(); - let send_futures = clients.iter().map(|client| client.send(data.clone())); + let send_futures = clients + .iter() + .map(|client| client.stream.send(data.clone())); // try to send to all clients, ignoring failures // disconnected clients will get swept up by `remove_stale_clients` let _ = future::join_all(send_futures).await; } + fn get_query_filters( + filter_query: Query, + token: EdgeToken, + ) -> FeatureFilterSet { + let query_filters = filter_query.into_inner(); + + let filter_set = if let Some(name_prefix) = query_filters.name_prefix { + FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix))) + } else { + FeatureFilterSet::default() + } + .with_filter(project_filter(&token)); + filter_set + } + /// Broadcasts `msg` to all clients. /// /// This is the example implementation of the broadcast function. It's not used anywhere today. - pub async fn broadcast(&self, msg: &str) { + pub async fn broadcast(&self) { let clients = self.inner.lock().clients.clone(); - let send_futures = clients - .iter() - .map(|client| client.send(sse::Data::new(msg).into())); + let send_futures = clients.iter().map(|client| { + let binding = self.inner.lock(); + let query_stuff = binding.filters.get(&client.id).unwrap(); + let filter_set = Broadcaster::get_query_filters( + query_stuff.filter_set.clone(), + query_stuff.token.clone(), + ); + let features = self + .features_cache + .get(&cache_key(&query_stuff.token)) + .map(|client_features| filter_client_features(&client_features, &filter_set)); + // let features = get_features_for_filter(query_stuff.token.clone(), &filter_set).unwrap(); + let event = sse::Data::new_json(&features).unwrap().into(); + client.stream.send(event) + }); // try to send to all clients, ignoring failures // disconnected clients will get swept up by `remove_stale_clients` diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index a64712d8..0c4363e3 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -120,7 +120,7 @@ impl Default for FeatureRefresher { persistence: None, strict: true, app_name: "unleash_edge".into(), - broadcaster: Broadcaster::create(), + broadcaster: Broadcaster::new(Default::default()), } } } @@ -160,13 +160,13 @@ impl FeatureRefresher { FeatureRefresher { unleash_client, tokens_to_refresh: Arc::new(DashMap::default()), - features_cache: features, + features_cache: features.clone(), engine_cache: engines, refresh_interval: features_refresh_interval, persistence, strict, app_name: app_name.into(), - broadcaster: Broadcaster::create(), + broadcaster: Broadcaster::new(features.clone()), } } @@ -378,8 +378,9 @@ impl FeatureRefresher { // feature set OR even just a "filter flags" // function. The broadcaster will take care // of filtering the flags per listener. - let data = Data::new(event.data).event("unleash-updated"); - broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await; + // let data = Data::new(event.data).event("unleash-updated"); + // broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await; + broadcaster.broadcast().await; } eventsource_client::SSE::Event(event) => { debug!(