Skip to content

Commit

Permalink
Merge pull request #8 from Nuhvi/feat/server_44
Browse files Browse the repository at this point in the history
Feat: server side storage
  • Loading branch information
Nuhvi authored Feb 18, 2024
2 parents 26f8932 + 2ec58bd commit c840d0d
Show file tree
Hide file tree
Showing 20 changed files with 768 additions and 318 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mainline"
version = "1.1.0"
version = "1.2.0"
authors = ["nuh.dev"]
edition = "2018"
description = "Simple, robust, BitTorrent's Mainline DHT implementation"
Expand All @@ -22,6 +22,7 @@ flume = { version = "0.11.0", features = ["select", "eventual-fairness"], defaul
ed25519-dalek = "2.1.0"
bytes = "1.5.0"
tracing = "0.1"
lru = { version = "0.12.2", default-features = false }

[dev-dependencies]
clap = { version = "4.4.8", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ let dht = Dht::client(); // or Dht::default();

Supported BEPs:
- [x] [BEP0005 DHT Protocol](https://www.bittorrent.org/beps/bep_0005.html)
- [X] [BEP0042 DHT Security extension](https://www.bittorrent.org/beps/bep_0042.html)
- [x] [BEP0042 DHT Security extension](https://www.bittorrent.org/beps/bep_0042.html)
- [x] [BEP0043 Read-only DHT Nodes](https://www.bittorrent.org/beps/bep_0043.html)
- [x] [BEP0044 Storing arbitrary data in the DHT](https://www.bittorrent.org/beps/bep_0044.html)

Expand All @@ -44,7 +44,7 @@ Supported BEPs:
- [x] [BEP0005 DHT Protocol](https://www.bittorrent.org/beps/bep_0005.html)
- [ ] [BEP0042 DHT Security extension](https://www.bittorrent.org/beps/bep_0042.html)
- [x] [BEP0043 Read-only DHT Nodes](https://www.bittorrent.org/beps/bep_0043.html)
- [ ] [BEP0044 Storing arbitrary data in the DHT](https://www.bittorrent.org/beps/bep_0044.html)
- [x] [BEP0044 Storing arbitrary data in the DHT](https://www.bittorrent.org/beps/bep_0044.html)


## Acknowledgment
Expand Down
49 changes: 44 additions & 5 deletions src/async_dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
use bytes::Bytes;

use crate::common::{
hash_immutable, GetImmutableResponse, GetMutableResponse, GetPeerResponse, Id, MutableItem,
Node, Response, ResponseDone, ResponseMessage, StoreQueryMetdata,
};
use crate::common::{hash_immutable, Id, MutableItem, Node, RoutingTable};
use crate::dht::ActorMessage;
use crate::routing_table::RoutingTable;
use crate::rpc::{
GetImmutableResponse, GetMutableResponse, GetPeerResponse, Response, ResponseDone,
ResponseMessage, StoreQueryMetdata,
};
use crate::{Dht, Result};
use std::net::SocketAddr;

Expand Down Expand Up @@ -202,3 +202,42 @@ impl<T> Response<T> {
}
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use super::*;
use crate::Testnet;

#[cfg(feature = "async")]
#[test]
fn announce_get_peer_async() {
async fn test() {
let testnet = Testnet::new(10);

let a = Dht::builder()
.bootstrap(&testnet.bootstrap)
.build()
.as_async();
let b = Dht::builder()
.bootstrap(&testnet.bootstrap)
.build()
.as_async();

let info_hash = Id::random();

match a.announce_peer(info_hash, Some(45555)).await {
Ok(_) => {
if let Some(r) = b.get_peers(info_hash).next_async().await {
assert_eq!(r.peer.port(), 45555);
} else {
panic!("No respnoses")
}
}
Err(_) => {}
};
}
futures::executor::block_on(test());
}
}
4 changes: 2 additions & 2 deletions src/common/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Id {
let mut rng = rand::thread_rng();
let r: u8 = rng.gen();

let mut bytes: [u8; 20] = rng.gen();
let bytes: [u8; 20] = rng.gen();

match ip {
IpAddr::V4(addr) => from_ipv4_and_r(bytes, addr, r),
Expand All @@ -96,7 +96,7 @@ impl Id {

expected == actual
}
IpAddr::V6(ipv6) => {
IpAddr::V6(_ipv6) => {
unimplemented!()

// // For IPv6, checking the ULA range fc00::/7
Expand Down
4 changes: 2 additions & 2 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ mod id;
mod immutable;
mod mutable;
mod node;
mod response;
mod routing_table;

pub use id::*;
pub use immutable::*;
pub use mutable::*;
pub use node::*;
pub use response::*;
pub use routing_table::*;
15 changes: 15 additions & 0 deletions src/common/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct MutableItem {
signature: [u8; 64],
/// Optional salt
salt: Option<Bytes>,
/// Optional compare and swap seq
cas: Option<i64>,
}

impl MutableItem {
Expand All @@ -38,6 +40,12 @@ impl MutableItem {
)
}

/// Set the cas number if needed.
pub fn with_cas(mut self, cas: i64) -> Self {
self.cas = Some(cas);
self
}

/// Create a new mutable item from an already signed value.
pub fn new_signed_unchecked(
key: [u8; 32],
Expand All @@ -53,6 +61,7 @@ impl MutableItem {
seq,
signature,
salt,
cas: None,
}
}

Expand All @@ -63,6 +72,7 @@ impl MutableItem {
seq: &i64,
signature: &[u8],
salt: &Option<Bytes>,
cas: &Option<i64>,
) -> Result<Self> {
let key = VerifyingKey::try_from(key).map_err(|_| Error::InvalidMutablePublicKey)?;

Expand All @@ -79,6 +89,7 @@ impl MutableItem {
seq: *seq,
signature: signature.to_bytes(),
salt: salt.to_owned(),
cas: *cas,
})
}

Expand Down Expand Up @@ -107,6 +118,10 @@ impl MutableItem {
pub fn salt(&self) -> &Option<Bytes> {
&self.salt
}

pub fn cas(&self) -> &Option<i64> {
&self.cas
}
}

pub fn target_from_key(public_key: &[u8; 32], salt: &Option<Bytes>) -> Id {
Expand Down
5 changes: 1 addition & 4 deletions src/routing_table.rs → src/common/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,7 @@ mod test {
use std::net::SocketAddr;
use std::str::FromStr;

use crate::{
common::{Id, Node},
routing_table::{KBucket, RoutingTable, MAX_BUCKET_SIZE_K},
};
use crate::common::{Id, KBucket, Node, RoutingTable, MAX_BUCKET_SIZE_K};

#[test]
fn table_is_empty() {
Expand Down
119 changes: 81 additions & 38 deletions src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use bytes::Bytes;
use flume::{Receiver, Sender};

use crate::{
common::{
hash_immutable, target_from_key, GetImmutableResponse, GetMutableResponse, GetPeerResponse,
Id, MutableItem, Node, Response, ResponseMessage, ResponseSender, StoreQueryMetdata,
common::{hash_immutable, target_from_key, Id, MutableItem, Node, RoutingTable},
rpc::{
GetImmutableResponse, GetMutableResponse, GetPeerResponse, Response, ResponseMessage,
ResponseSender, Rpc, StoreQueryMetdata,
},
routing_table::RoutingTable,
rpc::Rpc,
Result,
};

Expand Down Expand Up @@ -460,8 +459,11 @@ impl Testnet {

#[cfg(test)]
mod test {
use std::str::FromStr;
use std::time::Duration;

use ed25519_dalek::SigningKey;

use super::*;

#[test]
Expand All @@ -478,6 +480,18 @@ mod test {
dht.block_until_shutdown();
}

#[test]
fn bind_twice() {
let a = Dht::default();
let b = Dht::builder()
.port(a.local_addr().unwrap().port())
.as_server()
.build();

let result = b.handle.unwrap().join();
assert!(result.is_err());
}

#[test]
fn announce_get_peer() {
let testnet = Testnet::new(10);
Expand All @@ -504,46 +518,75 @@ mod test {
};
}

#[cfg(feature = "async")]
#[test]
fn announce_get_peer_async() {
async fn test() {
let testnet = Testnet::new(10);

let a = Dht::builder()
.bootstrap(&testnet.bootstrap)
.build()
.as_async();
let b = Dht::builder()
.bootstrap(&testnet.bootstrap)
.build()
.as_async();

let info_hash = Id::random();

match a.announce_peer(info_hash, Some(45555)).await {
Ok(_) => {
if let Some(r) = b.get_peers(info_hash).next_async().await {
assert_eq!(r.peer.port(), 45555);
} else {
fn put_get_immutable() {
let testnet = Testnet::new(10);

let a = Dht::builder().bootstrap(&testnet.bootstrap).build();
let b = Dht::builder().bootstrap(&testnet.bootstrap).build();

let value: Bytes = "Hello World!".into();
let expected_target = Id::from_str("e5f96f6f38320f0f33959cb4d3d656452117aadb").unwrap();

match a.put_immutable(value.clone()) {
Ok(result) => {
assert_ne!(result.stored_at().len(), 0);
assert_eq!(result.target(), expected_target);

let responses: Vec<_> = b.get_immutable(result.target()).collect();

match responses.first() {
Some(r) => {
assert_eq!(r.value, value);
}
None => {
panic!("No respnoses")
}
}
Err(_) => {}
};
}
futures::executor::block_on(test());
}
Err(_) => {
panic!("Expected put_immutable to succeeed")
}
};
}

#[test]
fn bind_twice() {
let a = Dht::default();
let b = Dht::builder()
.port(a.local_addr().unwrap().port())
.as_server()
.build();
fn put_get_mutable() {
let testnet = Testnet::new(10);

let result = b.handle.unwrap().join();
assert!(result.is_err());
let a = Dht::builder().bootstrap(&testnet.bootstrap).build();
let b = Dht::builder().bootstrap(&testnet.bootstrap).build();

let signer = SigningKey::from_bytes(&[
56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
]);

let seq = 1000;
let value: Bytes = "Hello World!".into();

let item = MutableItem::new(signer.clone(), value, seq, None);

match a.put_mutable(item.clone()) {
Ok(result) => {
assert_ne!(result.stored_at().len(), 0);

let responses: Vec<_> = b
.get_mutable(signer.verifying_key().as_bytes(), None)
.collect();

match responses.first() {
Some(r) => {
assert_eq!(&r.item, &item);
}
None => {
panic!("No respnoses")
}
}
}
Err(_) => {
panic!("Expected put_immutable to succeeed")
}
};
}
}
5 changes: 0 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@ pub mod common;
pub mod dht;
pub mod error;
mod messages;
mod peers;
mod query;
mod routing_table;
mod rpc;
mod socket;
mod tokens;

pub use crate::common::Id;
pub use crate::error::Error;
Expand Down
Loading

0 comments on commit c840d0d

Please sign in to comment.