Skip to content

Commit

Permalink
Small network adjustments (sigp#1884)
Browse files Browse the repository at this point in the history
## Issue Addressed

- Asymmetric pings - Currently with symmetric ping intervals, lighthouse nodes race each other to ping often ending in simultaneous ping connections. This shifts the ping interval to be asymmetric based on inbound/outbound connections
- Correct inbound/outbound peer-db registering - It appears we were accounting inbound as outbound and vice versa in the peerdb, this has been corrected
- Improved logging

There is likely more to come - I'll leave this open as we investigate further testnets
  • Loading branch information
AgeManning committed Nov 13, 2020
1 parent 8772c02 commit c00e6c2
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 40 deletions.
6 changes: 5 additions & 1 deletion beacon_node/eth2_libp2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,17 @@ pub struct Config {
/// Attempt to construct external port mappings with UPnP.
pub upnp_enabled: bool,

/// Subscribe to all subnets for the duration of the runtime.
pub subscribe_all_subnets: bool,

/// List of extra topics to initially subscribe to as strings.
pub topics: Vec<GossipKind>,
}

impl Default for Config {
/// Generate a default network configuration.
fn default() -> Self {
// WARNING: this directory default should be always overrided with parameters
// WARNING: this directory default should be always overwritten with parameters
// from cli for specific networks.
let network_dir = dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
Expand Down Expand Up @@ -181,6 +184,7 @@ impl Default for Config {
client_version: lighthouse_version::version_with_platform(),
disable_discovery: false,
upnp_enabled: true,
subscribe_all_subnets: false,
topics: Vec::new(),
}
}
Expand Down
71 changes: 56 additions & 15 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::Stream;
use hashset_delay::HashSetDelay;
use libp2p::core::multiaddr::Protocol as MProtocol;
use libp2p::identify::IdentifyInfo;
use slog::{crit, debug, error};
use slog::{crit, debug, error, warn};
use smallvec::SmallVec;
use std::{
net::SocketAddr,
Expand Down Expand Up @@ -40,7 +40,11 @@ use std::collections::HashMap;
const STATUS_INTERVAL: u64 = 300;
/// The time in seconds between PING events. We do not send a ping if the other peer has PING'd us
/// within this time frame (Seconds)
const PING_INTERVAL: u64 = 30;
/// This is asymmetric to avoid simultaneous pings.
/// The interval for outbound connections.
const PING_INTERVAL_OUTBOUND: u64 = 30;
/// The interval for inbound connections.
const PING_INTERVAL_INBOUND: u64 = 35;

/// The heartbeat performs regular updates such as updating reputations and performing discovery
/// requests. This defines the interval in seconds.
Expand All @@ -61,8 +65,10 @@ pub struct PeerManager<TSpec: EthSpec> {
network_globals: Arc<NetworkGlobals<TSpec>>,
/// A queue of events that the `PeerManager` is waiting to produce.
events: SmallVec<[PeerManagerEvent; 16]>,
/// A collection of peers awaiting to be Ping'd.
ping_peers: HashSetDelay<PeerId>,
/// A collection of inbound-connected peers awaiting to be Ping'd.
inbound_ping_peers: HashSetDelay<PeerId>,
/// A collection of outbound-connected peers awaiting to be Ping'd.
outbound_ping_peers: HashSetDelay<PeerId>,
/// A collection of peers awaiting to be Status'd.
status_peers: HashSetDelay<PeerId>,
/// The target number of peers we would like to connect to.
Expand Down Expand Up @@ -112,7 +118,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Ok(PeerManager {
network_globals,
events: SmallVec::new(),
ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)),
inbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_INBOUND)),
outbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_OUTBOUND)),
status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)),
target_peers: config.target_peers,
max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize,
Expand Down Expand Up @@ -203,6 +210,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {

/// A request to find peers on a given subnet.
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
// If discovery is not started or disabled, ignore the request
if !self.discovery.started {
return;
}

let filtered: Vec<SubnetDiscovery> = subnets_to_discover
.into_iter()
.filter(|s| {
Expand Down Expand Up @@ -263,7 +275,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.notify_disconnect(peer_id);

// remove the ping and status timer for the peer
self.ping_peers.remove(peer_id);
self.inbound_ping_peers.remove(peer_id);
self.outbound_ping_peers.remove(peer_id);
self.status_peers.remove(peer_id);
}

Expand Down Expand Up @@ -410,7 +423,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// received a ping
// reset the to-ping timer for this peer
debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq);
self.ping_peers.insert(peer_id.clone());
match peer_info.connection_direction {
Some(ConnectionDirection::Incoming) => {
self.inbound_ping_peers.insert(peer_id.clone());
}
Some(ConnectionDirection::Outgoing) => {
self.outbound_ping_peers.insert(peer_id.clone());
}
None => {
warn!(self.log, "Received a ping from a peer with an unknown connection direction"; "peer_id" => %peer_id);
}
}

// if the sequence number is unknown send an update the meta data of the peer.
if let Some(meta_data) = &peer_info.meta_data {
Expand Down Expand Up @@ -656,16 +679,19 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
return true;
}
ConnectingType::IngoingConnected { multiaddr } => {
peerdb.connect_outgoing(peer_id, multiaddr, enr)
peerdb.connect_ingoing(peer_id, multiaddr, enr);
// start a timer to ping inbound peers.
self.inbound_ping_peers.insert(peer_id.clone());
}
ConnectingType::OutgoingConnected { multiaddr } => {
peerdb.connect_ingoing(peer_id, multiaddr, enr)
peerdb.connect_outgoing(peer_id, multiaddr, enr);
// start a timer for to ping outbound peers.
self.outbound_ping_peers.insert(peer_id.clone());
}
}
}

// start a ping and status timer for the peer
self.ping_peers.insert(peer_id.clone());
self.status_peers.insert(peer_id.clone());

// increment prometheus metrics
Expand Down Expand Up @@ -833,8 +859,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let peer_count = self.network_globals.connected_or_dialing_peers();
if peer_count < self.target_peers {
// If we need more peers, queue a discovery lookup.
debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers);
self.discovery.discover_peers();
if self.discovery.started {
debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers);
self.discovery.discover_peers();
}
}

// Updates peer's scores.
Expand Down Expand Up @@ -892,13 +920,26 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {

// poll the timeouts for pings and status'
loop {
match self.ping_peers.poll_next_unpin(cx) {
match self.inbound_ping_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.inbound_ping_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Ping(peer_id));
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for inbound peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
}

loop {
match self.outbound_ping_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.ping_peers.insert(peer_id.clone());
self.outbound_ping_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Ping(peer_id));
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
error!(self.log, "Failed to check for outbound peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
});

// Ban the peer if the score is not already low enough.
match info.score().state() {
match info.score_state() {
ScoreState::Banned => {}
_ => {
// If score isn't low enough to ban, this function has been called incorrectly.
Expand Down Expand Up @@ -522,7 +522,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
return Err("Unbanning peer that is not banned");
}

if let ScoreState::Banned = info.score().state() {
if let ScoreState::Banned = info.score_state() {
return Err("Attempted to unban (connection status) a banned peer");
}

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/eth2_libp2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ where
let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) {
info
} else {
warn!(self.log, "Stream has expired. Response not sent";
"response" => response.to_string(), "id" => inbound_id);
warn!(self.log, "Inbound stream has expired, response not sent";
"response" => response.to_string(), "id" => inbound_id, "msg" => "Likely too many resources, reduce peer count");
return;
};

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<TSpec: EthSpec> RPC<TSpec> {
Duration::from_secs(10),
)
.build()
.unwrap();
.expect("Configuration parameters are valid");
RPC {
limiter,
events: Vec::new(),
Expand Down
8 changes: 5 additions & 3 deletions beacon_node/eth2_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use libp2p::{
swarm::{SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
};
use slog::{crit, debug, info, o, trace, warn};
use slog::{crit, debug, info, o, trace, warn, Logger};
use ssz::Decode;
use std::fs::File;
use std::io::prelude::*;
Expand Down Expand Up @@ -53,15 +53,15 @@ pub struct Service<TSpec: EthSpec> {
pub local_peer_id: PeerId,

/// The libp2p logger handle.
pub log: slog::Logger,
pub log: Logger,
}

impl<TSpec: EthSpec> Service<TSpec> {
pub async fn new(
executor: task_executor::TaskExecutor,
config: &NetworkConfig,
enr_fork_id: EnrForkId,
log: &slog::Logger,
log: &Logger,
chain_spec: &ChainSpec,
) -> error::Result<(Arc<NetworkGlobals<TSpec>>, Self)> {
let log = log.new(o!("service"=> "libp2p"));
Expand Down Expand Up @@ -206,13 +206,15 @@ impl<TSpec: EthSpec> Service<TSpec> {
}

let mut subscribed_topics: Vec<GossipKind> = vec![];

for topic_kind in &config.topics {
if swarm.subscribe_kind(topic_kind.clone()) {
subscribed_topics.push(topic_kind.clone());
} else {
warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind));
}
}

if !subscribed_topics.is_empty() {
info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics));
}
Expand Down
46 changes: 33 additions & 13 deletions beacon_node/network/src/attestation_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rand::seq::SliceRandom;
use slog::{debug, error, o, trace, warn};

use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::SubnetDiscovery;
use eth2_libp2p::{NetworkConfig, SubnetDiscovery};
use hashset_delay::HashSetDelay;
use slot_clock::SlotClock;
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
Expand Down Expand Up @@ -89,14 +89,24 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// The waker for the current thread.
waker: Option<std::task::Waker>,

/// The discovery mechanism of lighthouse is disabled.
discovery_disabled: bool,

/// We are always subscribed to all subnets.
subscribe_all_subnets: bool,

/// The logger for the attestation service.
log: slog::Logger,
}

impl<T: BeaconChainTypes> AttestationService<T> {
/* Public functions */

pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self {
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig,
log: &slog::Logger,
) -> Self {
let log = log.new(o!("service" => "attestation_service"));

// calculate the random subnet duration from the spec constants
Expand Down Expand Up @@ -124,14 +134,20 @@ impl<T: BeaconChainTypes> AttestationService<T> {
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
known_validators: HashSetDelay::new(last_seen_val_timeout),
waker: None,
subscribe_all_subnets: config.subscribe_all_subnets,
discovery_disabled: config.disable_discovery,
log,
}
}

/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
#[cfg(test)]
pub fn subscription_count(&self) -> usize {
self.subscriptions.len()
if self.subscribe_all_subnets {
self.beacon_chain.spec.attestation_subnet_count as usize
} else {
self.subscriptions.len()
}
}

/// Processes a list of validator subscriptions.
Expand Down Expand Up @@ -186,7 +202,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if subscription.slot > *slot {
subnets_to_discover.insert(subnet_id, subscription.slot);
}
} else {
} else if !self.discovery_disabled {
subnets_to_discover.insert(subnet_id, subscription.slot);
}

Expand Down Expand Up @@ -218,13 +234,17 @@ impl<T: BeaconChainTypes> AttestationService<T> {
}
}

if let Err(e) = self.discover_peers_request(
subnets_to_discover
.into_iter()
.map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }),
) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
};
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.
if !self.discovery_disabled {
if let Err(e) = self.discover_peers_request(
subnets_to_discover
.into_iter()
.map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }),
) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
};
}

// pre-emptively wake the thread to check for new events
if let Some(waker) = &self.waker {
Expand Down Expand Up @@ -343,7 +363,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).

// Return if we already have a subscription for this subnet_id and slot
if self.unsubscriptions.contains(&exact_subnet) {
if self.unsubscriptions.contains(&exact_subnet) || self.subscribe_all_subnets {
return Ok(());
}

Expand All @@ -366,7 +386,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
///
/// This also updates the ENR to indicate our long-lived subscription to the subnet
fn add_known_validator(&mut self, validator_index: u64) {
if self.known_validators.get(&validator_index).is_none() {
if self.known_validators.get(&validator_index).is_none() && !self.subscribe_all_subnets {
// New validator has subscribed
// Subscribe to random topics and update the ENR if needed.

Expand Down
3 changes: 2 additions & 1 deletion beacon_node/network/src/attestation_service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ mod tests {

fn get_attestation_service() -> AttestationService<TestBeaconChainType> {
let log = get_logger();
let config = NetworkConfig::default();

let beacon_chain = CHAIN.chain.clone();

AttestationService::new(beacon_chain, &log)
AttestationService::new(beacon_chain, &config, &log)
}

fn get_subscription(
Expand Down
Loading

0 comments on commit c00e6c2

Please sign in to comment.