Skip to content

Commit

Permalink
feat: update utp-rs dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev committed Feb 3, 2025
1 parent 5c4bbd1 commit f4bf5b2
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 155 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ tree_hash_derive = "0.8.0"
uds_windows = "1.0.1"
ureq = { version = "2.5.0", features = ["json"] }
url = "2.3.1"
utp-rs = { tag = "v0.1.0-alpha.15", git = "https://github.com/ethereum/utp" }
utp-rs = { git = "https://github.com/morph-dev/utp", rev= "da5ae3e46a3daa6489a98fd2c7d198e83eee86b4" }

# Trin workspace crates
e2store = { path = "crates/e2store" }
Expand Down
186 changes: 100 additions & 86 deletions crates/portalnet/src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::{
fmt,
hash::{Hash, Hasher},
io,
fmt, io,
net::{Ipv4Addr, SocketAddr},
ops::Deref,
str::FromStr,
sync::Arc,
time::Duration,
Expand All @@ -26,7 +25,10 @@ use parking_lot::RwLock;
use tokio::sync::{mpsc, RwLock as TokioRwLock};
use tracing::{debug, info, warn};
use trin_validation::oracle::HeaderOracle;
use utp_rs::{cid::ConnectionPeer, udp::AsyncUdpSocket};
use utp_rs::{
peer::{ConnectionPeer, Peer},
udp::AsyncUdpSocket,
};

use super::config::PortalnetConfig;
use crate::socket;
Expand Down Expand Up @@ -357,113 +359,75 @@ impl Discv5UdpSocket {
header_oracle,
}
}

async fn find_enr(&mut self, node_id: &NodeId) -> io::Result<UtpEnr> {
if let Some(cached_enr) = self.enr_cache.write().await.get(node_id).cloned() {
return Ok(UtpEnr(cached_enr));
}

if let Some(enr) = self.discv5.find_enr(node_id) {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}

if let Some(enr) = self.discv5.cached_node_addr(node_id) {
self.enr_cache.write().await.put(*node_id, enr.enr.clone());
return Ok(UtpEnr(enr.enr));
}

let history_jsonrpc_tx = self.header_oracle.read().await.history_jsonrpc_tx();
if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}
}

let state_jsonrpc_tx = self.header_oracle.read().await.state_jsonrpc_tx();
if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}
}

let beacon_jsonrpc_tx = self.header_oracle.read().await.beacon_jsonrpc_tx();
if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}
}

debug!(node_id = %node_id, "uTP packet from unknown source");
Err(io::Error::new(
io::ErrorKind::Other,
"ENR not found for talk req destination",
))
}
}

/// A wrapper around `Enr` that implements `ConnectionPeer`.
#[derive(Clone)]
pub struct UtpEnr(pub Enr);
pub struct UtpPeer(pub Enr);

impl UtpEnr {
pub fn node_id(&self) -> NodeId {
self.0.node_id()
impl Deref for UtpPeer {
type Target = Enr;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl UtpPeer {
pub fn client(&self) -> Option<String> {
self.0
.get_decodable::<String>(ENR_PORTAL_CLIENT_KEY)
self.get_decodable::<String>(ENR_PORTAL_CLIENT_KEY)
.and_then(|v| v.ok())
}
}

impl std::fmt::Debug for UtpEnr {
impl std::fmt::Debug for UtpPeer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let peer_client_type = self.client().unwrap_or_else(|| "Unknown".to_string());
f.debug_struct("UtpEnr")
f.debug_struct("EnrPeer")
.field("enr", &self.0)
.field("Peer Client Type", &peer_client_type)
.finish()
}
}

// Why are we implementing Hash, PartialEq, Eq for UtpEnr?
// UtpEnr is used as an element of the key for a Connections HashTable in our uTP library.
// Enr's can change and are not stable, so if we initiate a ``connect_with_cid`` we are inserting
// our known Enr for the peer, but if the peer has a more upto date Enr, values will be different
// and the Hash for the old Enr and New Enr will be different, along with equating the two structs
// will return false. This leads us to a situation where our peer sends us a uTP messages back and
// our code thinks the same peer is instead 2 different peers causing uTP to ignore the messages. We
// fixed this by implementing Eq and Hash only using the NodeId of the Enr as it is the only stable
// non-updatable field in the Enr.
impl Hash for UtpEnr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.node_id().hash(state);
impl ConnectionPeer for UtpPeer {
type Id = NodeId;

fn id(&self) -> Self::Id {
self.node_id()
}
}

impl PartialEq for UtpEnr {
fn eq(&self, other: &Self) -> bool {
self.0.node_id() == other.0.node_id()
fn consolidate(a: Self, b: Self) -> Self {
assert!(a.id() == b.id());
if a.seq() >= b.seq() {
a
} else {
b
}
}
}

impl Eq for UtpEnr {}

impl ConnectionPeer for UtpEnr {}

#[async_trait]
impl AsyncUdpSocket<UtpEnr> for Discv5UdpSocket {
async fn send_to(&mut self, buf: &[u8], target: &UtpEnr) -> io::Result<usize> {
impl AsyncUdpSocket<UtpPeer> for Discv5UdpSocket {
async fn send_to(&mut self, buf: &[u8], peer: &Peer<UtpPeer>) -> io::Result<usize> {
let peer_id = *peer.id();
let peer_enr = peer.peer().cloned();
let discv5 = Arc::clone(&self.discv5);
let target = target.0.clone();
let enr_cache = Arc::clone(&self.enr_cache);
let header_oracle = Arc::clone(&self.header_oracle);
let data = buf.to_vec();
tokio::spawn(async move {
match discv5.send_talk_req(target, Subnetwork::Utp, data).await {
let enr = match peer_enr {
Some(enr) => enr.0,
None => match find_enr(&peer_id, &discv5, enr_cache, header_oracle).await {
Ok(enr) => enr,
Err(err) => {
warn!(%err, "unable to send uTP talk request, ENR not found");
return;
}
},
};
match discv5.send_talk_req(enr, Subnetwork::Utp, data).await {
// We drop the talk response because it is ignored in the uTP protocol.
Ok(..) => {}
Err(err) => match err {
Expand All @@ -476,11 +440,10 @@ impl AsyncUdpSocket<UtpEnr> for Discv5UdpSocket {
Ok(buf.len())
}

async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, UtpEnr)> {
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, Peer<UtpPeer>)> {
match self.talk_request_receiver.recv().await {
Some(talk_req) => {
let src_node_id = talk_req.node_id();
let enr = self.find_enr(src_node_id).await?;
let node_id = *talk_req.node_id();
let packet = talk_req.body();
let n = std::cmp::min(buf.len(), packet.len());
buf[..n].copy_from_slice(&packet[..n]);
Expand All @@ -490,9 +453,60 @@ impl AsyncUdpSocket<UtpEnr> for Discv5UdpSocket {
warn!(%err, "failed to respond to uTP talk request");
}

Ok((n, enr))
Ok((n, Peer::new_id(node_id)))
}
None => Err(io::Error::from(io::ErrorKind::NotConnected)),
}
}
}

async fn find_enr(
node_id: &NodeId,
discv5: &Arc<Discovery>,
enr_cache: Arc<TokioRwLock<LruCache<NodeId, Enr>>>,
header_oracle: Arc<TokioRwLock<HeaderOracle>>,
) -> io::Result<Enr> {
if let Some(cached_enr) = enr_cache.write().await.get(node_id).cloned() {
return Ok(cached_enr);
}

if let Some(enr) = discv5.find_enr(node_id) {
enr_cache.write().await.put(*node_id, enr.clone());
return Ok(enr);
}

if let Some(enr) = discv5.cached_node_addr(node_id) {
enr_cache.write().await.put(*node_id, enr.enr.clone());
return Ok(enr.enr);
}

let history_jsonrpc_tx = header_oracle.read().await.history_jsonrpc_tx();
if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await {
enr_cache.write().await.put(*node_id, enr.clone());
return Ok(enr);
}
}

let state_jsonrpc_tx = header_oracle.read().await.state_jsonrpc_tx();
if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await {
enr_cache.write().await.put(*node_id, enr.clone());
return Ok(enr);
}
}

let beacon_jsonrpc_tx = header_oracle.read().await.beacon_jsonrpc_tx();
if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await {
enr_cache.write().await.put(*node_id, enr.clone());
return Ok(enr);
}
}

debug!(node_id = %node_id, "uTP packet to unknown target");
Err(io::Error::new(
io::ErrorKind::Other,
"ENR not found for talk req destination",
))
}
8 changes: 4 additions & 4 deletions crates/portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use utp_rs::socket::UtpSocket;
use super::{ping_extensions::PingExtension, service::OverlayService};
use crate::{
bootnodes::Bootnode,
discovery::{Discovery, UtpEnr},
discovery::{Discovery, UtpPeer},
events::EventEnvelope,
find::query_info::{FindContentResult, RecursiveFindContentResult},
overlay::{
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<
pub async fn new(
config: OverlayConfig,
discovery: Arc<Discovery>,
utp_socket: Arc<UtpSocket<UtpEnr>>,
utp_socket: Arc<UtpSocket<UtpPeer>>,
store: Arc<RwLock<TStore>>,
protocol: Subnetwork,
validator: Arc<TValidator>,
Expand Down Expand Up @@ -498,10 +498,10 @@ impl<
let cid = utp_rs::cid::ConnectionId {
recv: conn_id,
send: conn_id.wrapping_add(1),
peer: UtpEnr(enr),
peer_id: enr.node_id(),
};
self.utp_controller
.connect_inbound_stream(cid)
.connect_inbound_stream(cid, UtpPeer(enr))
.await
.map_err(|err| OverlayRequestError::ContentNotFound {
message: format!("Unable to locate content on the network: {err:?}"),
Expand Down
Loading

0 comments on commit f4bf5b2

Please sign in to comment.