Skip to content

Commit

Permalink
feat: add retries to bridge gossip, with rfc network checks
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Nov 10, 2023
1 parent d258efd commit 202d25a
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 78 deletions.
34 changes: 6 additions & 28 deletions portal-bridge/src/beacon_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::{info, warn};

use crate::consensus_api::ConsensusApi;
use crate::constants::BEACON_GENESIS_TIME;
use crate::gossip::gossip_beacon_content;
use crate::mode::BridgeMode;
use crate::stats::{BeaconSlotStats, StatsReporter};
use crate::utils::{
Expand All @@ -31,7 +32,6 @@ use ethportal_api::types::content_value::beacon::{
ForkVersionedLightClientUpdate, LightClientUpdatesByRange,
};
use ethportal_api::utils::bytes::hex_decode;
use ethportal_api::BeaconNetworkApiClient;
use ethportal_api::{
BeaconContentKey, BeaconContentValue, LightClientBootstrapKey, LightClientUpdatesByRangeKey,
};
Expand Down Expand Up @@ -75,7 +75,7 @@ impl BeaconBridge {
// test files have no slot number data, so report all gossiped content at height 0.
let slot_stats = Arc::new(Mutex::new(BeaconSlotStats::new(0)));
for asset in assets.0.into_iter() {
BeaconBridge::gossip_beacon_content(
gossip_beacon_content(
Arc::clone(&self.portal_clients),
asset.content_key,
asset.content_value,
Expand Down Expand Up @@ -265,7 +265,7 @@ impl BeaconBridge {
});

// Return the latest finalized block root if we successfully gossiped the latest bootstrap.
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats)
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats)
.await
.map(|_| latest_finalized_block_root)
}
Expand Down Expand Up @@ -327,7 +327,7 @@ impl BeaconBridge {
);

// Update the current known period if we successfully gossiped the latest data.
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;

Ok(expected_current_period)
}
Expand All @@ -350,7 +350,7 @@ impl BeaconBridge {
LightClientOptimisticUpdateKey::new(update.signature_slot),
);
let content_value = BeaconContentValue::LightClientOptimisticUpdate(update.into());
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await
}

async fn serve_light_client_finality_update(
Expand Down Expand Up @@ -389,30 +389,8 @@ impl BeaconBridge {
);
let content_value = BeaconContentValue::LightClientFinalityUpdate(update.into());

Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;

Ok(new_finalized_slot)
}

/// Gossip any given content key / value to the history network.
async fn gossip_beacon_content(
portal_clients: Arc<Vec<HttpClient>>,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<()> {
let mut results = vec![];
for client in portal_clients.as_ref() {
let result = client
.trace_gossip(content_key.clone(), content_value.clone())
.await;
results.push(result);
}
if let Ok(mut data) = slot_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating beacon gossip stats. Unable to acquire lock.");
}
Ok(())
}
}
37 changes: 10 additions & 27 deletions portal-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tracing::{debug, info, warn};

use crate::execution_api::ExecutionApi;
use crate::full_header::FullHeader;
use crate::gossip::gossip_history_content;
use crate::mode::{BridgeMode, ModeType};
use crate::stats::{HistoryBlockStats, StatsReporter};
use crate::utils::{read_test_assets_from_file, TestAssets};
Expand All @@ -31,7 +32,7 @@ use ethportal_api::types::execution::{
use ethportal_api::utils::bytes::hex_encode;
use ethportal_api::{
BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, EpochAccumulatorKey, HistoryContentKey,
HistoryContentValue, HistoryNetworkApiClient,
HistoryContentValue,
};
use trin_validation::{
accumulator::MasterAccumulator,
Expand Down Expand Up @@ -91,7 +92,7 @@ impl Bridge {
// test files have no block number data, so we report all gossiped content at height 0.
let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(0)));
for asset in assets.0.into_iter() {
Bridge::gossip_content(
let _ = gossip_history_content(
&self.portal_clients,
asset.content_key.clone(),
asset.content_value,
Expand Down Expand Up @@ -303,7 +304,8 @@ impl Bridge {
"Gossip: Block #{:?} HeaderWithProof",
full_header.header.number
);
Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await;
let _ =
gossip_history_content(portal_clients, content_key, content_value, block_stats).await;
Ok(())
}

Expand All @@ -330,7 +332,7 @@ impl Bridge {
let content_value = HistoryContentValue::EpochAccumulator(local_epoch_acc.clone());
// create unique stats for epoch accumulator, since it's rarely gossiped
let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(epoch_index * EPOCH_SIZE)));
Bridge::gossip_content(
let _ = gossip_history_content(
&self.portal_clients,
content_key,
content_value,
Expand Down Expand Up @@ -371,7 +373,8 @@ impl Bridge {
});
let content_value = HistoryContentValue::Receipts(receipts);
debug!("Gossip: Block #{:?} Receipts", full_header.header.number,);
Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await;
let _ =
gossip_history_content(portal_clients, content_key, content_value, block_stats).await;
Ok(())
}

Expand Down Expand Up @@ -414,7 +417,8 @@ impl Bridge {
});
let content_value = HistoryContentValue::BlockBody(block_body);
debug!("Gossip: Block #{:?} BlockBody", full_header.header.number);
Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await;
let _ =
gossip_history_content(portal_clients, content_key, content_value, block_stats).await;
Ok(())
}

Expand All @@ -427,27 +431,6 @@ impl Bridge {
let proof = BlockHeaderProof::AccumulatorProof(AccumulatorProof { proof });
Ok(HeaderWithProof { header, proof })
}

/// Gossip any given content key / value to the history network.
async fn gossip_content(
portal_clients: &Vec<HttpClient>,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
block_stats: Arc<Mutex<HistoryBlockStats>>,
) {
let mut results = vec![];
for client in portal_clients {
let result = client
.trace_gossip(content_key.clone(), content_value.clone())
.await;
results.push(result);
}
if let Ok(mut data) = block_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating history gossip stats. Unable to acquire lock.");
}
}
}

#[derive(Debug)]
Expand Down
146 changes: 146 additions & 0 deletions portal-bridge/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use crate::stats::{BeaconSlotStats, HistoryBlockStats, StatsReporter};
use ethportal_api::jsonrpsee::core::Error;
use ethportal_api::types::portal::{ContentInfo, TraceGossipInfo};
use ethportal_api::{
BeaconContentKey, BeaconContentValue, BeaconNetworkApiClient, HistoryContentKey,
HistoryContentValue, HistoryNetworkApiClient, OverlayContentKey, PossibleBeaconContentValue,
PossibleHistoryContentValue,
};
use jsonrpsee::http_client::HttpClient;
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
use tracing::{debug, warn};

const GOSSIP_RETRY_COUNT: u64 = 3;
const RETRY_AFTER: Duration = Duration::from_secs(15);

/// Gossip any given content key / value to the history network.
pub async fn gossip_beacon_content(
portal_clients: Arc<Vec<HttpClient>>,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<()> {
let mut results: Vec<Result<(Vec<TraceGossipInfo>, u64), Error>> = vec![];
for client in portal_clients.as_ref() {
let client = client.clone();
let content_key = content_key.clone();
let content_value = content_value.clone();
let result = tokio::spawn(beacon_trace_gossip(client, content_key, content_value)).await?;
results.push(result);
}
if let Ok(mut data) = slot_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating beacon gossip stats. Unable to acquire lock.");
}
Ok(())
}

async fn beacon_trace_gossip(
client: HttpClient,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
) -> Result<(Vec<TraceGossipInfo>, u64), Error> {
let mut retry_count = 0;
let mut traces = vec![];
while retry_count < GOSSIP_RETRY_COUNT {
let result = BeaconNetworkApiClient::trace_gossip(
&client,
content_key.clone(),
content_value.clone(),
)
.await;
// check if content was successfully transferred to at least one peer on network
if let Ok(trace) = result {
traces.push(trace.clone());
if !trace.transferred.is_empty() {
return Ok((traces, retry_count));
}
}
// if not, make rfc request to see if data is available on network
let result =
BeaconNetworkApiClient::recursive_find_content(&client, content_key.clone()).await;
if let Ok(PossibleBeaconContentValue::ContentPresent(_)) = result {
debug!("Found content on network, after failing to gossip, aborting gossip. content key={:?}", content_key.to_hex());
return Ok((traces, retry_count));
}
retry_count += 1;
debug!("Unable to locate content on network, after failing to gossip, retrying in {:?} seconds. content key={:?}", RETRY_AFTER, content_key.to_hex());
sleep(RETRY_AFTER).await;
}
warn!(
"Failed to gossip beacon content, without succesfully locating data on network, after {} attempts: content key={:?}",
GOSSIP_RETRY_COUNT,
content_key.to_hex(),
);
Ok((traces, retry_count))
}

/// Gossip any given content key / value to the history network.
pub async fn gossip_history_content(
portal_clients: &Vec<HttpClient>,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
block_stats: Arc<Mutex<HistoryBlockStats>>,
) -> anyhow::Result<()> {
let mut results: Vec<Result<(Vec<TraceGossipInfo>, u64), Error>> = vec![];
for client in portal_clients {
let client = client.clone();
let content_key = content_key.clone();
let content_value = content_value.clone();
let result = tokio::spawn(history_trace_gossip(client, content_key, content_value)).await?;
results.push(result);
}
if let Ok(mut data) = block_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating history gossip stats. Unable to acquire lock.");
}
Ok(())
}

// todo why doesn't history return PossibleHistoryContentValue?
async fn history_trace_gossip(
client: HttpClient,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
) -> Result<(Vec<TraceGossipInfo>, u64), Error> {
let mut retry_count = 0;
let mut traces = vec![];
while retry_count < GOSSIP_RETRY_COUNT {
let result = HistoryNetworkApiClient::trace_gossip(
&client,
content_key.clone(),
content_value.clone(),
)
.await;
// check if content was successfully transferred to at least one peer on network
if let Ok(trace) = result {
traces.push(trace.clone());
if !trace.transferred.is_empty() {
return Ok((traces, retry_count));
}
}
// if not, make rfc request to see if data is available on network
let result =
HistoryNetworkApiClient::recursive_find_content(&client, content_key.clone()).await;
if let Ok(ContentInfo::Content {
content: PossibleHistoryContentValue::ContentPresent(val),
..
}) = result
{
debug!("Found content on network, after failing to gossip, aborting gossip. content key={:?}", content_key.to_hex());
return Ok((traces, retry_count));
}
retry_count += 1;
debug!("Unable to locate content on network, after failing to gossip, retrying in {:?} seconds. content key={:?}", RETRY_AFTER, content_key.to_hex());
sleep(RETRY_AFTER).await;
}
warn!(
"Failed to gossip history content, without succesfully locating data on network, after {} attempts: content key={:?}",
GOSSIP_RETRY_COUNT,
content_key.to_hex(),
);
Ok((traces, retry_count))
}
1 change: 1 addition & 0 deletions portal-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod consensus_api;
pub mod constants;
pub mod execution_api;
pub mod full_header;
pub mod gossip;
pub mod mode;
pub mod pandaops;
pub mod stats;
Expand Down
Loading

0 comments on commit 202d25a

Please sign in to comment.