Skip to content

Commit

Permalink
streaming-spike: send data on connection
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 5, 2024
1 parent 41beb29 commit 0227a51
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
2 changes: 1 addition & 1 deletion server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn stream_features(
)
.await;
match (req.app_data::<Data<FeatureRefresher>>(), features) {
(Some(refresher), Ok(features)) => refresher.broadcaster.new_client().await,
(Some(refresher), Ok(features)) => refresher.broadcaster.new_client(features).await,
_ => todo!(),
}
}
Expand Down
19 changes: 14 additions & 5 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use actix_web::rt::time::interval;
use actix_web::{rt::time::interval, web::Json};
use actix_web_lab::{
sse::{self, Event, Sse},
util::InfallibleStream,
Expand All @@ -9,6 +9,7 @@ use futures_util::future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use unleash_types::client_features::ClientFeatures;

pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
Expand Down Expand Up @@ -65,12 +66,20 @@ impl Broadcaster {

/// Registers client with broadcaster, returning an SSE response body.
/// should take the current feature set as input and send it to the client.
pub async fn new_client(&self) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
pub async fn new_client(
&self,
features: Json<ClientFeatures>,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);

tx.send(sse::Data::new("").event("unleash-connected").into())
.await
.unwrap();
tx.send(
sse::Data::new_json(features)
.unwrap()
.event("unleash-connected")
.into(),
)
.await
.unwrap();

self.inner.lock().clients.push(tx);

Expand Down
2 changes: 1 addition & 1 deletion server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl FeatureRefresher {
serde_json::from_str(&event.data).unwrap();
refresher.handle_client_features_updated(TokenRefresh::new(token, None), features);

let data = Data::new_json(event.data).unwrap().event("unleash-updated");
let data = Data::new(event.data).event("unleash-updated");
broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await;
// self.broadcaster.broadcast("got an update".clone).await;
}
Expand Down

0 comments on commit 0227a51

Please sign in to comment.