diff --git a/Cargo.lock b/Cargo.lock index 60f85cceb..6b896f2d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7088,7 +7088,7 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "utp-rs" version = "0.1.0-alpha.8" -source = "git+https://github.com/ethereum/utp?tag=v0.1.0-alpha.15#ef62dd992cf67662319ee69d6fe8caa2af15aeae" +source = "git+https://github.com/ethereum/utp?tag=v0.1.0-alpha.16#a4da8630ba0f56eb04a4ead6c374a850724445ac" dependencies = [ "async-trait", "delay_map 0.3.0", diff --git a/Cargo.toml b/Cargo.toml index 01fd48c14..50ed2edb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,7 @@ tree_hash_derive = "0.8.0" uds_windows = "1.0.1" ureq = { version = "2.5.0", features = ["json"] } url = "2.3.1" -utp-rs = { tag = "v0.1.0-alpha.15", git = "https://github.com/ethereum/utp" } +utp-rs = { git = "https://github.com/ethereum/utp", tag = "v0.1.0-alpha.16" } # Trin workspace crates e2store = { path = "crates/e2store" } diff --git a/crates/portalnet/src/discovery.rs b/crates/portalnet/src/discovery.rs index fa5679616..01d6df432 100644 --- a/crates/portalnet/src/discovery.rs +++ b/crates/portalnet/src/discovery.rs @@ -1,8 +1,7 @@ use std::{ - fmt, - hash::{Hash, Hasher}, - io, + fmt, io, net::{Ipv4Addr, SocketAddr}, + ops::Deref, str::FromStr, sync::Arc, time::Duration, @@ -26,7 +25,10 @@ use parking_lot::RwLock; use tokio::sync::{mpsc, RwLock as TokioRwLock}; use tracing::{debug, info, warn}; use trin_validation::oracle::HeaderOracle; -use utp_rs::{cid::ConnectionPeer, udp::AsyncUdpSocket}; +use utp_rs::{ + peer::{ConnectionPeer, Peer}, + udp::AsyncUdpSocket, +}; use super::config::PortalnetConfig; use crate::socket; @@ -357,113 +359,75 @@ impl Discv5UdpSocket { header_oracle, } } - - async fn find_enr(&mut self, node_id: &NodeId) -> io::Result { - if let Some(cached_enr) = self.enr_cache.write().await.get(node_id).cloned() { - return Ok(UtpEnr(cached_enr)); - } - - if let Some(enr) = self.discv5.find_enr(node_id) { - self.enr_cache.write().await.put(*node_id, enr.clone()); - return Ok(UtpEnr(enr)); - } - - if let Some(enr) = self.discv5.cached_node_addr(node_id) { - self.enr_cache.write().await.put(*node_id, enr.enr.clone()); - return Ok(UtpEnr(enr.enr)); - } - - let history_jsonrpc_tx = self.header_oracle.read().await.history_jsonrpc_tx(); - if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx { - if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await { - self.enr_cache.write().await.put(*node_id, enr.clone()); - return Ok(UtpEnr(enr)); - } - } - - let state_jsonrpc_tx = self.header_oracle.read().await.state_jsonrpc_tx(); - if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx { - if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await { - self.enr_cache.write().await.put(*node_id, enr.clone()); - return Ok(UtpEnr(enr)); - } - } - - let beacon_jsonrpc_tx = self.header_oracle.read().await.beacon_jsonrpc_tx(); - if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx { - if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await { - self.enr_cache.write().await.put(*node_id, enr.clone()); - return Ok(UtpEnr(enr)); - } - } - - debug!(node_id = %node_id, "uTP packet from unknown source"); - Err(io::Error::new( - io::ErrorKind::Other, - "ENR not found for talk req destination", - )) - } } /// A wrapper around `Enr` that implements `ConnectionPeer`. #[derive(Clone)] -pub struct UtpEnr(pub Enr); +pub struct UtpPeer(pub Enr); -impl UtpEnr { - pub fn node_id(&self) -> NodeId { - self.0.node_id() +impl Deref for UtpPeer { + type Target = Enr; + + fn deref(&self) -> &Self::Target { + &self.0 } +} +impl UtpPeer { pub fn client(&self) -> Option { - self.0 - .get_decodable::(ENR_PORTAL_CLIENT_KEY) + self.get_decodable::(ENR_PORTAL_CLIENT_KEY) .and_then(|v| v.ok()) } } -impl std::fmt::Debug for UtpEnr { +impl std::fmt::Debug for UtpPeer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let peer_client_type = self.client().unwrap_or_else(|| "Unknown".to_string()); - f.debug_struct("UtpEnr") + f.debug_struct("EnrPeer") .field("enr", &self.0) .field("Peer Client Type", &peer_client_type) .finish() } } -// Why are we implementing Hash, PartialEq, Eq for UtpEnr? -// UtpEnr is used as an element of the key for a Connections HashTable in our uTP library. -// Enr's can change and are not stable, so if we initiate a ``connect_with_cid`` we are inserting -// our known Enr for the peer, but if the peer has a more upto date Enr, values will be different -// and the Hash for the old Enr and New Enr will be different, along with equating the two structs -// will return false. This leads us to a situation where our peer sends us a uTP messages back and -// our code thinks the same peer is instead 2 different peers causing uTP to ignore the messages. We -// fixed this by implementing Eq and Hash only using the NodeId of the Enr as it is the only stable -// non-updatable field in the Enr. -impl Hash for UtpEnr { - fn hash(&self, state: &mut H) { - self.0.node_id().hash(state); +impl ConnectionPeer for UtpPeer { + type Id = NodeId; + + fn id(&self) -> Self::Id { + self.node_id() } -} -impl PartialEq for UtpEnr { - fn eq(&self, other: &Self) -> bool { - self.0.node_id() == other.0.node_id() + fn consolidate(a: Self, b: Self) -> Self { + assert!(a.id() == b.id()); + if a.seq() >= b.seq() { + a + } else { + b + } } } -impl Eq for UtpEnr {} - -impl ConnectionPeer for UtpEnr {} - #[async_trait] -impl AsyncUdpSocket for Discv5UdpSocket { - async fn send_to(&mut self, buf: &[u8], target: &UtpEnr) -> io::Result { +impl AsyncUdpSocket for Discv5UdpSocket { + async fn send_to(&mut self, buf: &[u8], peer: &Peer) -> io::Result { + let peer_id = *peer.id(); + let peer_enr = peer.peer().cloned(); let discv5 = Arc::clone(&self.discv5); - let target = target.0.clone(); + let enr_cache = Arc::clone(&self.enr_cache); + let header_oracle = Arc::clone(&self.header_oracle); let data = buf.to_vec(); tokio::spawn(async move { - match discv5.send_talk_req(target, Subnetwork::Utp, data).await { + let enr = match peer_enr { + Some(enr) => enr.0, + None => match find_enr(&peer_id, &discv5, enr_cache, header_oracle).await { + Ok(enr) => enr, + Err(err) => { + warn!(%err, "unable to send uTP talk request, ENR not found"); + return; + } + }, + }; + match discv5.send_talk_req(enr, Subnetwork::Utp, data).await { // We drop the talk response because it is ignored in the uTP protocol. Ok(..) => {} Err(err) => match err { @@ -476,11 +440,10 @@ impl AsyncUdpSocket for Discv5UdpSocket { Ok(buf.len()) } - async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, UtpEnr)> { + async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, Peer)> { match self.talk_request_receiver.recv().await { Some(talk_req) => { - let src_node_id = talk_req.node_id(); - let enr = self.find_enr(src_node_id).await?; + let node_id = *talk_req.node_id(); let packet = talk_req.body(); let n = std::cmp::min(buf.len(), packet.len()); buf[..n].copy_from_slice(&packet[..n]); @@ -490,9 +453,60 @@ impl AsyncUdpSocket for Discv5UdpSocket { warn!(%err, "failed to respond to uTP talk request"); } - Ok((n, enr)) + Ok((n, Peer::new_id(node_id))) } None => Err(io::Error::from(io::ErrorKind::NotConnected)), } } } + +async fn find_enr( + node_id: &NodeId, + discv5: &Arc, + enr_cache: Arc>>, + header_oracle: Arc>, +) -> io::Result { + if let Some(cached_enr) = enr_cache.write().await.get(node_id).cloned() { + return Ok(cached_enr); + } + + if let Some(enr) = discv5.find_enr(node_id) { + enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(enr); + } + + if let Some(enr) = discv5.cached_node_addr(node_id) { + enr_cache.write().await.put(*node_id, enr.enr.clone()); + return Ok(enr.enr); + } + + let history_jsonrpc_tx = header_oracle.read().await.history_jsonrpc_tx(); + if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx { + if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await { + enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(enr); + } + } + + let state_jsonrpc_tx = header_oracle.read().await.state_jsonrpc_tx(); + if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx { + if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await { + enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(enr); + } + } + + let beacon_jsonrpc_tx = header_oracle.read().await.beacon_jsonrpc_tx(); + if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx { + if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await { + enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(enr); + } + } + + debug!(node_id = %node_id, "uTP packet to unknown target"); + Err(io::Error::new( + io::ErrorKind::Other, + "ENR not found for talk req destination", + )) +} diff --git a/crates/portalnet/src/overlay/protocol.rs b/crates/portalnet/src/overlay/protocol.rs index 3a3d23ad5..055731634 100644 --- a/crates/portalnet/src/overlay/protocol.rs +++ b/crates/portalnet/src/overlay/protocol.rs @@ -42,7 +42,7 @@ use utp_rs::socket::UtpSocket; use super::{ping_extensions::PingExtension, service::OverlayService}; use crate::{ bootnodes::Bootnode, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, events::EventEnvelope, find::query_info::{FindContentResult, RecursiveFindContentResult}, overlay::{ @@ -105,7 +105,7 @@ impl< pub async fn new( config: OverlayConfig, discovery: Arc, - utp_socket: Arc>, + utp_socket: Arc>, store: Arc>, protocol: Subnetwork, validator: Arc, @@ -498,10 +498,10 @@ impl< let cid = utp_rs::cid::ConnectionId { recv: conn_id, send: conn_id.wrapping_add(1), - peer: UtpEnr(enr), + peer_id: enr.node_id(), }; self.utp_controller - .connect_inbound_stream(cid) + .connect_inbound_stream(cid, UtpPeer(enr)) .await .map_err(|err| OverlayRequestError::ContentNotFound { message: format!("Unable to locate content on the network: {err:?}"), diff --git a/crates/portalnet/src/overlay/service/manager.rs b/crates/portalnet/src/overlay/service/manager.rs index 254b890b4..4af2d48b3 100644 --- a/crates/portalnet/src/overlay/service/manager.rs +++ b/crates/portalnet/src/overlay/service/manager.rs @@ -56,7 +56,7 @@ use utp_rs::cid::ConnectionId; use super::OverlayService; use crate::{ accept_queue::AcceptQueue, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, events::{EventEnvelope, OverlayEvent}, find::{ iterators::{ @@ -620,11 +620,11 @@ impl< let cid = utp_rs::cid::ConnectionId { recv: connection_id, send: connection_id.wrapping_add(1), - peer: UtpEnr(source), + peer_id: source.node_id(), }; let data = match utp_processing .utp_controller - .connect_inbound_stream(cid) + .connect_inbound_stream(cid, UtpPeer(source)) .await { Ok(data) => RawContentValue::from(data), @@ -941,15 +941,15 @@ impl< "handle_find_content: unable to find ENR for NodeId".to_string(), ) })?; - let enr = UtpEnr(enr); - let cid = self.utp_controller.cid(enr, false); + let cid = self.utp_controller.cid(enr.node_id(), false); let cid_send = cid.send; // Wait for an incoming connection with the given CID. Then, write the data // over the uTP stream. let utp = Arc::clone(&self.utp_controller); tokio::spawn(async move { - utp.accept_outbound_stream(cid, &content).await; + utp.accept_outbound_stream(cid, UtpPeer(enr), &content) + .await; permit.drop(); }); @@ -1079,13 +1079,12 @@ impl< // Generate a connection ID for the uTP connection if there is data we would like to // accept. - let enr = UtpEnr(enr); let enr_str = if enabled!(Level::TRACE) { - enr.0.to_base64() + enr.to_base64() } else { String::with_capacity(0) }; - let cid: ConnectionId = self.utp_controller.cid(enr, false); + let cid: ConnectionId = self.utp_controller.cid(enr.node_id(), false); let cid_send = cid.send; let content_keys_string: Vec = content_keys @@ -1105,14 +1104,16 @@ impl< let utp_processing = UtpProcessing::from(self); tokio::spawn(async move { + let peer = UtpPeer(enr); + let peer_client = peer.client(); let data = match utp_processing .utp_controller - .accept_inbound_stream(cid.clone()) + .accept_inbound_stream(cid, peer) .await { Ok(data) => data, Err(err) => { - debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), content_keys = ?content_keys_string, "unable to complete uTP transfer"); + debug!(%err, cid.send, cid.recv, peer = ?peer_client, content_keys = ?content_keys_string, "unable to complete uTP transfer"); // Spawn a fallback FINDCONTENT task for each content key // in a payload that failed to be received. // @@ -1417,13 +1418,14 @@ impl< let cid = utp_rs::cid::ConnectionId { recv: conn_id, send: conn_id.wrapping_add(1), - peer: UtpEnr(enr), + peer_id: enr.node_id(), }; let store = Arc::clone(&self.store); let response_clone = response.clone(); let utp_controller = Arc::clone(&self.utp_controller); tokio::spawn(async move { + let peer = UtpPeer(enr); let content_items = match offer { Request::Offer(offer) => { Self::provide_requested_content(store, &response_clone, offer.content_keys) @@ -1456,7 +1458,7 @@ impl< %err, cid.send, cid.recv, - peer = ?cid.peer.client(), + peer = ?peer.client(), "Error decoding previously offered content items" ); if let Some(tx) = gossip_result_tx { @@ -1477,7 +1479,7 @@ impl< } }; let result = utp_controller - .connect_outbound_stream(cid, &content_payload) + .connect_outbound_stream(cid, peer, &content_payload) .await; if let Some(tx) = gossip_result_tx { if result { @@ -1640,11 +1642,11 @@ impl< let cid = utp_rs::cid::ConnectionId { recv: conn_id, send: conn_id.wrapping_add(1), - peer: UtpEnr(fallback_peer.clone()), + peer_id: fallback_peer.node_id(), }; utp_processing .utp_controller - .connect_inbound_stream(cid) + .connect_inbound_stream(cid, UtpPeer(fallback_peer.clone())) .await? .into() } diff --git a/crates/portalnet/src/utp/controller.rs b/crates/portalnet/src/utp/controller.rs index dfe29e556..e7dfc433a 100644 --- a/crates/portalnet/src/utp/controller.rs +++ b/crates/portalnet/src/utp/controller.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::anyhow; use bytes::Bytes; +use discv5::enr::NodeId; use lazy_static::lazy_static; use tokio::sync::Semaphore; use tracing::debug; @@ -9,10 +10,10 @@ use trin_metrics::{ labels::{UtpDirectionLabel, UtpOutcomeLabel}, overlay::OverlayMetricsReporter, }; -use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket}; +use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, peer::Peer, socket::UtpSocket}; use super::timed_semaphore::OwnedTimedSemaphorePermit; -use crate::discovery::UtpEnr; +use crate::discovery::UtpPeer; /// UtpController is meant to be a container which contains all code related to/for managing uTP /// streams We are implementing this because we want the utils of controlling uTP connection to be /// as contained as it can, instead of extending overlay_service even more. @@ -24,7 +25,7 @@ use crate::discovery::UtpEnr; pub struct UtpController { inbound_utp_transfer_semaphore: Arc, outbound_utp_transfer_semaphore: Arc, - utp_socket: Arc>, + utp_socket: Arc>, metrics: OverlayMetricsReporter, } @@ -50,7 +51,7 @@ enum UtpConnectionSide { impl UtpController { pub fn new( utp_transfer_limit: usize, - utp_socket: Arc>, + utp_socket: Arc>, metrics: OverlayMetricsReporter, ) -> Self { Self { @@ -61,8 +62,8 @@ impl UtpController { } } - pub fn cid(&self, peer: UtpEnr, is_initiator: bool) -> ConnectionId { - self.utp_socket.cid(peer, is_initiator) + pub fn cid(&self, node_id: NodeId, is_initiator: bool) -> ConnectionId { + self.utp_socket.cid(node_id, is_initiator) } /// Non-blocking method to try and acquire a permit for an outbound uTP transfer. @@ -109,29 +110,51 @@ impl UtpController { } } - pub async fn connect_inbound_stream(&self, cid: ConnectionId) -> anyhow::Result { - self.inbound_stream(cid, UtpConnectionSide::Connect).await + pub async fn connect_inbound_stream( + &self, + cid: ConnectionId, + peer: UtpPeer, + ) -> anyhow::Result { + self.inbound_stream(cid, peer, UtpConnectionSide::Connect) + .await } - pub async fn accept_inbound_stream(&self, cid: ConnectionId) -> anyhow::Result { - self.inbound_stream(cid, UtpConnectionSide::Accept).await + pub async fn accept_inbound_stream( + &self, + cid: ConnectionId, + peer: UtpPeer, + ) -> anyhow::Result { + self.inbound_stream(cid, peer, UtpConnectionSide::Accept) + .await } - pub async fn connect_outbound_stream(&self, cid: ConnectionId, data: &[u8]) -> bool { - self.outbound_stream(cid, data, UtpConnectionSide::Connect) + pub async fn connect_outbound_stream( + &self, + cid: ConnectionId, + peer: UtpPeer, + data: &[u8], + ) -> bool { + self.outbound_stream(cid, peer, data, UtpConnectionSide::Connect) .await } - pub async fn accept_outbound_stream(&self, cid: ConnectionId, data: &[u8]) -> bool { - self.outbound_stream(cid, data, UtpConnectionSide::Accept) + pub async fn accept_outbound_stream( + &self, + cid: ConnectionId, + peer: UtpPeer, + data: &[u8], + ) -> bool { + self.outbound_stream(cid, peer, data, UtpConnectionSide::Accept) .await } async fn inbound_stream( &self, - cid: ConnectionId, + cid: ConnectionId, + peer: UtpPeer, side: UtpConnectionSide, ) -> anyhow::Result { + let peer_client = peer.client(); // Wait for an incoming connection with the given CID. Then, read the data from the uTP // stream. self.metrics @@ -139,13 +162,13 @@ impl UtpController { let (stream, message) = match side { UtpConnectionSide::Connect => ( self.utp_socket - .connect_with_cid(cid.clone(), *UTP_CONN_CFG) + .connect_with_cid(cid, Peer::new(peer), *UTP_CONN_CFG) .await, "connect inbound uTP stream", ), UtpConnectionSide::Accept => ( self.utp_socket - .accept_with_cid(cid.clone(), *UTP_CONN_CFG) + .accept_with_cid(cid, Peer::new(peer), *UTP_CONN_CFG) .await, "accept inbound uTP stream", ), @@ -155,7 +178,7 @@ impl UtpController { UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedConnection, ); - debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "unable to {message}"); + debug!(%err, cid.send, cid.recv, peer = ?peer_client, "unable to {message}"); anyhow!("Unable to locate content on the network: unable to {message}") })?; @@ -164,7 +187,7 @@ impl UtpController { .map_err(|err| { self.metrics .report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx); - debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from {message}"); + debug!(%err, cid.send, cid.recv, peer = ?peer_client, "error reading data from {message}"); anyhow!( "Unable to locate content on the network: error reading data from {message}" ) @@ -178,22 +201,24 @@ impl UtpController { async fn outbound_stream( &self, - cid: ConnectionId, + cid: ConnectionId, + peer: UtpPeer, data: &[u8], side: UtpConnectionSide, ) -> bool { + let peer_client = peer.client(); self.metrics .report_utp_active_inc(UtpDirectionLabel::Outbound); let (stream, message) = match side { UtpConnectionSide::Connect => ( self.utp_socket - .connect_with_cid(cid.clone(), *UTP_CONN_CFG) + .connect_with_cid(cid, Peer::new(peer), *UTP_CONN_CFG) .await, "outbound connect with cid", ), UtpConnectionSide::Accept => ( self.utp_socket - .accept_with_cid(cid.clone(), *UTP_CONN_CFG) + .accept_with_cid(cid, Peer::new(peer), *UTP_CONN_CFG) .await, "outbound accept with cid", ), @@ -209,7 +234,7 @@ impl UtpController { %err, cid.send, cid.recv, - peer = ?cid.peer.client(), + peer = ?peer_client, "Unable to establish uTP conn based on {message}", ); return false; @@ -226,7 +251,7 @@ impl UtpController { debug!( %cid.send, %cid.recv, - peer = ?cid.peer.client(), + peer = ?peer_client, "Error sending content over uTP, in response to uTP write exited before sending all content: {write_size} bytes written, {} bytes expected", data.len() ); @@ -240,7 +265,7 @@ impl UtpController { %err, %cid.send, %cid.recv, - peer = ?cid.peer.client(), + peer = ?peer_client, "Error sending content over uTP, in response to Error writing content to uTP stream: {err}" ); return false; @@ -255,7 +280,7 @@ impl UtpController { %err, %cid.send, %cid.recv, - peer = ?cid.peer.client(), + peer = ?peer_client, "Error sending content over uTP, in response to Error closing uTP connection: {err}" ); return false; diff --git a/crates/subnetworks/beacon/src/lib.rs b/crates/subnetworks/beacon/src/lib.rs index f0414f845..b1c1c81a4 100644 --- a/crates/subnetworks/beacon/src/lib.rs +++ b/crates/subnetworks/beacon/src/lib.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use ethportal_api::types::jsonrpc::request::BeaconJsonRpcRequest; use portalnet::{ config::PortalnetConfig, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, events::{EventEnvelope, OverlayRequest}, }; use tokio::{ @@ -39,7 +39,7 @@ type BeaconEventStream = Option>; pub async fn initialize_beacon_network( discovery: &Arc, - utp_socket: Arc>, + utp_socket: Arc>, portalnet_config: PortalnetConfig, storage_config: PortalStorageConfig, header_oracle: Arc>, diff --git a/crates/subnetworks/beacon/src/network.rs b/crates/subnetworks/beacon/src/network.rs index f54d7b454..f4b331c43 100644 --- a/crates/subnetworks/beacon/src/network.rs +++ b/crates/subnetworks/beacon/src/network.rs @@ -9,7 +9,7 @@ use light_client::{consensus::rpc::portal_rpc::PortalRpc, database::FileDB, Clie use parking_lot::RwLock as PLRwLock; use portalnet::{ config::PortalnetConfig, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, overlay::{config::OverlayConfig, protocol::OverlayProtocol}, }; use tokio::sync::{Mutex, RwLock}; @@ -46,7 +46,7 @@ const GOSSIP_DROPPED: bool = false; impl BeaconNetwork { pub async fn new( discovery: Arc, - utp_socket: Arc>, + utp_socket: Arc>, storage_config: PortalStorageConfig, portal_config: PortalnetConfig, header_oracle: Arc>, diff --git a/crates/subnetworks/history/src/lib.rs b/crates/subnetworks/history/src/lib.rs index 5803c9660..f9996e305 100644 --- a/crates/subnetworks/history/src/lib.rs +++ b/crates/subnetworks/history/src/lib.rs @@ -14,7 +14,7 @@ use ethportal_api::types::jsonrpc::request::HistoryJsonRpcRequest; use network::HistoryNetwork; use portalnet::{ config::PortalnetConfig, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, events::{EventEnvelope, OverlayRequest}, }; use tokio::{ @@ -37,7 +37,7 @@ type HistoryEventStream = Option>; pub async fn initialize_history_network( discovery: &Arc, - utp_socket: Arc>, + utp_socket: Arc>, portalnet_config: PortalnetConfig, storage_config: PortalStorageConfig, header_oracle: Arc>, diff --git a/crates/subnetworks/history/src/network.rs b/crates/subnetworks/history/src/network.rs index f4b96057d..1b1b97b6a 100644 --- a/crates/subnetworks/history/src/network.rs +++ b/crates/subnetworks/history/src/network.rs @@ -7,7 +7,7 @@ use ethportal_api::{ use parking_lot::RwLock as PLRwLock; use portalnet::{ config::PortalnetConfig, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, overlay::{config::OverlayConfig, protocol::OverlayProtocol}, }; use tokio::sync::RwLock; @@ -42,7 +42,7 @@ pub struct HistoryNetwork { impl HistoryNetwork { pub async fn new( discovery: Arc, - utp_socket: Arc>, + utp_socket: Arc>, storage_config: PortalStorageConfig, portal_config: PortalnetConfig, header_oracle: Arc>, diff --git a/crates/subnetworks/state/src/lib.rs b/crates/subnetworks/state/src/lib.rs index 53773d9ee..03c5e5e0c 100644 --- a/crates/subnetworks/state/src/lib.rs +++ b/crates/subnetworks/state/src/lib.rs @@ -11,7 +11,7 @@ use ethportal_api::types::jsonrpc::request::StateJsonRpcRequest; use network::StateNetwork; use portalnet::{ config::PortalnetConfig, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, events::{EventEnvelope, OverlayRequest}, }; use tokio::{ @@ -41,7 +41,7 @@ type StateEventStream = Option>; pub async fn initialize_state_network( discovery: &Arc, - utp_socket: Arc>, + utp_socket: Arc>, portalnet_config: PortalnetConfig, storage_config: PortalStorageConfig, header_oracle: Arc>, diff --git a/crates/subnetworks/state/src/network.rs b/crates/subnetworks/state/src/network.rs index 1a9a5f79f..4f2422a02 100644 --- a/crates/subnetworks/state/src/network.rs +++ b/crates/subnetworks/state/src/network.rs @@ -7,7 +7,7 @@ use ethportal_api::{ use parking_lot::RwLock as PLRwLock; use portalnet::{ config::PortalnetConfig, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, overlay::{config::OverlayConfig, protocol::OverlayProtocol}, }; use tokio::sync::RwLock; @@ -45,7 +45,7 @@ const GOSSIP_DROPPED: bool = false; impl StateNetwork { pub async fn new( discovery: Arc, - utp_socket: Arc>, + utp_socket: Arc>, storage_config: PortalStorageConfig, portal_config: PortalnetConfig, header_oracle: Arc>, diff --git a/testing/utp/src/lib.rs b/testing/utp/src/lib.rs index 9d3f82011..048848d8c 100644 --- a/testing/utp/src/lib.rs +++ b/testing/utp/src/lib.rs @@ -19,21 +19,21 @@ use jsonrpsee::{ }; use portalnet::{ config::PortalnetConfig, - discovery::{Discovery, UtpEnr}, + discovery::{Discovery, UtpPeer}, }; use tokio::sync::{ mpsc::{self, Receiver}, RwLock, }; use trin_validation::oracle::HeaderOracle; -use utp_rs::{conn::ConnectionConfig, socket::UtpSocket}; +use utp_rs::{conn::ConnectionConfig, peer::Peer, socket::UtpSocket}; use crate::rpc::RpcServer; /// uTP test app pub struct TestApp { pub discovery: Arc, - pub utp_socket: Arc>, + pub utp_socket: Arc>, pub utp_talk_req_tx: mpsc::UnboundedSender, pub utp_payload: Arc>>>, } @@ -64,9 +64,9 @@ impl RpcServer for TestApp { let cid = utp_rs::cid::ConnectionId { send: cid_send, recv: cid_recv, - peer: UtpEnr(src_enr.clone()), + peer_id: src_enr.node_id(), }; - self.discovery.add_enr(src_enr).unwrap(); + self.discovery.add_enr(src_enr.clone()).unwrap(); let utp = Arc::clone(&self.utp_socket); let payload_store = Arc::clone(&self.utp_payload); @@ -78,7 +78,10 @@ impl RpcServer for TestApp { initial_timeout: Duration::from_millis(1250), ..Default::default() }; - let mut conn = utp.accept_with_cid(cid, utp_config).await.unwrap(); + let mut conn = utp + .accept_with_cid(cid, Peer::new(UtpPeer(src_enr)), utp_config) + .await + .unwrap(); let mut data = vec![]; let n = conn.read_to_eof(&mut data).await.unwrap(); @@ -112,9 +115,9 @@ impl RpcServer for TestApp { let cid = utp_rs::cid::ConnectionId { send: cid_send, recv: cid_recv, - peer: UtpEnr(dst_enr.clone()), + peer_id: dst_enr.node_id(), }; - self.discovery.add_enr(dst_enr).unwrap(); + self.discovery.add_enr(dst_enr.clone()).unwrap(); let utp = Arc::clone(&self.utp_socket); let utp_config = ConnectionConfig { @@ -125,7 +128,10 @@ impl RpcServer for TestApp { ..Default::default() }; tokio::spawn(async move { - let mut conn = utp.connect_with_cid(cid, utp_config).await.unwrap(); + let mut conn = utp + .connect_with_cid(cid, Peer::new(UtpPeer(dst_enr)), utp_config) + .await + .unwrap(); conn.write(&payload).await.unwrap();