From ee98a261f3778e374724f11a2bf370abae22d2c7 Mon Sep 17 00:00:00 2001 From: nazeh Date: Fri, 27 Sep 2024 18:05:39 +0300 Subject: [PATCH] feat: remove all unwrap and deny using it --- CHANGELOG.md | 12 ++++++ src/async_dht.rs | 44 ++++++++++++---------- src/common/messages.rs | 22 +++++------ src/common/mutable.rs | 4 +- src/common/routing_table.rs | 4 +- src/dht.rs | 74 ++++++++++++++++++------------------- src/error.rs | 8 +++- src/lib.rs | 6 +-- src/rpc.rs | 7 +++- src/rpc/socket.rs | 21 ++++++----- src/server.rs | 13 ++++--- 11 files changed, 117 insertions(+), 98 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..0f181f6a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,12 @@ +# Changelog + +All notable changes to mainline dht will be documented in this file. + +## [Unreleased] + +### Changed + +- Removed all internal panic `#![deny(clippy::unwrap_used)]` +- `Testnet::new(size)` returns a `Result`. +- `Dht::local_addr()` returns a `Result`. + `AsyncDht::local_addr()` returns a `Result`. diff --git a/src/async_dht.rs b/src/async_dht.rs index 3c6675c3..f273aef6 100644 --- a/src/async_dht.rs +++ b/src/async_dht.rs @@ -10,6 +10,7 @@ use crate::{ PutMutableRequestArguments, PutRequestSpecific, RequestTypeSpecific, }, dht::{ActorMessage, Dht}, + error::SocketAddrResult, rpc::{PutResult, ResponseSender}, Result, }; @@ -29,8 +30,15 @@ impl AsyncDht { // === Getters === /// Returns the local address of the udp socket this node is listening on. - pub fn local_addr(&self) -> Option { - self.0.local_addr() + /// + /// Returns an error if the actor is shutdown, or if the [std::net::UdpSocket::local_addr] + /// returned an IO error. + pub async fn local_addr(&self) -> Result { + let (sender, receiver) = flume::bounded::(1); + + self.0 .0.send(ActorMessage::LocalAddr(sender))?; + + Ok(receiver.recv_async().await??) } // === Public Methods === @@ -39,12 +47,10 @@ impl AsyncDht { pub async fn shutdown(&mut self) -> Result<()> { let (sender, receiver) = flume::bounded::<()>(1); - self.0.sender.send(ActorMessage::Shutdown(sender))?; + self.0 .0.send(ActorMessage::Shutdown(sender))?; receiver.recv_async().await?; - self.0.address = None; - Ok(()) } @@ -69,7 +75,7 @@ impl AsyncDht { let request = RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash }); - self.0.sender.send(ActorMessage::Get( + self.0 .0.send(ActorMessage::Get( info_hash, request, ResponseSender::Peers(sender), @@ -98,7 +104,7 @@ impl AsyncDht { }); self.0 - .sender + .0 .send(ActorMessage::Put(info_hash, request, sender))?; receiver.recv_async().await? @@ -116,7 +122,7 @@ impl AsyncDht { salt: None, }); - self.0.sender.send(ActorMessage::Get( + self.0 .0.send(ActorMessage::Get( target, request, ResponseSender::Immutable(sender), @@ -127,7 +133,7 @@ impl AsyncDht { /// Put an immutable data to the DHT. pub async fn put_immutable(&self, value: Bytes) -> Result { - let target = Id::from_bytes(hash_immutable(&value)).unwrap(); + let target: Id = hash_immutable(&value).into(); let (sender, receiver) = flume::bounded::(1); @@ -136,9 +142,7 @@ impl AsyncDht { v: value.clone().into(), }); - self.0 - .sender - .send(ActorMessage::Put(target, request, sender))?; + self.0 .0.send(ActorMessage::Put(target, request, sender))?; receiver.recv_async().await? } @@ -158,7 +162,7 @@ impl AsyncDht { let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt }); - let _ = self.0.sender.send(ActorMessage::Get( + let _ = self.0 .0.send(ActorMessage::Get( target, request, ResponseSender::Mutable(sender), @@ -183,7 +187,7 @@ impl AsyncDht { let _ = self .0 - .sender + .0 .send(ActorMessage::Put(*item.target(), request, sender)); receiver.recv_async().await? @@ -207,7 +211,7 @@ mod test { async fn test() { let mut dht = Dht::client().unwrap().as_async(); - dht.local_addr(); + dht.local_addr().await.unwrap(); let a = dht.clone(); @@ -223,7 +227,7 @@ mod test { #[test] fn announce_get_peer() { async fn test() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -258,7 +262,7 @@ mod test { #[test] fn put_get_immutable() { async fn test() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -287,7 +291,7 @@ mod test { #[test] fn put_get_mutable() { async fn test() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -328,7 +332,7 @@ mod test { #[test] fn put_get_mutable_no_more_recent_value() { async fn test() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -368,7 +372,7 @@ mod test { #[test] fn repeated_put_query() { async fn test() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) diff --git a/src/common/messages.rs b/src/common/messages.rs index e3509678..5f770977 100644 --- a/src/common/messages.rs +++ b/src/common/messages.rs @@ -437,13 +437,9 @@ impl Message { token: arguments.token, put_request_type: PutRequestSpecific::AnnouncePeer( AnnouncePeerRequestArguments { - implied_port: if arguments.implied_port.is_none() { - None - } else if arguments.implied_port.unwrap() != 0 { - Some(true) - } else { - Some(false) - }, + implied_port: arguments + .implied_port + .map(|implied_port| implied_port != 0), info_hash: Id::from_bytes(&arguments.info_hash)?, port: arguments.port, }, @@ -452,7 +448,7 @@ impl Message { } } internal::DHTRequestSpecific::PutValue { arguments } => { - if arguments.k.is_some() { + if let Some(k) = arguments.k { RequestSpecific { requester_id: Id::from_bytes(arguments.id)?, @@ -462,10 +458,14 @@ impl Message { PutMutableRequestArguments { target: Id::from_bytes(arguments.target)?, v: arguments.v, - k: arguments.k.unwrap(), + k, // Should panic if missing. - seq: arguments.seq.unwrap(), - sig: arguments.sig.unwrap(), + seq: arguments.seq.expect( + "Put mutable message to have sequence number", + ), + sig: arguments.sig.expect( + "Put mutable message to have a signature", + ), salt: arguments.salt, cas: arguments.cas, }, diff --git a/src/common/mutable.rs b/src/common/mutable.rs index a6b425d2..874550cd 100644 --- a/src/common/mutable.rs +++ b/src/common/mutable.rs @@ -53,9 +53,9 @@ impl MutableItem { let mut hasher = Sha1::new(); hasher.update(&encoded); - let hash = hasher.digest().bytes(); + let bytes = hasher.digest().bytes(); - Id::from_bytes(hash).unwrap() + Id { bytes } } /// Set the cas number if needed. diff --git a/src/common/routing_table.rs b/src/common/routing_table.rs index 2dbf382b..419313c2 100644 --- a/src/common/routing_table.rs +++ b/src/common/routing_table.rs @@ -43,9 +43,7 @@ impl RoutingTable { return false; } - self.buckets.entry(distance).or_default(); - - let bucket = self.buckets.get_mut(&distance).unwrap(); + let bucket = self.buckets.entry(distance).or_default(); bucket.add(node) } diff --git a/src/dht.rs b/src/dht.rs index ee5646c4..2ca85322 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -13,6 +13,7 @@ use crate::{ GetValueRequestArguments, Id, MutableItem, PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific, RequestTypeSpecific, }, + error::SocketAddrResult, rpc::{PutResult, ReceivedFrom, ReceivedMessage, ResponseSender, Rpc}, server::{DhtServer, Server}, Result, @@ -20,10 +21,7 @@ use crate::{ #[derive(Debug, Clone)] /// Mainlin eDht node. -pub struct Dht { - pub(crate) sender: Sender, - pub(crate) address: Option, -} +pub struct Dht(pub(crate) Sender); pub struct Builder { settings: DhtSettings, @@ -115,7 +113,7 @@ impl Dht { let rpc = Rpc::new(&settings)?; - let address = rpc.local_addr(); + let address = rpc.local_addr()?; info!(?address, "Mainline DHT listening"); @@ -123,19 +121,21 @@ impl Dht { thread::spawn(move || run(rpc, &mut server, receiver)); - Ok(Dht { - sender, - address: Some(address), - }) + Ok(Dht(sender)) } // === Getters === /// Returns the local address of the udp socket this node is listening on. /// - /// Returns `None` if the node is shutdown - pub fn local_addr(&self) -> Option { - self.address + /// Returns an error if the actor is shutdown, or if the [std::net::UdpSocket::local_addr] + /// returned an IO error. + pub fn local_addr(&self) -> Result { + let (sender, receiver) = flume::bounded::(1); + + self.0.send(ActorMessage::LocalAddr(sender))?; + + Ok(receiver.recv()??) } // === Public Methods === @@ -144,12 +144,10 @@ impl Dht { pub fn shutdown(&mut self) -> Result<()> { let (sender, receiver) = flume::bounded::<()>(1); - self.sender.send(ActorMessage::Shutdown(sender))?; + self.0.send(ActorMessage::Shutdown(sender))?; receiver.recv()?; - self.address = None; - Ok(()) } @@ -174,7 +172,7 @@ impl Dht { let request = RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash }); - self.sender.send(ActorMessage::Get( + self.0.send(ActorMessage::Get( info_hash, request, ResponseSender::Peers(sender), @@ -202,8 +200,7 @@ impl Dht { implied_port, }); - self.sender - .send(ActorMessage::Put(info_hash, request, sender))?; + self.0.send(ActorMessage::Put(info_hash, request, sender))?; receiver.recv()? } @@ -220,7 +217,7 @@ impl Dht { salt: None, }); - self.sender.send(ActorMessage::Get( + self.0.send(ActorMessage::Get( target, request, ResponseSender::Immutable(sender), @@ -231,7 +228,7 @@ impl Dht { /// Put an immutable data to the DHT. pub fn put_immutable(&self, value: Bytes) -> Result { - let target = Id::from_bytes(hash_immutable(&value)).unwrap(); + let target: Id = hash_immutable(&value).into(); let (sender, receiver) = flume::bounded::(1); @@ -240,8 +237,7 @@ impl Dht { v: value.clone().into(), }); - self.sender - .send(ActorMessage::Put(target, request, sender))?; + self.0.send(ActorMessage::Put(target, request, sender))?; receiver.recv()? } @@ -261,7 +257,7 @@ impl Dht { let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt }); - let _ = self.sender.send(ActorMessage::Get( + let _ = self.0.send(ActorMessage::Get( target, request, ResponseSender::Mutable(sender), @@ -285,7 +281,7 @@ impl Dht { }); let _ = self - .sender + .0 .send(ActorMessage::Put(*item.target(), request, sender)); receiver.recv()? @@ -301,6 +297,9 @@ fn run(mut rpc: Rpc, server: &mut Option>, receiver: Receiver { + let _ = sender.send(rpc.local_addr()); + } ActorMessage::Put(target, request, sender) => { rpc.put(target, request, Some(sender)); } @@ -328,6 +327,7 @@ fn run(mut rpc: Rpc, server: &mut Option>, receiver: Receiver), Get(Id, RequestTypeSpecific, ResponseSender), + LocalAddr(Sender), Shutdown(Sender<()>), } @@ -339,29 +339,25 @@ pub struct Testnet { } impl Testnet { - pub fn new(count: usize) -> Self { + pub fn new(count: usize) -> Result { let mut nodes: Vec = vec![]; let mut bootstrap = vec![]; for i in 0..count { if i == 0 { - let node = Dht::builder().server().bootstrap(&[]).build().unwrap(); + let node = Dht::builder().server().bootstrap(&[]).build()?; - let addr = node.local_addr().unwrap(); + let addr = node.local_addr()?; bootstrap.push(format!("127.0.0.1:{}", addr.port())); nodes.push(node) } else { - let node = Dht::builder() - .server() - .bootstrap(&bootstrap) - .build() - .unwrap(); + let node = Dht::builder().server().bootstrap(&bootstrap).build()?; nodes.push(node) } } - Self { bootstrap, nodes } + Ok(Self { bootstrap, nodes }) } } @@ -378,7 +374,7 @@ mod test { fn shutdown() { let mut dht = Dht::client().unwrap(); - dht.local_addr(); + dht.local_addr().unwrap(); let a = dht.clone(); @@ -402,7 +398,7 @@ mod test { #[test] fn announce_get_peer() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -425,7 +421,7 @@ mod test { #[test] fn put_get_immutable() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -448,7 +444,7 @@ mod test { #[test] fn put_get_mutable() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -482,7 +478,7 @@ mod test { #[test] fn put_get_mutable_no_more_recent_value() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) @@ -515,7 +511,7 @@ mod test { #[test] fn repeated_put_query() { - let testnet = Testnet::new(10); + let testnet = Testnet::new(10).unwrap(); let a = Dht::builder() .bootstrap(&testnet.bootstrap) diff --git a/src/error.rs b/src/error.rs index 319de704..d43cb536 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,11 +1,17 @@ //! Main Crate Error +use std::net::SocketAddr; + use crate::{common::ErrorSpecific, dht::ActorMessage, Id}; +// Alias Result to be the crate Result. +pub type Result = core::result::Result; + +pub type SocketAddrResult = core::result::Result; + #[derive(thiserror::Error, Debug)] /// Mainline crate error enum. pub enum Error { - /// For starter, to remove as code matures. #[error("Generic error: {0}")] Generic(String), /// For starter, to remove as code matures. diff --git a/src/lib.rs b/src/lib.rs index f4e9a7c2..b42436d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![doc = include_str!("../README.md")] +#![deny(clippy::unwrap_used)] // Public modules mod common; @@ -15,7 +16,4 @@ pub use bytes::Bytes; pub use dht::{Dht, Testnet}; pub use ed25519_dalek::SigningKey; -pub use error::Error; - -// Alias Result to be the crate Result. -pub type Result = core::result::Result; +pub use error::{Error, Result}; diff --git a/src/rpc.rs b/src/rpc.rs index c106d868..7ecc3e0b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -20,6 +20,7 @@ use crate::common::{ PutRequestSpecific, RequestSpecific, RequestTypeSpecific, ResponseSpecific, RoutingTable, }; +use crate::error::SocketAddrResult; use crate::{dht::DhtSettings, Error, Result}; use query::{PutQuery, Query}; use socket::KrpcSocket; @@ -95,7 +96,9 @@ impl Rpc { routing_table: RoutingTable::new().with_id(id), queries: HashMap::new(), put_queries: HashMap::new(), - closest_nodes: LruCache::new(NonZeroUsize::new(MAX_CACHED_BUCKETS).unwrap()), + closest_nodes: LruCache::new( + NonZeroUsize::new(MAX_CACHED_BUCKETS).expect("MAX_CACHED_BUCKETS is NonZeroUsize"), + ), last_table_refresh: Instant::now() .checked_sub(REFRESH_TABLE_INTERVAL) @@ -121,7 +124,7 @@ impl Rpc { /// Returns the address the server is listening to. #[inline] - pub fn local_addr(&self) -> SocketAddr { + pub fn local_addr(&self) -> SocketAddrResult { self.socket.local_addr() } diff --git a/src/rpc/socket.rs b/src/rpc/socket.rs index bdfa6e91..728bd2d2 100644 --- a/src/rpc/socket.rs +++ b/src/rpc/socket.rs @@ -7,6 +7,7 @@ use tracing::{debug, trace}; use crate::common::{ErrorSpecific, Message, MessageType, RequestSpecific, ResponseSpecific}; +use crate::error::SocketAddrResult; use crate::{dht::DhtSettings, Result}; const VERSION: [u8; 4] = [82, 83, 0, 1]; // "RS" version 01 @@ -61,8 +62,8 @@ impl KrpcSocket { /// Returns the address the server is listening to. #[inline] - pub fn local_addr(&self) -> SocketAddr { - self.socket.local_addr().unwrap() + pub fn local_addr(&self) -> SocketAddrResult { + self.socket.local_addr() } // === Public Methods === @@ -282,12 +283,12 @@ mod test { port: None, }) .unwrap(); - let server_address = server.local_addr(); + let server_address = server.local_addr().unwrap(); let mut client = KrpcSocket::new(&DhtSettings::default()).unwrap(); client.next_tid = 120; - let client_address = client.local_addr(); + let client_address = client.local_addr().unwrap(); let request = RequestSpecific { requester_id: Id::random(), request_type: RequestTypeSpecific::Ping, @@ -318,11 +319,11 @@ mod test { #[test] fn recv_response() { let mut server = KrpcSocket::new(&DhtSettings::default()).unwrap(); - let server_address = server.local_addr(); + let server_address = server.local_addr().unwrap(); let mut client = KrpcSocket::new(&DhtSettings::default()).unwrap(); - let client_address = client.local_addr(); + let client_address = client.local_addr().unwrap(); server.inflight_requests.push(InflightRequest { tid: 8, @@ -362,7 +363,7 @@ mod test { #[test] fn ignore_unexcpected_response() { let mut server = KrpcSocket::new(&DhtSettings::default()).unwrap(); - let server_address = server.local_addr(); + let server_address = server.local_addr().unwrap(); let mut client = KrpcSocket::new(&DhtSettings::default()).unwrap(); @@ -390,11 +391,11 @@ mod test { #[test] fn ignore_response_from_wrong_address() { let mut server = KrpcSocket::new(&DhtSettings::default()).unwrap(); - let server_address = server.local_addr(); + let server_address = server.local_addr().unwrap(); let mut client = KrpcSocket::new(&DhtSettings::default()).unwrap(); - let client_address = client.local_addr(); + let client_address = client.local_addr().unwrap(); server.inflight_requests.push(InflightRequest { tid: 8, @@ -424,7 +425,7 @@ mod test { #[test] fn ignore_request_in_read_only() { let mut server = KrpcSocket::new(&DhtSettings::default()).unwrap(); - let server_address = server.local_addr(); + let server_address = server.local_addr().unwrap(); let mut client = KrpcSocket::new(&DhtSettings::default()).unwrap(); client.next_tid = 120; diff --git a/src/server.rs b/src/server.rs index a18ca88b..db660d82 100644 --- a/src/server.rs +++ b/src/server.rs @@ -29,7 +29,7 @@ pub const MAX_INFO_HASHES: usize = 2000; pub const MAX_PEERS: usize = 500; pub const MAX_VALUES: usize = 1000; -/// +/// Dht server that can handle incoming rpc requests pub trait Server: std::fmt::Debug + Send + Sync { /// Handle incoming requests. /// @@ -92,19 +92,20 @@ impl DhtServer { Self { tokens, peers: PeersStore::new( - NonZeroUsize::new(settings.max_info_hashes) - .unwrap_or(NonZeroUsize::new(MAX_INFO_HASHES).unwrap()), + NonZeroUsize::new(settings.max_info_hashes).unwrap_or( + NonZeroUsize::new(MAX_INFO_HASHES).expect("MAX_PEERS is NonZeroUsize"), + ), NonZeroUsize::new(settings.max_peers_per_info_hash) - .unwrap_or(NonZeroUsize::new(MAX_PEERS).unwrap()), + .unwrap_or(NonZeroUsize::new(MAX_PEERS).expect("MAX_PEERS is NonZeroUsize")), ), immutable_values: LruCache::new( NonZeroUsize::new(settings.max_immutable_values) - .unwrap_or(NonZeroUsize::new(MAX_VALUES).unwrap()), + .unwrap_or(NonZeroUsize::new(MAX_VALUES).expect("MAX_VALUES is NonZeroUsize")), ), mutable_values: LruCache::new( NonZeroUsize::new(settings.max_mutable_values) - .unwrap_or(NonZeroUsize::new(MAX_VALUES).unwrap()), + .unwrap_or(NonZeroUsize::new(MAX_VALUES).expect("MAX_VALUES is NonZeroUsize")), ), } }