Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discv5 support #5576

Closed
wants to merge 17 commits into from
21 changes: 19 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ reth-config = { path = "crates/config" }
reth-consensus-common = { path = "crates/consensus/common" }
reth-db = { path = "crates/storage/db" }
reth-discv4 = { path = "crates/net/discv4" }
reth-discv5 = {path = "crates/net/discv5"}
reth-dns-discovery = { path = "crates/net/dns" }
reth-downloaders = { path = "crates/net/downloaders" }
reth-ecies = { path = "crates/net/ecies" }
Expand Down
22 changes: 22 additions & 0 deletions crates/net/discv5/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "reth-discv5"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
exclude.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
discv5.workspace = true
tokio-stream.workspace = true
tokio.workspace = true
futures-util = "*"
thiserror.workspace = true
parking_lot = "0.12.1"
secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] }
k256 = "0.13.2"
enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
157 changes: 157 additions & 0 deletions crates/net/discv5/src/lib.rs
supernovahs marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use core::future::Future;
pub use discv5::{
enr, enr::CombinedKey, service::Service, Config as Discv5Config,
ConfigBuilder as Discv5ConfigBuilder, Discv5, Enr, Event,
};
use futures_util::{StreamExt, TryFutureExt};
use k256::ecdsa::SigningKey;
use parking_lot::Mutex;
use secp256k1::SecretKey;
use std::{
default::Default,
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{sync::mpsc, task};
use tokio_stream::{wrappers::ReceiverStream, Stream};

// Wrapper struct for Discv5
#[derive(Clone)]
pub struct Discv5Handle {
inner: Arc<Mutex<Discv5>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a mutex here?

}

impl Discv5Handle {
// Constructor to create a new Discv5Handle
pub fn new(discv5: Discv5) -> Self {
Discv5Handle { inner: Arc::new(Mutex::new(discv5)) }
}

pub fn from_secret_key(
secret_key: SecretKey,
discv5_config: Discv5Config,
) -> Result<Self, Discv5Error> {
let secret_key_bytes = secret_key.as_ref();
let signing_key = SigningKey::from_slice(secret_key_bytes)
.map_err(|_e| Discv5Error::SecretKeyDecode.into())?;
let enr_key = CombinedKey::Secp256k1(signing_key);
let enr = enr::EnrBuilder::new("v4")
.build(&enr_key)
.map_err(|_e| Discv5Error::EnrBuilderConstruct.into())?;
Ok(Discv5Handle::new(
Discv5::new(enr, enr_key, discv5_config)
.map_err(|_e| Discv5Error::Discv5Construct.into())?,
))
}

pub fn convert_to_discv5(&self) -> Arc<Mutex<Discv5>> {
self.inner.clone()
}

pub async fn start_service(&self) -> Result<(), Discv5Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this not call start directly on the wrapped discv5?

let discv5 = Arc::clone(&self.inner);

tokio::task::spawn_blocking(move || {
let mut discv5_guard = discv5.lock();

let _ = discv5_guard.start();
drop(discv5_guard);
})
.await
.map_err(|_| Discv5Error::Discv5Construct.into())
}

pub async fn create_event_stream(
&self,
) -> Result<tokio::sync::mpsc::Receiver<Event>, Discv5Error> {
let discv5 = Arc::clone(&self.inner);
let discv5_guard = discv5.lock();
let res = discv5_guard
.event_stream()
.await
.map_err(|_| Discv5Error::Discv5EventStreamStart.into());
drop(discv5_guard);
return res;
}
}

impl fmt::Debug for Discv5Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Discv5Handle(<Discv5>)")
}
}

/// The default table filter that results in all nodes being accepted into the local routing table.
const fn allow_all_enrs(_enr: &Enr) -> bool {
true
}

#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
#[allow(missing_docs)]
pub enum Discv5Error {
/// Failed to decode a key from a table.
#[error("failed to parse secret key to Signing Key")]
SecretKeyDecode,
/// Failed to construct a new EnrBuilder
#[error("failed to constuct a new EnrBuilder")]
EnrBuilderConstruct,
/// Failed to construct Discv5 instance
#[error("failed to construct a new Discv5 instance")]
Discv5Construct,
/// Failed to create a event stream
#[error("failed to create event stream")]
Discv5EventStream,
/// Failed to start Discv5 event stream
#[error("failed to start event stream")]
Discv5EventStreamStart,
}

pub fn default_discv5_config() -> Discv5Config {
Discv5Config {
enable_packet_filter: Default::default(),
request_timeout: Default::default(),
vote_duration: Default::default(),
query_peer_timeout: Default::default(),
query_timeout: Default::default(),
request_retries: Default::default(),
session_timeout: Default::default(),
session_cache_capacity: Default::default(),
enr_update: Default::default(),
max_nodes_response: Default::default(),
enr_peer_update_min: Default::default(),
query_parallelism: Default::default(),
ip_limit: Default::default(),
incoming_bucket_limit: Default::default(),
table_filter: allow_all_enrs,
ping_interval: Default::default(),
report_discovered_peers: Default::default(),
filter_rate_limiter: Default::default(),
filter_max_nodes_per_ip: Default::default(),
filter_max_bans_per_ip: Default::default(),
permit_ban_list: Default::default(),
ban_duration: Default::default(),
executor: Default::default(),
listen_config: Default::default(),
}
}
pub struct Discv5Service {
inner: ReceiverStream<Event>,
}

impl Discv5Service {
// A constructor to create a new Discv5Service
pub fn new(event_receiver: mpsc::Receiver<Event>) -> Self {
Discv5Service { inner: ReceiverStream::new(event_receiver) }
}
}

impl Stream for Discv5Service {
type Item = Event;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let receiver = self.get_mut().inner.poll_next_unpin(cx);
mattsse marked this conversation as resolved.
Show resolved Hide resolved
receiver
}
}
4 changes: 3 additions & 1 deletion crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ reth-primitives.workspace = true
reth-net-common.workspace = true
reth-network-api.workspace = true
reth-discv4.workspace = true
reth-discv5.workspace = true
reth-dns-discovery.workspace = true
reth-eth-wire.workspace = true
reth-ecies.workspace = true
Expand All @@ -31,7 +32,7 @@ reth-rpc-types.workspace = true
reth-tokio-util.workspace = true

alloy-rlp.workspace = true

k256 = "0.13.2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# async/futures
futures.workspace = true
pin-project.workspace = true
Expand Down Expand Up @@ -67,6 +68,7 @@ tempfile = { workspace = true, optional = true }
[dev-dependencies]
# reth
reth-discv4 = { workspace = true, features = ["test-utils"] }
reth-discv5.workspace = true
reth-interfaces = { workspace = true, features = ["test-utils"] }
reth-primitives = { workspace = true, features = ["test-utils"] }

Expand Down
14 changes: 14 additions & 0 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
NetworkHandle, NetworkManager,
};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_ADDRESS};
use reth_discv5::{Discv5Config, Discv5ConfigBuilder};
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_ecies::util::pk2id;
use reth_eth_wire::{HelloMessage, HelloMessageWithProtocols, Status};
Expand Down Expand Up @@ -43,6 +44,8 @@ pub struct NetworkConfig<C> {
pub dns_discovery_config: Option<DnsDiscoveryConfig>,
/// How to set up discovery.
pub discovery_v4_config: Option<Discv4Config>,
/// How to set up discovery
pub discovery_v5_config: Option<Discv5Config>,
/// Address to use for discovery
pub discovery_addr: SocketAddr,
/// Address to listen for incoming connections
Expand Down Expand Up @@ -108,6 +111,12 @@ impl<C> NetworkConfig<C> {
self
}

/// Sets the config to use for the discovery v5 protocol.
pub fn set_discovery_v5(mut self, discovery_config: Discv5Config) -> Self {
self.discovery_v5_config = Some(discovery_config);
self
}

/// Sets the address for the incoming connection listener.
pub fn set_listener_addr(mut self, listener_addr: SocketAddr) -> Self {
self.listener_addr = listener_addr;
Expand Down Expand Up @@ -143,6 +152,8 @@ pub struct NetworkConfigBuilder {
dns_discovery_config: Option<DnsDiscoveryConfig>,
/// How to set up discovery.
discovery_v4_builder: Option<Discv4ConfigBuilder>,
#[serde(skip)]
discovery_v5_builder: Option<Discv5ConfigBuilder>,
/// All boot nodes to start network discovery with.
boot_nodes: HashSet<NodeRecord>,
/// Address to use for discovery
Expand Down Expand Up @@ -195,6 +206,7 @@ impl NetworkConfigBuilder {
secret_key,
dns_discovery_config: Some(Default::default()),
discovery_v4_builder: Some(Default::default()),
discovery_v5_builder: Default::default(),
boot_nodes: Default::default(),
discovery_addr: None,
listener_addr: None,
Expand Down Expand Up @@ -425,6 +437,7 @@ impl NetworkConfigBuilder {
secret_key,
mut dns_discovery_config,
discovery_v4_builder,
discovery_v5_builder,
boot_nodes,
discovery_addr,
listener_addr,
Expand Down Expand Up @@ -479,6 +492,7 @@ impl NetworkConfigBuilder {
boot_nodes,
dns_discovery_config,
discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()),
discovery_v5_config: discovery_v5_builder.map(|mut builder| builder.build()),
discovery_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS),
listener_addr,
peers_config: peers_config.unwrap_or_default(),
Expand Down
Loading
Loading