-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
experimental: sse client support #592
Conversation
Dependency ReviewThe following issues were found:
License IssuesCargo.lock
server/Cargo.toml
Denied Licenses: GPL-1.0, GPL-2.0, GPL-3.0, LGPL-2.1, LGPL-3.0, AGPL-3.0 OpenSSF ScorecardScorecard details
Scanned Files
|
b3122da
to
ae70a59
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you did an amazing job with this! 🥇
|
||
#[cfg(feature = "streaming")] | ||
#[get("/streaming")] | ||
pub async fn stream_features( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this would be slightly more consistent with Unleash's counterpart?
pub async fn stream_features( | |
pub async fn connect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. But as the API endpoint, maybe it should be more explicit? Feels like stream_features
is clearer to me. But then again, it's a separate controller in Unleash. I don't care much either way, but this feels more explicit in this context.
impl From<SendError<Event>> for EdgeError { | ||
// todo: create better enum representation. use this is placeholder | ||
fn from(_value: SendError<Event>) -> Self { | ||
EdgeError::TlsError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be weird seeing a TlsError
here, but I'm guessing we're okay with this for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. Hence the comment. I just didn't want to start implementing a new error enum member when cleaning this up. We should handle this as part of testing, because I don't think it is the right error.
server/src/http/feature_refresher.rs
Outdated
engine_cache: engines, | ||
refresh_interval: features_refresh_interval, | ||
persistence, | ||
strict, | ||
app_name: app_name.into(), | ||
#[cfg(feature = "streaming")] | ||
broadcaster: Broadcaster::new(features.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we move this above features_cache? That way we can clone here if needed, but leave features_cache: features
intact as the owner of the features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Smart 🧠
} | ||
_ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { | ||
tracing::info!("Prometheus push unexpectedly shut down"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wish we could simplify this and reduce the duplicated code but I don't have any better ideas atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I know. I hate this. But we can refactor it later. I don't think this'll be the final version (I hope not!).
Overview
tl;dr: This is a very experimental implementation of streaming in Edge: it both listens to the experimental Unleash streaming endpoint and pushes updates to any listeners subscribing to Edge (in effect mirroring the Unleash endpoint). All related code is behind the "streaming" feature gate.
More detail:
I have added an event source client to the
features_refresher
file. If the streaming flag is on, we'll spawn a task that takes care of listening to Unleash for events instead of starting the poll loop for flags.There is a new endpoint at
api/client/streaming
(mirroring the Unleash endpoint) that you can hit to start listening for updates.The updates are handled by a new
broadcaster
module (largely stolen from this Actix example).The broadcaster stores its client in a hash map that uses the flag query as the key and maps it to a vec of clients that use the same query.
Left to do
Regarding the implementation: I'm not very familiar with Actix, and I haven't done a whole lot of async / multithreaded rust before, so there's probably gonna be a whole lot of things that we can improve, from the architecture level to the specific data structures used.
Also, due to the very time-limited spike we did, we need to actually do some real error handling. There's a good few places where we either ignore errors or would just panic if we ever encountered them.But aside from that, there's a few other things to do too: