Skip to content

Commit

Permalink
Additional networking metrics (sigp#2549)
Browse files Browse the repository at this point in the history
Adds additional metrics for network monitoring and evaluation.


Co-authored-by: Mark Mackey <[email protected]>
  • Loading branch information
AgeManning and ethDreamer committed Dec 22, 2021
1 parent 60d917d commit 81c667b
Show file tree
Hide file tree
Showing 29 changed files with 849 additions and 1,130 deletions.
773 changes: 414 additions & 359 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ genesis = { path = "../genesis" }
int_to_bytes = { path = "../../consensus/int_to_bytes" }
rand = "0.7.3"
proto_array = { path = "../../consensus/proto_array" }
lru = "0.6.0"
lru = "0.7.1"
tempfile = "3.1.0"
bitvec = "0.19.3"
bls = { path = "../../crypto/bls" }
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ task_executor = { path = "../../common/task_executor" }
environment = { path = "../../lighthouse/environment" }
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
time = "0.3.3"
time = "0.3.5"
directory = {path = "../../common/directory"}
http_api = { path = "../http_api" }
http_metrics = { path = "../http_metrics" }
Expand Down
31 changes: 24 additions & 7 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use eth2::{
};
use execution_layer::ExecutionLayer;
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
use lighthouse_network::NetworkGlobals;
use lighthouse_network::{open_metrics_client::registry::Registry, NetworkGlobals};
use monitoring_api::{MonitoringHttpClient, ProcessType};
use network::{NetworkConfig, NetworkMessage, NetworkService};
use slasher::Slasher;
Expand Down Expand Up @@ -65,6 +65,7 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
eth1_service: Option<Eth1Service>,
network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
network_send: Option<UnboundedSender<NetworkMessage<T::EthSpec>>>,
gossipsub_registry: Option<Registry>,
db_path: Option<PathBuf>,
freezer_db_path: Option<PathBuf>,
http_api_config: http_api::Config,
Expand Down Expand Up @@ -96,6 +97,7 @@ where
eth1_service: None,
network_globals: None,
network_send: None,
gossipsub_registry: None,
db_path: None,
freezer_db_path: None,
http_api_config: <_>::default(),
Expand Down Expand Up @@ -448,13 +450,27 @@ where
.ok_or("network requires a runtime_context")?
.clone();

let (network_globals, network_send) =
NetworkService::start(beacon_chain, config, context.executor)
.await
.map_err(|e| format!("Failed to start network: {:?}", e))?;
// If gossipsub metrics are required we build a registry to record them
let mut gossipsub_registry = if config.metrics_enabled {
Some(Registry::default())
} else {
None
};

let (network_globals, network_send) = NetworkService::start(
beacon_chain,
config,
context.executor,
gossipsub_registry
.as_mut()
.map(|registry| registry.sub_registry_with_prefix("gossipsub")),
)
.await
.map_err(|e| format!("Failed to start network: {:?}", e))?;

self.network_globals = Some(network_globals);
self.network_send = Some(network_send);
self.gossipsub_registry = gossipsub_registry;

Ok(self)
}
Expand Down Expand Up @@ -562,13 +578,13 @@ where
Ok(self)
}

/// Consumers the builder, returning a `Client` if all necessary components have been
/// Consumes the builder, returning a `Client` if all necessary components have been
/// specified.
///
/// If type inference errors are being raised, see the comment on the definition of `Self`.
#[allow(clippy::type_complexity)]
pub fn build(
self,
mut self,
) -> Result<Client<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>, String>
{
let runtime_context = self
Expand Down Expand Up @@ -615,6 +631,7 @@ where
chain: self.beacon_chain.clone(),
db_path: self.db_path.clone(),
freezer_db_path: self.freezer_db_path.clone(),
gossipsub_registry: self.gossipsub_registry.take().map(std::sync::Mutex::new),
log: log.clone(),
});

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/execution_layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bytes = "1.1.0"
task_executor = { path = "../../common/task_executor" }
hex = "0.4.2"
eth2_ssz_types = "0.2.2"
lru = "0.6.0"
lru = "0.7.1"
exit-future = "0.2.0"
tree_hash = "0.4.1"
tree_hash_derive = { path = "../../consensus/tree_hash_derive"}
Expand Down
17 changes: 17 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,22 @@ pub fn serve<T: BeaconChainTypes>(
})
});

// GET lighthouse/nat
let get_lighthouse_nat = warp::path("lighthouse")
.and(warp::path("nat"))
.and(warp::path::end())
.and_then(|| {
blocking_json_task(move || {
Ok(api_types::GenericResponse::from(
lighthouse_network::metrics::NAT_OPEN
.as_ref()
.map(|v| v.get())
.unwrap_or(0)
!= 0,
))
})
});

// GET lighthouse/peers
let get_lighthouse_peers = warp::path("lighthouse")
.and(warp::path("peers"))
Expand Down Expand Up @@ -2622,6 +2638,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_validator_sync_committee_contribution.boxed())
.or(get_lighthouse_health.boxed())
.or(get_lighthouse_syncing.boxed())
.or(get_lighthouse_nat.boxed())
.or(get_lighthouse_peers.boxed())
.or(get_lighthouse_peers_connected.boxed())
.or(get_lighthouse_proto_array.boxed())
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/http_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod metrics;

use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::open_metrics_client::registry::Registry;
use lighthouse_version::version_with_platform;
use serde::{Deserialize, Serialize};
use slog::{crit, info, Logger};
Expand Down Expand Up @@ -39,6 +40,7 @@ pub struct Context<T: BeaconChainTypes> {
pub chain: Option<Arc<BeaconChain<T>>>,
pub db_path: Option<PathBuf>,
pub freezer_db_path: Option<PathBuf>,
pub gossipsub_registry: Option<std::sync::Mutex<Registry>>,
pub log: Logger,
}

Expand Down
7 changes: 7 additions & 0 deletions beacon_node/http_metrics/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::Context;
use beacon_chain::BeaconChainTypes;
use lighthouse_metrics::{Encoder, TextEncoder};
use lighthouse_network::open_metrics_client::encoding::text::encode;
use malloc_utils::scrape_allocator_metrics;

pub use lighthouse_metrics::*;
Expand Down Expand Up @@ -51,6 +52,12 @@ pub fn gather_prometheus_metrics<T: BeaconChainTypes>(
encoder
.encode(&lighthouse_metrics::gather(), &mut buffer)
.unwrap();
// encode gossipsub metrics also if they exist
if let Some(registry) = ctx.gossipsub_registry.as_ref() {
if let Ok(registry_locked) = registry.lock() {
let _ = encode(&mut buffer, &registry_locked);
}
}

String::from_utf8(buffer).map_err(|e| format!("Failed to encode prometheus info: {:?}", e))
}
1 change: 1 addition & 0 deletions beacon_node/http_metrics/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async fn returns_200_ok() {
chain: None,
db_path: None,
freezer_db_path: None,
gossipsub_registry: None,
log,
});

Expand Down
13 changes: 8 additions & 5 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Sigma Prime <[email protected]>"]
edition = "2018"

[dependencies]
discv5 = { version = "0.1.0-beta.11", features = ["libp2p"] }
discv5 = { version = "0.1.0-beta.13", features = ["libp2p"] }
unsigned-varint = { version = "0.6.0", features = ["codec"] }
types = { path = "../../consensus/types" }
hashset_delay = { path = "../../common/hashset_delay" }
Expand All @@ -25,7 +25,7 @@ lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
smallvec = "1.6.1"
tokio-io-timeout = "1.1.1"
lru = "0.6.0"
lru = "0.7.1"
parking_lot = "0.11.0"
sha2 = "0.9.1"
snap = "1.0.1"
Expand All @@ -38,18 +38,21 @@ directory = { path = "../../common/directory" }
regex = "1.3.9"
strum = { version = "0.21.0", features = ["derive"] }
superstruct = "0.3.0"
open-metrics-client = "0.13.0"

[dependencies.libp2p]
version = "0.41.0"
# version = "0.41.0"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio"]
git = "https://github.com/libp2p/rust-libp2p"
# Latest libp2p master
rev = "17861d9cac121f7e448585a7f052d5eab4618826"
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio", "plaintext"]

[dev-dependencies]
slog-term = "2.6.0"
slog-async = "2.5.0"
tempfile = "3.1.0"
exit-future = "0.2.0"
libp2p = { version = "0.41.0", default-features = false, features = ["plaintext"] }
void = "1"

[features]
Expand Down
53 changes: 36 additions & 17 deletions beacon_node/lighthouse_network/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ use crate::peer_manager::{
ConnectionDirection, PeerManager, PeerManagerEvent,
};
use crate::rpc::*;
use crate::service::METADATA_FILENAME;
use crate::service::{Context as ServiceContext, METADATA_FILENAME};
use crate::types::{
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet,
SubnetDiscovery,
};
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use libp2p::{
core::{
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
},
gossipsub::{
metrics::Config as GossipsubMetricsConfig,
subscription_filter::{MaxCountSubscriptionFilter, WhitelistSubscriptionFilter},
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
MessageAuthenticity, MessageId,
Expand All @@ -45,7 +46,7 @@ use std::{
task::{Context, Poll},
};
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, ChainSpec, EnrForkId, EthSpec, ForkContext,
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext,
SignedBeaconBlock, Slot, SubnetId, SyncSubnetId,
};

Expand Down Expand Up @@ -182,14 +183,14 @@ pub struct Behaviour<TSpec: EthSpec> {
impl<TSpec: EthSpec> Behaviour<TSpec> {
pub async fn new(
local_key: &Keypair,
mut config: NetworkConfig,
ctx: ServiceContext<'_>,
network_globals: Arc<NetworkGlobals<TSpec>>,
log: &slog::Logger,
fork_context: Arc<ForkContext>,
chain_spec: &ChainSpec,
) -> error::Result<Self> {
let behaviour_log = log.new(o!());

let mut config = ctx.config.clone();

// Set up the Identify Behaviour
let identify_config = if config.private {
IdentifyConfig::new(
Expand All @@ -215,25 +216,29 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.eth2()
.expect("Local ENR must have a fork id");

let possible_fork_digests = fork_context.all_fork_digests();
let possible_fork_digests = ctx.fork_context.all_fork_digests();
let filter = MaxCountSubscriptionFilter {
filter: Self::create_whitelist_filter(
possible_fork_digests,
chain_spec.attestation_subnet_count,
ctx.chain_spec.attestation_subnet_count,
SYNC_COMMITTEE_SUBNET_COUNT,
),
max_subscribed_topics: 200,
max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2
};

config.gs_config = gossipsub_config(fork_context.clone());
config.gs_config = gossipsub_config(ctx.fork_context.clone());

// If metrics are enabled for gossipsub build the configuration
let gossipsub_metrics = ctx
.gossipsub_registry
.map(|registry| (registry, GossipsubMetricsConfig::default()));

// Build and configure the Gossipsub behaviour
let snappy_transform = SnappyTransform::new(config.gs_config.max_transmit_size());
let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform(
MessageAuthenticity::Anonymous,
config.gs_config.clone(),
None, // No metrics for the time being
gossipsub_metrics,
filter,
snappy_transform,
)
Expand All @@ -246,7 +251,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

let thresholds = lighthouse_gossip_thresholds();

let score_settings = PeerScoreSettings::new(chain_spec, &config.gs_config);
let score_settings = PeerScoreSettings::new(ctx.chain_spec, &config.gs_config);

// Prepare scoring parameters
let params = score_settings.get_peer_score_params(
Expand All @@ -267,14 +272,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

let peer_manager_cfg = PeerManagerCfg {
discovery_enabled: !config.disable_discovery,
metrics_enabled: config.metrics_enabled,
target_peer_count: config.target_peers,
..Default::default()
};

Ok(Behaviour {
// Sub-behaviours
gossipsub,
eth2_rpc: RPC::new(fork_context.clone(), log.clone()),
eth2_rpc: RPC::new(ctx.fork_context.clone(), log.clone()),
discovery,
identify: Identify::new(identify_config),
// Auxiliary fields
Expand All @@ -287,7 +293,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
network_dir: config.network_dir.clone(),
log: behaviour_log,
score_settings,
fork_context,
fork_context: ctx.fork_context,
update_gossipsub_scores,
})
}
Expand Down Expand Up @@ -393,14 +399,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.remove(&topic);

// unsubscribe from the topic
let topic: Topic = topic.into();
let libp2p_topic: Topic = topic.clone().into();

match self.gossipsub.unsubscribe(&topic) {
match self.gossipsub.unsubscribe(&libp2p_topic) {
Err(_) => {
warn!(self.log, "Failed to unsubscribe from topic"; "topic" => %topic);
warn!(self.log, "Failed to unsubscribe from topic"; "topic" => %libp2p_topic);
false
}
Ok(v) => {
// Inform the network
debug!(self.log, "Unsubscribed to topic"; "topic" => %topic);
v
}
Expand Down Expand Up @@ -732,6 +739,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

/// Convenience function to propagate a request.
fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) {
// Increment metrics
match &request {
Request::Status(_) => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["status"])
}
Request::BlocksByRange { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_range"])
}
Request::BlocksByRoot { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"])
}
}
self.add_event(BehaviourEvent::RequestReceived {
peer_id,
id,
Expand Down
Loading

0 comments on commit 81c667b

Please sign in to comment.