Skip to content

Commit

Permalink
streaming-spike: prepare to send message on connection
Browse files Browse the repository at this point in the history
A;
  • Loading branch information
thomasheartman committed Dec 5, 2024
1 parent 9aeaa53 commit 41beb29
Showing 1 changed file with 16 additions and 50 deletions.
66 changes: 16 additions & 50 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::types::{
};
use actix_web::web::{self, Data, Json, Query};
use actix_web::{get, post, HttpRequest, HttpResponse, Responder};
use aws_sdk_s3::config::endpoint::ResolveEndpoint;
use dashmap::DashMap;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia};
Expand Down Expand Up @@ -39,59 +40,24 @@ pub async fn get_features(

#[get("/streaming")]
pub async fn stream_features(
_edge_token: EdgeToken,
_features_cache: Data<DashMap<String, ClientFeatures>>,
_token_cache: Data<DashMap<String, EdgeToken>>,
_filter_query: Query<FeatureFilters>,
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> impl Responder {
// .map(|refresher| refresher.broadcaster.new_client())
match req.app_data::<Data<FeatureRefresher>>() {
Some(refresher) => refresher.broadcaster.new_client().await,

None => todo!(),
let features = resolve_features(
edge_token,
features_cache,
token_cache,
filter_query,
req.clone(),
)
.await;
match (req.app_data::<Data<FeatureRefresher>>(), features) {
(Some(refresher), Ok(features)) => refresher.broadcaster.new_client().await,
_ => todo!(),
}

// let broadcaster = match req.app_data::<Data<FeatureRefresher>>() {
// Some(refresher) => {
// refresher.broadcaster.new_client().await
// }
// None => features_cache
// .get(&cache_key(&validated_token))
// .map(|client_features| filter_client_features(&client_features, &filter_set))
// .ok_or(EdgeError::ClientCacheError),
// }?;

// let (sender, receiver) = tokio::sync::mpsc::channel(2);

// actix_web::rt::spawn(async move {
// loop {
// let data = resolve_features(
// edge_token.clone(),
// features_cache.clone(),
// token_cache.clone(),
// filter_query.clone(),
// req.clone(),
// )
// .await;

// if let Ok(data) = data {
// let msg = sse::Data::new_json(data).unwrap().event("update");

// if sender.send(msg.into()).await.is_err() {
// tracing::warn!("client disconnected; could not send SSE message");
// break;
// }
// } else {
// tracing::warn!("whoops; data is err");
// break;
// }

// sleep(Duration::from_secs(10)).await;
// }
// });

// sse::Sse::from_infallible_receiver(receiver).with_keep_alive(Duration::from_secs(3))
}

#[utoipa::path(
Expand Down

0 comments on commit 41beb29

Please sign in to comment.