Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add our own versions of FuturesUnordered and FuturesOrdered #2798

Merged
merged 6 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ reason = "use `nimiq_utils::spawn_local` instead, it is also supported in WASM e
[[disallowed-methods]]
path = "wasm_bindgen_futures::spawn_local"
reason = "use `nimiq_utils::spawn` or `nimq_utils::spawn_local` instead, it is also supported in non-WASM environments"

[[disallowed-types]]
path = "futures_util::stream::FuturesUnordered"
reason = "use `nimiq_utils::stream::FuturesUnordered` instead, it does not need manual `Waker`s"

[[disallowed-types]]
path = "futures_util::stream::FuturesOrdered"
reason = "use `nimiq_utils::stream::FuturesOrdered` instead, it does not need manual `Waker`s"
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ nimiq-primitives = { workspace = true, features = ["policy", "trie"] }
nimiq-serde = { workspace = true }
nimiq-time = { workspace = true }
nimiq-transaction = { workspace = true }
nimiq-utils = { workspace = true, features = ["merkle", "spawn", "time"] }
nimiq-utils = { workspace = true, features = ["futures", "merkle", "spawn", "time"] }
nimiq-zkp-component = { workspace = true }

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/consensus/head_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use std::{
task::{Context, Poll},
};

use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use nimiq_block::Block;
use nimiq_blockchain_interface::AbstractBlockchain;
use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::{network::Network, request::RequestError};
use nimiq_utils::stream::FuturesUnordered;

use crate::messages::{BlockError, RequestBlock, RequestHead, ResponseHead};

Expand Down
8 changes: 2 additions & 6 deletions consensus/src/sync/history/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use std::{
task::Waker,
};

use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt};
use futures::{future::BoxFuture, FutureExt};
use nimiq_blockchain::Blockchain;
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::network::{Network, SubscribeEvents};
use nimiq_utils::WakerExt as _;
use nimiq_utils::stream::FuturesUnordered;
use parking_lot::RwLock;

use crate::{
Expand Down Expand Up @@ -117,9 +117,5 @@ impl<TNetwork: Network> MacroSync<TNetwork::PeerId> for HistoryMacroSync<TNetwor
)
.boxed();
self.epoch_ids_stream.push(future);

// Pushing the future to FuturesUnordered above does not wake the task that
// polls `epoch_ids_stream`. Therefore, we need to wake the task manually.
self.waker.wake();
}
}
3 changes: 1 addition & 2 deletions consensus/src/sync/history/sync_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ impl<TNetwork: Network> Stream for HistoryMacroSync<TNetwork> {
type Item = MacroSyncReturn<TNetwork::PeerId>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.waker.store_waker(cx);

if let Poll::Ready(o) = self.poll_network_events(cx) {
return Poll::Ready(o);
}
Expand All @@ -251,6 +249,7 @@ impl<TNetwork: Network> Stream for HistoryMacroSync<TNetwork> {

self.poll_job_queue(cx);

self.waker.store_waker(cx);
Poll::Pending
}
}
Expand Down
12 changes: 2 additions & 10 deletions consensus/src/sync/light/sync.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
sync::Arc,
task::Waker,
};

use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt};
use futures::{future::BoxFuture, FutureExt};
use nimiq_block::Block;
use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::{
network::{CloseReason, Network, SubscribeEvents},
request::RequestError,
};
use nimiq_utils::{spawn, WakerExt as _};
use nimiq_utils::{spawn, stream::FuturesUnordered};
use nimiq_zkp_component::{
types::{Error, ZKPRequestEvent},
zkp_component::ZKPComponentProxy,
Expand Down Expand Up @@ -185,8 +184,6 @@ pub struct LightMacroSync<TNetwork: Network> {
pub(crate) synced_validity_peers: Vec<TNetwork::PeerId>,
/// Minimum distance to light sync in #blocks from the peers head.
pub(crate) full_sync_threshold: u32,
/// Waker used for the poll next function
pub(crate) waker: Option<Waker>,
}

impl<TNetwork: Network> LightMacroSync<TNetwork> {
Expand Down Expand Up @@ -230,7 +227,6 @@ impl<TNetwork: Network> LightMacroSync<TNetwork> {
epoch_ids_stream: FuturesUnordered::new(),
zkp_component_proxy,
zkp_requests: FuturesUnordered::new(),
waker: None,
full_sync_threshold,
block_headers: Default::default(),
validity_requests: None,
Expand Down Expand Up @@ -266,9 +262,5 @@ impl<TNetwork: Network> MacroSync<TNetwork::PeerId> for LightMacroSync<TNetwork>

self.zkp_requests
.push(Self::request_zkps(self.zkp_component_proxy.clone(), peer_id).boxed());

// Pushing the future to FuturesUnordered above does not wake the task that
// polls `epoch_ids_stream`. Therefore, we need to wake the task manually.
self.waker.wake();
}
}
11 changes: 0 additions & 11 deletions consensus/src/sync/light/sync_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_light_blockchain::LightBlockchain;
use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent};
use nimiq_primitives::policy::Policy;
use nimiq_utils::WakerExt as _;
use nimiq_zkp_component::types::ZKPRequestEvent::{OutdatedProof, Proof};

use crate::sync::{
Expand Down Expand Up @@ -372,10 +371,6 @@ impl<TNetwork: Network> LightMacroSync<TNetwork> {
)
.boxed();
self.epoch_ids_stream.push(future);

// Pushing the future to FuturesUnordered above does not wake the task that
// polls `epoch_ids_stream`. Therefore, we need to wake the task manually.
self.waker.wake();
}
} else {
// If we don't have any pending requests from this peer, we proceed requesting epoch ids
Expand All @@ -386,10 +381,6 @@ impl<TNetwork: Network> LightMacroSync<TNetwork> {
)
.boxed();
self.epoch_ids_stream.push(future);

// Pushing the future to FuturesUnordered above does not wake the task that
// polls `epoch_ids_stream`. Therefore, we need to wake the task manually.
self.waker.wake();
}
}
(Ok(Err(error)), peer_id) => {
Expand All @@ -413,8 +404,6 @@ impl<TNetwork: Network> Stream for LightMacroSync<TNetwork> {
type Item = MacroSyncReturn<TNetwork::PeerId>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.waker.store_waker(cx);

if let Poll::Ready(o) = self.poll_network_events(cx) {
return Poll::Ready(o);
}
Expand Down
7 changes: 2 additions & 5 deletions consensus/src/sync/live/diff_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ use std::{
task::{Context, Poll},
};

use futures::{
future::BoxFuture,
stream::{FuturesOrdered, FuturesUnordered},
Stream, StreamExt, TryStreamExt,
};
use futures::{future::BoxFuture, Stream, StreamExt, TryStreamExt};
use nimiq_block::Block;
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::{
Expand All @@ -19,6 +15,7 @@ use nimiq_network_interface::{
};
use nimiq_primitives::trie::trie_diff::TrieDiff;
use nimiq_serde::{Deserialize, Serialize};
use nimiq_utils::stream::{FuturesOrdered, FuturesUnordered};
use parking_lot::RwLock;

use self::diff_request_component::DiffRequestComponent;
Expand Down
10 changes: 9 additions & 1 deletion consensus/src/sync/live/state_queue/chunk_request_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
task::{Context, Poll},
};

use futures::{FutureExt, Stream, StreamExt};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use nimiq_network_interface::{network::Network, request::RequestError};
use nimiq_primitives::key_nibbles::KeyNibbles;
use parking_lot::RwLock;
Expand Down Expand Up @@ -72,6 +72,14 @@ impl<N: Network> ChunkRequestComponent<N> {
) -> Result<ResponseChunk, RequestError> {
network.request::<RequestChunk>(request, peer_id).await
}

/// Returns a future that resolves when the peer list of the chunk request
/// component becomes nonempty.
///
/// Returns `None` is the chunk request component already has peers.
pub fn wait_for_peers(&self) -> Option<BoxFuture<'static, ()>> {
self.peers.read().wait_for_peers()
}
}

impl<N: Network> Stream for ChunkRequestComponent<N> {
Expand Down
29 changes: 26 additions & 3 deletions consensus/src/sync/live/state_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
task::{Context, Poll},
};

use futures::{stream::BoxStream, Stream, StreamExt};
use futures::{future::BoxFuture, stream::BoxStream, Stream, StreamExt};
use nimiq_block::Block;
use nimiq_blockchain::Blockchain;
use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent};
Expand All @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize};
use self::chunk_request_component::ChunkRequestComponent;
use super::{
block_queue::BlockAndId,
queue::{ChunkAndId, LiveSyncQueue, QueueConfig},
queue::{ChunkAndId, QueueConfig},
};
use crate::sync::live::diff_queue::{DiffQueue, QueuedDiff};

Expand Down Expand Up @@ -173,6 +173,14 @@ pub struct StateQueue<N: Network> {

/// The blockchain event stream.
blockchain_rx: BoxStream<'static, BlockchainEvent>,

/// Waiter for the peer list to become nonempty.
///
/// Since we only want to dispatch requests from the
/// `ChunkRequestComponent` when its peer list is nonempty, we need some
/// notification mechanism to wake us up once the list becomes nonempty if
/// we find it empty.
peers_became_nonempty: Option<BoxFuture<'static, ()>>,
}

impl<N: Network> StateQueue<N> {
Expand Down Expand Up @@ -211,6 +219,7 @@ impl<N: Network> StateQueue<N> {
current_macro_height,
start_key,
blockchain_rx,
peers_became_nonempty: None,
}
}

Expand Down Expand Up @@ -574,8 +583,22 @@ impl<N: Network> Stream for StateQueue<N> {
}
}

// Check if we have peers.
if self.peers_became_nonempty.is_none() {
self.peers_became_nonempty = self.chunk_request_component.wait_for_peers();
}
if let Some(peers_became_nonempty) = &mut self.peers_became_nonempty {
if peers_became_nonempty.as_mut().poll(cx).is_ready() {
self.peers_became_nonempty = None;
}
}
// Obvious TOCTOU, but it would otherwise need to lock the
// `chunk_request_component`'s peer list.
//
// Request chunks via ChunkRequestComponent.
if !self.chunk_request_component.has_pending_requests() && self.num_peers() > 0 {
if self.peers_became_nonempty.is_none()
&& !self.chunk_request_component.has_pending_requests()
{
self.request_chunk();
}

Expand Down
21 changes: 20 additions & 1 deletion consensus/src/sync/peer_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex};
use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex, sync::Arc};

use futures::future::BoxFuture;
use nimiq_network_interface::network::Network;
use tokio::sync::Notify;

/// A list of peers to be used while syncing.
/// This contains an ordered list of peers as well as a hashmap.
Expand All @@ -9,6 +11,8 @@ use nimiq_network_interface::network::Network;
pub struct PeerList<N: Network> {
peers_set: HashSet<N::PeerId>,
peers: Vec<N::PeerId>,
/// Used to notify listeners when the peer list becomes nonempty.
notify_nonempty: Arc<Notify>,
}
nibhar marked this conversation as resolved.
Show resolved Hide resolved

/// Stores an index into a [`PeerList`].
Expand Down Expand Up @@ -58,6 +62,7 @@ impl<N: Network> Default for PeerList<N> {
Self {
peers_set: Default::default(),
peers: Default::default(),
notify_nonempty: Default::default(),
}
}
}
Expand All @@ -67,13 +72,17 @@ impl<N: Network> Clone for PeerList<N> {
Self {
peers_set: self.peers_set.clone(),
peers: self.peers.clone(),
notify_nonempty: Default::default(),
}
}
}

impl<N: Network> PeerList<N> {
pub fn add_peer(&mut self, peer_id: N::PeerId) -> bool {
if self.peers_set.insert(peer_id) {
if self.peers.is_empty() {
self.notify_nonempty.notify_waiters();
hrxi marked this conversation as resolved.
Show resolved Hide resolved
}
self.peers.push(peer_id);
return true;
}
Expand Down Expand Up @@ -130,6 +139,16 @@ impl<N: Network> PeerList<N> {
peer_index.index = peer_index.index.wrapping_add(1) % self.peers.len();
Some(self.peers[peer_index.index])
}

/// Returns a future that resolves when the list becomes nonempty.
///
/// Returns `None` is the list has peers already.
pub fn wait_for_peers(&self) -> Option<BoxFuture<'static, ()>> {
self.is_empty().then(|| {
let notify = self.notify_nonempty.clone();
Box::pin(async move { notify.notified().await }) as _
})
}
}

impl<N: Network, I: SliceIndex<[N::PeerId]>> Index<I> for PeerList<N> {
Expand Down
Loading
Loading