Skip to content

Commit

Permalink
feat(client): request range of headers from persistence
Browse files Browse the repository at this point in the history
`RangeBounds` is not object safe, so it may not be used
to request a range of heights.
  • Loading branch information
rustaceanrob committed Jan 7, 2025
1 parent 33bb4a9 commit f6c24f4
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 3 deletions.
18 changes: 18 additions & 0 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate alloc;
use std::{
collections::{BTreeMap, HashSet},
ops::Range,
sync::Arc,
};

Expand Down Expand Up @@ -894,6 +895,23 @@ impl<H: HeaderStore> Chain<H> {
}
}

pub(crate) async fn fetch_header_range(
&self,
range: Range<u32>,
) -> Result<BTreeMap<u32, Header>, HeaderPersistenceError<H::Error>> {
let mut db = self.db.lock().await;
let range_opt = db.load(range).await;
if range_opt.is_err() {
self.dialog
.send_warning(Warning::FailedPersistance {
warning: "Unexpected error fetching a range of headers from the header store"
.to_string(),
})
.await;
}
range_opt.map_err(HeaderPersistenceError::Database)
}

// Reset the compact filter queue because we received a new block
pub(crate) fn clear_compact_filter_queue(&mut self) {
self.cf_header_chain.clear_queue();
Expand Down
23 changes: 21 additions & 2 deletions src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bitcoin::BlockHash;
#[cfg(not(feature = "filter-control"))]
use bitcoin::ScriptBuf;
use bitcoin::Transaction;
use std::time::Duration;
use std::{collections::BTreeMap, ops::Range, time::Duration};
use tokio::sync::mpsc;
pub use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
Expand All @@ -16,7 +16,7 @@ use crate::{Event, Log, TrustedPeer, TxBroadcast};
use super::{error::FetchBlockError, messages::BlockRequest, BlockReceiver, IndexedBlock};
use super::{
error::{ClientError, FetchHeaderError},
messages::{ClientMessage, HeaderRequest},
messages::{BatchHeaderRequest, ClientMessage, HeaderRequest},
};

/// A [`Client`] allows for communication with a running node.
Expand Down Expand Up @@ -208,6 +208,25 @@ impl EventSender {
.map_err(|_| FetchHeaderError::RecvError)?
}

/// Get a range of headers by the specified range.
///
/// # Errors
///
/// If the node has stopped running.
pub async fn get_header_range(
&self,
range: Range<u32>,
) -> Result<BTreeMap<u32, Header>, FetchHeaderError> {
let (tx, rx) =
tokio::sync::oneshot::channel::<Result<BTreeMap<u32, Header>, FetchHeaderError>>();
let message = BatchHeaderRequest::new(tx, range);
self.ntx
.send(ClientMessage::GetHeaderBatch(message))
.await
.map_err(|_| FetchHeaderError::SendError)?;
rx.await.map_err(|_| FetchHeaderError::RecvError)?
}

/// Request a block be fetched. Note that this method will request a block
/// from a connected peer's inventory, and may take an indefinite amount of
/// time, until a peer responds.
Expand Down
19 changes: 18 additions & 1 deletion src/core/messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, time::Duration};
use std::{collections::BTreeMap, ops::Range, time::Duration};

#[cfg(feature = "filter-control")]
use bitcoin::BlockHash;
Expand Down Expand Up @@ -158,6 +158,8 @@ pub(crate) enum ClientMessage {
AddPeer(TrustedPeer),
/// Request a header from a specified height.
GetHeader(HeaderRequest),
/// Request a range of headers.
GetHeaderBatch(BatchHeaderRequest),
}

type HeaderSender = tokio::sync::oneshot::Sender<Result<Header, FetchHeaderError>>;
Expand All @@ -174,6 +176,21 @@ impl HeaderRequest {
}
}

type BatchHeaderSender =
tokio::sync::oneshot::Sender<Result<BTreeMap<u32, Header>, FetchHeaderError>>;

#[derive(Debug)]
pub(crate) struct BatchHeaderRequest {
pub(crate) oneshot: BatchHeaderSender,
pub(crate) range: Range<u32>,
}

impl BatchHeaderRequest {
pub(crate) fn new(oneshot: BatchHeaderSender, range: Range<u32>) -> Self {
Self { oneshot, range }
}
}

pub(crate) type BlockSender = tokio::sync::oneshot::Sender<Result<IndexedBlock, FetchBlockError>>;

#[cfg(feature = "filter-control")]
Expand Down
8 changes: 8 additions & 0 deletions src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await
};
},
ClientMessage::GetHeaderBatch(request) => {
let chain = self.chain.lock().await;
let range_opt = chain.fetch_header_range(request.range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() });
let send_result = request.oneshot.send(range_opt);
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await
};
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ async fn test_long_chain() {
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
let batch = sender.get_header_range(10_000..10_002).await.unwrap();
assert!(batch.is_empty());
sender.shutdown().await.unwrap();
rpc.stop().unwrap();
}
Expand Down Expand Up @@ -304,6 +306,8 @@ async fn test_sql_reorg() {
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
let batch = sender.get_header_range(0..10).await.unwrap();
assert!(!batch.is_empty());
sender.shutdown().await.unwrap();
// Reorganize the blocks
let old_best = best;
Expand Down

0 comments on commit f6c24f4

Please sign in to comment.