diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 24f96ed4..c646d92e 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -58,7 +58,7 @@ pub async fn stream_features( match (req.app_data::>(), features) { (Some(refresher), Ok(features)) => Ok(refresher .broadcaster - .new_client( + .connect( validated_token, filter_query.clone(), query.clone(), diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs index ae034728..a956cec1 100644 --- a/server/src/http/broadcaster.rs +++ b/server/src/http/broadcaster.rs @@ -1,5 +1,10 @@ /// copied from https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + hash::{Hash, Hasher}, + sync::Arc, + time::Duration, +}; use actix_web::{ rt::time::interval, @@ -12,21 +17,18 @@ use actix_web_lab::{ use dashmap::DashMap; use futures_util::future; use parking_lot::Mutex; -use tokio::sync::mpsc; +use serde::Serialize; +use tokio::{net::unix::pipe::Sender, sync::mpsc}; use tokio_stream::wrappers::ReceiverStream; -use unleash_types::client_features::ClientFeatures; +use unleash_types::client_features::{ClientFeatures, Query as FlagQuery}; use crate::{ + cli, filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet}, tokens::cache_key, types::{EdgeResult, EdgeToken, FeatureFilters}, }; -pub struct Broadcaster { - inner: Mutex, - features_cache: Arc>, -} - // this doesn't work because filter_set isn't clone. However, we can probably // find a way around that. For instance, we can create a hash map / dash map of // some client identifier to each filter set, so that we don't need to clone the @@ -50,7 +52,7 @@ struct StreamClient { struct QueryStuff { token: EdgeToken, filter_set: Query, - query: unleash_types::client_features::Query, + query: FlagQuery, } impl std::fmt::Debug for QueryStuff { @@ -59,12 +61,36 @@ impl std::fmt::Debug for QueryStuff { } } -#[derive(Debug, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +struct QueryWrapper { + query: FlagQuery, +} + +impl Hash for QueryWrapper { + fn hash(&self, state: &mut H) { + serde_json::to_string(&self.query).unwrap().hash(state); + } +} + +#[derive(Clone, Debug)] +struct ClientGroup { + clients: Vec>, + filter_set: Query, + token: EdgeToken, +} + +#[derive(Default)] struct BroadcasterInner { + active_connections: HashMap, clients: Vec, filters: HashMap, } +pub struct Broadcaster { + inner: Mutex, + features_cache: Arc>, +} + impl Broadcaster { /// Constructs new broadcaster and spawns ping loop. pub fn new(features: Arc>) -> Arc { @@ -117,7 +143,7 @@ impl Broadcaster { /// The commented-out arguments are what we'll need to store per client so /// that we can properly filter / format the feature response when they get /// updates later. - pub async fn new_client( + pub async fn connect( &self, token: EdgeToken, filter_set: Query, @@ -126,17 +152,32 @@ impl Broadcaster { ) -> Sse>> { let (tx, rx) = mpsc::channel(10); - let token_string = token.token.clone(); - let query_stuff = QueryStuff { - token, - filter_set, - query, - }; + // let token_string = token.token.clone(); + // let query_stuff = QueryStuff { + // token: token.clone(), + // filter_set: filter_set.clone(), + // query: query.clone(), + // }; self.inner .lock() - .filters - .insert(token_string.clone(), query_stuff); + .active_connections + .entry(QueryWrapper { + query: query.clone(), + }) + .and_modify(|group| { + group.clients.push(tx.clone()); + }) + .or_insert(ClientGroup { + clients: vec![tx.clone()], + filter_set, + token, + }); + + // self.inner + // .lock() + // .filters + // .insert(token_string.clone(), query_stuff); tx.send( sse::Data::new_json(features) @@ -147,10 +188,10 @@ impl Broadcaster { .await .unwrap(); - self.inner.lock().clients.push(StreamClient { - stream: tx, - id: token_string, - }); + // self.inner.lock().clients.push(StreamClient { + // stream: tx, + // id: token_string, + // }); Sse::from_infallible_receiver(rx) // we're already using remove_stale_clients to clean up disconnected @@ -191,30 +232,51 @@ impl Broadcaster { filter_set } - /// Broadcasts `msg` to all clients. - /// - /// This is the example implementation of the broadcast function. It's not used anywhere today. + /// Broadcast new features to all clients. pub async fn broadcast(&self) { let clients = self.inner.lock().clients.clone(); + let active_connections = self.inner.lock().active_connections.clone(); - let send_futures = clients.iter().map(|client| { - let binding = self.inner.lock(); - let query_stuff = binding.filters.get(&client.id).unwrap(); - let filter_set = Broadcaster::get_query_filters( - query_stuff.filter_set.clone(), - query_stuff.token.clone(), - ); + let send_futures = Vec::new(); + for (query, group) in active_connections { + let filter_set = + Broadcaster::get_query_filters(group.filter_set.clone(), group.token.clone()); let features = self .features_cache - .get(&cache_key(&query_stuff.token)) + .get(&cache_key(&group.token)) .map(|client_features| filter_client_features(&client_features, &filter_set)); - // let features = get_features_for_filter(query_stuff.token.clone(), &filter_set).unwrap(); - let event = sse::Data::new_json(&features).unwrap().into(); - client.stream.send(event) - }); + let event: Event = sse::Data::new_json(&features).unwrap().into(); + for client in group.clients { + send_futures.push(client.stream.send(event.clone())); + } + // let send_futures = group + // .clients + // .iter() + // .map(|client| client.send(event.clone())); + } // try to send to all clients, ignoring failures // disconnected clients will get swept up by `remove_stale_clients` let _ = future::join_all(send_futures).await; + + // let send_futures = clients.iter().map(|client| { + // let binding = self.inner.lock(); + // let query_stuff = binding.filters.get(&client.id).unwrap(); + // let filter_set = Broadcaster::get_query_filters( + // query_stuff.filter_set.clone(), + // query_stuff.token.clone(), + // ); + // let features = self + // .features_cache + // .get(&cache_key(&query_stuff.token)) + // .map(|client_features| filter_client_features(&client_features, &filter_set)); + // // let features = get_features_for_filter(query_stuff.token.clone(), &filter_set).unwrap(); + // let event = sse::Data::new_json(&features).unwrap().into(); + // client.stream.send(event) + // }); + + // // try to send to all clients, ignoring failures + // // disconnected clients will get swept up by `remove_stale_clients` + // let _ = future::join_all(send_futures).await; } } diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index ae6b117f..516ab36a 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -307,7 +307,7 @@ impl FeatureRefresher { } /// This is where we set up a listener per token. - pub async fn start_streaming_features_background_task(&self) -> Result<(), anyhow::Error> { + pub async fn start_streaming_features_background_task(&self) -> anyhow::Result<()> { let refreshes = self.get_tokens_due_for_refresh(); for refresh in refreshes { let token = refresh.token.clone(); @@ -326,7 +326,7 @@ impl FeatureRefresher { .build(); let refresher = self.clone(); - let broadcaster = self.broadcaster.clone(); + // let broadcaster = self.broadcaster.clone(); tokio::spawn(async move { let mut stream = es_client @@ -334,7 +334,7 @@ impl FeatureRefresher { .map_ok(move |sse| { let token = token.clone(); let refresher = refresher.clone(); - let broadcaster = broadcaster.clone(); + // let broadcaster = broadcaster.clone(); async move { match sse { // The first time we're connecting to Unleash. Just store the data. @@ -369,7 +369,7 @@ impl FeatureRefresher { // of filtering the flags per listener. // let data = Data::new(event.data).event("unleash-updated"); // broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await; - broadcaster.broadcast().await; + refresher.broadcaster.broadcast().await; } eventsource_client::SSE::Event(event) => { debug!( @@ -378,7 +378,7 @@ impl FeatureRefresher { ); } eventsource_client::SSE::Connected(_) => { - debug!("SSE Connectection established"); + debug!("SSE Connection established"); } eventsource_client::SSE::Comment(_) => { // purposefully left blank. diff --git a/server/src/main.rs b/server/src/main.rs index af488c29..b10732d9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -153,7 +153,7 @@ async fn main() -> Result<(), anyhow::Error> { let refresher_for_background = feature_refresher.clone().unwrap(); tokio::spawn(async move { - refresher_for_background + let _ = refresher_for_background .start_streaming_features_background_task() .await; });