diff --git a/Cargo.lock b/Cargo.lock index cf1100a8a4cf..a3ca6acc5630 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4588,9 +4588,9 @@ dependencies = [ [[package]] name = "nybbles" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47dddada2357f8e7786f4f4d837db7bdddec02c7c3e5da7840d92c70390f6dc0" +checksum = "836816c354fb2c09622b54545a6f98416147346b13cc7eba5f92fab6b3042c93" dependencies = [ "alloy-rlp", "arbitrary", @@ -5852,6 +5852,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "reth-discv5" +version = "0.1.0-alpha.13" +dependencies = [ + "discv5", + "enr", + "futures-util", + "k256", + "parking_lot 0.12.1", + "secp256k1 0.27.0", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "reth-dns-discovery" version = "0.1.0-alpha.13" @@ -6124,6 +6139,7 @@ dependencies = [ "fnv", "futures", "humantime-serde", + "k256", "linked-hash-map", "linked_hash_set", "metrics", @@ -6132,6 +6148,7 @@ dependencies = [ "pprof", "rand 0.8.5", "reth-discv4", + "reth-discv5", "reth-dns-discovery", "reth-ecies", "reth-eth-wire", diff --git a/Cargo.toml b/Cargo.toml index 2d18c023d662..11cf48526f8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ reth-config = { path = "crates/config" } reth-consensus-common = { path = "crates/consensus/common" } reth-db = { path = "crates/storage/db" } reth-discv4 = { path = "crates/net/discv4" } +reth-discv5 = {path = "crates/net/discv5"} reth-dns-discovery = { path = "crates/net/dns" } reth-downloaders = { path = "crates/net/downloaders" } reth-ecies = { path = "crates/net/ecies" } diff --git a/crates/net/discv5/Cargo.toml b/crates/net/discv5/Cargo.toml new file mode 100644 index 000000000000..755a2a8306c0 --- /dev/null +++ b/crates/net/discv5/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "reth-discv5" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +discv5.workspace = true +tokio-stream.workspace = true +tokio.workspace = true +futures-util = "*" +thiserror.workspace = true +parking_lot = "0.12.1" +secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] } +k256 = "0.13.2" +enr = { workspace = true, features = ["rust-secp256k1"], optional = true } diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs new file mode 100644 index 000000000000..397548f0f15c --- /dev/null +++ b/crates/net/discv5/src/lib.rs @@ -0,0 +1,137 @@ +pub use discv5::{ + enr, enr::CombinedKey, service::Service, Config as Discv5Config, + ConfigBuilder as Discv5ConfigBuilder, Discv5, Enr, Event, +}; +use futures_util::StreamExt; +use k256::ecdsa::SigningKey; +use secp256k1::SecretKey; +use std::{ + default::Default, + fmt, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; + +// Wrapper struct for Discv5 +pub struct Discv5Handle { + inner: Discv5, +} + +impl Discv5Handle { + // Constructor to create a new Discv5Handle + pub fn new(discv5: Discv5) -> Self { + Discv5Handle { inner: discv5 } + } + + pub fn from_secret_key( + secret_key: SecretKey, + discv5_config: Discv5Config, + ) -> Result { + let secret_key_bytes = secret_key.as_ref(); + let signing_key = SigningKey::from_slice(secret_key_bytes) + .map_err(|_e| Discv5Error::SecretKeyDecode.into())?; + let enr_key = CombinedKey::Secp256k1(signing_key); + let enr = enr::EnrBuilder::new("v4") + .build(&enr_key) + .map_err(|_e| Discv5Error::EnrBuilderConstruct.into())?; + Ok(Discv5Handle::new( + Discv5::new(enr, enr_key, discv5_config) + .map_err(|_e| Discv5Error::Discv5Construct.into())?, + )) + } + + pub fn convert_to_discv5(&self) -> &Discv5 { + &self.inner + } + + pub async fn start_service(&mut self) -> Result<(), Discv5Error> { + self.inner.start().await.map_err(|_| Discv5Error::Discv5Construct.into()) + } + + pub async fn create_event_stream( + &mut self, + ) -> Result, Discv5Error> { + self.inner.event_stream().await.map_err(|_| Discv5Error::Discv5EventStreamStart.into()) + } +} + +impl fmt::Debug for Discv5Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Discv5Handle()") + } +} + +/// The default table filter that results in all nodes being accepted into the local routing table. +const fn allow_all_enrs(_enr: &Enr) -> bool { + true +} + +#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)] +#[allow(missing_docs)] +pub enum Discv5Error { + /// Failed to decode a key from a table. + #[error("failed to parse secret key to Signing Key")] + SecretKeyDecode, + /// Failed to construct a new EnrBuilder + #[error("failed to constuct a new EnrBuilder")] + EnrBuilderConstruct, + /// Failed to construct Discv5 instance + #[error("failed to construct a new Discv5 instance")] + Discv5Construct, + /// Failed to create a event stream + #[error("failed to create event stream")] + Discv5EventStream, + /// Failed to start Discv5 event stream + #[error("failed to start event stream")] + Discv5EventStreamStart, +} + +pub fn default_discv5_config() -> Discv5Config { + Discv5Config { + enable_packet_filter: Default::default(), + request_timeout: Default::default(), + vote_duration: Default::default(), + query_peer_timeout: Default::default(), + query_timeout: Default::default(), + request_retries: Default::default(), + session_timeout: Default::default(), + session_cache_capacity: Default::default(), + enr_update: Default::default(), + max_nodes_response: Default::default(), + enr_peer_update_min: Default::default(), + query_parallelism: Default::default(), + ip_limit: Default::default(), + incoming_bucket_limit: Default::default(), + table_filter: allow_all_enrs, + ping_interval: Default::default(), + report_discovered_peers: Default::default(), + filter_rate_limiter: Default::default(), + filter_max_nodes_per_ip: Default::default(), + filter_max_bans_per_ip: Default::default(), + permit_ban_list: Default::default(), + ban_duration: Default::default(), + executor: Default::default(), + listen_config: Default::default(), + } +} +pub struct Discv5Service { + inner: ReceiverStream, +} + +impl Discv5Service { + // A constructor to create a new Discv5Service + pub fn new(event_receiver: mpsc::Receiver) -> Self { + Discv5Service { inner: ReceiverStream::new(event_receiver) } + } +} + +impl Stream for Discv5Service { + type Item = Event; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let receiver = self.get_mut().inner.poll_next_unpin(cx); + receiver + } +} diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index d861b2cfe836..5f811f04015f 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -21,6 +21,7 @@ reth-primitives.workspace = true reth-net-common.workspace = true reth-network-api.workspace = true reth-discv4.workspace = true +reth-discv5.workspace = true reth-dns-discovery.workspace = true reth-eth-wire.workspace = true reth-ecies.workspace = true @@ -31,7 +32,7 @@ reth-rpc-types.workspace = true reth-tokio-util.workspace = true alloy-rlp.workspace = true - +k256 = "0.13.2" # async/futures futures.workspace = true pin-project.workspace = true @@ -67,6 +68,7 @@ tempfile = { workspace = true, optional = true } [dev-dependencies] # reth reth-discv4 = { workspace = true, features = ["test-utils"] } +reth-discv5.workspace = true reth-interfaces = { workspace = true, features = ["test-utils"] } reth-primitives = { workspace = true, features = ["test-utils"] } diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 5e229dbdbbd0..d817b15f94b9 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -8,6 +8,7 @@ use crate::{ NetworkHandle, NetworkManager, }; use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_ADDRESS}; +use reth_discv5::{Discv5Config, Discv5ConfigBuilder}; use reth_dns_discovery::DnsDiscoveryConfig; use reth_ecies::util::pk2id; use reth_eth_wire::{HelloMessage, HelloMessageWithProtocols, Status}; @@ -43,6 +44,8 @@ pub struct NetworkConfig { pub dns_discovery_config: Option, /// How to set up discovery. pub discovery_v4_config: Option, + /// How to set up discovery + pub discovery_v5_config: Option, /// Address to use for discovery pub discovery_addr: SocketAddr, /// Address to listen for incoming connections @@ -108,6 +111,12 @@ impl NetworkConfig { self } + /// Sets the config to use for the discovery v5 protocol. + pub fn set_discovery_v5(mut self, discovery_config: Discv5Config) -> Self { + self.discovery_v5_config = Some(discovery_config); + self + } + /// Sets the address for the incoming connection listener. pub fn set_listener_addr(mut self, listener_addr: SocketAddr) -> Self { self.listener_addr = listener_addr; @@ -143,6 +152,8 @@ pub struct NetworkConfigBuilder { dns_discovery_config: Option, /// How to set up discovery. discovery_v4_builder: Option, + #[serde(skip)] + discovery_v5_builder: Option, /// All boot nodes to start network discovery with. boot_nodes: HashSet, /// Address to use for discovery @@ -195,6 +206,7 @@ impl NetworkConfigBuilder { secret_key, dns_discovery_config: Some(Default::default()), discovery_v4_builder: Some(Default::default()), + discovery_v5_builder: Default::default(), boot_nodes: Default::default(), discovery_addr: None, listener_addr: None, @@ -425,6 +437,7 @@ impl NetworkConfigBuilder { secret_key, mut dns_discovery_config, discovery_v4_builder, + discovery_v5_builder, boot_nodes, discovery_addr, listener_addr, @@ -479,6 +492,7 @@ impl NetworkConfigBuilder { boot_nodes, dns_discovery_config, discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()), + discovery_v5_config: discovery_v5_builder.map(|mut builder| builder.build()), discovery_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS), listener_addr, peers_config: peers_config.unwrap_or_default(), diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 72583f01bb2d..a43d1caeba31 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -4,12 +4,14 @@ use crate::{ error::{NetworkError, ServiceKind}, manager::DiscoveredEvent, }; -use futures::StreamExt; +use futures::{stream::Stream, StreamExt}; +// use k256::ecdsa::SigningKey; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, EnrForkIdEntry}; +use reth_discv5::{Discv5Config, Discv5Handle, Event}; use reth_dns_discovery::{ DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver, }; -use reth_primitives::{ForkId, NodeRecord, PeerId}; +use reth_primitives::{ForkId, NodeRecord, PeerId, B512}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, @@ -19,7 +21,7 @@ use std::{ task::{ready, Context, Poll}, }; use tokio::{sync::mpsc, task::JoinHandle}; -use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tokio_stream::wrappers::ReceiverStream; /// An abstraction over the configured discovery protocol. /// @@ -35,6 +37,10 @@ pub struct Discovery { local_enr: NodeRecord, /// Handler to interact with the Discovery v4 service discv4: Option, + /// Handler to interact with the Discovery v5 service + discv5: Option, + /// Event stream for Receiving Event from Discovery v5 + discv5_event_stream: Option>, /// All KAD table updates from the discv4 service. discv4_updates: Option>, /// The handle to the spawned discv4 service @@ -60,6 +66,7 @@ impl Discovery { discovery_addr: SocketAddr, sk: SecretKey, discv4_config: Option, + discv5_config: Option, dns_discovery_config: Option, ) -> Result { // setup discv4 @@ -77,6 +84,16 @@ impl Discovery { (None, None, None) }; + // setup discv5 + let (discv5, _discv5_event_stream) = if let Some(discv5_config) = discv5_config { + let mut discv5 = Discv5Handle::from_secret_key(sk, discv5_config)?; + discv5.start_service().await.unwrap(); + let discv5_event_stream = discv5.create_event_stream().await.unwrap(); + (Some(discv5), Some(discv5_event_stream)) + } else { + (None, None) + }; + // setup DNS discovery let (_dns_discovery, dns_discovery_updates, _dns_disc_service) = if let Some(dns_config) = dns_discovery_config { @@ -95,6 +112,8 @@ impl Discovery { discovery_listeners: Default::default(), local_enr, discv4, + discv5, + discv5_event_stream: Default::default(), discv4_updates, _discv4_service, discovered_nodes: Default::default(), @@ -188,6 +207,75 @@ impl Discovery { } } + fn on_discv5_update(&mut self, update: Event) { + match update { + Event::Discovered(enr) => { + let ip_addr: Option = + enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from)); + let tcp_port = enr.tcp4().or_else(|| enr.tcp6()); + let udp_port = enr.udp4().or_else(|| enr.udp6()); + let node_id = B512::from_slice(&enr.node_id().raw()[..]); + let node_record = NodeRecord { + address: ip_addr.unwrap(), + tcp_port: tcp_port.unwrap(), + udp_port: udp_port.unwrap(), + id: node_id, + }; + self.on_node_record_update(node_record, None) + } + Event::EnrAdded { enr, replaced } => { + if let Some(discv5) = &self.discv5 { + let discv5 = Discv5Handle::convert_to_discv5(discv5); + let _ = discv5.add_enr(enr); + if let Some(replaced_enr) = replaced { + let node_id = replaced_enr.node_id(); + discv5.remove_node(&node_id); + } + } + } + Event::NodeInserted { node_id, replaced } => { + if let Some(discv5) = &self.discv5 { + let discv5 = Discv5Handle::convert_to_discv5(discv5); + let enr = discv5.find_enr(&node_id); + discv5.add_enr(enr.unwrap()).unwrap(); // TODO # 5576 handle unwrap in the + // end properly + if let Some(replaced_enr) = replaced { + discv5.remove_node(&replaced_enr); + } + } + } + Event::SessionEstablished(enr, socket_address) => { + let tcp_port = enr.tcp4().or_else(|| enr.tcp6()); + let udp_port = enr.udp4().or_else(|| enr.udp6()); + let node_id = B512::from_slice(&enr.node_id().raw()[..]); + let node_record = NodeRecord { + address: socket_address.ip(), + tcp_port: tcp_port.unwrap(), + udp_port: udp_port.unwrap(), + id: node_id, + }; + self.on_node_record_update(node_record, None) + } + Event::SocketUpdated(socket_address) => { + if let Some(discv5) = &self.discv5 { + let discv5 = Discv5Handle::convert_to_discv5(discv5); + let mut local_enr = discv5.local_enr(); + // local_enr.set_ip(socket_address.ip(),); #5576 figure out how to get signing + } + } + Event::TalkRequest(talk_request) => { + if let Some(discv5) = &self.discv5 { + let discv5 = Discv5Handle::convert_to_discv5(discv5); + let node_id = talk_request.node_id(); + let enr = discv5.find_enr(&node_id); + let protocol = talk_request.protocol(); + let request = talk_request.body(); + let _ = discv5.talk_req(enr.unwrap(), protocol.to_vec(), request.to_vec()); + } + } + } + } + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { loop { // Drain all buffered events first @@ -203,6 +291,12 @@ impl Discovery { self.on_discv4_update(update) } + while let Some(Poll::Ready(Some(update))) = + self.discv5_event_stream.as_mut().map(|updates| (updates).poll_next_unpin(cx)) + { + self.on_discv5_update(update) + } + while let Some(Poll::Ready(Some(update))) = self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx)) { @@ -243,6 +337,8 @@ impl Discovery { id: PeerId::random(), }, discv4: Default::default(), + discv5: Default::default(), + discv5_event_stream: Default::default(), discv4_updates: Default::default(), queued_events: Default::default(), _discv4_service: Default::default(), @@ -275,9 +371,14 @@ mod tests { let mut rng = thread_rng(); let (secret_key, _) = SECP256K1.generate_keypair(&mut rng); let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); - let _discovery = - Discovery::new(discovery_addr, secret_key, Default::default(), Default::default()) - .await - .unwrap(); + let _discovery = Discovery::new( + discovery_addr, + secret_key, + Default::default(), + Default::default(), + Default::default(), + ) + .await + .unwrap(); } } diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index ae8b485cad2a..de59cec7dbf7 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -1,13 +1,13 @@ //! Possible errors when interacting with the network. use crate::session::PendingSessionHandshakeError; +use reth_discv5::Discv5Error; use reth_dns_discovery::resolver::ResolveError; use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError}, DisconnectReason, }; use std::{fmt, io, io::ErrorKind, net::SocketAddr}; - /// Service kind. #[derive(Debug, PartialEq)] pub enum ServiceKind { @@ -43,6 +43,9 @@ pub enum NetworkError { /// IO error when creating the discovery service #[error("failed to launch discovery service: {0}")] Discovery(io::Error), + /// Discv5 error + #[error("Discv5 error: {0}")] + Discv5(#[from] Discv5Error), /// Error when setting up the DNS resolver failed /// /// See also [DnsResolver](reth_dns_discovery::DnsResolver::from_system_conf) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 88cb8bdcbcdf..3138f3b31fdd 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -161,6 +161,7 @@ where client, secret_key, mut discovery_v4_config, + discovery_v5_config, discovery_addr, listener_addr, peers_config, @@ -195,9 +196,14 @@ where disc_config }); - let discovery = - Discovery::new(discovery_addr, secret_key, discovery_v4_config, dns_discovery_config) - .await?; + let discovery = Discovery::new( + discovery_addr, + secret_key, + discovery_v4_config, + discovery_v5_config, + dns_discovery_config, + ) + .await?; // need to retrieve the addr here since provided port could be `0` let local_peer_id = discovery.local_id(); diff --git a/crates/net/network/tests/it/startup.rs b/crates/net/network/tests/it/startup.rs index 774c6c0af622..599e2d02d1a2 100644 --- a/crates/net/network/tests/it/startup.rs +++ b/crates/net/network/tests/it/startup.rs @@ -1,4 +1,5 @@ use reth_discv4::Discv4Config; +use reth_discv5::default_discv5_config; use reth_network::{ error::{NetworkError, ServiceKind}, Discovery, NetworkConfigBuilder, NetworkManager, @@ -7,6 +8,7 @@ use reth_network_api::NetworkInfo; use reth_provider::test_utils::NoopProvider; use secp256k1::SecretKey; use std::{ + default::Default, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, }; @@ -55,12 +57,17 @@ async fn test_listener_addr_in_use() { async fn test_discovery_addr_in_use() { let secret_key = SecretKey::new(&mut rand::thread_rng()); let disc_config = Discv4Config::default(); + let discv5_config = default_discv5_config(); let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); let any_port_listener = TcpListener::bind(addr).await.unwrap(); let port = any_port_listener.local_addr().unwrap().port(); let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)); - let _discovery = Discovery::new(addr, secret_key, Some(disc_config), None).await.unwrap(); + let _discovery = Discovery::new(addr, secret_key, Some(disc_config), Some(discv5_config), None) + .await + .unwrap(); let disc_config = Discv4Config::default(); - let result = Discovery::new(addr, secret_key, Some(disc_config), None).await; + let discv5_config = default_discv5_config(); + let result = + Discovery::new(addr, secret_key, Some(disc_config), Some(discv5_config), None).await; assert!(is_addr_in_use_kind(&result.err().unwrap(), ServiceKind::Discovery(addr))); }