From 4f19318b2a34e097dc62a7f394fad5b8d5fbb658 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Mon, 16 Dec 2024 10:27:09 +0200 Subject: [PATCH 01/12] feat: add content downlaoder to qury data from the network --- Cargo.lock | 1 + crates/subnetworks/history/Cargo.toml | 1 + crates/subnetworks/history/src/downloader.rs | 154 +++++++++++++++++++ crates/subnetworks/history/src/lib.rs | 12 +- 4 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 crates/subnetworks/history/src/downloader.rs diff --git a/Cargo.lock b/Cargo.lock index 438023513..a6ab9bf18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6786,6 +6786,7 @@ dependencies = [ "env_logger 0.9.3", "ethereum_ssz", "ethportal-api", + "futures", "parking_lot 0.11.2", "portalnet", "quickcheck", diff --git a/crates/subnetworks/history/Cargo.toml b/crates/subnetworks/history/Cargo.toml index 6884a1d00..ff712c7c4 100644 --- a/crates/subnetworks/history/Cargo.toml +++ b/crates/subnetworks/history/Cargo.toml @@ -26,6 +26,7 @@ tree_hash.workspace = true trin-storage.workspace = true trin-validation.workspace = true utp-rs.workspace = true +futures = "0.3.31" [dev-dependencies] env_logger.workspace = true diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs new file mode 100644 index 000000000..590577c5c --- /dev/null +++ b/crates/subnetworks/history/src/downloader.rs @@ -0,0 +1,154 @@ +/// Downloader struct that load a data CSV file from disk with block number and block hashes +/// and do FIndContent queries in batches to download all the content from the csv file. +/// We don't save the content to disk, we just download it and drop +/// it. But we need to measure the time it takes to download all the content, the number of +/// queries and the number of bytes downloaded, the data ingress rate and the query rate. +use std::fs::File; +use std::{ + io::{self, BufRead}, + path::Path, +}; + +use anyhow::anyhow; +use ethportal_api::{ + utils::bytes::hex_decode, BlockBodyKey, BlockReceiptsKey, ContentValue, HistoryContentKey, + HistoryContentValue, +}; +use futures::{channel::oneshot, future::join_all}; +use portalnet::overlay::command::OverlayCommand; +use tokio::sync::mpsc::UnboundedSender; +use tracing::{error, info, warn}; + +/// The number of blocks to download in a single batch. +const BATCH_SIZE: usize = 40; +/// The path to the CSV file with block numbers and block hashes. +const CSV_PATH: &str = "ethereum_blocks_14000000_merge.csv"; + +#[derive(Clone)] +pub struct Downloader { + pub overlay_tx: UnboundedSender>, +} + +impl Downloader { + pub fn new(overlay_tx: UnboundedSender>) -> Self { + Self { overlay_tx } + } + + pub async fn start(self) -> io::Result<()> { + // set the csv path to a file in the root trin-history directory + info!("Opening CSV file"); + let csv_path = Path::new(CSV_PATH); + let file = File::open(csv_path)?; + let reader = io::BufReader::new(file); + info!("Reading CSV file"); + let lines: Vec<_> = reader.lines().collect::>()?; + // Create a hash table in memory with all the block hashes and block numbers + info!("Parsing CSV file"); + // skip the header of the csv file + let lines = &lines[1..]; + let blocks: Vec<(u64, String)> = lines.iter().map(|line| parse_line(line)).collect(); + info!("Processing blocks"); + let batches = blocks.chunks(BATCH_SIZE); + + for batch in batches { + self.clone().process_batches(batch.to_vec()).await; + } + + tokio::signal::ctrl_c() + .await + .expect("failed to pause until ctrl-c"); + + Ok(()) + } + + async fn process_batches(self, batch: Vec<(u64, String)>) { + let mut futures = Vec::new(); + + for (block_number, block_hash) in batch { + let block_body_content_key = generate_block_body_content_key(block_hash.clone()); + futures.push(self.find_content(block_body_content_key, block_number)); + info!( + block_number = block_number, + "Sent FindContent query for block body" + ); + let block_receipts_content_key = generate_block_receipts_content_key(block_hash); + futures.push(self.find_content(block_receipts_content_key, block_number)); + info!( + block_number = block_number, + "Sent FindContent query for block receipts" + ); + } + join_all(futures).await; + } + + async fn find_content( + &self, + content_key: HistoryContentKey, + block_number: u64, + ) -> anyhow::Result<()> { + let (tx, rx) = oneshot::channel(); + + let overlay_command = OverlayCommand::FindContentQuery { + target: content_key.clone(), + callback: tx, + config: Default::default(), + }; + + if let Err(err) = self.overlay_tx.send(overlay_command) { + warn!( + error = %err, + "Error submitting FindContent query to service" + ); + } + match rx.await { + Ok(result) => match result { + Ok(result) => { + HistoryContentValue::decode(&content_key, &result.0)?; + info!(block_number = block_number, "Downloaded content for block"); + Ok(()) + } + Err(err) => { + error!( + block_number = block_number, + error = %err, + "Error in FindContent query" + ); + Err(anyhow!("Error in FindContent query: {:?}", err)) + } + }, + Err(err) => { + error!( + block_number = block_number, + error = %err, + "Error receiving FindContent query response" + ); + Err(err.into()) + } + } + } +} + +fn parse_line(line: &str) -> (u64, String) { + let parts: Vec<&str> = line.split(',').collect(); + let block_number = parts[0].parse().expect("Failed to parse block number"); + let block_hash = parts[1].to_string(); + (block_number, block_hash) +} + +fn generate_block_body_content_key(block_hash: String) -> HistoryContentKey { + HistoryContentKey::BlockBody(BlockBodyKey { + block_hash: <[u8; 32]>::try_from( + hex_decode(&block_hash).expect("Failed to decode block hash"), + ) + .expect("Failed to convert block hash to byte array"), + }) +} + +fn generate_block_receipts_content_key(block_hash: String) -> HistoryContentKey { + HistoryContentKey::BlockReceipts(BlockReceiptsKey { + block_hash: <[u8; 32]>::try_from( + hex_decode(&block_hash).expect("Failed to decode block hash"), + ) + .expect("Failed to convert block hash to byte array"), + }) +} diff --git a/crates/subnetworks/history/src/lib.rs b/crates/subnetworks/history/src/lib.rs index 3fb403250..62d713c53 100644 --- a/crates/subnetworks/history/src/lib.rs +++ b/crates/subnetworks/history/src/lib.rs @@ -1,6 +1,7 @@ #![warn(clippy::unwrap_used)] #![warn(clippy::uninlined_format_args)] +mod downloader; pub mod events; mod jsonrpc; pub mod network; @@ -10,6 +11,7 @@ pub mod validation; use std::sync::Arc; +use downloader::Downloader; use ethportal_api::types::jsonrpc::request::HistoryJsonRpcRequest; use network::HistoryNetwork; use portalnet::{ @@ -22,7 +24,7 @@ use tokio::{ task::JoinHandle, time::{interval, Duration}, }; -use tracing::info; +use tracing::{error, info}; use trin_storage::PortalStorageConfig; use trin_validation::oracle::HeaderOracle; use utp_rs::socket::UtpSocket; @@ -102,6 +104,14 @@ pub fn spawn_history_network( // hacky test: make sure we establish a session with the boot node network.overlay.ping_bootnodes().await; + let overlay_tx = network.overlay.command_tx.clone(); + let downloader = Downloader::new(overlay_tx); + tokio::spawn(async move { + if let Err(e) = downloader.start().await { + error!("Downloader error: {:?}", e); + } + }); + tokio::signal::ctrl_c() .await .expect("failed to pause until ctrl-c"); From 55c6370d82a2847ead79edeafe069097bdab096e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Mon, 16 Dec 2024 13:18:07 +0200 Subject: [PATCH 02/12] feat: add metric for total bytes inbound received --- crates/metrics/src/overlay.rs | 19 +++++++++++++++++++ .../portalnet/src/overlay/service/manager.rs | 4 ++++ 2 files changed, 23 insertions(+) diff --git a/crates/metrics/src/overlay.rs b/crates/metrics/src/overlay.rs index 44680594a..d23f3a1ff 100644 --- a/crates/metrics/src/overlay.rs +++ b/crates/metrics/src/overlay.rs @@ -23,6 +23,8 @@ pub struct OverlayMetrics { pub utp_outcome_total: IntCounterVec, pub utp_active_gauge: IntGaugeVec, pub utp_connection_duration: HistogramVec, + /// Total bytes transferred inbound + pub bytes_inbound_total: IntCounterVec, pub validation_total: IntCounterVec, } @@ -60,6 +62,14 @@ impl OverlayMetrics { &["protocol", "direction"], registry )?; + let bytes_inbound_total = register_int_counter_vec_with_registry!( + opts!( + "trin_bytes_inbound_total", + "count all bytes transferred inbound" + ), + &["protocol"], + registry + )?; let validation_total = register_int_counter_vec_with_registry!( opts!( "trin_validation_total", @@ -73,6 +83,7 @@ impl OverlayMetrics { utp_outcome_total, utp_active_gauge, utp_connection_duration, + bytes_inbound_total, validation_total, }) } @@ -125,6 +136,14 @@ impl OverlayMetricsReporter { .with_label_values(&labels) .inc(); } + /// Increase the total bytes inbound metric by the given length. + pub fn report_bytes_inbound(&self, bytes_len: u64) { + let labels: [&str; 1] = [&self.protocol]; + self.overlay_metrics + .bytes_inbound_total + .with_label_values(&labels) + .inc_by(bytes_len) + } // // uTP metrics diff --git a/crates/portalnet/src/overlay/service/manager.rs b/crates/portalnet/src/overlay/service/manager.rs index 4af2d48b3..96c4f34ef 100644 --- a/crates/portalnet/src/overlay/service/manager.rs +++ b/crates/portalnet/src/overlay/service/manager.rs @@ -1754,6 +1754,10 @@ impl< query_trace_events_tx: Option>, ) { let mut content = content; + // report the total bytes of content received + utp_processing + .metrics + .report_bytes_inbound(content.len() as u64); // Operate under assumption that all content in the store is valid let local_value = utp_processing.store.read().get(&content_key); if let Ok(Some(val)) = local_value { From 506145b87f43a61212a8e20841c3347c1ea0759e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Tue, 17 Dec 2024 15:33:35 +0200 Subject: [PATCH 03/12] feat: add census with peer scoring to downloader --- Cargo.lock | 1 + bin/portal-bridge/src/census/mod.rs | 29 +++++++++-- bin/portal-bridge/src/census/network.rs | 16 +++--- bin/portal-bridge/src/main.rs | 6 ++- crates/ethportal-api/src/types/query_trace.rs | 2 +- crates/subnetworks/history/Cargo.toml | 1 + crates/subnetworks/history/src/downloader.rs | 49 ++++++++++++++++--- 7 files changed, 82 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a6ab9bf18..b3e473406 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6788,6 +6788,7 @@ dependencies = [ "ethportal-api", "futures", "parking_lot 0.11.2", + "portal-bridge", "portalnet", "quickcheck", "rand", diff --git a/bin/portal-bridge/src/census/mod.rs b/bin/portal-bridge/src/census/mod.rs index 7fb61a974..5b0d537ef 100644 --- a/bin/portal-bridge/src/census/mod.rs +++ b/bin/portal-bridge/src/census/mod.rs @@ -11,7 +11,7 @@ use thiserror::Error; use tokio::task::JoinHandle; use tracing::{error, info, Instrument}; -use crate::cli::BridgeConfig; +use crate::cli::ClientType; mod network; mod peer; @@ -51,11 +51,30 @@ impl Census { const SUPPORTED_SUBNETWORKS: [Subnetwork; 3] = [Subnetwork::Beacon, Subnetwork::History, Subnetwork::State]; - pub fn new(client: HttpClient, bridge_config: &BridgeConfig) -> Self { + pub fn new( + client: HttpClient, + enr_offer_limit: usize, + filter_clients: Vec, + ) -> Self { Self { - history: Network::new(client.clone(), Subnetwork::History, bridge_config), - state: Network::new(client.clone(), Subnetwork::State, bridge_config), - beacon: Network::new(client.clone(), Subnetwork::Beacon, bridge_config), + history: Network::new( + client.clone(), + Subnetwork::History, + enr_offer_limit, + filter_clients.clone(), + ), + state: Network::new( + client.clone(), + Subnetwork::State, + enr_offer_limit, + filter_clients.clone(), + ), + beacon: Network::new( + client.clone(), + Subnetwork::Beacon, + enr_offer_limit, + filter_clients, + ), initialized: false, } } diff --git a/bin/portal-bridge/src/census/network.rs b/bin/portal-bridge/src/census/network.rs index 3191d2644..1250bc1ea 100644 --- a/bin/portal-bridge/src/census/network.rs +++ b/bin/portal-bridge/src/census/network.rs @@ -20,10 +20,7 @@ use super::{ peers::Peers, scoring::{AdditiveWeight, PeerSelector}, }; -use crate::{ - census::CensusError, - cli::{BridgeConfig, ClientType}, -}; +use crate::{census::CensusError, cli::ClientType}; /// The result of the liveness check. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -78,7 +75,12 @@ pub(super) struct Network { } impl Network { - pub fn new(client: HttpClient, subnetwork: Subnetwork, bridge_config: &BridgeConfig) -> Self { + pub fn new( + client: HttpClient, + subnetwork: Subnetwork, + enr_offer_limit: usize, + filter_clients: Vec, + ) -> Self { if !matches!( subnetwork, Subnetwork::History | Subnetwork::Beacon | Subnetwork::State @@ -89,11 +91,11 @@ impl Network { Self { peers: Peers::new(PeerSelector::new( AdditiveWeight::default(), - bridge_config.enr_offer_limit, + enr_offer_limit, )), client, subnetwork, - filter_clients: bridge_config.filter_clients.to_vec(), + filter_clients, } } diff --git a/bin/portal-bridge/src/main.rs b/bin/portal-bridge/src/main.rs index 4515fcca9..10445f953 100644 --- a/bin/portal-bridge/src/main.rs +++ b/bin/portal-bridge/src/main.rs @@ -49,7 +49,11 @@ async fn main() -> Result<(), Box> { .contains(&Subnetwork::State) { // Create and initialize the census to acquire critical view of network before gossiping - let mut census = Census::new(portal_client.clone(), &bridge_config); + let mut census = Census::new( + portal_client.clone(), + bridge_config.enr_offer_limit, + bridge_config.filter_clients, + ); census_handle = Some(census.init([Subnetwork::State]).await?); let state_bridge = StateBridge::new( diff --git a/crates/ethportal-api/src/types/query_trace.rs b/crates/ethportal-api/src/types/query_trace.rs index 670139770..191f2261f 100644 --- a/crates/ethportal-api/src/types/query_trace.rs +++ b/crates/ethportal-api/src/types/query_trace.rs @@ -123,7 +123,7 @@ impl QueryTrace { } /// Returns milliseconds since the time provided. - fn timestamp_millis_u64(since: u64) -> u64 { + pub fn timestamp_millis_u64(since: u64) -> u64 { // Convert `since` (milliseconds) to a `SystemTime` let since_time = UNIX_EPOCH + Duration::from_millis(since); diff --git a/crates/subnetworks/history/Cargo.toml b/crates/subnetworks/history/Cargo.toml index ff712c7c4..7ffa5edb8 100644 --- a/crates/subnetworks/history/Cargo.toml +++ b/crates/subnetworks/history/Cargo.toml @@ -26,6 +26,7 @@ tree_hash.workspace = true trin-storage.workspace = true trin-validation.workspace = true utp-rs.workspace = true +portal-bridge.workspace = true futures = "0.3.31" [dev-dependencies] diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs index 590577c5c..b4d9e6eeb 100644 --- a/crates/subnetworks/history/src/downloader.rs +++ b/crates/subnetworks/history/src/downloader.rs @@ -7,34 +7,49 @@ use std::fs::File; use std::{ io::{self, BufRead}, path::Path, + time::Duration, }; use anyhow::anyhow; use ethportal_api::{ - utils::bytes::hex_decode, BlockBodyKey, BlockReceiptsKey, ContentValue, HistoryContentKey, - HistoryContentValue, + jsonrpsee::http_client::{HttpClient, HttpClientBuilder}, + types::{cli::DEFAULT_WEB3_HTTP_ADDRESS, network::Subnetwork, query_trace::QueryTrace}, + utils::bytes::hex_decode, + BlockBodyKey, BlockReceiptsKey, ContentValue, HistoryContentKey, HistoryContentValue, }; use futures::{channel::oneshot, future::join_all}; -use portalnet::overlay::command::OverlayCommand; +use portal_bridge::census::Census; +use portalnet::overlay::{command::OverlayCommand, config::FindContentConfig}; use tokio::sync::mpsc::UnboundedSender; use tracing::{error, info, warn}; /// The number of blocks to download in a single batch. -const BATCH_SIZE: usize = 40; +const BATCH_SIZE: usize = 3; /// The path to the CSV file with block numbers and block hashes. const CSV_PATH: &str = "ethereum_blocks_14000000_merge.csv"; #[derive(Clone)] pub struct Downloader { + pub census: Census, pub overlay_tx: UnboundedSender>, } impl Downloader { pub fn new(overlay_tx: UnboundedSender>) -> Self { - Self { overlay_tx } + let http_client: HttpClient = HttpClientBuilder::default() + // increase default timeout to allow for trace_gossip requests that can take a long + // time + .request_timeout(Duration::from_secs(120)) + .build(DEFAULT_WEB3_HTTP_ADDRESS) + .map_err(|e| e.to_string()) + .expect("Failed to build http client"); + + // BUild hhtp client binded to the current node web3rpc + let census = Census::new(http_client, 0, vec![]); + Self { overlay_tx, census } } - pub async fn start(self) -> io::Result<()> { + pub async fn start(mut self) -> io::Result<()> { // set the csv path to a file in the root trin-history directory info!("Opening CSV file"); let csv_path = Path::new(CSV_PATH); @@ -47,6 +62,14 @@ impl Downloader { // skip the header of the csv file let lines = &lines[1..]; let blocks: Vec<(u64, String)> = lines.iter().map(|line| parse_line(line)).collect(); + // Initialize the census with the history subnetwork + let _ = Some( + self.census + .init([Subnetwork::History]) + .await + .expect("Failed to initialize Census"), + ); + info!("Processing blocks"); let batches = blocks.chunks(BATCH_SIZE); @@ -91,7 +114,10 @@ impl Downloader { let overlay_command = OverlayCommand::FindContentQuery { target: content_key.clone(), callback: tx, - config: Default::default(), + config: FindContentConfig { + is_trace: true, + ..Default::default() + }, }; if let Err(err) = self.overlay_tx.send(overlay_command) { @@ -104,7 +130,14 @@ impl Downloader { Ok(result) => match result { Ok(result) => { HistoryContentValue::decode(&content_key, &result.0)?; - info!(block_number = block_number, "Downloaded content for block"); + let duration_ms = QueryTrace::timestamp_millis_u64( + result.2.expect("QueryTrace not found").started_at_ms, + ); + info!( + block_number = block_number, + query_duration = duration_ms, + "Downloaded content for block" + ); Ok(()) } Err(err) => { From 41d23c07d29d68df48a6d635a0841f7c678e9ac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Tue, 7 Jan 2025 14:43:31 +0200 Subject: [PATCH 04/12] feat: enable peer scoring --- crates/subnetworks/history/Cargo.toml | 1 + crates/subnetworks/history/src/downloader.rs | 215 +++++++++++++------ crates/subnetworks/history/src/lib.rs | 4 +- 3 files changed, 156 insertions(+), 64 deletions(-) diff --git a/crates/subnetworks/history/Cargo.toml b/crates/subnetworks/history/Cargo.toml index 7ffa5edb8..a53f781d7 100644 --- a/crates/subnetworks/history/Cargo.toml +++ b/crates/subnetworks/history/Cargo.toml @@ -20,6 +20,7 @@ ethportal-api.workspace = true parking_lot.workspace = true portalnet.workspace = true serde_json.workspace = true +ssz_types.workspace = true tokio.workspace = true tracing.workspace = true tree_hash.workspace = true diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs index b4d9e6eeb..0c8585e5b 100644 --- a/crates/subnetworks/history/src/downloader.rs +++ b/crates/subnetworks/history/src/downloader.rs @@ -1,41 +1,71 @@ /// Downloader struct that load a data CSV file from disk with block number and block hashes /// and do FIndContent queries in batches to download all the content from the csv file. /// We don't save the content to disk, we just download it and drop -/// it. But we need to measure the time it takes to download all the content, the number of -/// queries and the number of bytes downloaded, the data ingress rate and the query rate. +/// it. use std::fs::File; use std::{ + fmt::{Display, Formatter}, io::{self, BufRead}, path::Path, + sync::Arc, time::Duration, }; use anyhow::anyhow; use ethportal_api::{ jsonrpsee::http_client::{HttpClient, HttpClientBuilder}, - types::{cli::DEFAULT_WEB3_HTTP_ADDRESS, network::Subnetwork, query_trace::QueryTrace}, + types::{ + distance::XorMetric, + network::Subnetwork, + portal_wire::{Content, OfferTrace}, + }, utils::bytes::hex_decode, BlockBodyKey, BlockReceiptsKey, ContentValue, HistoryContentKey, HistoryContentValue, + OverlayContentKey, }; -use futures::{channel::oneshot, future::join_all}; +use futures::future::join_all; use portal_bridge::census::Census; -use portalnet::overlay::{command::OverlayCommand, config::FindContentConfig}; -use tokio::sync::mpsc::UnboundedSender; -use tracing::{error, info, warn}; +use portalnet::{constants::DEFAULT_WEB3_HTTP_ADDRESS, overlay::protocol::OverlayProtocol}; +use ssz_types::BitList; +use tracing::{info, warn}; + +use crate::{storage::HistoryStorage, validation::ChainHistoryValidator}; /// The number of blocks to download in a single batch. -const BATCH_SIZE: usize = 3; +const BATCH_SIZE: usize = 100; +/// The max number of ENRs to send FindContent queries to. +const CENSUS_ENR_LIMIT: usize = 4; /// The path to the CSV file with block numbers and block hashes. const CSV_PATH: &str = "ethereum_blocks_14000000_merge.csv"; +enum ContentType { + BlockBody, + BlockReceipts, +} + +impl Display for ContentType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ContentType::BlockBody => write!(f, "BlockBody"), + ContentType::BlockReceipts => write!(f, "BlockReceipts"), + } + } +} + #[derive(Clone)] pub struct Downloader { pub census: Census, - pub overlay_tx: UnboundedSender>, + pub overlay_arc: + Arc>, } impl Downloader { - pub fn new(overlay_tx: UnboundedSender>) -> Self { + pub fn new( + overlay_arc: Arc< + OverlayProtocol, + >, + ) -> Self { + // Build hhtp client bound to the current node web3rpc let http_client: HttpClient = HttpClientBuilder::default() // increase default timeout to allow for trace_gossip requests that can take a long // time @@ -44,9 +74,11 @@ impl Downloader { .map_err(|e| e.to_string()) .expect("Failed to build http client"); - // BUild hhtp client binded to the current node web3rpc - let census = Census::new(http_client, 0, vec![]); - Self { overlay_tx, census } + let census = Census::new(http_client, CENSUS_ENR_LIMIT, vec![]); + Self { + overlay_arc, + census, + } } pub async fn start(mut self) -> io::Result<()> { @@ -57,7 +89,6 @@ impl Downloader { let reader = io::BufReader::new(file); info!("Reading CSV file"); let lines: Vec<_> = reader.lines().collect::>()?; - // Create a hash table in memory with all the block hashes and block numbers info!("Parsing CSV file"); // skip the header of the csv file let lines = &lines[1..]; @@ -89,17 +120,17 @@ impl Downloader { for (block_number, block_hash) in batch { let block_body_content_key = generate_block_body_content_key(block_hash.clone()); - futures.push(self.find_content(block_body_content_key, block_number)); - info!( - block_number = block_number, - "Sent FindContent query for block body" - ); + futures.push(self.find_content( + block_body_content_key, + block_number, + ContentType::BlockBody, + )); let block_receipts_content_key = generate_block_receipts_content_key(block_hash); - futures.push(self.find_content(block_receipts_content_key, block_number)); - info!( - block_number = block_number, - "Sent FindContent query for block receipts" - ); + futures.push(self.find_content( + block_receipts_content_key, + block_number, + ContentType::BlockReceipts, + )); } join_all(futures).await; } @@ -108,56 +139,116 @@ impl Downloader { &self, content_key: HistoryContentKey, block_number: u64, + content_type: ContentType, ) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - - let overlay_command = OverlayCommand::FindContentQuery { - target: content_key.clone(), - callback: tx, - config: FindContentConfig { - is_trace: true, - ..Default::default() - }, + // Select interested peers from the census + let enrs = self + .census + .select_peers(Subnetwork::History, &content_key.content_id()) + .expect("Failed to select peers"); + // Send FindContent query to the interested peers + if enrs.is_empty() { + warn!( + block_number = block_number, + content_type = %content_type, + "No peers found for block. Skipping" + ); + return Err(anyhow!("No peers found for block {block_number}")); }; - if let Err(err) = self.overlay_tx.send(overlay_command) { - warn!( - error = %err, - "Error submitting FindContent query to service" + for (index, enr) in enrs.iter().enumerate() { + info!( + block_number = block_number, + content_type = %content_type, + peer_index = index, + "Sending FindContent query to peer" ); - } - match rx.await { - Ok(result) => match result { - Ok(result) => { - HistoryContentValue::decode(&content_key, &result.0)?; - let duration_ms = QueryTrace::timestamp_millis_u64( - result.2.expect("QueryTrace not found").started_at_ms, - ); - info!( + + let result = self + .overlay_arc + .send_find_content(enr.clone(), content_key.to_bytes()) + .await?; + let content = result.0; + + match content { + Content::ConnectionId(_) => { + // Should not return connection ID, should always return the content + warn!( block_number = block_number, - query_duration = duration_ms, - "Downloaded content for block" + content_type = %content_type, + "Received ConnectionId content" ); - Ok(()) + self.census.record_offer_result( + Subnetwork::History, + enr.node_id(), + 0, + Duration::from_secs(0), + &OfferTrace::Failed, + ); + continue; } - Err(err) => { - error!( + Content::Content(content_bytes) => { + let content = HistoryContentValue::decode(&content_key, &content_bytes); + + match content { + Ok(_) => { + info!( + block_number = block_number, + content_type = %content_type, + "Received content from peer" + ); + self.census.record_offer_result( + Subnetwork::History, + enr.node_id(), + content_bytes.len(), + Duration::from_secs(0), + &OfferTrace::Success( + BitList::with_capacity(1).expect("Failed to create bitlist"), + ), + ); + return Ok(()); + } + Err(_) => { + warn!( + block_number = block_number, + content_type = %content_type, + "Failed to parse content from peer, invalid content" + ); + self.census.record_offer_result( + Subnetwork::History, + enr.node_id(), + 0, + Duration::from_secs(0), + &OfferTrace::Failed, + ); + continue; + } + } + } + Content::Enrs(_) => { + // Content not found + warn!( block_number = block_number, - error = %err, - "Error in FindContent query" + content_type = %content_type, + "Received Enrs content, content not found from peer" + ); + self.census.record_offer_result( + Subnetwork::History, + enr.node_id(), + 0, + Duration::from_secs(0), + &OfferTrace::Failed, ); - Err(anyhow!("Error in FindContent query: {:?}", err)) + continue; } - }, - Err(err) => { - error!( - block_number = block_number, - error = %err, - "Error receiving FindContent query response" - ); - Err(err.into()) } } + warn!( + block_number = block_number, + content_type = %content_type, + "Failed to find content for block" + ); + Err(anyhow!("Failed to find content for block")) } } diff --git a/crates/subnetworks/history/src/lib.rs b/crates/subnetworks/history/src/lib.rs index 62d713c53..5013fd9a0 100644 --- a/crates/subnetworks/history/src/lib.rs +++ b/crates/subnetworks/history/src/lib.rs @@ -104,8 +104,8 @@ pub fn spawn_history_network( // hacky test: make sure we establish a session with the boot node network.overlay.ping_bootnodes().await; - let overlay_tx = network.overlay.command_tx.clone(); - let downloader = Downloader::new(overlay_tx); + let overlay_arc = network.overlay.clone(); + let downloader = Downloader::new(overlay_arc); tokio::spawn(async move { if let Err(e) = downloader.start().await { error!("Downloader error: {:?}", e); From 519406666f70e2dc180aa43c61a5672019a56bc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Mon, 13 Jan 2025 15:32:32 +0200 Subject: [PATCH 05/12] chore: report number of failed outgoing requests --- crates/metrics/src/overlay.rs | 26 +++++++++++++++++++ .../portalnet/src/overlay/service/manager.rs | 6 ++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/crates/metrics/src/overlay.rs b/crates/metrics/src/overlay.rs index d23f3a1ff..d333b26c1 100644 --- a/crates/metrics/src/overlay.rs +++ b/crates/metrics/src/overlay.rs @@ -20,6 +20,7 @@ use crate::{ #[derive(Clone)] pub struct OverlayMetrics { pub message_total: IntCounterVec, + pub failed_message_sent: IntCounterVec, pub utp_outcome_total: IntCounterVec, pub utp_active_gauge: IntGaugeVec, pub utp_connection_duration: HistogramVec, @@ -38,6 +39,14 @@ impl OverlayMetrics { &["protocol", "direction", "type"], registry )?; + let failed_message_sent = register_int_counter_vec_with_registry!( + opts!( + "trin_failed_message_sent", + "count all network messages sent" + ), + &["protocol", "direction", "type"], + registry + )?; let utp_outcome_total = register_int_counter_vec_with_registry!( opts!( "trin_utp_outcome_total", @@ -80,6 +89,7 @@ impl OverlayMetrics { )?; Ok(Self { message_total, + failed_message_sent, utp_outcome_total, utp_active_gauge, utp_connection_duration, @@ -129,6 +139,10 @@ impl OverlayMetricsReporter { self.increment_message_total(MessageDirectionLabel::Received, response.into()); } + pub fn report_failed_outbound_request(&self, request: &Request) { + self.increment_failed_message_sent(MessageDirectionLabel::Sent, request.into()); + } + fn increment_message_total(&self, direction: MessageDirectionLabel, message: MessageLabel) { let labels: [&str; 3] = [&self.protocol, direction.into(), message.into()]; self.overlay_metrics @@ -136,6 +150,18 @@ impl OverlayMetricsReporter { .with_label_values(&labels) .inc(); } + /// Increment the failed message sent metric + fn increment_failed_message_sent( + &self, + direction: MessageDirectionLabel, + message: MessageLabel, + ) { + let labels: [&str; 3] = [&self.protocol, direction.into(), message.into()]; + self.overlay_metrics + .failed_message_sent + .with_label_values(&labels) + .inc(); + } /// Increase the total bytes inbound metric by the given length. pub fn report_bytes_inbound(&self, bytes_len: u64) { let labels: [&str; 1] = [&self.protocol]; diff --git a/crates/portalnet/src/overlay/service/manager.rs b/crates/portalnet/src/overlay/service/manager.rs index 96c4f34ef..9a93351a9 100644 --- a/crates/portalnet/src/overlay/service/manager.rs +++ b/crates/portalnet/src/overlay/service/manager.rs @@ -309,7 +309,11 @@ impl< self.metrics.report_inbound_response(&response); self.process_response(response, request.destination, request.request, request.query_id, request.request_permit) } - Err(error) => self.process_request_failure(response.request_id, request.destination, error), + Err(error) => { + // Metric repord failed request + self.metrics.report_failed_outbound_request(&request.request); + self.process_request_failure(response.request_id, request.destination, error) + }, } } else { From e13d6e18d4ed49f0a5a7af0c773bf4f8cbfc1f4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Mon, 13 Jan 2025 17:32:13 +0200 Subject: [PATCH 06/12] chore: add downloader metrics --- Cargo.lock | 1 + crates/metrics/src/downloader.rs | 51 ++++++++++++++++++++ crates/metrics/src/lib.rs | 1 + crates/metrics/src/portalnet.rs | 12 ++++- crates/subnetworks/history/Cargo.toml | 1 + crates/subnetworks/history/src/downloader.rs | 6 +++ 6 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 crates/metrics/src/downloader.rs diff --git a/Cargo.lock b/Cargo.lock index b3e473406..5583e7672 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6802,6 +6802,7 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.19", "tree_hash", + "trin-metrics", "trin-storage", "trin-validation", "ureq", diff --git a/crates/metrics/src/downloader.rs b/crates/metrics/src/downloader.rs new file mode 100644 index 000000000..4be024c04 --- /dev/null +++ b/crates/metrics/src/downloader.rs @@ -0,0 +1,51 @@ +use prometheus_exporter::prometheus::{ + opts, register_int_gauge_vec_with_registry, IntGaugeVec, Registry, +}; + +use crate::portalnet::PORTALNET_METRICS; + +/// Contains metrics reporters for portalnet bridge. +#[derive(Clone, Debug)] +pub struct DownloaderMetrics { + pub current_block: IntGaugeVec, +} + +impl DownloaderMetrics { + pub fn new(registry: &Registry) -> anyhow::Result { + let current_block = register_int_gauge_vec_with_registry!( + opts!( + "downloader_current_block", + "the current block number the downloader is on" + ), + &["downloader"], + registry + )?; + Ok(Self { current_block }) + } +} + +#[derive(Clone, Debug)] +pub struct DownloaderMetricsReporter { + metrics: DownloaderMetrics, +} + +impl Default for DownloaderMetricsReporter { + fn default() -> Self { + Self::new() + } +} + +impl DownloaderMetricsReporter { + pub fn new() -> Self { + Self { + metrics: PORTALNET_METRICS.downloader(), + } + } + + pub fn report_current_block(&self, block_number: u64) { + self.metrics + .current_block + .with_label_values(&["downloader"]) + .set(block_number as i64); + } +} diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 7fd030967..571941b00 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -2,6 +2,7 @@ #![warn(clippy::uninlined_format_args)] pub mod bridge; +pub mod downloader; pub mod labels; pub mod overlay; pub mod portalnet; diff --git a/crates/metrics/src/portalnet.rs b/crates/metrics/src/portalnet.rs index 0d4b878a5..84168ee94 100644 --- a/crates/metrics/src/portalnet.rs +++ b/crates/metrics/src/portalnet.rs @@ -1,7 +1,10 @@ use lazy_static::lazy_static; use prometheus_exporter::prometheus::default_registry; -use crate::{bridge::BridgeMetrics, overlay::OverlayMetrics, storage::StorageMetrics}; +use crate::{ + bridge::BridgeMetrics, downloader::DownloaderMetrics, overlay::OverlayMetrics, + storage::StorageMetrics, +}; // We use lazy_static to ensure that the metrics registry is initialized only once, for each // runtime. This is important because the registry is a global singleton, and if it is @@ -17,6 +20,7 @@ fn initialize_metrics_registry() -> PortalnetMetrics { pub struct PortalnetMetrics { bridge: BridgeMetrics, + downloader: DownloaderMetrics, overlay: OverlayMetrics, storage: StorageMetrics, } @@ -27,10 +31,12 @@ impl PortalnetMetrics { let overlay = OverlayMetrics::new(registry)?; let storage = StorageMetrics::new(registry)?; let bridge = BridgeMetrics::new(registry)?; + let downloader = DownloaderMetrics::new(registry)?; Ok(Self { overlay, storage, bridge, + downloader, }) } @@ -45,4 +51,8 @@ impl PortalnetMetrics { pub fn bridge(&self) -> BridgeMetrics { self.bridge.clone() } + + pub fn downloader(&self) -> DownloaderMetrics { + self.downloader.clone() + } } diff --git a/crates/subnetworks/history/Cargo.toml b/crates/subnetworks/history/Cargo.toml index a53f781d7..fc2c14ec0 100644 --- a/crates/subnetworks/history/Cargo.toml +++ b/crates/subnetworks/history/Cargo.toml @@ -25,6 +25,7 @@ tokio.workspace = true tracing.workspace = true tree_hash.workspace = true trin-storage.workspace = true +trin-metrics.workspace = true trin-validation.workspace = true utp-rs.workspace = true portal-bridge.workspace = true diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs index 0c8585e5b..8d7fe3528 100644 --- a/crates/subnetworks/history/src/downloader.rs +++ b/crates/subnetworks/history/src/downloader.rs @@ -28,6 +28,7 @@ use portal_bridge::census::Census; use portalnet::{constants::DEFAULT_WEB3_HTTP_ADDRESS, overlay::protocol::OverlayProtocol}; use ssz_types::BitList; use tracing::{info, warn}; +use trin_metrics::downloader::DownloaderMetricsReporter; use crate::{storage::HistoryStorage, validation::ChainHistoryValidator}; @@ -57,6 +58,7 @@ pub struct Downloader { pub census: Census, pub overlay_arc: Arc>, + pub metrics: DownloaderMetricsReporter, } impl Downloader { @@ -74,10 +76,13 @@ impl Downloader { .map_err(|e| e.to_string()) .expect("Failed to build http client"); + let metrics = DownloaderMetricsReporter::new(); + let census = Census::new(http_client, CENSUS_ENR_LIMIT, vec![]); Self { overlay_arc, census, + metrics, } } @@ -119,6 +124,7 @@ impl Downloader { let mut futures = Vec::new(); for (block_number, block_hash) in batch { + self.metrics.report_current_block(block_number); let block_body_content_key = generate_block_body_content_key(block_hash.clone()); futures.push(self.find_content( block_body_content_key, From dbd9305a22fe7798f60620e437ed873451033897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Tue, 14 Jan 2025 18:18:57 +0200 Subject: [PATCH 07/12] feat: switch between find content with census and recursive find content queries --- crates/subnetworks/history/src/downloader.rs | 128 +++++++++++++++---- 1 file changed, 106 insertions(+), 22 deletions(-) diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs index 8d7fe3528..8b85eb6a2 100644 --- a/crates/subnetworks/history/src/downloader.rs +++ b/crates/subnetworks/history/src/downloader.rs @@ -11,7 +11,7 @@ use std::{ time::Duration, }; -use anyhow::anyhow; +use anyhow::{anyhow, Error}; use ethportal_api::{ jsonrpsee::http_client::{HttpClient, HttpClientBuilder}, types::{ @@ -23,17 +23,23 @@ use ethportal_api::{ BlockBodyKey, BlockReceiptsKey, ContentValue, HistoryContentKey, HistoryContentValue, OverlayContentKey, }; -use futures::future::join_all; +use futures::{channel::oneshot, future::join_all}; use portal_bridge::census::Census; -use portalnet::{constants::DEFAULT_WEB3_HTTP_ADDRESS, overlay::protocol::OverlayProtocol}; +use portalnet::{ + constants::DEFAULT_WEB3_HTTP_ADDRESS, + overlay::{command::OverlayCommand, protocol::OverlayProtocol}, +}; use ssz_types::BitList; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use trin_metrics::downloader::DownloaderMetricsReporter; use crate::{storage::HistoryStorage, validation::ChainHistoryValidator}; /// The number of blocks to download in a single batch. -const BATCH_SIZE: usize = 100; +const BATCH_SIZE: usize = 30; +/// Enable census with full view of the network and peer scoring to find peers to download content +/// from. +const CENSUS: bool = true; /// The max number of ENRs to send FindContent queries to. const CENSUS_ENR_LIMIT: usize = 4; /// The path to the CSV file with block numbers and block hashes. @@ -55,7 +61,7 @@ impl Display for ContentType { #[derive(Clone)] pub struct Downloader { - pub census: Census, + pub census: Option, pub overlay_arc: Arc>, pub metrics: DownloaderMetricsReporter, @@ -78,7 +84,15 @@ impl Downloader { let metrics = DownloaderMetricsReporter::new(); - let census = Census::new(http_client, CENSUS_ENR_LIMIT, vec![]); + let mut census = None; + + if CENSUS { + info!("Census enabled"); + census = Some(Census::new(http_client, CENSUS_ENR_LIMIT, vec![])); + } else { + info!("Census disabled"); + } + Self { overlay_arc, census, @@ -86,7 +100,7 @@ impl Downloader { } } - pub async fn start(mut self) -> io::Result<()> { + pub async fn start(self) -> io::Result<()> { // set the csv path to a file in the root trin-history directory info!("Opening CSV file"); let csv_path = Path::new(CSV_PATH); @@ -98,13 +112,15 @@ impl Downloader { // skip the header of the csv file let lines = &lines[1..]; let blocks: Vec<(u64, String)> = lines.iter().map(|line| parse_line(line)).collect(); - // Initialize the census with the history subnetwork - let _ = Some( - self.census - .init([Subnetwork::History]) - .await - .expect("Failed to initialize Census"), - ); + // Initialize the census with the history subnetwork if enabled + if let Some(mut census) = self.census.clone() { + let _ = Some( + census + .init([Subnetwork::History]) + .await + .expect("Failed to initialize Census"), + ); + } info!("Processing blocks"); let batches = blocks.chunks(BATCH_SIZE); @@ -147,9 +163,25 @@ impl Downloader { block_number: u64, content_type: ContentType, ) -> anyhow::Result<()> { + if CENSUS { + self.find_content_census(&content_key, block_number, content_type) + .await + } else { + self.recursive_find_content(content_key, block_number, content_type) + .await + } + } + + /// Send FindContent queries to the interested peers in the census, includes peers scoring + async fn find_content_census( + &self, + content_key: &HistoryContentKey, + block_number: u64, + content_type: ContentType, + ) -> Result<(), Error> { + let census = self.census.clone().expect("census should be enabled"); // Select interested peers from the census - let enrs = self - .census + let enrs = census .select_peers(Subnetwork::History, &content_key.content_id()) .expect("Failed to select peers"); // Send FindContent query to the interested peers @@ -184,7 +216,7 @@ impl Downloader { content_type = %content_type, "Received ConnectionId content" ); - self.census.record_offer_result( + census.record_offer_result( Subnetwork::History, enr.node_id(), 0, @@ -194,7 +226,7 @@ impl Downloader { continue; } Content::Content(content_bytes) => { - let content = HistoryContentValue::decode(&content_key, &content_bytes); + let content = HistoryContentValue::decode(content_key, &content_bytes); match content { Ok(_) => { @@ -203,7 +235,7 @@ impl Downloader { content_type = %content_type, "Received content from peer" ); - self.census.record_offer_result( + census.record_offer_result( Subnetwork::History, enr.node_id(), content_bytes.len(), @@ -220,7 +252,7 @@ impl Downloader { content_type = %content_type, "Failed to parse content from peer, invalid content" ); - self.census.record_offer_result( + census.record_offer_result( Subnetwork::History, enr.node_id(), 0, @@ -238,7 +270,7 @@ impl Downloader { content_type = %content_type, "Received Enrs content, content not found from peer" ); - self.census.record_offer_result( + census.record_offer_result( Subnetwork::History, enr.node_id(), 0, @@ -256,6 +288,58 @@ impl Downloader { ); Err(anyhow!("Failed to find content for block")) } + + /// Send recursive FindContent queries to the overlay service + async fn recursive_find_content( + &self, + content_key: HistoryContentKey, + block_number: u64, + content_type: ContentType, + ) -> anyhow::Result<()> { + let (tx, rx) = oneshot::channel(); + + let overlay_command = OverlayCommand::FindContentQuery { + target: content_key.clone(), + callback: tx, + config: Default::default(), + }; + + if let Err(err) = self.overlay_arc.command_tx.send(overlay_command) { + warn!( + error = %err, + block_number = block_number, + content_type = %content_type, + "Error submitting FindContent query to service" + ); + } + match rx.await { + Ok(result) => match result { + Ok(result) => { + HistoryContentValue::decode(&content_key, &result.0)?; + info!(block_number = block_number, "Downloaded content for block"); + Ok(()) + } + Err(err) => { + error!( + block_number = block_number, + content_type = %content_type, + error = %err, + "Error in FindContent query" + ); + Err(anyhow!("Error in FindContent query: {:?}", err)) + } + }, + Err(err) => { + error!( + block_number = block_number, + content_type = %content_type, + error = %err, + "Error receiving FindContent query response" + ); + Err(err.into()) + } + } + } } fn parse_line(line: &str) -> (u64, String) { From 9068fd571ef2c68e5c7f0a19eda6f793f2dbb638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Thu, 16 Jan 2025 09:51:51 +0200 Subject: [PATCH 08/12] chore: report find content query elapsed time --- crates/metrics/src/downloader.rs | 28 ++++++++++++++++++-- crates/subnetworks/history/src/downloader.rs | 7 +++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/crates/metrics/src/downloader.rs b/crates/metrics/src/downloader.rs index 4be024c04..464ef2cd7 100644 --- a/crates/metrics/src/downloader.rs +++ b/crates/metrics/src/downloader.rs @@ -1,5 +1,6 @@ use prometheus_exporter::prometheus::{ - opts, register_int_gauge_vec_with_registry, IntGaugeVec, Registry, + histogram_opts, opts, register_histogram_vec_with_registry, + register_int_gauge_vec_with_registry, HistogramTimer, HistogramVec, IntGaugeVec, Registry, }; use crate::portalnet::PORTALNET_METRICS; @@ -8,6 +9,7 @@ use crate::portalnet::PORTALNET_METRICS; #[derive(Clone, Debug)] pub struct DownloaderMetrics { pub current_block: IntGaugeVec, + pub find_content_timer: HistogramVec, } impl DownloaderMetrics { @@ -20,7 +22,18 @@ impl DownloaderMetrics { &["downloader"], registry )?; - Ok(Self { current_block }) + let find_content_timer = register_histogram_vec_with_registry!( + histogram_opts!( + "downloader_find_content_timer", + "the time it takes for find content query to complete" + ), + &["downloader"], + registry + )?; + Ok(Self { + current_block, + find_content_timer, + }) } } @@ -48,4 +61,15 @@ impl DownloaderMetricsReporter { .with_label_values(&["downloader"]) .set(block_number as i64); } + + pub fn start_find_content_timer(&self) -> HistogramTimer { + self.metrics + .find_content_timer + .with_label_values(&["downloader"]) + .start_timer() + } + + pub fn stop_find_content_timer(&self, timer: HistogramTimer) { + timer.observe_duration() + } } diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs index 8b85eb6a2..8a036e66d 100644 --- a/crates/subnetworks/history/src/downloader.rs +++ b/crates/subnetworks/history/src/downloader.rs @@ -163,13 +163,16 @@ impl Downloader { block_number: u64, content_type: ContentType, ) -> anyhow::Result<()> { - if CENSUS { + let timer = self.metrics.start_find_content_timer(); + let result = if CENSUS { self.find_content_census(&content_key, block_number, content_type) .await } else { self.recursive_find_content(content_key, block_number, content_type) .await - } + }; + self.metrics.stop_find_content_timer(timer); + result } /// Send FindContent queries to the interested peers in the census, includes peers scoring From ea76a75814093425f02111b94c6bd3551a69c05d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Thu, 16 Jan 2025 12:17:26 +0200 Subject: [PATCH 09/12] fix: report inbound strem bytes --- crates/portalnet/src/utp/controller.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/portalnet/src/utp/controller.rs b/crates/portalnet/src/utp/controller.rs index e7dfc433a..f9ffd80ab 100644 --- a/crates/portalnet/src/utp/controller.rs +++ b/crates/portalnet/src/utp/controller.rs @@ -196,6 +196,7 @@ impl UtpController { // report utp tx as successful, even if we go on to fail to process the payload self.metrics .report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success); + self.metrics.report_bytes_inbound(data.len() as u64); Ok(Bytes::from(data)) } From ed0832cb5baf99f4e20e6b4e9df2b3f252ffa3a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Thu, 23 Jan 2025 12:09:23 +0200 Subject: [PATCH 10/12] feat: disable history validation --- crates/subnetworks/history/src/validation.rs | 196 +++++++++---------- 1 file changed, 94 insertions(+), 102 deletions(-) diff --git a/crates/subnetworks/history/src/validation.rs b/crates/subnetworks/history/src/validation.rs index 4c5d96265..844b27da3 100644 --- a/crates/subnetworks/history/src/validation.rs +++ b/crates/subnetworks/history/src/validation.rs @@ -1,16 +1,6 @@ use std::sync::Arc; -use alloy::primitives::B256; -use anyhow::{anyhow, ensure}; -use ethportal_api::{ - types::execution::{ - block_body::BlockBody, header::Header, header_with_proof::HeaderWithProof, - receipts::Receipts, - }, - utils::bytes::hex_encode, - HistoryContentKey, -}; -use ssz::Decode; +use ethportal_api::HistoryContentKey; use tokio::sync::RwLock; use trin_validation::{ oracle::HeaderOracle, @@ -24,101 +14,103 @@ pub struct ChainHistoryValidator { impl Validator for ChainHistoryValidator { async fn validate_content( &self, - content_key: &HistoryContentKey, - content: &[u8], + _content_key: &HistoryContentKey, + _content: &[u8], ) -> anyhow::Result> { - match content_key { - HistoryContentKey::BlockHeaderByHash(key) => { - let header_with_proof = - HeaderWithProof::from_ssz_bytes(content).map_err(|err| { - anyhow!("Header by hash content has invalid encoding: {err:?}") - })?; - let header_hash = header_with_proof.header.hash(); - ensure!( - header_hash == B256::from(key.block_hash), - "Content validation failed: Invalid header hash. Found: {header_hash:?} - Expected: {:?}", - hex_encode(header_hash) - ); - self.header_oracle - .read() - .await - .header_validator - .validate_header_with_proof(&header_with_proof)?; - - Ok(ValidationResult::new(true)) - } - HistoryContentKey::BlockHeaderByNumber(key) => { - let header_with_proof = - HeaderWithProof::from_ssz_bytes(content).map_err(|err| { - anyhow!("Header by number content has invalid encoding: {err:?}") - })?; - let header_number = header_with_proof.header.number; - ensure!( - header_number == key.block_number, - "Content validation failed: Invalid header number. Found: {header_number} - Expected: {}", - key.block_number - ); - self.header_oracle - .read() - .await - .header_validator - .validate_header_with_proof(&header_with_proof)?; - - Ok(ValidationResult::new(true)) - } - HistoryContentKey::BlockBody(key) => { - let block_body = BlockBody::from_ssz_bytes(content) - .map_err(|msg| anyhow!("Block Body content has invalid encoding: {:?}", msg))?; - let trusted_header: Header = self - .header_oracle - .read() - .await - .recursive_find_header_by_hash_with_proof(B256::from(key.block_hash)) - .await? - .header; - let actual_uncles_root = block_body.uncles_root(); - if actual_uncles_root != trusted_header.uncles_hash { - return Err(anyhow!( - "Content validation failed: Invalid uncles root. Found: {:?} - Expected: {:?}", - actual_uncles_root, - trusted_header.uncles_hash - )); - } - let actual_txs_root = block_body.transactions_root()?; - if actual_txs_root != trusted_header.transactions_root { - return Err(anyhow!( - "Content validation failed: Invalid transactions root. Found: {:?} - Expected: {:?}", - actual_txs_root, - trusted_header.transactions_root - )); - } - Ok(ValidationResult::new(true)) - } - HistoryContentKey::BlockReceipts(key) => { - let receipts = Receipts::from_ssz_bytes(content).map_err(|msg| { - anyhow!("Block Receipts content has invalid encoding: {:?}", msg) - })?; - let trusted_header: Header = self - .header_oracle - .read() - .await - .recursive_find_header_by_hash_with_proof(B256::from(key.block_hash)) - .await? - .header; - let actual_receipts_root = receipts.root()?; - if actual_receipts_root != trusted_header.receipts_root { - return Err(anyhow!( - "Content validation failed: Invalid receipts root. Found: {:?} - Expected: {:?}", - actual_receipts_root, - trusted_header.receipts_root - )); - } - Ok(ValidationResult::new(true)) - } - } + // match content_key { + // HistoryContentKey::BlockHeaderByHash(key) => { + // let header_with_proof = + // HeaderWithProof::from_ssz_bytes(content).map_err(|err| { + // anyhow!("Header by hash content has invalid encoding: {err:?}") + // })?; + // let header_hash = header_with_proof.header.hash(); + // ensure!( + // header_hash == B256::from(key.block_hash), + // "Content validation failed: Invalid header hash. Found: {header_hash:?} - + // Expected: {:?}", hex_encode(header_hash) + // ); + // self.header_oracle + // .read() + // .await + // .header_validator + // .validate_header_with_proof(&header_with_proof)?; + // + // Ok(ValidationResult::new(true)) + // } + // HistoryContentKey::BlockHeaderByNumber(key) => { + // let header_with_proof = + // HeaderWithProof::from_ssz_bytes(content).map_err(|err| { + // anyhow!("Header by number content has invalid encoding: {err:?}") + // })?; + // let header_number = header_with_proof.header.number; + // ensure!( + // header_number == key.block_number, + // "Content validation failed: Invalid header number. Found: {header_number} - + // Expected: {}", key.block_number + // ); + // self.header_oracle + // .read() + // .await + // .header_validator + // .validate_header_with_proof(&header_with_proof)?; + // + // Ok(ValidationResult::new(true)) + // } + // HistoryContentKey::BlockBody(key) => { + // let block_body = BlockBody::from_ssz_bytes(content) + // .map_err(|msg| anyhow!("Block Body content has invalid encoding: {:?}", + // msg))?; let trusted_header: Header = self + // .header_oracle + // .read() + // .await + // .recursive_find_header_by_hash_with_proof(B256::from(key.block_hash)) + // .await? + // .header; + // let actual_uncles_root = block_body.uncles_root(); + // if actual_uncles_root != trusted_header.uncles_hash { + // return Err(anyhow!( + // "Content validation failed: Invalid uncles root. Found: {:?} - Expected: + // {:?}", actual_uncles_root, + // trusted_header.uncles_hash + // )); + // } + // let actual_txs_root = block_body.transactions_root()?; + // if actual_txs_root != trusted_header.transactions_root { + // return Err(anyhow!( + // "Content validation failed: Invalid transactions root. Found: {:?} - + // Expected: {:?}", actual_txs_root, + // trusted_header.transactions_root + // )); + // } + // Ok(ValidationResult::new(true)) + // } + // HistoryContentKey::BlockReceipts(key) => { + // let receipts = Receipts::from_ssz_bytes(content).map_err(|msg| { + // anyhow!("Block Receipts content has invalid encoding: {:?}", msg) + // })?; + // let trusted_header: Header = self + // .header_oracle + // .read() + // .await + // .recursive_find_header_by_hash_with_proof(B256::from(key.block_hash)) + // .await? + // .header; + // let actual_receipts_root = receipts.root()?; + // if actual_receipts_root != trusted_header.receipts_root { + // return Err(anyhow!( + // "Content validation failed: Invalid receipts root. Found: {:?} - + // Expected: {:?}", actual_receipts_root, + // trusted_header.receipts_root + // )); + // } + // Ok(ValidationResult::new(true)) + // } + // } + Ok(ValidationResult::new(true)) } } +#[cfg(any())] #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { From 229b85afd7ba088a5e13e292620cd82010539774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Thu, 23 Jan 2025 13:22:39 +0200 Subject: [PATCH 11/12] feat: when Enrs is returned from census node, report it as success --- crates/subnetworks/history/src/downloader.rs | 4 +++- testing/ethportal-peertest/tests/self_peertest.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs index 8a036e66d..1337525f3 100644 --- a/crates/subnetworks/history/src/downloader.rs +++ b/crates/subnetworks/history/src/downloader.rs @@ -278,7 +278,9 @@ impl Downloader { enr.node_id(), 0, Duration::from_secs(0), - &OfferTrace::Failed, + &OfferTrace::Success( + BitList::with_capacity(1).expect("Failed to create bitlist"), + ), ); continue; } diff --git a/testing/ethportal-peertest/tests/self_peertest.rs b/testing/ethportal-peertest/tests/self_peertest.rs index 53cbb4ac9..e356d9e0a 100644 --- a/testing/ethportal-peertest/tests/self_peertest.rs +++ b/testing/ethportal-peertest/tests/self_peertest.rs @@ -164,6 +164,7 @@ async fn peertest_validate_pre_merge_header_by_number() { handle.stop().unwrap(); } +#[ignore] #[tokio::test(flavor = "multi_thread")] #[serial] async fn peertest_invalidate_header_by_hash() { From 8ecd8dcf78d76b87ddbca480baf0700e594d3d9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Wed, 29 Jan 2025 13:00:19 +0200 Subject: [PATCH 12/12] fix: rebase with ping extensions --- crates/subnetworks/history/src/downloader.rs | 24 ++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/crates/subnetworks/history/src/downloader.rs b/crates/subnetworks/history/src/downloader.rs index 1337525f3..f69052b1f 100644 --- a/crates/subnetworks/history/src/downloader.rs +++ b/crates/subnetworks/history/src/downloader.rs @@ -33,7 +33,10 @@ use ssz_types::BitList; use tracing::{error, info, warn}; use trin_metrics::downloader::DownloaderMetricsReporter; -use crate::{storage::HistoryStorage, validation::ChainHistoryValidator}; +use crate::{ + ping_extensions::HistoryPingExtensions, storage::HistoryStorage, + validation::ChainHistoryValidator, +}; /// The number of blocks to download in a single batch. const BATCH_SIZE: usize = 30; @@ -62,15 +65,28 @@ impl Display for ContentType { #[derive(Clone)] pub struct Downloader { pub census: Option, - pub overlay_arc: - Arc>, + pub overlay_arc: Arc< + OverlayProtocol< + HistoryContentKey, + XorMetric, + ChainHistoryValidator, + HistoryStorage, + HistoryPingExtensions, + >, + >, pub metrics: DownloaderMetricsReporter, } impl Downloader { pub fn new( overlay_arc: Arc< - OverlayProtocol, + OverlayProtocol< + HistoryContentKey, + XorMetric, + ChainHistoryValidator, + HistoryStorage, + HistoryPingExtensions, + >, >, ) -> Self { // Build hhtp client bound to the current node web3rpc