diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index bdf2aca9c778..54026070ec8b 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -35,11 +35,11 @@ use tracing::info; /// All blocks in a batch are fetched at the same time. #[must_use = "Stream does nothing unless polled"] #[derive(Debug)] -pub struct BodiesDownloader { +pub struct BodiesDownloader { /// The bodies client client: Arc, /// The consensus client - consensus: Arc>, + consensus: Arc>, /// The database handle provider: Provider, /// The maximum number of non-empty blocks per one request @@ -57,11 +57,11 @@ pub struct BodiesDownloader { /// The latest block number returned. latest_queued_block_number: Option, /// Requests in progress - in_progress_queue: BodiesRequestQueue, + in_progress_queue: BodiesRequestQueue, /// Buffered responses - buffered_responses: BinaryHeap>, + buffered_responses: BinaryHeap>, /// Queued body responses that can be returned for insertion into the database. - queued_bodies: Vec>, + queued_bodies: Vec>, /// The bodies downloader metrics. metrics: BodyDownloaderMetrics, } @@ -69,7 +69,7 @@ pub struct BodiesDownloader { impl BodiesDownloader where B: BodiesClient + 'static, - Provider: HeaderProvider + Unpin + 'static, + Provider: HeaderProvider + Unpin + 'static, { /// Returns the next contiguous request. fn next_headers_request(&self) -> DownloadResult>>> { @@ -193,14 +193,16 @@ where } /// Queues bodies and sets the latest queued block number - fn queue_bodies(&mut self, bodies: Vec>) { + fn queue_bodies(&mut self, bodies: Vec>) { self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number()); self.queued_bodies.extend(bodies); self.metrics.queued_blocks.set(self.queued_bodies.len() as f64); } /// Removes the next response from the buffer. - fn pop_buffered_response(&mut self) -> Option> { + fn pop_buffered_response( + &mut self, + ) -> Option> { let resp = self.buffered_responses.pop()?; self.metrics.buffered_responses.decrement(1.); self.buffered_blocks_size_bytes -= resp.size(); @@ -210,13 +212,10 @@ where } /// Adds a new response to the internal buffer - fn buffer_bodies_response( - &mut self, - response: Vec>, - ) { + fn buffer_bodies_response(&mut self, response: Vec>) { // take into account capacity let size = response.iter().map(BlockResponse::size).sum::() + - response.capacity() * mem::size_of::>(); + response.capacity() * mem::size_of::>(); let response = OrderedBodiesResponse { resp: response, size }; let response_len = response.len(); @@ -230,9 +229,7 @@ where } /// Returns a response if it's first block number matches the next expected. - fn try_next_buffered( - &mut self, - ) -> Option>> { + fn try_next_buffered(&mut self) -> Option>> { if let Some(next) = self.buffered_responses.peek() { let expected = self.next_expected_block_number(); let next_block_range = next.block_range(); @@ -258,9 +255,7 @@ where /// Returns the next batch of block bodies that can be returned if we have enough buffered /// bodies - fn try_split_next_batch( - &mut self, - ) -> Option>> { + fn try_split_next_batch(&mut self) -> Option>> { if self.queued_bodies.len() >= self.stream_batch_size { let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::>(); self.queued_bodies.shrink_to_fit(); @@ -292,12 +287,17 @@ where Self: BodyDownloader + 'static, { /// Spawns the downloader task via [`tokio::task::spawn`] - pub fn into_task(self) -> TaskDownloader<::Body> { + pub fn into_task( + self, + ) -> TaskDownloader<::Header, ::Body> { self.into_task_with(&TokioTaskExecutor::default()) } /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner. - pub fn into_task_with(self, spawner: &S) -> TaskDownloader<::Body> + pub fn into_task_with( + self, + spawner: &S, + ) -> TaskDownloader<::Header, ::Body> where S: TaskSpawner, { @@ -308,8 +308,9 @@ where impl BodyDownloader for BodiesDownloader where B: BodiesClient + 'static, - Provider: HeaderProvider
+ Unpin + 'static, + Provider: HeaderProvider + Unpin + 'static, { + type Header = Provider::Header; type Body = B::Body; /// Set a new download range (exclusive). @@ -358,9 +359,9 @@ where impl Stream for BodiesDownloader where B: BodiesClient + 'static, - Provider: HeaderProvider
+ Unpin + 'static, + Provider: HeaderProvider + Unpin + 'static, { - type Item = BodyDownloaderResult; + type Item = BodyDownloaderResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -442,13 +443,28 @@ where } #[derive(Debug)] -struct OrderedBodiesResponse { - resp: Vec>, +struct OrderedBodiesResponse { + resp: Vec>, /// The total size of the response in bytes size: usize, } -impl OrderedBodiesResponse { +impl OrderedBodiesResponse { + #[inline] + fn len(&self) -> usize { + self.resp.len() + } + + /// Returns the size of the response in bytes + /// + /// See [`BlockResponse::size`] + #[inline] + const fn size(&self) -> usize { + self.size + } +} + +impl OrderedBodiesResponse { /// Returns the block number of the first element /// /// # Panics @@ -464,36 +480,23 @@ impl OrderedBodiesResponse { fn block_range(&self) -> RangeInclusive { self.first_block_number()..=self.resp.last().expect("is not empty").block_number() } - - #[inline] - fn len(&self) -> usize { - self.resp.len() - } - - /// Returns the size of the response in bytes - /// - /// See [`BlockResponse::size`] - #[inline] - const fn size(&self) -> usize { - self.size - } } -impl PartialEq for OrderedBodiesResponse { +impl PartialEq for OrderedBodiesResponse { fn eq(&self, other: &Self) -> bool { self.first_block_number() == other.first_block_number() } } -impl Eq for OrderedBodiesResponse {} +impl Eq for OrderedBodiesResponse {} -impl PartialOrd for OrderedBodiesResponse { +impl PartialOrd for OrderedBodiesResponse { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for OrderedBodiesResponse { +impl Ord for OrderedBodiesResponse { fn cmp(&self, other: &Self) -> Ordering { self.first_block_number().cmp(&other.first_block_number()).reverse() } @@ -573,7 +576,7 @@ impl BodiesDownloaderBuilder { pub fn build( self, client: B, - consensus: Arc>, + consensus: Arc>, provider: Provider, ) -> BodiesDownloader where diff --git a/crates/net/downloaders/src/bodies/noop.rs b/crates/net/downloaders/src/bodies/noop.rs index dd3e6e9691b9..b7a9431a4d7b 100644 --- a/crates/net/downloaders/src/bodies/noop.rs +++ b/crates/net/downloaders/src/bodies/noop.rs @@ -9,18 +9,24 @@ use std::{fmt::Debug, ops::RangeInclusive}; /// A [`BodyDownloader`] implementation that does nothing. #[derive(Debug, Default)] #[non_exhaustive] -pub struct NoopBodiesDownloader(std::marker::PhantomData); +pub struct NoopBodiesDownloader { + _header: std::marker::PhantomData, + _body: std::marker::PhantomData, +} -impl BodyDownloader for NoopBodiesDownloader { +impl + BodyDownloader for NoopBodiesDownloader +{ type Body = B; + type Header = H; fn set_download_range(&mut self, _: RangeInclusive) -> DownloadResult<()> { Ok(()) } } -impl Stream for NoopBodiesDownloader { - type Item = Result>, DownloadError>; +impl Stream for NoopBodiesDownloader { + type Item = Result>, DownloadError>; fn poll_next( self: std::pin::Pin<&mut Self>, diff --git a/crates/net/downloaders/src/bodies/queue.rs b/crates/net/downloaders/src/bodies/queue.rs index 5f1e8b059cf8..ed8c425e6114 100644 --- a/crates/net/downloaders/src/bodies/queue.rs +++ b/crates/net/downloaders/src/bodies/queue.rs @@ -1,5 +1,6 @@ use super::request::BodiesRequestFuture; use crate::metrics::BodyDownloaderMetrics; +use alloy_consensus::BlockHeader; use alloy_primitives::BlockNumber; use futures::{stream::FuturesUnordered, Stream}; use futures_util::StreamExt; @@ -19,18 +20,19 @@ use std::{ /// The wrapper around [`FuturesUnordered`] that keeps information /// about the blocks currently being requested. #[derive(Debug)] -pub(crate) struct BodiesRequestQueue { +pub(crate) struct BodiesRequestQueue { /// Inner body request queue. - inner: FuturesUnordered>, + inner: FuturesUnordered>, /// The downloader metrics. metrics: BodyDownloaderMetrics, /// Last requested block number. pub(crate) last_requested_block_number: Option, } -impl BodiesRequestQueue +impl BodiesRequestQueue where B: BodiesClient + 'static, + H: BlockHeader, { /// Create new instance of request queue. pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self { @@ -58,15 +60,15 @@ where pub(crate) fn push_new_request( &mut self, client: Arc, - consensus: Arc>, - request: Vec, + consensus: Arc>, + request: Vec>, ) { // Set last max requested block number self.last_requested_block_number = request .last() .map(|last| match self.last_requested_block_number { - Some(num) => last.number.max(num), - None => last.number, + Some(num) => last.number().max(num), + None => last.number(), }) .or(self.last_requested_block_number); // Create request and push into the queue. @@ -76,11 +78,12 @@ where } } -impl Stream for BodiesRequestQueue +impl Stream for BodiesRequestQueue where + H: BlockHeader + Send + Sync + Unpin + 'static, B: BodiesClient + 'static, { - type Item = DownloadResult>>; + type Item = DownloadResult>>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().inner.poll_next_unpin(cx) diff --git a/crates/net/downloaders/src/bodies/request.rs b/crates/net/downloaders/src/bodies/request.rs index 28cfdb61b7cd..92f46fa6fdd6 100644 --- a/crates/net/downloaders/src/bodies/request.rs +++ b/crates/net/downloaders/src/bodies/request.rs @@ -38,30 +38,31 @@ use std::{ /// All errors regarding the response cause the peer to get penalized, meaning that adversaries /// that try to give us bodies that do not match the requested order are going to be penalized /// and eventually disconnected. -pub(crate) struct BodiesRequestFuture { +pub(crate) struct BodiesRequestFuture { client: Arc, - consensus: Arc>, + consensus: Arc>, metrics: BodyDownloaderMetrics, /// Metrics for individual responses. This can be used to observe how the size (in bytes) of /// responses change while bodies are being downloaded. response_metrics: ResponseMetrics, // Headers to download. The collection is shrunk as responses are buffered. - pending_headers: VecDeque, + pending_headers: VecDeque>, /// Internal buffer for all blocks - buffer: Vec>, + buffer: Vec>, fut: Option, /// Tracks how many bodies we requested in the last request. last_request_len: Option, } -impl BodiesRequestFuture +impl BodiesRequestFuture where B: BodiesClient + 'static, + H: BlockHeader, { /// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request. pub(crate) fn new( client: Arc, - consensus: Arc>, + consensus: Arc>, metrics: BodyDownloaderMetrics, ) -> Self { Self { @@ -76,7 +77,7 @@ where } } - pub(crate) fn with_headers(mut self, headers: Vec) -> Self { + pub(crate) fn with_headers(mut self, headers: Vec>) -> Self { self.buffer.reserve_exact(headers.len()); self.pending_headers = VecDeque::from(headers); // Submit the request only if there are any headers to download. @@ -192,7 +193,7 @@ where if let Err(error) = self.consensus.validate_block_pre_execution(&block) { // Body is invalid, put the header back and return an error let hash = block.hash(); - let number = block.number; + let number = block.number(); self.pending_headers.push_front(block.header); return Err(DownloadError::BodyValidation { hash, @@ -213,11 +214,12 @@ where } } -impl Future for BodiesRequestFuture +impl Future for BodiesRequestFuture where + H: BlockHeader + Unpin + Send + Sync + 'static, B: BodiesClient + 'static, { - type Output = DownloadResult>>; + type Output = DownloadResult>>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index 89af9813e3cc..9377be78676c 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -24,15 +24,15 @@ pub const BODIES_TASK_BUFFER_SIZE: usize = 4; /// A [BodyDownloader] that drives a spawned [BodyDownloader] on a spawned task. #[derive(Debug)] #[pin_project] -pub struct TaskDownloader { +pub struct TaskDownloader { #[pin] - from_downloader: ReceiverStream>, + from_downloader: ReceiverStream>, to_downloader: UnboundedSender>, } // === impl TaskDownloader === -impl TaskDownloader { +impl TaskDownloader { /// Spawns the given `downloader` via [`tokio::task::spawn`] returns a [`TaskDownloader`] that's /// connected to that task. /// @@ -64,7 +64,7 @@ impl TaskDownloader { /// ``` pub fn spawn(downloader: T) -> Self where - T: BodyDownloader + 'static, + T: BodyDownloader
+ 'static, { Self::spawn_with(downloader, &TokioTaskExecutor::default()) } @@ -73,7 +73,7 @@ impl TaskDownloader { /// that's connected to that task. pub fn spawn_with(downloader: T, spawner: &S) -> Self where - T: BodyDownloader + 'static, + T: BodyDownloader
+ 'static, S: TaskSpawner, { let (bodies_tx, bodies_rx) = mpsc::channel(BODIES_TASK_BUFFER_SIZE); @@ -91,7 +91,10 @@ impl TaskDownloader { } } -impl BodyDownloader for TaskDownloader { +impl + BodyDownloader for TaskDownloader +{ + type Header = H; type Body = B; fn set_download_range(&mut self, range: RangeInclusive) -> DownloadResult<()> { @@ -100,8 +103,8 @@ impl BodyDownloader for TaskDownloader } } -impl Stream for TaskDownloader { - type Item = BodyDownloaderResult; +impl Stream for TaskDownloader { + type Item = BodyDownloaderResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().from_downloader.poll_next(cx) @@ -111,7 +114,7 @@ impl Stream for TaskDownloader { /// A [`BodyDownloader`] that runs on its own task struct SpawnedDownloader { updates: UnboundedReceiverStream>, - bodies_tx: PollSender>, + bodies_tx: PollSender>, downloader: T, } diff --git a/crates/net/p2p/src/bodies/downloader.rs b/crates/net/p2p/src/bodies/downloader.rs index 06f35fc9bd69..b80a308d8a18 100644 --- a/crates/net/p2p/src/bodies/downloader.rs +++ b/crates/net/p2p/src/bodies/downloader.rs @@ -5,7 +5,7 @@ use futures::Stream; use std::{fmt::Debug, ops::RangeInclusive}; /// Body downloader return type. -pub type BodyDownloaderResult = DownloadResult>>; +pub type BodyDownloaderResult = DownloadResult>>; /// A downloader capable of fetching and yielding block bodies from block headers. /// @@ -13,8 +13,11 @@ pub type BodyDownloaderResult = DownloadResult> + Unpin + Send + Sync + Stream> + Unpin { + /// The type of header that can be returned in a blck + type Header: Debug + Send + Sync + Unpin + 'static; + /// The type of the body that is being downloaded. type Body: Debug + Send + Sync + Unpin + 'static; diff --git a/crates/net/p2p/src/bodies/response.rs b/crates/net/p2p/src/bodies/response.rs index 02534ea09637..1b415246f544 100644 --- a/crates/net/p2p/src/bodies/response.rs +++ b/crates/net/p2p/src/bodies/response.rs @@ -1,10 +1,11 @@ +use alloy_consensus::BlockHeader; use alloy_primitives::{BlockNumber, U256}; use reth_primitives::{BlockBody, SealedBlock, SealedHeader}; -use reth_primitives_traits::{BlockHeader, InMemorySize}; +use reth_primitives_traits::InMemorySize; /// The block response #[derive(PartialEq, Eq, Debug, Clone)] -pub enum BlockResponse { +pub enum BlockResponse { /// Full block response (with transactions or ommers) Full(SealedBlock), /// The empty block response diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 0a0e4f10dbc9..6dff28bd39b7 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -87,7 +87,7 @@ pub fn build_pipeline( where N: ProviderNodeTypes, H: HeaderDownloader
> + 'static, - B: BodyDownloader> + 'static, + B: BodyDownloader
, Body = BodyTy> + 'static, Executor: BlockExecutorProvider, N::Primitives: NodePrimitives, { diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 88a1b96e249e..0f311b1bc9e0 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -5,7 +5,7 @@ use reth_db::{tables, transaction::DbTx}; use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut}; use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; use reth_primitives::StaticFileSegment; -use reth_primitives_traits::{Block, BlockBody}; +use reth_primitives_traits::{Block, BlockBody, BlockHeader}; use reth_provider::{ providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader, StorageLocation, @@ -56,7 +56,7 @@ pub struct BodyStage { /// The body downloader. downloader: D, /// Block response buffer. - buffer: Option>>, + buffer: Option>>, } impl BodyStage { @@ -72,9 +72,7 @@ impl BodyStage { unwind_block: Option, ) -> Result<(), StageError> where - Provider: DBProvider - + BlockReader
- + StaticFileProviderFactory, + Provider: DBProvider + BlockReader + StaticFileProviderFactory, { // Get id for the next tx_num of zero if there are no transactions. let next_tx_num = provider @@ -151,9 +149,9 @@ where Provider: DBProvider + StaticFileProviderFactory + StatsReader - + BlockReader
- + BlockWriter>, - D: BodyDownloader>, + + BlockReader + + BlockWriter>, + D: BodyDownloader>, { /// Return the id of the stage fn id(&self) -> StageId { @@ -764,6 +762,7 @@ mod tests { } impl BodyDownloader for TestBodyDownloader { + type Header = Header; type Body = BlockBody; fn set_download_range( @@ -786,7 +785,7 @@ mod tests { } impl Stream for TestBodyDownloader { - type Item = BodyDownloaderResult; + type Item = BodyDownloaderResult; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index c2a7c6ede02f..169d556348b2 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -258,7 +258,7 @@ pub(crate) fn missing_static_data_error( segment: StaticFileSegment, ) -> Result where - Provider: BlockReader
+ StaticFileProviderFactory, + Provider: BlockReader + StaticFileProviderFactory, { let mut last_block = static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();