-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add our own versions of FuturesUnordered
and FuturesOrdered
#2798
Conversation
e0ee318
to
3948e91
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're still using the futures
's version in consensus/src/sync/syncer.rs
.
57c33ba
to
e2f0ae0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some minor feedback points.
CC #2832 to use this new abstraction once this PR is in.
2fd2b5a
to
f2441da
Compare
@@ -568,8 +577,23 @@ impl<N: Network> Stream for StateQueue<N> { | |||
} | |||
} | |||
|
|||
// Check if we have peers. | |||
if self.nonempty_peers.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it better to use get_or_insert
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only inserts something if self.chunk_request_component.notify_nonempty_peers()
returns a non-None
value. This doesn't fit Option::get_or_insert
's function signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notify_nonempty_peers()
can return None
thus the following assignment can be to None
again, which get_or_insert
does not allow for.
.map(|notify| Box::pin(async move { notify.notified().await }) as BoxFuture<()>); | ||
} | ||
if let Some(nonempty_peers) = &mut self.nonempty_peers { | ||
if nonempty_peers.as_mut().poll(cx).is_ready() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just take
it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only want to remove the future from the member variable when it resolves. Option::take
can't do that.
f2441da
to
f3744bd
Compare
@@ -568,8 +577,23 @@ impl<N: Network> Stream for StateQueue<N> { | |||
} | |||
} | |||
|
|||
// Check if we have peers. | |||
if self.nonempty_peers.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notify_nonempty_peers()
can return None
thus the following assignment can be to None
again, which get_or_insert
does not allow for.
6c66ab0
to
86b7fe0
Compare
Rebasing branch to merge it |
Unlike their `futures_util::stream::*` counterparts, they wake themselves when some future is pushed.
This gets rid of a couple of manual waker instances. It also fixes some waking-related bugs (CC #2550).
This fixes a waking bug in the `Stream` implementation of `StateQueue`, as it checked for a nonempty peer list but did not register a waker for it. CC #2550
86b7fe0
to
8171e0c
Compare
Unlike their
futures_util::stream::*
counterparts, they wake themselves when some future is pushed.This gets rid of a couple of manual waker instances.
It also fixes some waking-related bugs (CC #2550).
Use clippy to warn for usages of the old types.