From bb1d23bb8ea23a1b70ad81c5280763e3bda2fa4f Mon Sep 17 00:00:00 2001 From: Stefan Date: Fri, 4 Oct 2024 14:05:26 +0200 Subject: [PATCH] network-libp2p: Migrate AutoNAT v1 protocol to v2 --- network-libp2p/src/autonat.rs | 97 +++++++++++++++++++ network-libp2p/src/behaviour.rs | 21 ++-- .../src/connection_pool/behaviour.rs | 9 +- network-libp2p/src/discovery/handler.rs | 27 +++++- network-libp2p/src/lib.rs | 3 + network-libp2p/src/network_types.rs | 3 + network-libp2p/src/swarm.rs | 53 +++++++--- network-libp2p/tests/network.rs | 2 +- 8 files changed, 184 insertions(+), 31 deletions(-) create mode 100644 network-libp2p/src/autonat.rs diff --git a/network-libp2p/src/autonat.rs b/network-libp2p/src/autonat.rs new file mode 100644 index 0000000000..12e4b78f7f --- /dev/null +++ b/network-libp2p/src/autonat.rs @@ -0,0 +1,97 @@ +use std::collections::{HashMap, HashSet}; + +use libp2p::Multiaddr; + +#[derive(Copy, Clone, Debug, Default, PartialEq)] +pub(crate) enum NatStatus { + Public, + Private, + #[default] + Unknown, +} + +#[derive(Default)] +pub(crate) struct NatState { + confirmed_addresses: HashSet, + listen_address_status: HashMap, + status: NatStatus, +} + +impl NatState { + pub fn add_listen_address(&mut self, address: Multiaddr) { + self.listen_address_status + .insert(address, NatStatus::Unknown); + } + + pub fn remove_listen_address(&mut self, address: &Multiaddr) { + self.listen_address_status.remove(address); + self.confirmed_addresses.remove(address); + self.update_state(); + } + + pub fn set_listen_address_nat_status(&mut self, address: Multiaddr, nat_status: NatStatus) { + if let Some(status) = self.listen_address_status.get_mut(&address) { + *status = nat_status; + + if nat_status == NatStatus::Public { + self.confirmed_addresses.insert(address); + } else { + self.confirmed_addresses.remove(&address); + } + self.update_state(); + } + } + + pub fn add_confirmed_address(&mut self, address: Multiaddr) { + if let Some(address_status) = self.listen_address_status.get_mut(&address) { + *address_status = NatStatus::Public; + + self.confirmed_addresses.insert(address); + self.update_state(); + } + } + + pub fn remove_confirmed_address(&mut self, address: &Multiaddr) { + if let Some(address_status) = self.listen_address_status.get_mut(address) { + *address_status = NatStatus::Private; + + self.confirmed_addresses.remove(address); + self.update_state(); + } + } + + fn update_state(&mut self) { + let old_nat_status = self.status; + + if !self.confirmed_addresses.is_empty() { + self.status = NatStatus::Public; + } else if self + .listen_address_status + .iter() + .all(|(_, status)| *status == NatStatus::Private) + { + self.status = NatStatus::Private; + } else { + self.status = NatStatus::Unknown + } + + Self::handle_new_status(&old_nat_status, &self.status); + } + + fn handle_new_status(old_nat_status: &NatStatus, new_nat_status: &NatStatus) { + if old_nat_status == new_nat_status { + return; + } + + if *new_nat_status == NatStatus::Private { + log::warn!("Couldn't detect a public reachable address. Validator network operations won't be possible"); + log::warn!("You may need to find a relay to enable validator network operations"); + } else if *new_nat_status == NatStatus::Public { + log::info!( + ?old_nat_status, + ?new_nat_status, + "NAT status changed and detected public reachable address. Validator network operations will be possible" + ); + } + } +} diff --git a/network-libp2p/src/behaviour.rs b/network-libp2p/src/behaviour.rs index d83a9b7367..700db86291 100644 --- a/network-libp2p/src/behaviour.rs +++ b/network-libp2p/src/behaviour.rs @@ -1,13 +1,15 @@ use std::{iter, sync::Arc}; use libp2p::{ - autonat, connection_limits, gossipsub, + autonat::v2::{self as autonat, client::Config as AutonatConfig}, + connection_limits, gossipsub, kad::{self, store::MemoryStore}, ping, request_response, swarm::NetworkBehaviour, Multiaddr, PeerId, StreamProtocol, }; use parking_lot::RwLock; +use rand::rngs::OsRng; use crate::{ connection_pool, @@ -32,9 +34,10 @@ pub struct Behaviour { pub connection_limits: connection_limits::Behaviour, pub pool: connection_pool::Behaviour, pub discovery: discovery::Behaviour, + pub autonat_server: autonat::server::Behaviour, + pub autonat_client: autonat::client::Behaviour, pub dht: kad::Behaviour, pub gossipsub: gossipsub::Behaviour, - pub autonat: autonat::Behaviour, pub ping: ping::Behaviour, pub request_response: request_response::Behaviour, } @@ -96,12 +99,11 @@ impl Behaviour { req_res_config, ); - // Autonat behaviour - let mut autonat_config = autonat::Config::default(); - if config.autonat_allow_non_global_ips { - autonat_config.only_global_ips = false; - } - let autonat = autonat::Behaviour::new(peer_id, autonat_config); + // AutoNAT server behaviour + let autonat_server = autonat::server::Behaviour::new(OsRng); + + // AutoNAT client behaviour + let autonat_client = autonat::client::Behaviour::new(OsRng, AutonatConfig::default()); // Connection limits behaviour let limits = connection_limits::ConnectionLimits::default() @@ -119,7 +121,8 @@ impl Behaviour { ping, pool, request_response, - autonat, + autonat_client, + autonat_server, connection_limits, } } diff --git a/network-libp2p/src/connection_pool/behaviour.rs b/network-libp2p/src/connection_pool/behaviour.rs index 5c8f08a242..d716550fbd 100644 --- a/network-libp2p/src/connection_pool/behaviour.rs +++ b/network-libp2p/src/connection_pool/behaviour.rs @@ -649,15 +649,16 @@ impl Behaviour { self.addresses.mark_failed(address.clone()); } - // Ignore connection if another connection to this peer already exists. + // Ignore connection if too many other connections to this peer already exists. + // Multiple connections with a peer are necessary as AutoNAT probes run over them. // TODO Do we still want to subject it to the IP limit checks? - if other_established > 0 { + if other_established > 3 { debug!( %peer_id, connections = other_established, - "Already have connections established to this peer", + "Already have too many connections established to this peer", ); - // We have more than one connection to the same peer. Deterministically + // We have more than three connections to the same peer. Deterministically // choose which connection to close: close the connection only if the // other peer ID is less than our own peer ID value. // Note: We don't track all of the connection IDs and if the latest diff --git a/network-libp2p/src/discovery/handler.rs b/network-libp2p/src/discovery/handler.rs index 55b7320451..7a04d03c58 100644 --- a/network-libp2p/src/discovery/handler.rs +++ b/network-libp2p/src/discovery/handler.rs @@ -1,4 +1,5 @@ use std::{ + collections::{HashSet, VecDeque}, pin::Pin, sync::Arc, task::{Context, Poll, Waker}, @@ -14,10 +15,11 @@ use libp2p::{ swarm::{ handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ProtocolSupport, }, ConnectionHandler, ConnectionHandlerEvent, Stream, SubstreamProtocol, }, - Multiaddr, PeerId, + Multiaddr, PeerId, StreamProtocol, }; use nimiq_hash::Blake2bHash; use nimiq_network_interface::peer_info::Services; @@ -34,6 +36,7 @@ use super::{ peer_contacts::{PeerContactBook, SignedPeerContact}, protocol::{ChallengeNonce, DiscoveryMessage, DiscoveryProtocol}, }; +use crate::{AUTONAT_DIAL_BACK_PROTOCOL, AUTONAT_DIAL_REQUEST_PROTOCOL}; #[derive(Debug)] pub enum HandlerOutEvent { @@ -163,6 +166,9 @@ pub struct Handler { /// Waker used when opening a substream. waker: Option, + + /// Events to inform its behaviour or all other ConnectionHandlers + events: VecDeque>, } impl Handler { @@ -196,6 +202,7 @@ impl Handler { inbound: None, outbound: None, waker: None, + events: VecDeque::new(), } } @@ -249,6 +256,18 @@ impl Handler { .wake(); } } + + /// Report to all the ConnectionHandlers that the remote peer supports the AutoNAT V2 client and server protocols + fn report_remote_autonat_support(&mut self) { + let mut stream_protocols = HashSet::new(); + stream_protocols.insert(StreamProtocol::new(AUTONAT_DIAL_REQUEST_PROTOCOL)); + stream_protocols.insert(StreamProtocol::new(AUTONAT_DIAL_BACK_PROTOCOL)); + + self.events + .push_back(ConnectionHandlerEvent::ReportRemoteProtocols( + ProtocolSupport::Added(stream_protocols), + )); + } } /// Extract the `/ip4/`,`/ip6/`, `/dns4/` or `/dns6/` protocol part from a `Multiaddr` @@ -298,6 +317,7 @@ impl ConnectionHandler for Handler { } self.inbound = Some(protocol); self.check_initialized(); + self.report_remote_autonat_support(); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, .. @@ -310,6 +330,7 @@ impl ConnectionHandler for Handler { } self.outbound = Some(protocol); self.check_initialized(); + self.report_remote_autonat_support(); } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { error!(%error, "inject_dial_upgrade_error"); @@ -749,6 +770,10 @@ impl ConnectionHandler for Handler { } } + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + // If we've left the loop, we're waiting on something. Poll::Pending } diff --git a/network-libp2p/src/lib.rs b/network-libp2p/src/lib.rs index edddbb76fb..6782864674 100644 --- a/network-libp2p/src/lib.rs +++ b/network-libp2p/src/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; +mod autonat; mod behaviour; mod config; mod connection_pool; @@ -18,6 +19,8 @@ mod utils; pub const DISCOVERY_PROTOCOL: &str = "/nimiq/discovery/0.0.1"; pub const DHT_PROTOCOL: &str = "/nimiq/kad/0.0.1"; +pub const AUTONAT_DIAL_REQUEST_PROTOCOL: &str = "/libp2p/autonat/2/dial-request"; +pub const AUTONAT_DIAL_BACK_PROTOCOL: &str = "/libp2p/autonat/2/dial-back"; pub use config::{Config, TlsConfig}; pub use error::NetworkError; diff --git a/network-libp2p/src/network_types.rs b/network-libp2p/src/network_types.rs index 54844c7f43..493c83e9a2 100644 --- a/network-libp2p/src/network_types.rs +++ b/network-libp2p/src/network_types.rs @@ -23,6 +23,7 @@ use thiserror::Error; use tokio::sync::{mpsc, oneshot}; use crate::{ + autonat::NatState, dispatch::codecs::{IncomingRequest, OutgoingResponse}, rate_limiting::RequestRateLimitData, NetworkError, @@ -238,6 +239,8 @@ pub(crate) struct TaskState { pub(crate) dht_bootstrap_state: DhtBootStrapState, /// DHT (kad) is in server mode pub(crate) dht_server_mode: bool, + /// The NAT status of the local peer + pub(crate) nat_status: NatState, /// Senders per `OutboundRequestId` for request-response pub(crate) requests: HashMap>>, /// Time spent per `OutboundRequestId` for request-response diff --git a/network-libp2p/src/swarm.rs b/network-libp2p/src/swarm.rs index 161818155a..3e276d88c4 100644 --- a/network-libp2p/src/swarm.rs +++ b/network-libp2p/src/swarm.rs @@ -6,7 +6,7 @@ use instant::Instant; #[cfg(all(target_family = "wasm", not(feature = "tokio-websocket")))] use libp2p::websocket_websys; use libp2p::{ - autonat::{self, OutboundFailure}, + autonat::OutboundFailure, core::{ self, muxing::StreamMuxerBox, @@ -42,6 +42,7 @@ use tokio::sync::{broadcast, mpsc}; #[cfg(feature = "metrics")] use crate::network_metrics::NetworkMetrics; use crate::{ + autonat::NatStatus, behaviour, discovery::{behaviour::Event, peer_contacts::PeerContactBook}, network_types::{ @@ -395,26 +396,46 @@ fn handle_event( swarm .behaviour_mut() .discovery - .add_own_addresses([address].to_vec()); + .add_own_addresses([address.clone()].to_vec()); + state.nat_status.add_listen_address(address); + } + + SwarmEvent::ListenerClosed { + listener_id: _, + addresses, + reason: _, + } => { + addresses.iter().for_each(|address| { + state.nat_status.remove_listen_address(address); + }); + } + + SwarmEvent::ExternalAddrConfirmed { address } => { + log::trace!(%address, "Address is confirmed and externally reachable"); + state.nat_status.add_confirmed_address(address); + } + + SwarmEvent::ExternalAddrExpired { address } => { + log::trace!(%address, "External address is expired and no longer externally reachable"); + state.nat_status.remove_confirmed_address(&address); } SwarmEvent::Behaviour(event) => { match event { - behaviour::BehaviourEvent::Autonat(event) => match event { - autonat::Event::InboundProbe(event) => { - log::trace!(?event, "Autonat inbound probe"); - } - autonat::Event::OutboundProbe(event) => { - log::trace!(?event, "Autonat outbound probe"); - } - autonat::Event::StatusChanged { old, new } => { - log::debug!(?old, ?new, "Autonat status changed"); - if new == autonat::NatStatus::Private { - log::warn!("Couldn't detect a public reachable address. Validator network operations won't be possible"); - log::warn!("You may need to find a relay to enable validator network operations"); - } + behaviour::BehaviourEvent::AutonatClient(event) => { + log::trace!(?event, "AutoNAT outbound probe"); + match event.result { + Ok(_) => state + .nat_status + .set_listen_address_nat_status(event.tested_addr, NatStatus::Public), + Err(_) => state + .nat_status + .set_listen_address_nat_status(event.tested_addr, NatStatus::Private), } - }, + } + behaviour::BehaviourEvent::AutonatServer(event) => { + log::trace!(?event, "AutoNAT inbound probe"); + } behaviour::BehaviourEvent::ConnectionLimits(_) => {} behaviour::BehaviourEvent::Dht(event) => { match event { diff --git a/network-libp2p/tests/network.rs b/network-libp2p/tests/network.rs index 1481486d2d..c8986603e4 100644 --- a/network-libp2p/tests/network.rs +++ b/network-libp2p/tests/network.rs @@ -317,7 +317,7 @@ async fn create_network_with_n_peers(n_peers: usize) -> Vec { #[test(tokio::test)] async fn connections_stress_and_reconnect() { - let peers: usize = 10; + let peers: usize = 5; let networks = create_network_with_n_peers(peers).await; assert_eq!(peers, networks.len());