diff --git a/CHANGELOG.md b/CHANGELOG.md index cd37c59f..a174abc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,3 +13,12 @@ All notable changes to mainline dht will be documented in this file. - `Dht::shutdown()` is now idempotent, and returns `()`. - `AsyncDht::shutdown()` is now idempotent, and returns `()`. - `Rpc::drop` uses `tracing::debug!()` to log dropping the Rpc. +- `Id::as_bytes()` instead of exposing internal `bytes` property. +- Replace crate `Error` with more granular errors. +- Replace Flume's `RecvError` with `expect()` message, since the sender should never be dropped to soon. +- `DhtIsShutdown` error is a standalone error. +- `InvalidIdSize` error is a standalone error. + +### Removed + +- Removed `mainline::error::Error` and `mainline::error::Result` diff --git a/src/async_dht.rs b/src/async_dht.rs index a9230c21..2b967a07 100644 --- a/src/async_dht.rs +++ b/src/async_dht.rs @@ -9,10 +9,8 @@ use crate::{ GetValueRequestArguments, Id, MutableItem, PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific, RequestTypeSpecific, }, - dht::{ActorMessage, Dht}, - error::SocketAddrResult, - rpc::{PutResult, ResponseSender}, - Result, + dht::{ActorMessage, Dht, DhtIsShutdown, DhtLocalAddrError, DhtPutError}, + rpc::{PutError, ResponseSender}, }; impl Dht { @@ -33,12 +31,18 @@ impl AsyncDht { /// /// Returns an error if the actor is shutdown, or if the [std::net::UdpSocket::local_addr] /// returned an IO error. - pub async fn local_addr(&self) -> Result { - let (sender, receiver) = flume::bounded::(1); + pub async fn local_addr(&self) -> Result { + let (sender, receiver) = flume::bounded::>(1); - self.0 .0.send(ActorMessage::LocalAddr(sender))?; + self.0 + .0 + .send(ActorMessage::LocalAddr(sender)) + .map_err(|_| DhtIsShutdown)?; - Ok(receiver.recv_async().await??) + Ok(receiver + .recv_async() + .await + .expect("Query was dropped before sending a response, please open an issue.")?) } // === Public Methods === @@ -62,7 +66,10 @@ impl AsyncDht { /// for Bittorrent is that any peer will introduce you to more peers through "peer exchange" /// so if you are implementing something different from Bittorrent, you might want /// to implement your own logic for gossipping more peers after you discover the first ones. - pub fn get_peers(&self, info_hash: Id) -> Result>> { + pub fn get_peers( + &self, + info_hash: Id, + ) -> Result>, DhtIsShutdown> { // Get requests use unbounded channels to avoid blocking in the run loop. // Other requests like put_* and getters don't need that and is ok with // bounded channel with 1 capacity since it only ever sends one message back. @@ -72,11 +79,14 @@ impl AsyncDht { let request = RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash }); - self.0 .0.send(ActorMessage::Get( - info_hash, - request, - ResponseSender::Peers(sender), - ))?; + self.0 + .0 + .send(ActorMessage::Get( + info_hash, + request, + ResponseSender::Peers(sender), + )) + .map_err(|_| DhtIsShutdown)?; Ok(receiver.into_stream()) } @@ -86,8 +96,8 @@ impl AsyncDht { /// The peer will be announced on this process IP. /// If explicit port is passed, it will be used, otherwise the port will be implicitly /// assumed by remote nodes to be the same ase port they recieved the request from. - pub async fn announce_peer(&self, info_hash: Id, port: Option) -> Result { - let (sender, receiver) = flume::bounded::(1); + pub async fn announce_peer(&self, info_hash: Id, port: Option) -> Result { + let (sender, receiver) = flume::bounded::>(1); let (port, implied_port) = match port { Some(port) => (port, None), @@ -102,15 +112,19 @@ impl AsyncDht { self.0 .0 - .send(ActorMessage::Put(info_hash, request, sender))?; + .send(ActorMessage::Put(info_hash, request, sender)) + .map_err(|_| DhtIsShutdown)?; - receiver.recv_async().await? + Ok(receiver + .recv_async() + .await + .expect("Query was dropped before sending a response, please open an issue.")?) } // === Immutable data === /// Get an Immutable data by its sha1 hash. - pub async fn get_immutable(&self, target: Id) -> Result { + pub async fn get_immutable(&self, target: Id) -> Result { let (sender, receiver) = flume::unbounded::(); let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { @@ -119,29 +133,41 @@ impl AsyncDht { salt: None, }); - self.0 .0.send(ActorMessage::Get( - target, - request, - ResponseSender::Immutable(sender), - ))?; - - Ok(receiver.recv_async().await?) + self.0 + .0 + .send(ActorMessage::Get( + target, + request, + ResponseSender::Immutable(sender), + )) + .map_err(|_| DhtIsShutdown)?; + + Ok(receiver + .recv_async() + .await + .expect("Query was dropped before sending a response, please open an issue.")) } /// Put an immutable data to the DHT. - pub async fn put_immutable(&self, value: Bytes) -> Result { + pub async fn put_immutable(&self, value: Bytes) -> Result { let target: Id = hash_immutable(&value).into(); - let (sender, receiver) = flume::bounded::(1); + let (sender, receiver) = flume::bounded::>(1); let request = PutRequestSpecific::PutImmutable(PutImmutableRequestArguments { target, v: value.clone().into(), }); - self.0 .0.send(ActorMessage::Put(target, request, sender))?; + self.0 + .0 + .send(ActorMessage::Put(target, request, sender)) + .map_err(|_| DhtIsShutdown)?; - receiver.recv_async().await? + Ok(receiver + .recv_async() + .await + .expect("Query was dropped before sending a response, please open an issue.")?) } // === Mutable data === @@ -152,25 +178,28 @@ impl AsyncDht { public_key: &[u8; 32], salt: Option, seq: Option, - ) -> Result> { + ) -> Result, DhtIsShutdown> { let target = MutableItem::target_from_key(public_key, &salt); let (sender, receiver) = flume::unbounded::(); let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt }); - let _ = self.0 .0.send(ActorMessage::Get( - target, - request, - ResponseSender::Mutable(sender), - )); + self.0 + .0 + .send(ActorMessage::Get( + target, + request, + ResponseSender::Mutable(sender), + )) + .map_err(|_| DhtIsShutdown)?; Ok(receiver.into_stream()) } /// Put a mutable data to the DHT. - pub async fn put_mutable(&self, item: MutableItem) -> Result { - let (sender, receiver) = flume::bounded::(1); + pub async fn put_mutable(&self, item: MutableItem) -> Result { + let (sender, receiver) = flume::bounded::>(1); let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments { target: *item.target(), @@ -182,12 +211,15 @@ impl AsyncDht { cas: *item.cas(), }); - let _ = self - .0 + self.0 .0 - .send(ActorMessage::Put(*item.target(), request, sender)); + .send(ActorMessage::Put(*item.target(), request, sender)) + .map_err(|_| DhtIsShutdown)?; - receiver.recv_async().await? + Ok(receiver + .recv_async() + .await + .expect("Query was dropped before sending a response, please open an issue.")?) } } @@ -201,7 +233,6 @@ mod test { use crate::dht::Testnet; use super::*; - use crate::Error; #[test] fn shutdown() { @@ -216,7 +247,7 @@ mod test { let result = a.get_immutable(Id::random()).await; - assert!(matches!(result, Err(Error::DhtIsShutdown(_)))) + assert!(matches!(result, Err(DhtIsShutdown))) } futures::executor::block_on(test()); } diff --git a/src/common/id.rs b/src/common/id.rs index bdfe3025..5de6f4d9 100644 --- a/src/common/id.rs +++ b/src/common/id.rs @@ -8,8 +8,6 @@ use std::{ str::FromStr, }; -use crate::{Error, Result}; - /// The size of node IDs in bits. pub const ID_SIZE: usize = 20; pub const MAX_DISTANCE: u8 = ID_SIZE as u8 * 8; @@ -29,10 +27,10 @@ impl Id { Id(bytes) } /// Create a new Id from some bytes. Returns Err if the input is not 20 bytes long. - pub fn from_bytes>(bytes: T) -> Result { + pub fn from_bytes>(bytes: T) -> Result { let bytes = bytes.as_ref(); if bytes.len() != ID_SIZE { - return Err(Error::InvalidIdSize(bytes.len())); + return Err(InvalidIdSize(bytes.len())); } let mut tmp: [u8; ID_SIZE] = [0; ID_SIZE]; @@ -120,7 +118,7 @@ fn from_ipv4_and_r(bytes: [u8; 20], ip: Ipv4Addr, r: u8) -> Id { // Set first 21 bits to the prefix bytes[0] = prefix[0]; bytes[1] = prefix[1]; - // set the first 5 bits of the 3 byte to the remaining 5 bits of the prefix + // set the first 5 bits of the 3rd byte to the remaining 5 bits of the prefix bytes[2] = (prefix[2] & 0xf8) | (bytes[2] & 0x7); // Set the last byte to the random r @@ -132,10 +130,10 @@ fn from_ipv4_and_r(bytes: [u8; 20], ip: Ipv4Addr, r: u8) -> Id { fn id_prefix_ipv4(ip: &Ipv4Addr, r: u8) -> [u8; 3] { let r32: u32 = r.into(); let ip_int: u32 = u32::from_be_bytes(ip.octets()); - let nonsense: u32 = (ip_int & IPV4_MASK) | (r32 << 29); + let masked_ip: u32 = (ip_int & IPV4_MASK) | (r32 << 29); let mut digest = CASTAGNOLI.digest(); - digest.update(&nonsense.to_be_bytes()); + digest.update(&masked_ip.to_be_bytes()); let crc = digest.finalize(); @@ -172,13 +170,11 @@ impl From for [u8; ID_SIZE] { } impl FromStr for Id { - type Err = Error; + type Err = DecodeIdError; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { if s.len() % 2 != 0 { - return Err(Error::InvalidIdEncoding( - "Number of Hex characters should be even".into(), - )); + return Err(DecodeIdError::OddNumberOfCharacters); } let mut bytes = Vec::with_capacity(s.len() / 2); @@ -188,11 +184,11 @@ impl FromStr for Id { if let Ok(byte) = u8::from_str_radix(byte_str, 16) { bytes.push(byte); } else { - return Err(Error::Static("Invalid hex character")); // Invalid hex character + return Err(DecodeIdError::InvalidHexCharacter(byte_str.into())); } } - Id::from_bytes(bytes) + Ok(Id::from_bytes(bytes)?) } } @@ -202,6 +198,32 @@ impl Debug for Id { } } +#[derive(Debug)] +pub struct InvalidIdSize(usize); + +impl std::error::Error for InvalidIdSize {} + +impl std::fmt::Display for InvalidIdSize { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Invalid Id size, expected 20, got {0}", self.0) + } +} + +#[derive(thiserror::Error, Debug)] +/// Mainline crate error enum. +pub enum DecodeIdError { + /// Id is expected to by 20 bytes. + #[error(transparent)] + InvalidIdSize(#[from] InvalidIdSize), + + #[error("Hex encoding should contain an even number of hex characters")] + OddNumberOfCharacters, + + /// Invalid hex character + #[error("Invalid Id encoding: {0}")] + InvalidHexCharacter(String), +} + #[cfg(test)] mod test { use super::*; diff --git a/src/common/messages.rs b/src/common/messages.rs index 5f770977..21003f72 100644 --- a/src/common/messages.rs +++ b/src/common/messages.rs @@ -10,7 +10,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use bytes::Bytes; use crate::common::{Id, Node, ID_SIZE}; -use crate::{Error, Result}; + +use super::InvalidIdSize; #[derive(Debug, PartialEq, Clone)] pub struct Message { @@ -389,7 +390,7 @@ impl Message { } } - fn from_serde_message(msg: internal::DHTMessage) -> Result { + fn from_serde_message(msg: internal::DHTMessage) -> Result { Ok(Message { transaction_id: transaction_id(msg.transaction_id)?, version: msg.version, @@ -568,32 +569,26 @@ impl Message { internal::DHTMessageVariant::Error(err) => { if err.error_info.len() < 2 { - return Err(Error::Static( - "Error packet should have at least 2 elements", - )); + return Err(DecodeMessageError::InvalidErrorDescription); } MessageType::Error(ErrorSpecific { code: match err.error_info[0] { serde_bencode::value::Value::Int(code) => match code.try_into() { Ok(code) => code, - Err(_) => return Err(Error::Static("error parsing error code")), + Err(_) => return Err(DecodeMessageError::InvalidErrorCode), }, - _ => return Err(Error::Static("Expected error code as first element")), + _ => return Err(DecodeMessageError::InvalidErrorCode), }, description: match &err.error_info[1] { serde_bencode::value::Value::Bytes(desc) => { match std::str::from_utf8(desc) { Ok(desc) => desc.to_string(), Err(_) => { - return Err(Error::Static( - "error parsing error description", - )) + return Err(DecodeMessageError::InvalidErrorDescription) } } } - _ => { - return Err(Error::Static("Expected description as second element")) - } + _ => return Err(DecodeMessageError::InvalidErrorDescription), }, }) } @@ -601,11 +596,11 @@ impl Message { }) } - pub fn to_bytes(&self) -> Result> { + pub fn to_bytes(&self) -> Result, serde_bencode::Error> { self.clone().into_serde_message().to_bytes() } - pub fn from_bytes>(bytes: T) -> Result { + pub fn from_bytes>(bytes: T) -> Result { Message::from_serde_message(internal::DHTMessage::from_bytes(bytes)?) } @@ -679,17 +674,17 @@ impl Message { } // Return the transaction Id as a u16 -pub fn transaction_id(bytes: Vec) -> Result { +fn transaction_id(bytes: Vec) -> Result { if bytes.len() == 2 { return Ok(((bytes[0] as u16) << 8) | (bytes[1] as u16)); } else if bytes.len() == 1 { return Ok(bytes[0] as u16); } - Err(Error::InvalidTransactionId(bytes)) + Err(DecodeMessageError::InvalidTransactionId(bytes)) } -fn bytes_to_sockaddr>(bytes: T) -> Result { +fn bytes_to_sockaddr>(bytes: T) -> Result { let bytes = bytes.as_ref(); match bytes.len() { 6 => { @@ -697,16 +692,16 @@ fn bytes_to_sockaddr>(bytes: T) -> Result { let port_bytes_as_array: [u8; 2] = bytes[4..6] .try_into() - .map_err(|_| Error::Static("wrong number of bytes for port"))?; + .map_err(|_| DecodeMessageError::InvalidPortEncoding)?; let port: u16 = u16::from_be_bytes(port_bytes_as_array); Ok(SocketAddr::new(IpAddr::V4(ip), port)) } - 18 => Err(Error::Static("IPv6 is not yet implemented")), + 18 => Err(DecodeMessageError::Ipv6Unsupported), - _ => Err(Error::Static("Wrong number of bytes for sockaddr")), + _ => Err(DecodeMessageError::InvalidSocketAddrEncodingLength), } } @@ -745,14 +740,11 @@ fn nodes4_to_bytes(nodes: &[Node]) -> Vec { vec } -fn bytes_to_nodes4>(bytes: T) -> Result> { +fn bytes_to_nodes4>(bytes: T) -> Result, DecodeMessageError> { let bytes = bytes.as_ref(); let node4_byte_size: usize = ID_SIZE + 6; if bytes.len() % node4_byte_size != 0 { - return Err(Error::Generic(format!( - "Wrong number of bytes for nodes message ({})", - bytes.len() - ))); + return Err(DecodeMessageError::InvalidNodes4); } let expected_num = bytes.len() / node4_byte_size; @@ -775,11 +767,48 @@ fn peers_to_bytes(peers: Vec) -> Vec { .collect() } -fn bytes_to_peers>(bytes: T) -> Result> { +fn bytes_to_peers>( + bytes: T, +) -> Result, DecodeMessageError> { let bytes = bytes.as_ref(); bytes.iter().map(bytes_to_sockaddr).collect() } +#[derive(thiserror::Error, Debug)] +/// Mainline crate error enum. +pub enum DecodeMessageError { + #[error("Wrong number of bytes for nodes")] + InvalidNodes4, + + /// Message transaction_id is not two bytes. + #[error("Invalid transaction_id: {0:?}")] + InvalidTransactionId(Vec), + + #[error("wrong number of bytes for port")] + InvalidPortEncoding, + + #[error("IPv6 is not yet implemented")] + Ipv6Unsupported, + + #[error("Wrong number of bytes for sockaddr")] + InvalidSocketAddrEncodingLength, + + #[error("Failed to parse packet bytes: {0}")] + BencodeError(#[from] serde_bencode::Error), + + #[error(transparent)] + InvalidIdSize(#[from] InvalidIdSize), + + #[error("Error packet should have at least 2 elements")] + InvalidErrorPacket, + + #[error("error parsing error code")] + InvalidErrorCode, + + #[error("error parsing error description")] + InvalidErrorDescription, +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/messages/internal.rs b/src/common/messages/internal.rs index b0f76a2b..27408c3b 100644 --- a/src/common/messages/internal.rs +++ b/src/common/messages/internal.rs @@ -1,7 +1,5 @@ use serde::{Deserialize, Serialize}; -use crate::{Error, Result}; - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct DHTMessage { #[serde(rename = "t", with = "serde_bytes")] @@ -24,14 +22,14 @@ pub struct DHTMessage { } impl DHTMessage { - pub fn from_bytes>(bytes: T) -> Result { + pub fn from_bytes>(bytes: T) -> Result { let bytes = bytes.as_ref(); let obj = serde_bencode::from_bytes(bytes)?; Ok(obj) } - pub fn to_bytes(&self) -> Result> { - serde_bencode::to_bytes(self).map_err(Error::BencodeError) + pub fn to_bytes(&self) -> Result, serde_bencode::Error> { + serde_bencode::to_bytes(self) } } diff --git a/src/common/mutable.rs b/src/common/mutable.rs index bd8f8372..0aa201a0 100644 --- a/src/common/mutable.rs +++ b/src/common/mutable.rs @@ -5,7 +5,7 @@ use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey}; use sha1_smol::Sha1; use std::convert::TryFrom; -use crate::{Error, Id, Result}; +use crate::Id; #[derive(Clone, Debug, PartialEq)] /// [Bep_0044](https://www.bittorrent.org/beps/bep_0044.html)'s Mutable item. @@ -91,14 +91,14 @@ impl MutableItem { signature: &[u8], salt: Option, cas: &Option, - ) -> Result { - let key = VerifyingKey::try_from(key).map_err(|_| Error::InvalidMutablePublicKey)?; + ) -> Result { + let key = VerifyingKey::try_from(key).map_err(|_| MutableError::InvalidMutablePublicKey)?; let signature = - Signature::from_slice(signature).map_err(|_| Error::InvalidMutableSignature)?; + Signature::from_slice(signature).map_err(|_| MutableError::InvalidMutableSignature)?; key.verify(&encode_signable(seq, &v, &salt), &signature) - .map_err(|_| Error::InvalidMutableSignature)?; + .map_err(|_| MutableError::InvalidMutableSignature)?; Ok(Self { target: *target, @@ -156,6 +156,16 @@ pub fn encode_signable(seq: &i64, value: &Bytes, salt: &Option) -> Bytes signable.into() } +#[derive(thiserror::Error, Debug)] +/// Mainline crate error enum. +pub enum MutableError { + #[error("Invalid mutable item signature")] + InvalidMutableSignature, + + #[error("Invalid mutable item public key")] + InvalidMutablePublicKey, +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/dht.rs b/src/dht.rs index 0560d078..83626fa0 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -1,6 +1,6 @@ //! Dht node. -use std::{net::SocketAddr, thread, time::Duration}; +use std::{fmt::Formatter, net::SocketAddr, thread, time::Duration}; use bytes::Bytes; use flume::{Receiver, Sender}; @@ -13,10 +13,8 @@ use crate::{ GetValueRequestArguments, Id, MutableItem, PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific, RequestTypeSpecific, }, - error::SocketAddrResult, - rpc::{PutResult, ReceivedFrom, ReceivedMessage, ResponseSender, Rpc}, + rpc::{PutError, ReceivedFrom, ReceivedMessage, ResponseSender, Rpc}, server::{DhtServer, Server}, - Result, }; #[derive(Debug, Clone)] @@ -29,7 +27,7 @@ pub struct Builder { impl Builder { /// Create a Dht node. - pub fn build(self) -> Result { + pub fn build(self) -> Result { Dht::new(self.settings) } @@ -91,7 +89,7 @@ impl Dht { } /// Create a new DHT client with default bootstrap nodes. - pub fn client() -> Result { + pub fn client() -> Result { Dht::builder().build() } @@ -100,7 +98,7 @@ impl Dht { /// /// Note: this is only useful if the node has a public IP address and is able to receive /// incoming udp packets. - pub fn server() -> Result { + pub fn server() -> Result { Dht::builder().server().build() } @@ -108,7 +106,7 @@ impl Dht { /// /// Could return an error if it failed to bind to the specified /// port or other io errors while binding the udp socket. - pub fn new(settings: DhtSettings) -> Result { + pub fn new(settings: DhtSettings) -> Result { let (sender, receiver) = flume::bounded(32); let rpc = Rpc::new(&settings)?; @@ -130,12 +128,16 @@ impl Dht { /// /// Returns an error if the actor is shutdown, or if the [std::net::UdpSocket::local_addr] /// returned an IO error. - pub fn local_addr(&self) -> Result { - let (sender, receiver) = flume::bounded::(1); + pub fn local_addr(&self) -> Result { + let (sender, receiver) = flume::bounded::>(1); - self.0.send(ActorMessage::LocalAddr(sender))?; + self.0 + .send(ActorMessage::LocalAddr(sender)) + .map_err(|_| DhtIsShutdown)?; - Ok(receiver.recv()??) + Ok(receiver + .recv() + .expect("Query was dropped before sending a response, please open an issue.")?) } // === Public Methods === @@ -159,7 +161,10 @@ impl Dht { /// for Bittorrent is that any peer will introduce you to more peers through "peer exchange" /// so if you are implementing something different from Bittorrent, you might want /// to implement your own logic for gossipping more peers after you discover the first ones. - pub fn get_peers(&self, info_hash: Id) -> Result>> { + pub fn get_peers( + &self, + info_hash: Id, + ) -> Result>, DhtIsShutdown> { // Get requests use unbounded channels to avoid blocking in the run loop. // Other requests like put_* and getters don't need that and is ok with // bounded channel with 1 capacity since it only ever sends one message back. @@ -169,11 +174,13 @@ impl Dht { let request = RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash }); - self.0.send(ActorMessage::Get( - info_hash, - request, - ResponseSender::Peers(sender), - ))?; + self.0 + .send(ActorMessage::Get( + info_hash, + request, + ResponseSender::Peers(sender), + )) + .map_err(|_| DhtIsShutdown)?; Ok(receiver.into_iter()) } @@ -183,8 +190,8 @@ impl Dht { /// The peer will be announced on this process IP. /// If explicit port is passed, it will be used, otherwise the port will be implicitly /// assumed by remote nodes to be the same ase port they recieved the request from. - pub fn announce_peer(&self, info_hash: Id, port: Option) -> Result { - let (sender, receiver) = flume::bounded::(1); + pub fn announce_peer(&self, info_hash: Id, port: Option) -> Result { + let (sender, receiver) = flume::bounded::>(1); let (port, implied_port) = match port { Some(port) => (port, None), @@ -197,15 +204,19 @@ impl Dht { implied_port, }); - self.0.send(ActorMessage::Put(info_hash, request, sender))?; + self.0 + .send(ActorMessage::Put(info_hash, request, sender)) + .map_err(|_| DhtIsShutdown)?; - receiver.recv()? + Ok(receiver + .recv() + .expect("Query was dropped before sending a response, please open an issue.")?) } // === Immutable data === /// Get an Immutable data by its sha1 hash. - pub fn get_immutable(&self, target: Id) -> Result { + pub fn get_immutable(&self, target: Id) -> Result { let (sender, receiver) = flume::unbounded::(); let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { @@ -214,29 +225,37 @@ impl Dht { salt: None, }); - self.0.send(ActorMessage::Get( - target, - request, - ResponseSender::Immutable(sender), - ))?; - - Ok(receiver.recv()?) + self.0 + .send(ActorMessage::Get( + target, + request, + ResponseSender::Immutable(sender), + )) + .map_err(|_| DhtIsShutdown)?; + + Ok(receiver + .recv() + .expect("Query was dropped before sending a response, please open an issue.")) } /// Put an immutable data to the DHT. - pub fn put_immutable(&self, value: Bytes) -> Result { + pub fn put_immutable(&self, value: Bytes) -> Result { let target: Id = hash_immutable(&value).into(); - let (sender, receiver) = flume::bounded::(1); + let (sender, receiver) = flume::bounded::>(1); let request = PutRequestSpecific::PutImmutable(PutImmutableRequestArguments { target, v: value.clone().into(), }); - self.0.send(ActorMessage::Put(target, request, sender))?; + self.0 + .send(ActorMessage::Put(target, request, sender)) + .map_err(|_| DhtIsShutdown)?; - receiver.recv()? + Ok(receiver + .recv() + .expect("Query was dropped before sending a response, please open an issue.")?) } // === Mutable data === @@ -247,25 +266,27 @@ impl Dht { public_key: &[u8; 32], salt: Option, seq: Option, - ) -> Result> { + ) -> Result, DhtIsShutdown> { let target = MutableItem::target_from_key(public_key, &salt); let (sender, receiver) = flume::unbounded::(); let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt }); - let _ = self.0.send(ActorMessage::Get( - target, - request, - ResponseSender::Mutable(sender), - )); + self.0 + .send(ActorMessage::Get( + target, + request, + ResponseSender::Mutable(sender), + )) + .map_err(|_| DhtIsShutdown)?; Ok(receiver.into_iter()) } /// Put a mutable data to the DHT. - pub fn put_mutable(&self, item: MutableItem) -> Result { - let (sender, receiver) = flume::bounded::(1); + pub fn put_mutable(&self, item: MutableItem) -> Result { + let (sender, receiver) = flume::bounded::>(1); let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments { target: *item.target(), @@ -277,11 +298,13 @@ impl Dht { cas: *item.cas(), }); - let _ = self - .0 - .send(ActorMessage::Put(*item.target(), request, sender)); + self.0 + .send(ActorMessage::Put(*item.target(), request, sender)) + .map_err(|_| DhtIsShutdown)?; - receiver.recv()? + Ok(receiver + .recv() + .expect("Query was dropped before sending a response, please open an issue.")?) } } @@ -322,9 +345,9 @@ fn run(mut rpc: Rpc, server: &mut Option>, receiver: Receiver), + Put(Id, PutRequestSpecific, Sender>), Get(Id, RequestTypeSpecific, ResponseSender), - LocalAddr(Sender), + LocalAddr(Sender>), Shutdown(Sender<()>), } @@ -336,7 +359,7 @@ pub struct Testnet { } impl Testnet { - pub fn new(count: usize) -> Result { + pub fn new(count: usize) -> Result { let mut nodes: Vec = vec![]; let mut bootstrap = vec![]; @@ -344,7 +367,9 @@ impl Testnet { if i == 0 { let node = Dht::builder().server().bootstrap(&[]).build()?; - let addr = node.local_addr()?; + let addr = node + .local_addr() + .expect("node should not be shutdown in Testnet"); bootstrap.push(format!("127.0.0.1:{}", addr.port())); nodes.push(node) @@ -358,6 +383,37 @@ impl Testnet { } } +#[derive(thiserror::Error, Debug)] +/// Dht Actor errors +pub enum DhtPutError { + #[error(transparent)] + PutError(#[from] PutError), + + #[error(transparent)] + DhtIsShutdown(#[from] DhtIsShutdown), +} + +#[derive(thiserror::Error, Debug)] +/// Dht Actor errors +pub enum DhtLocalAddrError { + #[error(transparent)] + Io(#[from] std::io::Error), + + #[error(transparent)] + DhtIsShutdown(#[from] DhtIsShutdown), +} + +#[derive(Debug)] +pub struct DhtIsShutdown; + +impl std::error::Error for DhtIsShutdown {} + +impl std::fmt::Display for DhtIsShutdown { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "The Dht was shutdown") + } +} + #[cfg(test)] mod test { use std::str::FromStr; @@ -365,7 +421,6 @@ mod test { use ed25519_dalek::SigningKey; use super::*; - use crate::Error; #[test] fn shutdown() { @@ -379,7 +434,7 @@ mod test { let result = a.get_immutable(Id::random()); - assert!(matches!(result, Err(Error::DhtIsShutdown(_)))) + assert!(matches!(result, Err(DhtIsShutdown))) } #[test] diff --git a/src/error.rs b/src/error.rs deleted file mode 100644 index d43cb536..00000000 --- a/src/error.rs +++ /dev/null @@ -1,69 +0,0 @@ -//! Main Crate Error - -use std::net::SocketAddr; - -use crate::{common::ErrorSpecific, dht::ActorMessage, Id}; - -// Alias Result to be the crate Result. -pub type Result = core::result::Result; - -pub type SocketAddrResult = core::result::Result; - -#[derive(thiserror::Error, Debug)] -/// Mainline crate error enum. -pub enum Error { - #[error("Generic error: {0}")] - Generic(String), - /// For starter, to remove as code matures. - #[error("Static error: {0}")] - Static(&'static str), - - #[error(transparent)] - /// Transparent [std::io::Error] - IO(#[from] std::io::Error), - - // Id - /// Id is expected to by 20 bytes. - #[error("Invalid Id size, expected 20, got {0}")] - InvalidIdSize(usize), - - /// hex encoding issue - #[error("Invalid Id encoding: {0}")] - InvalidIdEncoding(String), - - // DHT messages - /// Errors related to parsing DHT messages. - #[error("Failed to parse packet bytes: {0}")] - BencodeError(#[from] serde_bencode::Error), - - /// Indicates that the message transaction_id is not two bytes. - #[error("Invalid transaction_id: {0:?}")] - InvalidTransactionId(Vec), - - #[error(transparent)] - /// Transparent [flume::RecvError] - Receive(#[from] flume::RecvError), - - #[error(transparent)] - /// The dht was shutdown. - DhtIsShutdown(#[from] flume::SendError), - - #[error("Invalid mutable item signature")] - InvalidMutableSignature, - - #[error("Invalid mutable item public key")] - InvalidMutablePublicKey, - - /// Failed to find any nodes close, usually means dht node failed to bootstrap, - /// so the routing table is empty. Check the machine's access to UDP socket, - /// or find better bootstrapping nodes. - #[error("Failed to find any nodes close to store value at")] - NoClosestNodes, - - #[error("Query Error")] - QueryError(ErrorSpecific), - - #[error("Put query is already inflight to the same target: {0}")] - /// [crate::rpc::Rpc::put] query is already inflight to the same target - PutQueryIsInflight(Id), -} diff --git a/src/lib.rs b/src/lib.rs index 09dfc992..7cdd7b3b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ // Public modules mod common; -mod error; #[cfg(feature = "async")] pub mod async_dht; @@ -15,4 +14,3 @@ pub use bytes::Bytes; pub use dht::{Dht, Testnet}; pub use ed25519_dalek::SigningKey; -pub use error::{Error, Result}; diff --git a/src/rpc.rs b/src/rpc.rs index 9ab40114..8b5320a9 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -20,12 +20,12 @@ use crate::common::{ PutRequestSpecific, RequestSpecific, RequestTypeSpecific, ResponseSpecific, RoutingTable, }; -use crate::error::SocketAddrResult; -use crate::{dht::DhtSettings, Error, Result}; +use crate::dht::DhtSettings; use query::{PutQuery, Query}; use socket::KrpcSocket; pub use crate::common::messages; +pub use query::PutError; pub use socket::DEFAULT_PORT; pub use socket::DEFAULT_REQUEST_TIMEOUT; @@ -69,7 +69,7 @@ pub struct Rpc { impl Rpc { /// Create a new Rpc - pub fn new(settings: &DhtSettings) -> Result { + pub fn new(settings: &DhtSettings) -> Result { // TODO: One day I might implement BEP42 on Routing nodes. let id = Id::random(); @@ -124,7 +124,7 @@ impl Rpc { /// Returns the address the server is listening to. #[inline] - pub fn local_addr(&self) -> SocketAddrResult { + pub fn local_addr(&self) -> Result { self.socket.local_addr() } @@ -252,11 +252,11 @@ impl Rpc { &mut self, target: Id, request: PutRequestSpecific, - sender: Option>, + sender: Option>>, ) { if self.put_queries.contains_key(&target) { if let Some(sender) = sender { - let _ = sender.send(Err(Error::PutQueryIsInflight(target))); + let _ = sender.send(Err(PutError::PutQueryIsInflight(target))); }; debug!(?target, "Put query for the same target is already inflight"); @@ -662,7 +662,3 @@ pub enum ResponseSender { Mutable(Sender), Immutable(Sender), } - -/// Returns the info_hash or target of the operation. -/// Useful for put_immutable. -pub type PutResult = Result; diff --git a/src/rpc/query.rs b/src/rpc/query.rs index b40b54ab..f8033b2c 100644 --- a/src/rpc/query.rs +++ b/src/rpc/query.rs @@ -7,13 +7,12 @@ use flume::Sender; use tracing::{debug, error, info, trace, warn}; use super::socket::KrpcSocket; -use crate::Error; use crate::{ common::{ ErrorSpecific, Id, Node, PutRequest, PutRequestSpecific, RequestSpecific, RequestTypeSpecific, RoutingTable, }, - rpc::{PutResult, Response, ResponseSender}, + rpc::{Response, ResponseSender}, }; /// A query is an iterative process of concurrently sending a request to the closest known nodes to @@ -162,13 +161,17 @@ pub struct PutQuery { /// Nodes that confirmed success stored_at: u8, inflight_requests: Vec, - sender: Option>, + sender: Option>>, request: PutRequestSpecific, error: Option, } impl PutQuery { - pub fn new(target: Id, request: PutRequestSpecific, sender: Option>) -> Self { + pub fn new( + target: Id, + request: PutRequestSpecific, + sender: Option>>, + ) -> Self { Self { target, stored_at: 0, @@ -190,7 +193,7 @@ impl PutQuery { if let Some(sender) = &self.sender { if nodes.is_empty() { - let _ = sender.send(Err(Error::NoClosestNodes)); + let _ = sender.send(Err(PutError::NoClosestNodes)); } } @@ -250,7 +253,7 @@ impl PutQuery { let _ = self .sender .to_owned() - .map(|sender| sender.send(Err(Error::QueryError(error)))); + .map(|sender| sender.send(Err(PutError::ErrorResponse(error)))); } } else { info!(?target, stored_at = ?self.stored_at, "PutQuery Done"); @@ -264,3 +267,22 @@ impl PutQuery { false } } + +#[derive(thiserror::Error, Debug)] +/// Query errors +pub enum PutError { + /// Failed to find any nodes close, usually means dht node failed to bootstrap, + /// so the routing table is empty. Check the machine's access to UDP socket, + /// or find better bootstrapping nodes. + #[error("Failed to find any nodes close to store value at")] + NoClosestNodes, + + /// Put Query faild to store at any nodes, and got at least one + /// 3xx error response + #[error("Query Error Response")] + ErrorResponse(ErrorSpecific), + + /// [crate::rpc::Rpc::put] query is already inflight to the same target + #[error("Put query is already inflight to the same target: {0}")] + PutQueryIsInflight(Id), +} diff --git a/src/rpc/socket.rs b/src/rpc/socket.rs index c1865618..e9eec2e2 100644 --- a/src/rpc/socket.rs +++ b/src/rpc/socket.rs @@ -7,8 +7,7 @@ use tracing::{debug, trace}; use crate::common::{ErrorSpecific, Message, MessageType, RequestSpecific, ResponseSpecific}; -use crate::error::SocketAddrResult; -use crate::{dht::DhtSettings, Result}; +use crate::dht::DhtSettings; const VERSION: [u8; 4] = [82, 83, 0, 1]; // "RS" version 01 const MTU: usize = 2048; @@ -37,7 +36,7 @@ pub struct InflightRequest { } impl KrpcSocket { - pub fn new(settings: &DhtSettings) -> Result { + pub fn new(settings: &DhtSettings) -> Result { let socket = if let Some(port) = settings.port { UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))? } else { @@ -62,7 +61,7 @@ impl KrpcSocket { /// Returns the address the server is listening to. #[inline] - pub fn local_addr(&self) -> SocketAddrResult { + pub fn local_addr(&self) -> Result { self.socket.local_addr() } @@ -229,13 +228,25 @@ impl KrpcSocket { } /// Send a raw dht message - fn send(&mut self, address: SocketAddr, message: Message) -> Result<()> { + fn send(&mut self, address: SocketAddr, message: Message) -> Result<(), SendMessageError> { trace!(?message, "Sending a message"); self.socket.send_to(&message.to_bytes()?, address)?; Ok(()) } } +#[derive(thiserror::Error, Debug)] +/// Mainline crate error enum. +pub enum SendMessageError { + /// Errors related to parsing DHT messages. + #[error("Failed to parse packet bytes: {0}")] + BencodeError(#[from] serde_bencode::Error), + + #[error(transparent)] + /// Transparent [std::io::Error] + IO(#[from] std::io::Error), +} + // Same as SocketAddr::eq but ingores the ip if it is unspecified for testing reasons. fn compare_socket_addr(a: &SocketAddr, b: &SocketAddr) -> bool { if a.port() != b.port() {