Skip to content

Commit

Permalink
Stop requesting missing blocks if there are already pending missing b…
Browse files Browse the repository at this point in the history
…lock requests.

Buffer blocks if no missing block requests should be induced
  • Loading branch information
ii-cruz committed Nov 6, 2024
1 parent 84b322e commit ace44ad
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 61 deletions.
2 changes: 1 addition & 1 deletion consensus/src/messages/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl RequestMissingBlocks {
blockchain: &BlockchainProxy,
) -> Result<ResponseBlocks, ResponseBlocksError> {
// TODO We might want to do a sanity check on the locator hashes and reject the request if
// they they don't match up with the given target hash.
// they they don't match up with the given target hash.

// Build a HashSet from the given locator hashes.
let locators = HashSet::<Blake2bHash>::from_iter(self.locators.iter().cloned());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ impl<N: Network> BlockRequestComponent<N> {
self.pending_requests.contains(target_block_hash)
}

pub fn has_no_pending_requests(&self) -> bool {
self.pending_requests.is_empty()
}

pub fn add_peer(&self, peer_id: N::PeerId) {
self.peers.write().add_peer(peer_id);
}
Expand Down
79 changes: 50 additions & 29 deletions consensus/src/sync/live/block_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,6 @@ impl<N: Network> BlockQueue<N> {
let parent_known = blockchain.contains(block.parent_hash(), true);
drop(blockchain);

// Check if a macro block boundary was passed.
// If so prune the block buffer as well as pending requests.
let macro_height = Policy::last_macro_block(head_height);
if macro_height > self.current_macro_height {
self.current_macro_height = macro_height;
self.prune_pending_requests();
self.prune_buffer();
}

if block_number < head_height.saturating_sub(self.config.tolerate_past_max) {
block_source.ignore_block(&self.network);

Expand Down Expand Up @@ -216,17 +207,32 @@ impl<N: Network> BlockQueue<N> {
);
return Some(QueuedBlock::TooFarAhead(peer_id));
}
} else if block_number <= macro_height {
} else if block_number <= self.current_macro_height {
// Block is from a previous batch/epoch, discard it.
log::warn!(
"Discarding block {} - we're already at macro block #{}",
block,
macro_height
self.current_macro_height
);
block_source.ignore_block(&self.network);
} else {
// Block is inside the buffer window, put it in the buffer.
} else if self.request_component.has_no_pending_requests()
|| self.current_macro_height == Policy::last_macro_block(block.block_number())
{
// We only allow a new request missing blocks to start if the block is from the
// current batch or if there are no ongoing request.
self.buffer_and_request_missing_blocks(block, block_source);
} else {
// If we are on not within the same batch or we already are requesting blocks,
// we just buffer it without requesting for blocks.
// Any potential gaps will be filled after we sync up to the batch.
if self.insert_block_into_buffer(block, block_source.clone()) {
log::trace!(block_number, "Buffering block");
} else {
log::trace!(
block_number,
"Not buffering block - already known or exceeded the per peer limit",
);
}
}

None
Expand Down Expand Up @@ -622,6 +628,18 @@ impl<N: Network> BlockQueue<N> {
});
}

fn check_and_prune(&mut self) {
let block_macro_height = self.blockchain.read().macro_head().block_number();

// Check if a macro block boundary was passed.
// If so prune the block buffer as well as pending requests.
if self.current_macro_height < block_macro_height {
self.current_macro_height = block_macro_height;
self.prune_pending_requests();
self.prune_buffer();
}
}

/// Cleans up buffered blocks and removes blocks that precede the current macro block.
fn prune_buffer(&mut self) {
self.buffer.retain(|&block_number, blocks| {
Expand Down Expand Up @@ -801,22 +819,8 @@ impl<N: Network> Stream for BlockQueue<N> {
}
}

// Get as many blocks from the gossipsub stream as possible.
loop {
match self.block_stream.poll_next_unpin(cx) {
Poll::Ready(Some((block, block_source))) => {
if self.num_peers() > 0 {
log::debug!(%block, peer_id = %block_source.peer_id(), "Received block via gossipsub");
if let Some(block) = self.check_announced_block(block, block_source) {
return Poll::Ready(Some(block));
}
}
}
// If the block_stream is exhausted, we quit as well.
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => break,
}
}
// Prune anything that is no longer relevant before adding more requests and blocks to our structs.
self.check_and_prune();

// Read all the responses we got for our missing blocks requests.
loop {
Expand Down Expand Up @@ -859,6 +863,23 @@ impl<N: Network> Stream for BlockQueue<N> {
}
}

// Get as many blocks from the gossipsub stream as possible.
loop {
match self.block_stream.poll_next_unpin(cx) {
Poll::Ready(Some((block, block_source))) => {
if self.num_peers() > 0 {
log::debug!(%block, peer_id = %block_source.peer_id(), "Received block via gossipsub");
if let Some(block) = self.check_announced_block(block, block_source) {
return Poll::Ready(Some(block));
}
}
}
// If the block_stream is exhausted, we quit as well.
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => break,
}
}

self.waker.store_waker(cx);
Poll::Pending
}
Expand Down
Loading

0 comments on commit ace44ad

Please sign in to comment.