Skip to content

Commit

Permalink
wip: (broken) use active_connections hash map
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 6, 2024
1 parent e097a90 commit 0d65761
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 45 deletions.
2 changes: 1 addition & 1 deletion server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn stream_features(
match (req.app_data::<Data<FeatureRefresher>>(), features) {
(Some(refresher), Ok(features)) => Ok(refresher
.broadcaster
.new_client(
.connect(
validated_token,
filter_query.clone(),
query.clone(),
Expand Down
138 changes: 100 additions & 38 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
/// copied from https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::HashMap,
hash::{Hash, Hasher},
sync::Arc,
time::Duration,
};

use actix_web::{
rt::time::interval,
Expand All @@ -12,21 +17,18 @@ use actix_web_lab::{
use dashmap::DashMap;
use futures_util::future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use serde::Serialize;
use tokio::{net::unix::pipe::Sender, sync::mpsc};
use tokio_stream::wrappers::ReceiverStream;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_features::{ClientFeatures, Query as FlagQuery};

use crate::{
cli,
filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet},
tokens::cache_key,
types::{EdgeResult, EdgeToken, FeatureFilters},
};

pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
features_cache: Arc<DashMap<String, ClientFeatures>>,
}

// this doesn't work because filter_set isn't clone. However, we can probably
// find a way around that. For instance, we can create a hash map / dash map of
// some client identifier to each filter set, so that we don't need to clone the
Expand All @@ -50,7 +52,7 @@ struct StreamClient {
struct QueryStuff {
token: EdgeToken,
filter_set: Query<FeatureFilters>,
query: unleash_types::client_features::Query,
query: FlagQuery,
}

impl std::fmt::Debug for QueryStuff {
Expand All @@ -59,12 +61,36 @@ impl std::fmt::Debug for QueryStuff {
}
}

#[derive(Debug, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct QueryWrapper {
query: FlagQuery,
}

impl Hash for QueryWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
serde_json::to_string(&self.query).unwrap().hash(state);
}
}

#[derive(Clone, Debug)]
struct ClientGroup {
clients: Vec<mpsc::Sender<sse::Event>>,
filter_set: Query<FeatureFilters>,
token: EdgeToken,
}

#[derive(Default)]
struct BroadcasterInner {
active_connections: HashMap<QueryWrapper, ClientGroup>,
clients: Vec<StreamClient>,
filters: HashMap<String, QueryStuff>,
}

pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
features_cache: Arc<DashMap<String, ClientFeatures>>,
}

impl Broadcaster {
/// Constructs new broadcaster and spawns ping loop.
pub fn new(features: Arc<DashMap<String, ClientFeatures>>) -> Arc<Self> {
Expand Down Expand Up @@ -117,7 +143,7 @@ impl Broadcaster {
/// The commented-out arguments are what we'll need to store per client so
/// that we can properly filter / format the feature response when they get
/// updates later.
pub async fn new_client(
pub async fn connect(
&self,
token: EdgeToken,
filter_set: Query<FeatureFilters>,
Expand All @@ -126,17 +152,32 @@ impl Broadcaster {
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);

let token_string = token.token.clone();
let query_stuff = QueryStuff {
token,
filter_set,
query,
};
// let token_string = token.token.clone();
// let query_stuff = QueryStuff {
// token: token.clone(),
// filter_set: filter_set.clone(),
// query: query.clone(),
// };

self.inner
.lock()
.filters
.insert(token_string.clone(), query_stuff);
.active_connections
.entry(QueryWrapper {
query: query.clone(),
})
.and_modify(|group| {
group.clients.push(tx.clone());
})
.or_insert(ClientGroup {
clients: vec![tx.clone()],
filter_set,
token,
});

// self.inner
// .lock()
// .filters
// .insert(token_string.clone(), query_stuff);

tx.send(
sse::Data::new_json(features)
Expand All @@ -147,10 +188,10 @@ impl Broadcaster {
.await
.unwrap();

self.inner.lock().clients.push(StreamClient {
stream: tx,
id: token_string,
});
// self.inner.lock().clients.push(StreamClient {
// stream: tx,
// id: token_string,
// });

Sse::from_infallible_receiver(rx)
// we're already using remove_stale_clients to clean up disconnected
Expand Down Expand Up @@ -191,30 +232,51 @@ impl Broadcaster {
filter_set
}

/// Broadcasts `msg` to all clients.
///
/// This is the example implementation of the broadcast function. It's not used anywhere today.
/// Broadcast new features to all clients.
pub async fn broadcast(&self) {
let clients = self.inner.lock().clients.clone();
let active_connections = self.inner.lock().active_connections.clone();

let send_futures = clients.iter().map(|client| {
let binding = self.inner.lock();
let query_stuff = binding.filters.get(&client.id).unwrap();
let filter_set = Broadcaster::get_query_filters(
query_stuff.filter_set.clone(),
query_stuff.token.clone(),
);
let send_futures = Vec::new();
for (query, group) in active_connections {
let filter_set =
Broadcaster::get_query_filters(group.filter_set.clone(), group.token.clone());
let features = self
.features_cache
.get(&cache_key(&query_stuff.token))
.get(&cache_key(&group.token))
.map(|client_features| filter_client_features(&client_features, &filter_set));
// let features = get_features_for_filter(query_stuff.token.clone(), &filter_set).unwrap();
let event = sse::Data::new_json(&features).unwrap().into();
client.stream.send(event)
});
let event: Event = sse::Data::new_json(&features).unwrap().into();

for client in group.clients {
send_futures.push(client.stream.send(event.clone()));
}
// let send_futures = group
// .clients
// .iter()
// .map(|client| client.send(event.clone()));
}
// try to send to all clients, ignoring failures
// disconnected clients will get swept up by `remove_stale_clients`
let _ = future::join_all(send_futures).await;

// let send_futures = clients.iter().map(|client| {
// let binding = self.inner.lock();
// let query_stuff = binding.filters.get(&client.id).unwrap();
// let filter_set = Broadcaster::get_query_filters(
// query_stuff.filter_set.clone(),
// query_stuff.token.clone(),
// );
// let features = self
// .features_cache
// .get(&cache_key(&query_stuff.token))
// .map(|client_features| filter_client_features(&client_features, &filter_set));
// // let features = get_features_for_filter(query_stuff.token.clone(), &filter_set).unwrap();
// let event = sse::Data::new_json(&features).unwrap().into();
// client.stream.send(event)
// });

// // try to send to all clients, ignoring failures
// // disconnected clients will get swept up by `remove_stale_clients`
// let _ = future::join_all(send_futures).await;
}
}
10 changes: 5 additions & 5 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl FeatureRefresher {
}

/// This is where we set up a listener per token.
pub async fn start_streaming_features_background_task(&self) -> Result<(), anyhow::Error> {
pub async fn start_streaming_features_background_task(&self) -> anyhow::Result<()> {
let refreshes = self.get_tokens_due_for_refresh();
for refresh in refreshes {
let token = refresh.token.clone();
Expand All @@ -326,15 +326,15 @@ impl FeatureRefresher {
.build();

let refresher = self.clone();
let broadcaster = self.broadcaster.clone();
// let broadcaster = self.broadcaster.clone();

tokio::spawn(async move {
let mut stream = es_client
.stream()
.map_ok(move |sse| {
let token = token.clone();
let refresher = refresher.clone();
let broadcaster = broadcaster.clone();
// let broadcaster = broadcaster.clone();
async move {
match sse {
// The first time we're connecting to Unleash. Just store the data.
Expand Down Expand Up @@ -369,7 +369,7 @@ impl FeatureRefresher {
// of filtering the flags per listener.
// let data = Data::new(event.data).event("unleash-updated");
// broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await;
broadcaster.broadcast().await;
refresher.broadcaster.broadcast().await;
}
eventsource_client::SSE::Event(event) => {
debug!(
Expand All @@ -378,7 +378,7 @@ impl FeatureRefresher {
);
}
eventsource_client::SSE::Connected(_) => {
debug!("SSE Connectection established");
debug!("SSE Connection established");
}
eventsource_client::SSE::Comment(_) => {
// purposefully left blank.
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn main() -> Result<(), anyhow::Error> {
let refresher_for_background = feature_refresher.clone().unwrap();

tokio::spawn(async move {
refresher_for_background
let _ = refresher_for_background
.start_streaming_features_background_task()
.await;
});
Expand Down

0 comments on commit 0d65761

Please sign in to comment.