Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add retries to bridge gossip, with rfc network checks #1032

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 6 additions & 28 deletions portal-bridge/src/beacon_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::{info, warn};

use crate::consensus_api::ConsensusApi;
use crate::constants::BEACON_GENESIS_TIME;
use crate::gossip::gossip_beacon_content;
use crate::mode::BridgeMode;
use crate::stats::{BeaconSlotStats, StatsReporter};
use crate::utils::{
Expand All @@ -31,7 +32,6 @@ use ethportal_api::types::content_value::beacon::{
ForkVersionedLightClientUpdate, LightClientUpdatesByRange,
};
use ethportal_api::utils::bytes::hex_decode;
use ethportal_api::BeaconNetworkApiClient;
use ethportal_api::{
BeaconContentKey, BeaconContentValue, LightClientBootstrapKey, LightClientUpdatesByRangeKey,
};
Expand Down Expand Up @@ -75,7 +75,7 @@ impl BeaconBridge {
// test files have no slot number data, so report all gossiped content at height 0.
let slot_stats = Arc::new(Mutex::new(BeaconSlotStats::new(0)));
for asset in assets.0.into_iter() {
BeaconBridge::gossip_beacon_content(
gossip_beacon_content(
Arc::clone(&self.portal_clients),
asset.content_key,
asset.content_value,
Expand Down Expand Up @@ -265,7 +265,7 @@ impl BeaconBridge {
});

// Return the latest finalized block root if we successfully gossiped the latest bootstrap.
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats)
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats)
.await
.map(|_| latest_finalized_block_root)
}
Expand Down Expand Up @@ -327,7 +327,7 @@ impl BeaconBridge {
);

// Update the current known period if we successfully gossiped the latest data.
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;

Ok(expected_current_period)
}
Expand All @@ -350,7 +350,7 @@ impl BeaconBridge {
LightClientOptimisticUpdateKey::new(update.signature_slot),
);
let content_value = BeaconContentValue::LightClientOptimisticUpdate(update.into());
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await
}

async fn serve_light_client_finality_update(
Expand Down Expand Up @@ -389,30 +389,8 @@ impl BeaconBridge {
);
let content_value = BeaconContentValue::LightClientFinalityUpdate(update.into());

Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;
gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;

Ok(new_finalized_slot)
}

/// Gossip any given content key / value to the history network.
async fn gossip_beacon_content(
portal_clients: Arc<Vec<HttpClient>>,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<()> {
let mut results = vec![];
for client in portal_clients.as_ref() {
let result = client
.trace_gossip(content_key.clone(), content_value.clone())
.await;
results.push(result);
}
if let Ok(mut data) = slot_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating beacon gossip stats. Unable to acquire lock.");
}
Ok(())
}
}
37 changes: 10 additions & 27 deletions portal-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tracing::{debug, info, warn};

use crate::execution_api::ExecutionApi;
use crate::full_header::FullHeader;
use crate::gossip::gossip_history_content;
use crate::mode::{BridgeMode, ModeType};
use crate::stats::{HistoryBlockStats, StatsReporter};
use crate::utils::{read_test_assets_from_file, TestAssets};
Expand All @@ -31,7 +32,7 @@ use ethportal_api::types::execution::{
use ethportal_api::utils::bytes::hex_encode;
use ethportal_api::{
BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, EpochAccumulatorKey, HistoryContentKey,
HistoryContentValue, HistoryNetworkApiClient,
HistoryContentValue,
};
use trin_validation::{
accumulator::MasterAccumulator,
Expand Down Expand Up @@ -91,7 +92,7 @@ impl Bridge {
// test files have no block number data, so we report all gossiped content at height 0.
let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(0)));
for asset in assets.0.into_iter() {
Bridge::gossip_content(
let _ = gossip_history_content(
&self.portal_clients,
asset.content_key.clone(),
asset.content_value,
Expand Down Expand Up @@ -303,7 +304,8 @@ impl Bridge {
"Gossip: Block #{:?} HeaderWithProof",
full_header.header.number
);
Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await;
let _ =
gossip_history_content(portal_clients, content_key, content_value, block_stats).await;
Ok(())
}

Expand All @@ -330,7 +332,7 @@ impl Bridge {
let content_value = HistoryContentValue::EpochAccumulator(local_epoch_acc.clone());
// create unique stats for epoch accumulator, since it's rarely gossiped
let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(epoch_index * EPOCH_SIZE)));
Bridge::gossip_content(
let _ = gossip_history_content(
&self.portal_clients,
content_key,
content_value,
Expand Down Expand Up @@ -371,7 +373,8 @@ impl Bridge {
});
let content_value = HistoryContentValue::Receipts(receipts);
debug!("Gossip: Block #{:?} Receipts", full_header.header.number,);
Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await;
let _ =
gossip_history_content(portal_clients, content_key, content_value, block_stats).await;
Ok(())
}

Expand Down Expand Up @@ -414,7 +417,8 @@ impl Bridge {
});
let content_value = HistoryContentValue::BlockBody(block_body);
debug!("Gossip: Block #{:?} BlockBody", full_header.header.number);
Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await;
let _ =
gossip_history_content(portal_clients, content_key, content_value, block_stats).await;
Ok(())
}

Expand All @@ -427,27 +431,6 @@ impl Bridge {
let proof = BlockHeaderProof::AccumulatorProof(AccumulatorProof { proof });
Ok(HeaderWithProof { header, proof })
}

/// Gossip any given content key / value to the history network.
async fn gossip_content(
portal_clients: &Vec<HttpClient>,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
block_stats: Arc<Mutex<HistoryBlockStats>>,
) {
let mut results = vec![];
for client in portal_clients {
let result = client
.trace_gossip(content_key.clone(), content_value.clone())
.await;
results.push(result);
}
if let Ok(mut data) = block_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating history gossip stats. Unable to acquire lock.");
}
}
}

#[derive(Debug)]
Expand Down
146 changes: 146 additions & 0 deletions portal-bridge/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use crate::stats::{BeaconSlotStats, HistoryBlockStats, StatsReporter};
use ethportal_api::jsonrpsee::core::Error;
use ethportal_api::types::portal::{ContentInfo, TraceGossipInfo};
use ethportal_api::{
BeaconContentKey, BeaconContentValue, BeaconNetworkApiClient, HistoryContentKey,
HistoryContentValue, HistoryNetworkApiClient, OverlayContentKey, PossibleBeaconContentValue,
PossibleHistoryContentValue,
};
use jsonrpsee::http_client::HttpClient;
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
use tracing::{debug, warn};

const GOSSIP_RETRY_COUNT: u64 = 3;
const RETRY_AFTER: Duration = Duration::from_secs(15);

/// Gossip any given content key / value to the history network.
pub async fn gossip_beacon_content(
portal_clients: Arc<Vec<HttpClient>>,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<()> {
let mut results: Vec<Result<(Vec<TraceGossipInfo>, u64), Error>> = vec![];
for client in portal_clients.as_ref() {
let client = client.clone();
let content_key = content_key.clone();
let content_value = content_value.clone();
let result = tokio::spawn(beacon_trace_gossip(client, content_key, content_value)).await?;
results.push(result);
}
if let Ok(mut data) = slot_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating beacon gossip stats. Unable to acquire lock.");
}
Ok(())
}

async fn beacon_trace_gossip(
client: HttpClient,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
) -> Result<(Vec<TraceGossipInfo>, u64), Error> {
let mut retry_count = 0;
let mut traces = vec![];
while retry_count < GOSSIP_RETRY_COUNT {
let result = BeaconNetworkApiClient::trace_gossip(
&client,
content_key.clone(),
content_value.clone(),
)
.await;
// check if content was successfully transferred to at least one peer on network
if let Ok(trace) = result {
traces.push(trace.clone());
if !trace.transferred.is_empty() {
return Ok((traces, retry_count));
}
}
// if not, make rfc request to see if data is available on network
let result =
BeaconNetworkApiClient::recursive_find_content(&client, content_key.clone()).await;
if let Ok(PossibleBeaconContentValue::ContentPresent(_)) = result {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beacon should return ContentInfo::Content (same as the history below). They have identical jsonr-rpc implementations: beacon | history.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the subnetwork returns the same type, but the rpc deserialization of the response are different for the two endpoints: beacon | history

debug!("Found content on network, after failing to gossip, aborting gossip. content key={:?}", content_key.to_hex());
return Ok((traces, retry_count));
}
retry_count += 1;
debug!("Unable to locate content on network, after failing to gossip, retrying in {:?} seconds. content key={:?}", RETRY_AFTER, content_key.to_hex());
sleep(RETRY_AFTER).await;
}
warn!(
"Failed to gossip beacon content, without succesfully locating data on network, after {} attempts: content key={:?}",
GOSSIP_RETRY_COUNT,
content_key.to_hex(),
);
Ok((traces, retry_count))
}

/// Gossip any given content key / value to the history network.
pub async fn gossip_history_content(
portal_clients: &Vec<HttpClient>,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
block_stats: Arc<Mutex<HistoryBlockStats>>,
) -> anyhow::Result<()> {
let mut results: Vec<Result<(Vec<TraceGossipInfo>, u64), Error>> = vec![];
for client in portal_clients {
let client = client.clone();
let content_key = content_key.clone();
let content_value = content_value.clone();
let result = tokio::spawn(history_trace_gossip(client, content_key, content_value)).await?;
results.push(result);
}
if let Ok(mut data) = block_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating history gossip stats. Unable to acquire lock.");
}
Ok(())
}

// todo why doesn't history return PossibleHistoryContentValue?
async fn history_trace_gossip(
client: HttpClient,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
) -> Result<(Vec<TraceGossipInfo>, u64), Error> {
let mut retry_count = 0;
let mut traces = vec![];
while retry_count < GOSSIP_RETRY_COUNT {
let result = HistoryNetworkApiClient::trace_gossip(
&client,
content_key.clone(),
content_value.clone(),
)
.await;
// check if content was successfully transferred to at least one peer on network
if let Ok(trace) = result {
traces.push(trace.clone());
if !trace.transferred.is_empty() {
return Ok((traces, retry_count));
}
}
// if not, make rfc request to see if data is available on network
let result =
HistoryNetworkApiClient::recursive_find_content(&client, content_key.clone()).await;
if let Ok(ContentInfo::Content {
content: PossibleHistoryContentValue::ContentPresent(_),
..
}) = result
{
debug!("Found content on network, after failing to gossip, aborting gossip. content key={:?}", content_key.to_hex());
return Ok((traces, retry_count));
}
retry_count += 1;
debug!("Unable to locate content on network, after failing to gossip, retrying in {:?} seconds. content key={:?}", RETRY_AFTER, content_key.to_hex());
sleep(RETRY_AFTER).await;
}
warn!(
"Failed to gossip history content, without succesfully locating data on network, after {} attempts: content key={:?}",
GOSSIP_RETRY_COUNT,
content_key.to_hex(),
);
Ok((traces, retry_count))
}
1 change: 1 addition & 0 deletions portal-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod consensus_api;
pub mod constants;
pub mod execution_api;
pub mod full_header;
pub mod gossip;
pub mod mode;
pub mod pandaops;
pub mod stats;
Expand Down
Loading