diff --git a/consensus/src/sync/live/state_queue/chunk_request_component.rs b/consensus/src/sync/live/state_queue/chunk_request_component.rs index 4b5d4a4124..5f6362d19d 100644 --- a/consensus/src/sync/live/state_queue/chunk_request_component.rs +++ b/consensus/src/sync/live/state_queue/chunk_request_component.rs @@ -8,6 +8,7 @@ use futures::{FutureExt, Stream, StreamExt}; use nimiq_network_interface::{network::Network, request::RequestError}; use nimiq_primitives::key_nibbles::KeyNibbles; use parking_lot::RwLock; +use tokio::sync::Notify; use super::{RequestChunk, ResponseChunk}; use crate::sync::{peer_list::PeerList, sync_queue::SyncQueue}; @@ -72,6 +73,11 @@ impl ChunkRequestComponent { ) -> Result { network.request::(request, peer_id).await } + + pub fn notify_nonempty_peers(&self) -> Option> { + let peers = self.peers.read(); + peers.is_empty().then(|| peers.notify_nonempty()) + } } impl Stream for ChunkRequestComponent { diff --git a/consensus/src/sync/live/state_queue/mod.rs b/consensus/src/sync/live/state_queue/mod.rs index 629d05d7f5..5a3d9a17eb 100644 --- a/consensus/src/sync/live/state_queue/mod.rs +++ b/consensus/src/sync/live/state_queue/mod.rs @@ -9,7 +9,7 @@ use std::{ task::{Context, Poll}, }; -use futures::{stream::BoxStream, Stream, StreamExt}; +use futures::{future::BoxFuture, stream::BoxStream, Stream, StreamExt}; use nimiq_block::Block; use nimiq_blockchain::Blockchain; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent}; @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; use self::chunk_request_component::ChunkRequestComponent; use super::{ block_queue::BlockAndId, - queue::{ChunkAndId, LiveSyncQueue, QueueConfig}, + queue::{ChunkAndId, QueueConfig}, }; use crate::sync::live::diff_queue::{DiffQueue, QueuedDiff}; @@ -173,6 +173,8 @@ pub struct StateQueue { /// The blockchain event stream. blockchain_rx: BoxStream<'static, BlockchainEvent>, + + nonempty_peers: Option>, } impl StateQueue { @@ -211,6 +213,7 @@ impl StateQueue { current_macro_height, start_key, blockchain_rx, + nonempty_peers: None, } } @@ -568,9 +571,24 @@ impl Stream for StateQueue { } } + // Check if we have peers. + if self.nonempty_peers.is_none() { + self.nonempty_peers = self + .chunk_request_component + .notify_nonempty_peers() + .map(|notify| Box::pin(async move { notify.notified().await }) as BoxFuture<()>); + } + if let Some(nonempty_peers) = &mut self.nonempty_peers { + if nonempty_peers.as_mut().poll(cx).is_ready() { + self.nonempty_peers = None; + } + } + // Obvious TOCTOU, but it would otherwise need to lock the + // `chunk_request_component`'s peer list. + // // Request chunks via ChunkRequestComponent. - if !self.chunk_request_component.has_pending_requests() && self.num_peers() > 0 { - self.request_chunk(); + if self.nonempty_peers.is_none() && !self.chunk_request_component.has_pending_requests() { + println!("{}", self.request_chunk()); } // Receive blocks with diffs from DiffQueue. diff --git a/consensus/src/sync/peer_list.rs b/consensus/src/sync/peer_list.rs index 23e697f8ed..2896065f28 100644 --- a/consensus/src/sync/peer_list.rs +++ b/consensus/src/sync/peer_list.rs @@ -1,6 +1,7 @@ -use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex}; +use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex, sync::Arc}; use nimiq_network_interface::network::Network; +use tokio::sync::Notify; /// A list of peers to be used while syncing. /// This contains an ordered list of peers as well as a hashmap. @@ -9,6 +10,7 @@ use nimiq_network_interface::network::Network; pub struct PeerList { peers_set: HashSet, peers: Vec, + notify_nonempty: Arc, } /// Stores an index into a [`PeerList`]. @@ -58,15 +60,17 @@ impl Default for PeerList { Self { peers_set: Default::default(), peers: Default::default(), + notify_nonempty: Default::default(), } } } impl Clone for PeerList { - fn clone(&self) -> Self { - Self { + fn clone(&self) -> PeerList { + PeerList { peers_set: self.peers_set.clone(), peers: self.peers.clone(), + notify_nonempty: Default::default(), } } } @@ -74,6 +78,9 @@ impl Clone for PeerList { impl PeerList { pub fn add_peer(&mut self, peer_id: N::PeerId) -> bool { if self.peers_set.insert(peer_id) { + if self.peers.is_empty() { + self.notify_nonempty.notify_waiters(); + } self.peers.push(peer_id); return true; } @@ -130,6 +137,10 @@ impl PeerList { peer_index.index = peer_index.index.wrapping_add(1) % self.peers.len(); Some(self.peers[peer_index.index]) } + + pub fn notify_nonempty(&self) -> Arc { + self.notify_nonempty.clone() + } } impl> Index for PeerList { diff --git a/consensus/src/sync/sync_queue.rs b/consensus/src/sync/sync_queue.rs index 6a442c41df..bd2204e278 100644 --- a/consensus/src/sync/sync_queue.rs +++ b/consensus/src/sync/sync_queue.rs @@ -6,12 +6,12 @@ use std::{ future::Future, pin::Pin, sync::Arc, - task::{Context, Poll}, + task::{Context, Poll, Waker}, }; use futures::{future, future::BoxFuture, FutureExt, Stream, StreamExt}; use nimiq_network_interface::network::{Network, PubsubId}; -use nimiq_utils::stream::FuturesUnordered; +use nimiq_utils::{stream::FuturesUnordered, WakerExt as _}; use parking_lot::RwLock; use pin_project::pin_project; @@ -104,6 +104,7 @@ pub struct SyncQueue< request_fn: RequestFn, verify_fn: VerifyFn, verify_state: TVerifyState, + waker: Option, } impl SyncQueue @@ -172,6 +173,7 @@ where request_fn, verify_fn, verify_state: initial_verify_state, + waker: None, } } @@ -255,6 +257,8 @@ where self.queued_outputs.len(), self.peers.read().len(), ); + + self.waker.wake(); } } @@ -319,6 +323,9 @@ where for id in ids { self.ids_to_request.push_back(id); } + + // Adding new ids needs to wake the task that is polling the SyncQueue. + self.waker.wake(); } /// Truncates the stored ids, retaining only the first `len` elements. @@ -357,6 +364,8 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.waker.store_waker(cx); + // Try to request more objects. self.try_push_futures();