Skip to content

Commit

Permalink
feat: closes #23 more granular errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Oct 9, 2024
1 parent 51ebed1 commit e75ae9e
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 239 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ All notable changes to mainline dht will be documented in this file.
- `Dht::shutdown()` is now idempotent, and returns `()`.
- `AsyncDht::shutdown()` is now idempotent, and returns `()`.
- `Rpc::drop` uses `tracing::debug!()` to log dropping the Rpc.
- `Id::as_bytes()` instead of exposing internal `bytes` property.
- Replace crate `Error` with more granular errors.
- Replace Flume's `RecvError` with `expect()` message, since the sender should never be dropped to soon.
- `DhtIsShutdown` error is a standalone error.
- `InvalidIdSize` error is a standalone error.

### Removed

- Removed `mainline::error::Error` and `mainline::error::Result`
119 changes: 75 additions & 44 deletions src/async_dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ use crate::{
GetValueRequestArguments, Id, MutableItem, PutImmutableRequestArguments,
PutMutableRequestArguments, PutRequestSpecific, RequestTypeSpecific,
},
dht::{ActorMessage, Dht},
error::SocketAddrResult,
rpc::{PutResult, ResponseSender},
Result,
dht::{ActorMessage, Dht, DhtIsShutdown, DhtLocalAddrError, DhtPutError},
rpc::{PutError, ResponseSender},
};

impl Dht {
Expand All @@ -33,12 +31,18 @@ impl AsyncDht {
///
/// Returns an error if the actor is shutdown, or if the [std::net::UdpSocket::local_addr]
/// returned an IO error.
pub async fn local_addr(&self) -> Result<SocketAddr> {
let (sender, receiver) = flume::bounded::<SocketAddrResult>(1);
pub async fn local_addr(&self) -> Result<SocketAddr, DhtLocalAddrError> {
let (sender, receiver) = flume::bounded::<Result<SocketAddr, std::io::Error>>(1);

self.0 .0.send(ActorMessage::LocalAddr(sender))?;
self.0
.0
.send(ActorMessage::LocalAddr(sender))
.map_err(|_| DhtIsShutdown)?;

Ok(receiver.recv_async().await??)
Ok(receiver
.recv_async()
.await
.expect("Query was dropped before sending a response, please open an issue.")?)
}

// === Public Methods ===
Expand All @@ -62,7 +66,10 @@ impl AsyncDht {
/// for Bittorrent is that any peer will introduce you to more peers through "peer exchange"
/// so if you are implementing something different from Bittorrent, you might want
/// to implement your own logic for gossipping more peers after you discover the first ones.
pub fn get_peers(&self, info_hash: Id) -> Result<flume::r#async::RecvStream<Vec<SocketAddr>>> {
pub fn get_peers(
&self,
info_hash: Id,
) -> Result<flume::r#async::RecvStream<Vec<SocketAddr>>, DhtIsShutdown> {
// Get requests use unbounded channels to avoid blocking in the run loop.
// Other requests like put_* and getters don't need that and is ok with
// bounded channel with 1 capacity since it only ever sends one message back.
Expand All @@ -72,11 +79,14 @@ impl AsyncDht {

let request = RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash });

self.0 .0.send(ActorMessage::Get(
info_hash,
request,
ResponseSender::Peers(sender),
))?;
self.0
.0
.send(ActorMessage::Get(
info_hash,
request,
ResponseSender::Peers(sender),
))
.map_err(|_| DhtIsShutdown)?;

Ok(receiver.into_stream())
}
Expand All @@ -86,8 +96,8 @@ impl AsyncDht {
/// The peer will be announced on this process IP.
/// If explicit port is passed, it will be used, otherwise the port will be implicitly
/// assumed by remote nodes to be the same ase port they recieved the request from.
pub async fn announce_peer(&self, info_hash: Id, port: Option<u16>) -> Result<Id> {
let (sender, receiver) = flume::bounded::<PutResult>(1);
pub async fn announce_peer(&self, info_hash: Id, port: Option<u16>) -> Result<Id, DhtPutError> {
let (sender, receiver) = flume::bounded::<Result<Id, PutError>>(1);

let (port, implied_port) = match port {
Some(port) => (port, None),
Expand All @@ -102,15 +112,19 @@ impl AsyncDht {

self.0
.0
.send(ActorMessage::Put(info_hash, request, sender))?;
.send(ActorMessage::Put(info_hash, request, sender))
.map_err(|_| DhtIsShutdown)?;

receiver.recv_async().await?
Ok(receiver
.recv_async()
.await
.expect("Query was dropped before sending a response, please open an issue.")?)
}

// === Immutable data ===

/// Get an Immutable data by its sha1 hash.
pub async fn get_immutable(&self, target: Id) -> Result<Bytes> {
pub async fn get_immutable(&self, target: Id) -> Result<Bytes, DhtIsShutdown> {
let (sender, receiver) = flume::unbounded::<Bytes>();

let request = RequestTypeSpecific::GetValue(GetValueRequestArguments {
Expand All @@ -119,29 +133,41 @@ impl AsyncDht {
salt: None,
});

self.0 .0.send(ActorMessage::Get(
target,
request,
ResponseSender::Immutable(sender),
))?;

Ok(receiver.recv_async().await?)
self.0
.0
.send(ActorMessage::Get(
target,
request,
ResponseSender::Immutable(sender),
))
.map_err(|_| DhtIsShutdown)?;

Ok(receiver
.recv_async()
.await
.expect("Query was dropped before sending a response, please open an issue."))
}

/// Put an immutable data to the DHT.
pub async fn put_immutable(&self, value: Bytes) -> Result<Id> {
pub async fn put_immutable(&self, value: Bytes) -> Result<Id, DhtPutError> {
let target: Id = hash_immutable(&value).into();

let (sender, receiver) = flume::bounded::<PutResult>(1);
let (sender, receiver) = flume::bounded::<Result<Id, PutError>>(1);

let request = PutRequestSpecific::PutImmutable(PutImmutableRequestArguments {
target,
v: value.clone().into(),
});

self.0 .0.send(ActorMessage::Put(target, request, sender))?;
self.0
.0
.send(ActorMessage::Put(target, request, sender))
.map_err(|_| DhtIsShutdown)?;

receiver.recv_async().await?
Ok(receiver
.recv_async()
.await
.expect("Query was dropped before sending a response, please open an issue.")?)
}

// === Mutable data ===
Expand All @@ -152,25 +178,28 @@ impl AsyncDht {
public_key: &[u8; 32],
salt: Option<Bytes>,
seq: Option<i64>,
) -> Result<flume::r#async::RecvStream<MutableItem>> {
) -> Result<flume::r#async::RecvStream<MutableItem>, DhtIsShutdown> {
let target = MutableItem::target_from_key(public_key, &salt);

let (sender, receiver) = flume::unbounded::<MutableItem>();

let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt });

let _ = self.0 .0.send(ActorMessage::Get(
target,
request,
ResponseSender::Mutable(sender),
));
self.0
.0
.send(ActorMessage::Get(
target,
request,
ResponseSender::Mutable(sender),
))
.map_err(|_| DhtIsShutdown)?;

Ok(receiver.into_stream())
}

/// Put a mutable data to the DHT.
pub async fn put_mutable(&self, item: MutableItem) -> Result<Id> {
let (sender, receiver) = flume::bounded::<PutResult>(1);
pub async fn put_mutable(&self, item: MutableItem) -> Result<Id, DhtPutError> {
let (sender, receiver) = flume::bounded::<Result<Id, PutError>>(1);

let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments {
target: *item.target(),
Expand All @@ -182,12 +211,15 @@ impl AsyncDht {
cas: *item.cas(),
});

let _ = self
.0
self.0
.0
.send(ActorMessage::Put(*item.target(), request, sender));
.send(ActorMessage::Put(*item.target(), request, sender))
.map_err(|_| DhtIsShutdown)?;

receiver.recv_async().await?
Ok(receiver
.recv_async()
.await
.expect("Query was dropped before sending a response, please open an issue.")?)
}
}

Expand All @@ -201,7 +233,6 @@ mod test {
use crate::dht::Testnet;

use super::*;
use crate::Error;

#[test]
fn shutdown() {
Expand All @@ -216,7 +247,7 @@ mod test {

let result = a.get_immutable(Id::random()).await;

assert!(matches!(result, Err(Error::DhtIsShutdown(_))))
assert!(matches!(result, Err(DhtIsShutdown)))
}
futures::executor::block_on(test());
}
Expand Down
50 changes: 36 additions & 14 deletions src/common/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use std::{
str::FromStr,
};

use crate::{Error, Result};

/// The size of node IDs in bits.
pub const ID_SIZE: usize = 20;
pub const MAX_DISTANCE: u8 = ID_SIZE as u8 * 8;
Expand All @@ -29,10 +27,10 @@ impl Id {
Id(bytes)
}
/// Create a new Id from some bytes. Returns Err if the input is not 20 bytes long.
pub fn from_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<Id> {
pub fn from_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<Id, InvalidIdSize> {
let bytes = bytes.as_ref();
if bytes.len() != ID_SIZE {
return Err(Error::InvalidIdSize(bytes.len()));
return Err(InvalidIdSize(bytes.len()));
}

let mut tmp: [u8; ID_SIZE] = [0; ID_SIZE];
Expand Down Expand Up @@ -120,7 +118,7 @@ fn from_ipv4_and_r(bytes: [u8; 20], ip: Ipv4Addr, r: u8) -> Id {
// Set first 21 bits to the prefix
bytes[0] = prefix[0];
bytes[1] = prefix[1];
// set the first 5 bits of the 3 byte to the remaining 5 bits of the prefix
// set the first 5 bits of the 3rd byte to the remaining 5 bits of the prefix
bytes[2] = (prefix[2] & 0xf8) | (bytes[2] & 0x7);

// Set the last byte to the random r
Expand All @@ -132,10 +130,10 @@ fn from_ipv4_and_r(bytes: [u8; 20], ip: Ipv4Addr, r: u8) -> Id {
fn id_prefix_ipv4(ip: &Ipv4Addr, r: u8) -> [u8; 3] {
let r32: u32 = r.into();
let ip_int: u32 = u32::from_be_bytes(ip.octets());
let nonsense: u32 = (ip_int & IPV4_MASK) | (r32 << 29);
let masked_ip: u32 = (ip_int & IPV4_MASK) | (r32 << 29);

let mut digest = CASTAGNOLI.digest();
digest.update(&nonsense.to_be_bytes());
digest.update(&masked_ip.to_be_bytes());

let crc = digest.finalize();

Expand Down Expand Up @@ -172,13 +170,11 @@ impl From<Id> for [u8; ID_SIZE] {
}

impl FromStr for Id {
type Err = Error;
type Err = DecodeIdError;

fn from_str(s: &str) -> Result<Id> {
fn from_str(s: &str) -> Result<Id, DecodeIdError> {
if s.len() % 2 != 0 {
return Err(Error::InvalidIdEncoding(
"Number of Hex characters should be even".into(),
));
return Err(DecodeIdError::OddNumberOfCharacters);
}

let mut bytes = Vec::with_capacity(s.len() / 2);
Expand All @@ -188,11 +184,11 @@ impl FromStr for Id {
if let Ok(byte) = u8::from_str_radix(byte_str, 16) {
bytes.push(byte);
} else {
return Err(Error::Static("Invalid hex character")); // Invalid hex character
return Err(DecodeIdError::InvalidHexCharacter(byte_str.into()));
}
}

Id::from_bytes(bytes)
Ok(Id::from_bytes(bytes)?)
}
}

Expand All @@ -202,6 +198,32 @@ impl Debug for Id {
}
}

#[derive(Debug)]
pub struct InvalidIdSize(usize);

impl std::error::Error for InvalidIdSize {}

impl std::fmt::Display for InvalidIdSize {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Invalid Id size, expected 20, got {0}", self.0)
}
}

#[derive(thiserror::Error, Debug)]
/// Mainline crate error enum.
pub enum DecodeIdError {
/// Id is expected to by 20 bytes.
#[error(transparent)]
InvalidIdSize(#[from] InvalidIdSize),

#[error("Hex encoding should contain an even number of hex characters")]
OddNumberOfCharacters,

/// Invalid hex character
#[error("Invalid Id encoding: {0}")]
InvalidHexCharacter(String),
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading

0 comments on commit e75ae9e

Please sign in to comment.