Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental download code #1627

Open
wants to merge 12 commits into
base: download-experiment
Choose a base branch
from
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 24 additions & 5 deletions bin/portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use thiserror::Error;
use tokio::task::JoinHandle;
use tracing::{error, info, Instrument};

use crate::cli::BridgeConfig;
use crate::cli::ClientType;

mod network;
mod peer;
Expand Down Expand Up @@ -51,11 +51,30 @@ impl Census {
const SUPPORTED_SUBNETWORKS: [Subnetwork; 3] =
[Subnetwork::Beacon, Subnetwork::History, Subnetwork::State];

pub fn new(client: HttpClient, bridge_config: &BridgeConfig) -> Self {
pub fn new(
client: HttpClient,
enr_offer_limit: usize,
filter_clients: Vec<ClientType>,
) -> Self {
Self {
history: Network::new(client.clone(), Subnetwork::History, bridge_config),
state: Network::new(client.clone(), Subnetwork::State, bridge_config),
beacon: Network::new(client.clone(), Subnetwork::Beacon, bridge_config),
history: Network::new(
client.clone(),
Subnetwork::History,
enr_offer_limit,
filter_clients.clone(),
),
state: Network::new(
client.clone(),
Subnetwork::State,
enr_offer_limit,
filter_clients.clone(),
),
beacon: Network::new(
client.clone(),
Subnetwork::Beacon,
enr_offer_limit,
filter_clients,
),
initialized: false,
}
}
Expand Down
16 changes: 9 additions & 7 deletions bin/portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use super::{
peers::Peers,
scoring::{AdditiveWeight, PeerSelector},
};
use crate::{
census::CensusError,
cli::{BridgeConfig, ClientType},
};
use crate::{census::CensusError, cli::ClientType};

/// The result of the liveness check.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -78,7 +75,12 @@ pub(super) struct Network {
}

impl Network {
pub fn new(client: HttpClient, subnetwork: Subnetwork, bridge_config: &BridgeConfig) -> Self {
pub fn new(
client: HttpClient,
subnetwork: Subnetwork,
enr_offer_limit: usize,
filter_clients: Vec<ClientType>,
) -> Self {
if !matches!(
subnetwork,
Subnetwork::History | Subnetwork::Beacon | Subnetwork::State
Expand All @@ -89,11 +91,11 @@ impl Network {
Self {
peers: Peers::new(PeerSelector::new(
AdditiveWeight::default(),
bridge_config.enr_offer_limit,
enr_offer_limit,
)),
client,
subnetwork,
filter_clients: bridge_config.filter_clients.to_vec(),
filter_clients,
}
}

Expand Down
6 changes: 5 additions & 1 deletion bin/portal-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.contains(&Subnetwork::State)
{
// Create and initialize the census to acquire critical view of network before gossiping
let mut census = Census::new(portal_client.clone(), &bridge_config);
let mut census = Census::new(
portal_client.clone(),
bridge_config.enr_offer_limit,
bridge_config.filter_clients,
);
census_handle = Some(census.init([Subnetwork::State]).await?);

let state_bridge = StateBridge::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/ethportal-api/src/types/query_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl QueryTrace {
}

/// Returns milliseconds since the time provided.
fn timestamp_millis_u64(since: u64) -> u64 {
pub fn timestamp_millis_u64(since: u64) -> u64 {
// Convert `since` (milliseconds) to a `SystemTime`
let since_time = UNIX_EPOCH + Duration::from_millis(since);

Expand Down
75 changes: 75 additions & 0 deletions crates/metrics/src/downloader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use prometheus_exporter::prometheus::{
histogram_opts, opts, register_histogram_vec_with_registry,
register_int_gauge_vec_with_registry, HistogramTimer, HistogramVec, IntGaugeVec, Registry,
};

use crate::portalnet::PORTALNET_METRICS;

/// Contains metrics reporters for portalnet bridge.
#[derive(Clone, Debug)]
pub struct DownloaderMetrics {
pub current_block: IntGaugeVec,
pub find_content_timer: HistogramVec,
}

impl DownloaderMetrics {
pub fn new(registry: &Registry) -> anyhow::Result<Self> {
let current_block = register_int_gauge_vec_with_registry!(
opts!(
"downloader_current_block",
"the current block number the downloader is on"
),
&["downloader"],
registry
)?;
let find_content_timer = register_histogram_vec_with_registry!(
histogram_opts!(
"downloader_find_content_timer",
"the time it takes for find content query to complete"
),
&["downloader"],
registry
)?;
Ok(Self {
current_block,
find_content_timer,
})
}
}

#[derive(Clone, Debug)]
pub struct DownloaderMetricsReporter {
metrics: DownloaderMetrics,
}

impl Default for DownloaderMetricsReporter {
fn default() -> Self {
Self::new()
}
}

impl DownloaderMetricsReporter {
pub fn new() -> Self {
Self {
metrics: PORTALNET_METRICS.downloader(),
}
}

pub fn report_current_block(&self, block_number: u64) {
self.metrics
.current_block
.with_label_values(&["downloader"])
.set(block_number as i64);
}

pub fn start_find_content_timer(&self) -> HistogramTimer {
self.metrics
.find_content_timer
.with_label_values(&["downloader"])
.start_timer()
}

pub fn stop_find_content_timer(&self, timer: HistogramTimer) {
timer.observe_duration()
}
}
1 change: 1 addition & 0 deletions crates/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![warn(clippy::uninlined_format_args)]

pub mod bridge;
pub mod downloader;
pub mod labels;
pub mod overlay;
pub mod portalnet;
Expand Down
45 changes: 45 additions & 0 deletions crates/metrics/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ use crate::{
#[derive(Clone)]
pub struct OverlayMetrics {
pub message_total: IntCounterVec,
pub failed_message_sent: IntCounterVec,
pub utp_outcome_total: IntCounterVec,
pub utp_active_gauge: IntGaugeVec,
pub utp_connection_duration: HistogramVec,
/// Total bytes transferred inbound
pub bytes_inbound_total: IntCounterVec,
pub validation_total: IntCounterVec,
}

Expand All @@ -36,6 +39,14 @@ impl OverlayMetrics {
&["protocol", "direction", "type"],
registry
)?;
let failed_message_sent = register_int_counter_vec_with_registry!(
opts!(
"trin_failed_message_sent",
"count all network messages sent"
),
&["protocol", "direction", "type"],
registry
)?;
let utp_outcome_total = register_int_counter_vec_with_registry!(
opts!(
"trin_utp_outcome_total",
Expand All @@ -60,6 +71,14 @@ impl OverlayMetrics {
&["protocol", "direction"],
registry
)?;
let bytes_inbound_total = register_int_counter_vec_with_registry!(
opts!(
"trin_bytes_inbound_total",
"count all bytes transferred inbound"
),
&["protocol"],
registry
)?;
let validation_total = register_int_counter_vec_with_registry!(
opts!(
"trin_validation_total",
Expand All @@ -70,9 +89,11 @@ impl OverlayMetrics {
)?;
Ok(Self {
message_total,
failed_message_sent,
utp_outcome_total,
utp_active_gauge,
utp_connection_duration,
bytes_inbound_total,
validation_total,
})
}
Expand Down Expand Up @@ -118,13 +139,37 @@ impl OverlayMetricsReporter {
self.increment_message_total(MessageDirectionLabel::Received, response.into());
}

pub fn report_failed_outbound_request(&self, request: &Request) {
self.increment_failed_message_sent(MessageDirectionLabel::Sent, request.into());
}

fn increment_message_total(&self, direction: MessageDirectionLabel, message: MessageLabel) {
let labels: [&str; 3] = [&self.protocol, direction.into(), message.into()];
self.overlay_metrics
.message_total
.with_label_values(&labels)
.inc();
}
/// Increment the failed message sent metric
fn increment_failed_message_sent(
&self,
direction: MessageDirectionLabel,
message: MessageLabel,
) {
let labels: [&str; 3] = [&self.protocol, direction.into(), message.into()];
self.overlay_metrics
.failed_message_sent
.with_label_values(&labels)
.inc();
}
/// Increase the total bytes inbound metric by the given length.
pub fn report_bytes_inbound(&self, bytes_len: u64) {
let labels: [&str; 1] = [&self.protocol];
self.overlay_metrics
.bytes_inbound_total
.with_label_values(&labels)
.inc_by(bytes_len)
}

//
// uTP metrics
Expand Down
12 changes: 11 additions & 1 deletion crates/metrics/src/portalnet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use lazy_static::lazy_static;
use prometheus_exporter::prometheus::default_registry;

use crate::{bridge::BridgeMetrics, overlay::OverlayMetrics, storage::StorageMetrics};
use crate::{
bridge::BridgeMetrics, downloader::DownloaderMetrics, overlay::OverlayMetrics,
storage::StorageMetrics,
};

// We use lazy_static to ensure that the metrics registry is initialized only once, for each
// runtime. This is important because the registry is a global singleton, and if it is
Expand All @@ -17,6 +20,7 @@ fn initialize_metrics_registry() -> PortalnetMetrics {

pub struct PortalnetMetrics {
bridge: BridgeMetrics,
downloader: DownloaderMetrics,
overlay: OverlayMetrics,
storage: StorageMetrics,
}
Expand All @@ -27,10 +31,12 @@ impl PortalnetMetrics {
let overlay = OverlayMetrics::new(registry)?;
let storage = StorageMetrics::new(registry)?;
let bridge = BridgeMetrics::new(registry)?;
let downloader = DownloaderMetrics::new(registry)?;
Ok(Self {
overlay,
storage,
bridge,
downloader,
})
}

Expand All @@ -45,4 +51,8 @@ impl PortalnetMetrics {
pub fn bridge(&self) -> BridgeMetrics {
self.bridge.clone()
}

pub fn downloader(&self) -> DownloaderMetrics {
self.downloader.clone()
}
}
10 changes: 9 additions & 1 deletion crates/portalnet/src/overlay/service/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,11 @@ impl<
self.metrics.report_inbound_response(&response);
self.process_response(response, request.destination, request.request, request.query_id, request.request_permit)
}
Err(error) => self.process_request_failure(response.request_id, request.destination, error),
Err(error) => {
// Metric repord failed request
self.metrics.report_failed_outbound_request(&request.request);
self.process_request_failure(response.request_id, request.destination, error)
},
}

} else {
Expand Down Expand Up @@ -1752,6 +1756,10 @@ impl<
query_trace_events_tx: Option<UnboundedSender<QueryTraceEvent>>,
) {
let mut content = content;
// report the total bytes of content received
utp_processing
.metrics
.report_bytes_inbound(content.len() as u64);
// Operate under assumption that all content in the store is valid
let local_value = utp_processing.store.read().get(&content_key);
if let Ok(Some(val)) = local_value {
Expand Down
1 change: 1 addition & 0 deletions crates/portalnet/src/utp/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl UtpController {
// report utp tx as successful, even if we go on to fail to process the payload
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success);
self.metrics.report_bytes_inbound(data.len() as u64);
Ok(Bytes::from(data))
}

Expand Down
4 changes: 4 additions & 0 deletions crates/subnetworks/history/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ ethportal-api.workspace = true
parking_lot.workspace = true
portalnet.workspace = true
serde_json.workspace = true
ssz_types.workspace = true
tokio.workspace = true
tracing.workspace = true
tree_hash.workspace = true
trin-storage.workspace = true
trin-metrics.workspace = true
trin-validation.workspace = true
utp-rs.workspace = true
portal-bridge.workspace = true
futures = "0.3.31"

[dev-dependencies]
env_logger.workspace = true
Expand Down
Loading
Loading