diff --git a/portal-bridge/src/beacon_bridge.rs b/portal-bridge/src/beacon_bridge.rs index 71bf0fc0d..082d629e6 100644 --- a/portal-bridge/src/beacon_bridge.rs +++ b/portal-bridge/src/beacon_bridge.rs @@ -12,6 +12,7 @@ use tracing::{info, warn}; use crate::consensus_api::ConsensusApi; use crate::constants::BEACON_GENESIS_TIME; +use crate::gossip::gossip_beacon_content; use crate::mode::BridgeMode; use crate::stats::{BeaconSlotStats, StatsReporter}; use crate::utils::{ @@ -31,7 +32,6 @@ use ethportal_api::types::content_value::beacon::{ ForkVersionedLightClientUpdate, LightClientUpdatesByRange, }; use ethportal_api::utils::bytes::hex_decode; -use ethportal_api::BeaconNetworkApiClient; use ethportal_api::{ BeaconContentKey, BeaconContentValue, LightClientBootstrapKey, LightClientUpdatesByRangeKey, }; @@ -75,7 +75,7 @@ impl BeaconBridge { // test files have no slot number data, so report all gossiped content at height 0. let slot_stats = Arc::new(Mutex::new(BeaconSlotStats::new(0))); for asset in assets.0.into_iter() { - BeaconBridge::gossip_beacon_content( + gossip_beacon_content( Arc::clone(&self.portal_clients), asset.content_key, asset.content_value, @@ -265,7 +265,7 @@ impl BeaconBridge { }); // Return the latest finalized block root if we successfully gossiped the latest bootstrap. - Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats) + gossip_beacon_content(portal_clients, content_key, content_value, slot_stats) .await .map(|_| latest_finalized_block_root) } @@ -327,7 +327,7 @@ impl BeaconBridge { ); // Update the current known period if we successfully gossiped the latest data. - Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?; + gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?; Ok(expected_current_period) } @@ -350,7 +350,7 @@ impl BeaconBridge { LightClientOptimisticUpdateKey::new(update.signature_slot), ); let content_value = BeaconContentValue::LightClientOptimisticUpdate(update.into()); - Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await + gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await } async fn serve_light_client_finality_update( @@ -389,30 +389,8 @@ impl BeaconBridge { ); let content_value = BeaconContentValue::LightClientFinalityUpdate(update.into()); - Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?; + gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?; Ok(new_finalized_slot) } - - /// Gossip any given content key / value to the history network. - async fn gossip_beacon_content( - portal_clients: Arc>, - content_key: BeaconContentKey, - content_value: BeaconContentValue, - slot_stats: Arc>, - ) -> anyhow::Result<()> { - let mut results = vec![]; - for client in portal_clients.as_ref() { - let result = client - .trace_gossip(content_key.clone(), content_value.clone()) - .await; - results.push(result); - } - if let Ok(mut data) = slot_stats.lock() { - data.update(content_key, results.into()); - } else { - warn!("Error updating beacon gossip stats. Unable to acquire lock."); - } - Ok(()) - } } diff --git a/portal-bridge/src/bridge.rs b/portal-bridge/src/bridge.rs index 73bfaddf3..8c3a74e97 100644 --- a/portal-bridge/src/bridge.rs +++ b/portal-bridge/src/bridge.rs @@ -15,6 +15,7 @@ use tracing::{debug, info, warn}; use crate::execution_api::ExecutionApi; use crate::full_header::FullHeader; +use crate::gossip::gossip_history_content; use crate::mode::{BridgeMode, ModeType}; use crate::stats::{HistoryBlockStats, StatsReporter}; use crate::utils::{read_test_assets_from_file, TestAssets}; @@ -31,7 +32,7 @@ use ethportal_api::types::execution::{ use ethportal_api::utils::bytes::hex_encode; use ethportal_api::{ BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, EpochAccumulatorKey, HistoryContentKey, - HistoryContentValue, HistoryNetworkApiClient, + HistoryContentValue, }; use trin_validation::{ accumulator::MasterAccumulator, @@ -91,7 +92,7 @@ impl Bridge { // test files have no block number data, so we report all gossiped content at height 0. let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(0))); for asset in assets.0.into_iter() { - Bridge::gossip_content( + let _ = gossip_history_content( &self.portal_clients, asset.content_key.clone(), asset.content_value, @@ -303,7 +304,8 @@ impl Bridge { "Gossip: Block #{:?} HeaderWithProof", full_header.header.number ); - Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await; + let _ = + gossip_history_content(portal_clients, content_key, content_value, block_stats).await; Ok(()) } @@ -330,7 +332,7 @@ impl Bridge { let content_value = HistoryContentValue::EpochAccumulator(local_epoch_acc.clone()); // create unique stats for epoch accumulator, since it's rarely gossiped let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(epoch_index * EPOCH_SIZE))); - Bridge::gossip_content( + let _ = gossip_history_content( &self.portal_clients, content_key, content_value, @@ -371,7 +373,8 @@ impl Bridge { }); let content_value = HistoryContentValue::Receipts(receipts); debug!("Gossip: Block #{:?} Receipts", full_header.header.number,); - Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await; + let _ = + gossip_history_content(portal_clients, content_key, content_value, block_stats).await; Ok(()) } @@ -414,7 +417,8 @@ impl Bridge { }); let content_value = HistoryContentValue::BlockBody(block_body); debug!("Gossip: Block #{:?} BlockBody", full_header.header.number); - Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await; + let _ = + gossip_history_content(portal_clients, content_key, content_value, block_stats).await; Ok(()) } @@ -427,27 +431,6 @@ impl Bridge { let proof = BlockHeaderProof::AccumulatorProof(AccumulatorProof { proof }); Ok(HeaderWithProof { header, proof }) } - - /// Gossip any given content key / value to the history network. - async fn gossip_content( - portal_clients: &Vec, - content_key: HistoryContentKey, - content_value: HistoryContentValue, - block_stats: Arc>, - ) { - let mut results = vec![]; - for client in portal_clients { - let result = client - .trace_gossip(content_key.clone(), content_value.clone()) - .await; - results.push(result); - } - if let Ok(mut data) = block_stats.lock() { - data.update(content_key, results.into()); - } else { - warn!("Error updating history gossip stats. Unable to acquire lock."); - } - } } #[derive(Debug)] diff --git a/portal-bridge/src/gossip.rs b/portal-bridge/src/gossip.rs new file mode 100644 index 000000000..96ba838cd --- /dev/null +++ b/portal-bridge/src/gossip.rs @@ -0,0 +1,146 @@ +use crate::stats::{BeaconSlotStats, HistoryBlockStats, StatsReporter}; +use ethportal_api::jsonrpsee::core::Error; +use ethportal_api::types::portal::{ContentInfo, TraceGossipInfo}; +use ethportal_api::{ + BeaconContentKey, BeaconContentValue, BeaconNetworkApiClient, HistoryContentKey, + HistoryContentValue, HistoryNetworkApiClient, OverlayContentKey, PossibleBeaconContentValue, + PossibleHistoryContentValue, +}; +use jsonrpsee::http_client::HttpClient; +use std::sync::{Arc, Mutex}; +use tokio::time::{sleep, Duration}; +use tracing::{debug, warn}; + +const GOSSIP_RETRY_COUNT: u64 = 3; +const RETRY_AFTER: Duration = Duration::from_secs(15); + +/// Gossip any given content key / value to the history network. +pub async fn gossip_beacon_content( + portal_clients: Arc>, + content_key: BeaconContentKey, + content_value: BeaconContentValue, + slot_stats: Arc>, +) -> anyhow::Result<()> { + let mut results: Vec, u64), Error>> = vec![]; + for client in portal_clients.as_ref() { + let client = client.clone(); + let content_key = content_key.clone(); + let content_value = content_value.clone(); + let result = tokio::spawn(beacon_trace_gossip(client, content_key, content_value)).await?; + results.push(result); + } + if let Ok(mut data) = slot_stats.lock() { + data.update(content_key, results.into()); + } else { + warn!("Error updating beacon gossip stats. Unable to acquire lock."); + } + Ok(()) +} + +async fn beacon_trace_gossip( + client: HttpClient, + content_key: BeaconContentKey, + content_value: BeaconContentValue, +) -> Result<(Vec, u64), Error> { + let mut retry_count = 0; + let mut traces = vec![]; + while retry_count < GOSSIP_RETRY_COUNT { + let result = BeaconNetworkApiClient::trace_gossip( + &client, + content_key.clone(), + content_value.clone(), + ) + .await; + // check if content was successfully transferred to at least one peer on network + if let Ok(trace) = result { + traces.push(trace.clone()); + if !trace.transferred.is_empty() { + return Ok((traces, retry_count)); + } + } + // if not, make rfc request to see if data is available on network + let result = + BeaconNetworkApiClient::recursive_find_content(&client, content_key.clone()).await; + if let Ok(PossibleBeaconContentValue::ContentPresent(_)) = result { + debug!("Found content on network, after failing to gossip, aborting gossip. content key={:?}", content_key.to_hex()); + return Ok((traces, retry_count)); + } + retry_count += 1; + debug!("Unable to locate content on network, after failing to gossip, retrying in {:?} seconds. content key={:?}", RETRY_AFTER, content_key.to_hex()); + sleep(RETRY_AFTER).await; + } + warn!( + "Failed to gossip beacon content, without succesfully locating data on network, after {} attempts: content key={:?}", + GOSSIP_RETRY_COUNT, + content_key.to_hex(), + ); + Ok((traces, retry_count)) +} + +/// Gossip any given content key / value to the history network. +pub async fn gossip_history_content( + portal_clients: &Vec, + content_key: HistoryContentKey, + content_value: HistoryContentValue, + block_stats: Arc>, +) -> anyhow::Result<()> { + let mut results: Vec, u64), Error>> = vec![]; + for client in portal_clients { + let client = client.clone(); + let content_key = content_key.clone(); + let content_value = content_value.clone(); + let result = tokio::spawn(history_trace_gossip(client, content_key, content_value)).await?; + results.push(result); + } + if let Ok(mut data) = block_stats.lock() { + data.update(content_key, results.into()); + } else { + warn!("Error updating history gossip stats. Unable to acquire lock."); + } + Ok(()) +} + +// todo why doesn't history return PossibleHistoryContentValue? +async fn history_trace_gossip( + client: HttpClient, + content_key: HistoryContentKey, + content_value: HistoryContentValue, +) -> Result<(Vec, u64), Error> { + let mut retry_count = 0; + let mut traces = vec![]; + while retry_count < GOSSIP_RETRY_COUNT { + let result = HistoryNetworkApiClient::trace_gossip( + &client, + content_key.clone(), + content_value.clone(), + ) + .await; + // check if content was successfully transferred to at least one peer on network + if let Ok(trace) = result { + traces.push(trace.clone()); + if !trace.transferred.is_empty() { + return Ok((traces, retry_count)); + } + } + // if not, make rfc request to see if data is available on network + let result = + HistoryNetworkApiClient::recursive_find_content(&client, content_key.clone()).await; + if let Ok(ContentInfo::Content { + content: PossibleHistoryContentValue::ContentPresent(_), + .. + }) = result + { + debug!("Found content on network, after failing to gossip, aborting gossip. content key={:?}", content_key.to_hex()); + return Ok((traces, retry_count)); + } + retry_count += 1; + debug!("Unable to locate content on network, after failing to gossip, retrying in {:?} seconds. content key={:?}", RETRY_AFTER, content_key.to_hex()); + sleep(RETRY_AFTER).await; + } + warn!( + "Failed to gossip history content, without succesfully locating data on network, after {} attempts: content key={:?}", + GOSSIP_RETRY_COUNT, + content_key.to_hex(), + ); + Ok((traces, retry_count)) +} diff --git a/portal-bridge/src/lib.rs b/portal-bridge/src/lib.rs index e12309c01..0900d53e5 100644 --- a/portal-bridge/src/lib.rs +++ b/portal-bridge/src/lib.rs @@ -8,6 +8,7 @@ pub mod consensus_api; pub mod constants; pub mod execution_api; pub mod full_header; +pub mod gossip; pub mod mode; pub mod pandaops; pub mod stats; diff --git a/portal-bridge/src/stats.rs b/portal-bridge/src/stats.rs index de6dd6e1b..b946f6f1e 100644 --- a/portal-bridge/src/stats.rs +++ b/portal-bridge/src/stats.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::str::FromStr; use tracing::{info, trace}; @@ -272,55 +273,59 @@ impl StatsReporter for HistoryBlockStats { // Struct to record the gossip stats for a single piece of content (eg key/value pair), // consolidating results from jsonrpc requests to 1/many clients. +// +// After implementing retries on gossip, it's often the case that we offer the same piece +// of content to the same peer multiple times, however, we only record unique enrs in these lists. +// Currently, this is just to simplify, but if there's a use case where it makes sense to record +// duplicate offers to the same peer, we can change this, by removing the double count check in the +// From impl below. #[derive(Debug, Clone, Default)] pub struct ContentStats { - pub offered: Vec, - pub accepted: Vec, - pub transferred: Vec, + // use hashset so peers aren't double-counted across clients + pub offered: HashSet, + pub accepted: HashSet, + pub transferred: HashSet, + pub retries: u64, pub failures: u64, } impl ContentStats { pub fn report(&self) -> String { format!( - "offered: {}, accepted: {}, transferred: {}, failures: {}", + "offered: {}, accepted: {}, transferred: {}, retries: {}, failures: {}", self.offered.len(), self.accepted.len(), self.transferred.len(), + self.retries, self.failures, ) } } -impl From>> for ContentStats { - fn from(results: Vec>) -> Self { +impl From, u64), Error>>> for ContentStats { + fn from(results: Vec, u64), Error>>) -> Self { let mut content_stats = ContentStats::default(); for trace_gossip_info in results.iter() { match trace_gossip_info { - Ok(info) => { - for enr in info.offered.iter() { - let enr = Enr::from_str(enr) - .expect("ENR from trace gossip response to succesfully decode."); - // don't double count an enr if multiple clients offered the same content - // to a peer - if !content_stats.offered.contains(&enr) { - content_stats.offered.push(enr); + Ok((traces, retries)) => { + content_stats.retries += retries; + for trace in traces { + for enr in trace.offered.iter() { + let enr = Enr::from_str(enr) + .expect("ENR from trace gossip response to succesfully decode."); + content_stats.offered.insert(enr); } - } - for enr in info.accepted.iter() { - let enr = Enr::from_str(enr) - .expect("ENR from trace gossip response to succesfully decode."); - if !content_stats.accepted.contains(&enr) { - content_stats.accepted.push(enr); + for enr in trace.accepted.iter() { + let enr = Enr::from_str(enr) + .expect("ENR from trace gossip response to succesfully decode."); + content_stats.accepted.insert(enr); } - } - for enr in info.transferred.iter() { - let enr = Enr::from_str(enr) - .expect("ENR from trace gossip response to succesfully decode."); - if !content_stats.transferred.contains(&enr) { - content_stats.transferred.push(enr); + for enr in trace.transferred.iter() { + let enr = Enr::from_str(enr) + .expect("ENR from trace gossip response to succesfully decode."); + content_stats.transferred.insert(enr); } } }