Skip to content

Commit

Permalink
network-libp2p: Migrate AutoNAT v1 protocol to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Eligioo committed Oct 8, 2024
1 parent 4ed3515 commit 82e95d9
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 31 deletions.
97 changes: 97 additions & 0 deletions network-libp2p/src/autonat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::collections::{HashMap, HashSet};

use libp2p::Multiaddr;

#[derive(Clone, Debug, Default, PartialEq)]
pub(crate) enum NatStatus {
Public,
Private,
#[default]
Unknown,
}

#[derive(Default)]
pub(crate) struct NatState {
confirmed_addresses: HashSet<Multiaddr>,
listen_address_status: HashMap<Multiaddr, NatStatus>,
status: NatStatus,
}

impl NatState {
pub fn add_listen_address(&mut self, address: Multiaddr) {
self.listen_address_status
.insert(address, NatStatus::Unknown);
}

pub fn remove_listen_address(&mut self, address: &Multiaddr) {
self.listen_address_status.remove(address);
self.confirmed_addresses.remove(address);
self.update_state();
}

pub fn set_listen_address_nat_status(&mut self, address: Multiaddr, nat_status: NatStatus) {
if let Some(status) = self.listen_address_status.get_mut(&address) {
*status = nat_status.clone();

if nat_status == NatStatus::Public {
self.confirmed_addresses.insert(address);
} else {
self.confirmed_addresses.remove(&address);
}
self.update_state();
}
}

pub fn add_confirmed_address(&mut self, address: Multiaddr) {
if let Some(address_status) = self.listen_address_status.get_mut(&address) {
*address_status = NatStatus::Public;

self.confirmed_addresses.insert(address);
self.update_state();
}
}

pub fn remove_confirmed_address(&mut self, address: &Multiaddr) {
if let Some(address_status) = self.listen_address_status.get_mut(address) {
*address_status = NatStatus::Private;

self.confirmed_addresses.remove(address);
self.update_state();
}
}

fn update_state(&mut self) {
let old_nat_status = self.status.clone();

if !self.confirmed_addresses.is_empty() {
self.status = NatStatus::Public;
} else if self
.listen_address_status
.iter()
.all(|(_, status)| *status == NatStatus::Private)
{
self.status = NatStatus::Private;
} else {
self.status = NatStatus::Unknown
}

Self::handle_new_status(&old_nat_status, &self.status);
}

fn handle_new_status(old_nat_status: &NatStatus, new_nat_status: &NatStatus) {
if old_nat_status == new_nat_status {
return;
}

if *new_nat_status == NatStatus::Private {
log::warn!("Couldn't detect a public reachable address. Validator network operations won't be possible");
log::warn!("You may need to find a relay to enable validator network operations");
} else if *new_nat_status == NatStatus::Public {
log::info!(
?old_nat_status,
?new_nat_status,
"NAT status changed and detected public reachable address. Validator network operations will be possible"
);
}
}
}
21 changes: 12 additions & 9 deletions network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{iter, sync::Arc};

use libp2p::{
autonat, connection_limits, gossipsub,
autonat::v2::{self as autonat, client::Config as AutonatConfig},
connection_limits, gossipsub,
kad::{self, store::MemoryStore},
ping, request_response,
swarm::NetworkBehaviour,
Multiaddr, PeerId, StreamProtocol,
};
use parking_lot::RwLock;
use rand::rngs::OsRng;

use crate::{
connection_pool,
Expand All @@ -32,9 +34,10 @@ pub struct Behaviour {
pub connection_limits: connection_limits::Behaviour,
pub pool: connection_pool::Behaviour,
pub discovery: discovery::Behaviour,
pub autonat_server: autonat::server::Behaviour,
pub autonat_client: autonat::client::Behaviour,
pub dht: kad::Behaviour<MemoryStore>,
pub gossipsub: gossipsub::Behaviour,
pub autonat: autonat::Behaviour,
pub ping: ping::Behaviour,
pub request_response: request_response::Behaviour<MessageCodec>,
}
Expand Down Expand Up @@ -96,12 +99,11 @@ impl Behaviour {
req_res_config,
);

// Autonat behaviour
let mut autonat_config = autonat::Config::default();
if config.autonat_allow_non_global_ips {
autonat_config.only_global_ips = false;
}
let autonat = autonat::Behaviour::new(peer_id, autonat_config);
// AutoNAT server behaviour
let autonat_server = autonat::server::Behaviour::new(OsRng);

// AutoNAT client behaviour
let autonat_client = autonat::client::Behaviour::new(OsRng, AutonatConfig::default());

// Connection limits behaviour
let limits = connection_limits::ConnectionLimits::default()
Expand All @@ -119,7 +121,8 @@ impl Behaviour {
ping,
pool,
request_response,
autonat,
autonat_client,
autonat_server,
connection_limits,
}
}
Expand Down
9 changes: 5 additions & 4 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,15 +649,16 @@ impl Behaviour {
self.addresses.mark_failed(address.clone());
}

// Ignore connection if another connection to this peer already exists.
// Ignore connection if too many other connections to this peer already exists.
// Multiple connections with a peer are necessary as AutoNAT probes run over them.
// TODO Do we still want to subject it to the IP limit checks?
if other_established > 0 {
if other_established > 3 {
debug!(
%peer_id,
connections = other_established,
"Already have connections established to this peer",
"Already have too many connections established to this peer",
);
// We have more than one connection to the same peer. Deterministically
// We have more than three connections to the same peer. Deterministically
// choose which connection to close: close the connection only if the
// other peer ID is less than our own peer ID value.
// Note: We don't track all of the connection IDs and if the latest
Expand Down
27 changes: 26 additions & 1 deletion network-libp2p/src/discovery/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::{HashSet, VecDeque},
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
Expand All @@ -14,10 +15,11 @@ use libp2p::{
swarm::{
handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ProtocolSupport,
},
ConnectionHandler, ConnectionHandlerEvent, Stream, SubstreamProtocol,
},
Multiaddr, PeerId,
Multiaddr, PeerId, StreamProtocol,
};
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::peer_info::Services;
Expand All @@ -34,6 +36,7 @@ use super::{
peer_contacts::{PeerContactBook, SignedPeerContact},
protocol::{ChallengeNonce, DiscoveryMessage, DiscoveryProtocol},
};
use crate::{AUTONAT_DIAL_BACK_PROTOCOL, AUTONAT_DIAL_REQUEST_PROTOCOL};

#[derive(Debug)]
pub enum HandlerOutEvent {
Expand Down Expand Up @@ -163,6 +166,9 @@ pub struct Handler {

/// Waker used when opening a substream.
waker: Option<Waker>,

/// Events to inform its behaviour or all other ConnectionHandlers
events: VecDeque<ConnectionHandlerEvent<DiscoveryProtocol, (), HandlerOutEvent>>,
}

impl Handler {
Expand Down Expand Up @@ -196,6 +202,7 @@ impl Handler {
inbound: None,
outbound: None,
waker: None,
events: VecDeque::new(),
}
}

Expand Down Expand Up @@ -249,6 +256,18 @@ impl Handler {
.wake();
}
}

/// Report to all the ConnectionHandlers that the remote peer supports the AutoNAT V2 client and server protocols
fn report_remote_autonat_support(&mut self) {
let mut stream_protocols = HashSet::new();
stream_protocols.insert(StreamProtocol::new(AUTONAT_DIAL_REQUEST_PROTOCOL));
stream_protocols.insert(StreamProtocol::new(AUTONAT_DIAL_BACK_PROTOCOL));

self.events
.push_back(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Added(stream_protocols),
));
}
}

/// Extract the `/ip4/`,`/ip6/`, `/dns4/` or `/dns6/` protocol part from a `Multiaddr`
Expand Down Expand Up @@ -298,6 +317,7 @@ impl ConnectionHandler for Handler {
}
self.inbound = Some(protocol);
self.check_initialized();
self.report_remote_autonat_support();
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol, ..
Expand All @@ -310,6 +330,7 @@ impl ConnectionHandler for Handler {
}
self.outbound = Some(protocol);
self.check_initialized();
self.report_remote_autonat_support();
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
error!(%error, "inject_dial_upgrade_error");
Expand Down Expand Up @@ -749,6 +770,10 @@ impl ConnectionHandler for Handler {
}
}

if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}

// If we've left the loop, we're waiting on something.
Poll::Pending
}
Expand Down
3 changes: 3 additions & 0 deletions network-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[macro_use]
extern crate log;

mod autonat;
mod behaviour;
mod config;
mod connection_pool;
Expand All @@ -18,6 +19,8 @@ mod utils;

pub const DISCOVERY_PROTOCOL: &str = "/nimiq/discovery/0.0.1";
pub const DHT_PROTOCOL: &str = "/nimiq/kad/0.0.1";
pub const AUTONAT_DIAL_REQUEST_PROTOCOL: &str = "/libp2p/autonat/2/dial-request";
pub const AUTONAT_DIAL_BACK_PROTOCOL: &str = "/libp2p/autonat/2/dial-back";

pub use config::{Config, TlsConfig};
pub use error::NetworkError;
Expand Down
3 changes: 3 additions & 0 deletions network-libp2p/src/network_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use thiserror::Error;
use tokio::sync::{mpsc, oneshot};

use crate::{
autonat::NatState,
dispatch::codecs::{IncomingRequest, OutgoingResponse},
rate_limiting::RequestRateLimitData,
NetworkError,
Expand Down Expand Up @@ -238,6 +239,8 @@ pub(crate) struct TaskState {
pub(crate) dht_bootstrap_state: DhtBootStrapState,
/// DHT (kad) is in server mode
pub(crate) dht_server_mode: bool,
/// The NAT status of the local peer
pub(crate) nat_status: NatState,
/// Senders per `OutboundRequestId` for request-response
pub(crate) requests: HashMap<OutboundRequestId, oneshot::Sender<Result<Bytes, RequestError>>>,
/// Time spent per `OutboundRequestId` for request-response
Expand Down
53 changes: 37 additions & 16 deletions network-libp2p/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use instant::Instant;
#[cfg(all(target_family = "wasm", not(feature = "tokio-websocket")))]
use libp2p::websocket_websys;
use libp2p::{
autonat::{self, OutboundFailure},
autonat::OutboundFailure,
core::{
self,
muxing::StreamMuxerBox,
Expand Down Expand Up @@ -42,6 +42,7 @@ use tokio::sync::{broadcast, mpsc};
#[cfg(feature = "metrics")]
use crate::network_metrics::NetworkMetrics;
use crate::{
autonat::NatStatus,
behaviour,
discovery::{behaviour::Event, peer_contacts::PeerContactBook},
network_types::{
Expand Down Expand Up @@ -395,26 +396,46 @@ fn handle_event(
swarm
.behaviour_mut()
.discovery
.add_own_addresses([address].to_vec());
.add_own_addresses([address.clone()].to_vec());
state.nat_status.add_listen_address(address);
}

SwarmEvent::ListenerClosed {
listener_id: _,
addresses,
reason: _,
} => {
addresses.iter().for_each(|address| {
state.nat_status.remove_listen_address(address);
});
}

SwarmEvent::ExternalAddrConfirmed { address } => {
log::trace!(%address, "Address is confirmed and externally reachable");
state.nat_status.add_confirmed_address(address);
}

SwarmEvent::ExternalAddrExpired { address } => {
log::trace!(%address, "External address is expired and no longer externally reachable");
state.nat_status.remove_confirmed_address(&address);
}

SwarmEvent::Behaviour(event) => {
match event {
behaviour::BehaviourEvent::Autonat(event) => match event {
autonat::Event::InboundProbe(event) => {
log::trace!(?event, "Autonat inbound probe");
}
autonat::Event::OutboundProbe(event) => {
log::trace!(?event, "Autonat outbound probe");
}
autonat::Event::StatusChanged { old, new } => {
log::debug!(?old, ?new, "Autonat status changed");
if new == autonat::NatStatus::Private {
log::warn!("Couldn't detect a public reachable address. Validator network operations won't be possible");
log::warn!("You may need to find a relay to enable validator network operations");
}
behaviour::BehaviourEvent::AutonatClient(event) => {
log::trace!(?event, "AutoNAT outbound probe");
match event.result {
Ok(_) => state
.nat_status
.set_listen_address_nat_status(event.tested_addr, NatStatus::Public),
Err(_) => state
.nat_status
.set_listen_address_nat_status(event.tested_addr, NatStatus::Private),
}
},
}
behaviour::BehaviourEvent::AutonatServer(event) => {
log::trace!(?event, "AutoNAT inbound probe");
}
behaviour::BehaviourEvent::ConnectionLimits(_) => {}
behaviour::BehaviourEvent::Dht(event) => {
match event {
Expand Down
2 changes: 1 addition & 1 deletion network-libp2p/tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ async fn create_network_with_n_peers(n_peers: usize) -> Vec<Network> {

#[test(tokio::test)]
async fn connections_stress_and_reconnect() {
let peers: usize = 10;
let peers: usize = 5;
let networks = create_network_with_n_peers(peers).await;

assert_eq!(peers, networks.len());
Expand Down

0 comments on commit 82e95d9

Please sign in to comment.