Skip to content

Commit

Permalink
remove mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
supernovahs committed Jan 5, 2024
1 parent 922d0b5 commit 9bd89f9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 53 deletions.
36 changes: 9 additions & 27 deletions crates/net/discv5/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::future::Future;
pub use discv5::{
enr, enr::CombinedKey, service::Service, Config as Discv5Config,
ConfigBuilder as Discv5ConfigBuilder, Discv5, Enr, Event,
Expand All @@ -14,19 +13,18 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::{sync::mpsc, task};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream};

// Wrapper struct for Discv5
#[derive(Clone)]
pub struct Discv5Handle {
inner: Arc<Mutex<Discv5>>,
inner: Discv5,
}

impl Discv5Handle {
// Constructor to create a new Discv5Handle
pub fn new(discv5: Discv5) -> Self {
Discv5Handle { inner: Arc::new(Mutex::new(discv5)) }
Discv5Handle { inner: discv5 }
}

pub fn from_secret_key(
Expand All @@ -46,34 +44,18 @@ impl Discv5Handle {
))
}

pub fn convert_to_discv5(&self) -> Arc<Mutex<Discv5>> {
self.inner.clone()
pub fn convert_to_discv5(&self) -> &Discv5 {
&self.inner
}

pub async fn start_service(&self) -> Result<(), Discv5Error> {
let discv5 = Arc::clone(&self.inner);

tokio::task::spawn_blocking(move || {
let mut discv5_guard = discv5.lock();

let _ = discv5_guard.start();
drop(discv5_guard);
})
.await
.map_err(|_| Discv5Error::Discv5Construct.into())
pub async fn start_service(&mut self) -> Result<(), Discv5Error> {
self.inner.start().await.map_err(|_| Discv5Error::Discv5Construct.into())
}

pub async fn create_event_stream(
&self,
&mut self,
) -> Result<tokio::sync::mpsc::Receiver<Event>, Discv5Error> {
let discv5 = Arc::clone(&self.inner);
let discv5_guard = discv5.lock();
let res = discv5_guard
.event_stream()
.await
.map_err(|_| Discv5Error::Discv5EventStreamStart.into());
drop(discv5_guard);
return res;
self.inner.event_stream().await.map_err(|_| Discv5Error::Discv5EventStreamStart.into())
}
}

Expand Down
41 changes: 15 additions & 26 deletions crates/net/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use reth_primitives::{ForkId, NodeRecord, PeerId, B512};
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
fmt,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -86,18 +85,13 @@ impl Discovery {
};

// setup discv5
let (discv5,) = if let Some(discv5_config) = discv5_config {
let discv5 = Discv5Handle::from_secret_key(sk, discv5_config)?;
discv5.clone().start_service().await.unwrap();
// let discv5_event_stream : ;
// let discv5_event = discv5
// .create_event_stream()
// .await
// .map_err(|_e| NetworkError::Discv5(Discv5Error::SecretKeyDecode))?;
// let discv5_event_stream = Some(ReceiverStream::new(discv5_event));
(Some(discv5),)
let (discv5, discv5_event_stream) = if let Some(discv5_config) = discv5_config {
let mut discv5 = Discv5Handle::from_secret_key(sk, discv5_config)?;
discv5.start_service().await.unwrap();
let discv5_event_stream = discv5.create_event_stream().await.unwrap();
(Some(discv5), Some(discv5_event_stream))
} else {
(None,)
(None, None)
};

// setup DNS discovery
Expand Down Expand Up @@ -232,23 +226,21 @@ impl Discovery {
Event::EnrAdded { enr, replaced } => {
if let Some(discv5) = &self.discv5 {
let discv5 = Discv5Handle::convert_to_discv5(discv5);
let discv5_guard = discv5.lock();
let _ = discv5_guard.add_enr(enr);
let _ = discv5.add_enr(enr);
if let Some(replaced_enr) = replaced {
let node_id = replaced_enr.node_id();
discv5_guard.remove_node(&node_id);
discv5.remove_node(&node_id);
}
}
} // #5576 handle replaced variable
Event::NodeInserted { node_id, replaced } => {
if let Some(discv5) = &self.discv5 {
let discv5 = Discv5Handle::convert_to_discv5(discv5);
let discv5_guard = discv5.lock();
let enr = discv5_guard.find_enr(&node_id);
discv5_guard.add_enr(enr.unwrap()).unwrap(); // TODO # 5576 handle unwrap in the
// end properly
let enr = discv5.find_enr(&node_id);
discv5.add_enr(enr.unwrap()).unwrap(); // TODO # 5576 handle unwrap in the
// end properly
if let Some(replaced_enr) = replaced {
discv5_guard.remove_node(&replaced_enr);
discv5.remove_node(&replaced_enr);
}
}
}
Expand All @@ -267,21 +259,18 @@ impl Discovery {
Event::SocketUpdated(socket_address) => {
if let Some(discv5) = &self.discv5 {
let discv5 = Discv5Handle::convert_to_discv5(discv5);
let discv5_guard = discv5.lock();
let mut local_enr = discv5_guard.local_enr();
let mut local_enr = discv5.local_enr();
// local_enr.set_ip(socket_address.ip(),); #5576 figure out how to get signing
}
}
Event::TalkRequest(talk_request) => {
if let Some(discv5) = &self.discv5 {
let discv5 = Discv5Handle::convert_to_discv5(discv5);
let discv5_guard = discv5.lock();
let node_id = talk_request.node_id();
let enr = discv5_guard.find_enr(&node_id);
let enr = discv5.find_enr(&node_id);
let protocol = talk_request.protocol();
let request = talk_request.body();
let _ =
discv5_guard.talk_req(enr.unwrap(), protocol.to_vec(), request.to_vec());
let _ = discv5.talk_req(enr.unwrap(), protocol.to_vec(), request.to_vec());
}
}
}
Expand Down

0 comments on commit 9bd89f9

Please sign in to comment.