Skip to content

Commit

Permalink
streaming-spike: minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 3, 2024
1 parent 4822888 commit dd43a4d
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 28 deletions.
3 changes: 1 addition & 2 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::http::header::EntityTag;
use chrono::Utc;
use dashmap::DashMap;
use eventsource_client::Client;
use futures::{Stream, TryStreamExt};
use futures::TryStreamExt;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use unleash_types::client_features::Segment;
Expand Down Expand Up @@ -305,7 +305,6 @@ impl FeatureRefresher {
let refreshes = self.get_tokens_due_for_refresh();
for refresh in refreshes {
let token = refresh.token.clone();
println!("{}/client/streaming", self.unleash_client.urls.api_url);
let streaming_url = format!("{}/client/streaming", self.unleash_client.urls.api_url);

let es_client = match eventsource_client::ClientBuilder::for_url(&streaming_url) {
Expand Down
26 changes: 0 additions & 26 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use actix_web::{web, App, HttpServer};
use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
use futures::{Stream, TryStreamExt};
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ConnectVia;
use utoipa::OpenApi;
Expand All @@ -26,23 +25,6 @@ use unleash_edge::{cli, client_api, frontend_api, health_checker, openapi, ready
use unleash_edge::{edge_api, prom_metrics};
use unleash_edge::{internal_backstage, tls};

fn tail_events(client: impl eventsource_client::Client) -> impl Stream<Item = Result<(), ()>> {
client
.stream()
.map_ok(|event| match event {
eventsource_client::SSE::Connected(connection) => {
println!("got connected: \nstatus={}", connection.response().status())
}
eventsource_client::SSE::Event(ev) => {
println!("got an event: {}\n{}", ev.event_type, ev.data)
}
eventsource_client::SSE::Comment(comment) => {
println!("got a comment: \n{}", comment)
}
})
.map_err(|err| eprintln!("error streaming events: {:?}", err))
}

#[cfg(not(tarpaulin_include))]
#[actix_web::main]
async fn main() -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -165,14 +147,6 @@ async fn main() -> Result<(), anyhow::Error> {
.shutdown_timeout(5)
.client_request_timeout(std::time::Duration::from_secs(request_timeout));

// let es_client = eventsource_client::ClientBuilder::for_url("http://localhost:4242/streaming")?
// .header("authorization", "some-token")?
// .build();

// let mut stream = tail_events(es_client);

// tokio::spawn(async move { while stream.try_next().await.unwrap().is_some() {} });

match schedule_args.mode {
cli::EdgeMode::Edge(edge) => {
let refresher_for_background = feature_refresher.clone().unwrap();
Expand Down

0 comments on commit dd43a4d

Please sign in to comment.