Skip to content

Commit

Permalink
Allow waking on non-empty peer list
Browse files Browse the repository at this point in the history
This fixes a waking bug in the `Stream` implementation of `StateQueue`,
as it checked for a nonempty peer list but did not register a waker for
it.

CC #2550
  • Loading branch information
hrxi committed Aug 14, 2024
1 parent 093ef18 commit e2f0ae0
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::{FutureExt, Stream, StreamExt};
use nimiq_network_interface::{network::Network, request::RequestError};
use nimiq_primitives::key_nibbles::KeyNibbles;
use parking_lot::RwLock;
use tokio::sync::Notify;

use super::{RequestChunk, ResponseChunk};
use crate::sync::{peer_list::PeerList, sync_queue::SyncQueue};
Expand Down Expand Up @@ -72,6 +73,11 @@ impl<N: Network> ChunkRequestComponent<N> {
) -> Result<ResponseChunk, RequestError> {
network.request::<RequestChunk>(request, peer_id).await
}

pub fn notify_nonempty_peers(&self) -> Option<Arc<Notify>> {
let peers = self.peers.read();
peers.is_empty().then(|| peers.notify_nonempty())
}
}

impl<N: Network> Stream for ChunkRequestComponent<N> {
Expand Down
26 changes: 22 additions & 4 deletions consensus/src/sync/live/state_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
task::{Context, Poll},
};

use futures::{stream::BoxStream, Stream, StreamExt};
use futures::{future::BoxFuture, stream::BoxStream, Stream, StreamExt};
use nimiq_block::Block;
use nimiq_blockchain::Blockchain;
use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent};
Expand All @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize};
use self::chunk_request_component::ChunkRequestComponent;
use super::{
block_queue::BlockAndId,
queue::{ChunkAndId, LiveSyncQueue, QueueConfig},
queue::{ChunkAndId, QueueConfig},
};
use crate::sync::live::diff_queue::{DiffQueue, QueuedDiff};

Expand Down Expand Up @@ -173,6 +173,8 @@ pub struct StateQueue<N: Network> {

/// The blockchain event stream.
blockchain_rx: BoxStream<'static, BlockchainEvent>,

nonempty_peers: Option<BoxFuture<'static, ()>>,
}

impl<N: Network> StateQueue<N> {
Expand Down Expand Up @@ -211,6 +213,7 @@ impl<N: Network> StateQueue<N> {
current_macro_height,
start_key,
blockchain_rx,
nonempty_peers: None,
}
}

Expand Down Expand Up @@ -568,9 +571,24 @@ impl<N: Network> Stream for StateQueue<N> {
}
}

// Check if we have peers.
if self.nonempty_peers.is_none() {
self.nonempty_peers = self
.chunk_request_component
.notify_nonempty_peers()
.map(|notify| Box::pin(async move { notify.notified().await }) as BoxFuture<()>);
}
if let Some(nonempty_peers) = &mut self.nonempty_peers {
if nonempty_peers.as_mut().poll(cx).is_ready() {
self.nonempty_peers = None;
}
}
// Obvious TOCTOU, but it would otherwise need to lock the
// `chunk_request_component`'s peer list.
//
// Request chunks via ChunkRequestComponent.
if !self.chunk_request_component.has_pending_requests() && self.num_peers() > 0 {
self.request_chunk();
if self.nonempty_peers.is_none() && !self.chunk_request_component.has_pending_requests() {
println!("{}", self.request_chunk());
}

// Receive blocks with diffs from DiffQueue.
Expand Down
17 changes: 14 additions & 3 deletions consensus/src/sync/peer_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex};
use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex, sync::Arc};

use nimiq_network_interface::network::Network;
use tokio::sync::Notify;

/// A list of peers to be used while syncing.
/// This contains an ordered list of peers as well as a hashmap.
Expand All @@ -9,6 +10,7 @@ use nimiq_network_interface::network::Network;
pub struct PeerList<N: Network> {
peers_set: HashSet<N::PeerId>,
peers: Vec<N::PeerId>,
notify_nonempty: Arc<Notify>,
}

/// Stores an index into a [`PeerList`].
Expand Down Expand Up @@ -58,22 +60,27 @@ impl<N: Network> Default for PeerList<N> {
Self {
peers_set: Default::default(),
peers: Default::default(),
notify_nonempty: Default::default(),
}
}
}

impl<N: Network> Clone for PeerList<N> {
fn clone(&self) -> Self {
Self {
fn clone(&self) -> PeerList<N> {
PeerList {
peers_set: self.peers_set.clone(),
peers: self.peers.clone(),
notify_nonempty: Default::default(),
}
}
}

impl<N: Network> PeerList<N> {
pub fn add_peer(&mut self, peer_id: N::PeerId) -> bool {
if self.peers_set.insert(peer_id) {
if self.peers.is_empty() {
self.notify_nonempty.notify_waiters();
}
self.peers.push(peer_id);
return true;
}
Expand Down Expand Up @@ -130,6 +137,10 @@ impl<N: Network> PeerList<N> {
peer_index.index = peer_index.index.wrapping_add(1) % self.peers.len();
Some(self.peers[peer_index.index])
}

pub fn notify_nonempty(&self) -> Arc<Notify> {
self.notify_nonempty.clone()
}
}

impl<N: Network, I: SliceIndex<[N::PeerId]>> Index<I> for PeerList<N> {
Expand Down
13 changes: 11 additions & 2 deletions consensus/src/sync/sync_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
task::{Context, Poll, Waker},
};

use futures::{future, future::BoxFuture, FutureExt, Stream, StreamExt};
use nimiq_network_interface::network::{Network, PubsubId};
use nimiq_utils::stream::FuturesUnordered;
use nimiq_utils::{stream::FuturesUnordered, WakerExt as _};
use parking_lot::RwLock;
use pin_project::pin_project;

Expand Down Expand Up @@ -104,6 +104,7 @@ pub struct SyncQueue<
request_fn: RequestFn<TId, TNetwork, TOutput, TError>,
verify_fn: VerifyFn<TId, TOutput, TVerifyState>,
verify_state: TVerifyState,
waker: Option<Waker>,
}

impl<TNetwork, TId, TOutput, TError> SyncQueue<TNetwork, TId, TOutput, TError, ()>
Expand Down Expand Up @@ -172,6 +173,7 @@ where
request_fn,
verify_fn,
verify_state: initial_verify_state,
waker: None,
}
}

Expand Down Expand Up @@ -255,6 +257,8 @@ where
self.queued_outputs.len(),
self.peers.read().len(),
);

self.waker.wake();
}
}

Expand Down Expand Up @@ -319,6 +323,9 @@ where
for id in ids {
self.ids_to_request.push_back(id);
}

// Adding new ids needs to wake the task that is polling the SyncQueue.
self.waker.wake();
}

/// Truncates the stored ids, retaining only the first `len` elements.
Expand Down Expand Up @@ -357,6 +364,8 @@ where
type Item = Result<TOutput, TId>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.waker.store_waker(cx);

// Try to request more objects.
self.try_push_futures();

Expand Down

0 comments on commit e2f0ae0

Please sign in to comment.