Skip to content

Commit

Permalink
feat: use bytes::Bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Nov 22, 2023
1 parent 148905e commit 8b1fa1d
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ crc = "3.0.1"
sha1_smol = "1.0.0"
flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false }
ed25519-dalek = "2.1.0"
bytes = "1.5.0"

[dev-dependencies]
clap = { version = "4.4.8", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion examples/get_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() {
// No need to stream responses, just print the first result, since
// all immutable data items are guaranteed to be the same.

match String::from_utf8(res.value.clone()) {
match String::from_utf8(res.value.to_vec()) {
Ok(string) => {
println!("Got immutable data: {:?} | from: {:?}", string, res.from);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/get_mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn main() {

count += 1;

match String::from_utf8(res.item.value().clone()) {
match String::from_utf8(res.item.value().to_vec()) {
Ok(string) => {
println!(
"Got mutable item: {:?}, seq: {:?} | from: {:?}",
Expand Down
2 changes: 1 addition & 1 deletion examples/put_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {
println!("\nStoring immutable data: {} ...\n", cli.value);

let metadata = dht
.put_immutable(cli.value.as_bytes().to_vec())
.put_immutable(cli.value.into())
.expect("put immutable failed");
println!(
"Stored immutable data as {:?} in {:?} seconds",
Expand Down
2 changes: 1 addition & 1 deletion examples/put_mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn main() {
.expect("time drift")
.as_micros() as i64;

let item = MutableItem::new(signer, cli.value.as_bytes().to_vec(), seq, None);
let item = MutableItem::new(signer, cli.value.as_bytes().to_owned().into(), seq, None);

let metadata = dht.put_mutable(item).expect("put mutable failed");

Expand Down
7 changes: 4 additions & 3 deletions src/async_dht.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Dht node with async api.
use bytes::Bytes;
use ed25519_dalek::VerifyingKey;

use crate::common::{
Expand Down Expand Up @@ -91,7 +92,7 @@ impl AsyncDht {
}

/// Async version of [put_immutable](Dht::put_immutable).
pub async fn put_immutable(&self, value: Vec<u8>) -> Result<StoreQueryMetdata> {
pub async fn put_immutable(&self, value: Bytes) -> Result<StoreQueryMetdata> {
let target = Id::from_bytes(hash_immutable(&value)).unwrap();

let (sender, receiver) = flume::unbounded::<ResponseMessage<GetImmutableResponse>>();
Expand All @@ -113,7 +114,7 @@ impl AsyncDht {
pub async fn put_immutable_to(
&self,
target: Id,
value: Vec<u8>,
value: Bytes,
nodes: Vec<Node>,
) -> Result<StoreQueryMetdata> {
let (sender, receiver) = flume::bounded::<StoreQueryMetdata>(1);
Expand All @@ -132,7 +133,7 @@ impl AsyncDht {
pub async fn get_mutable(
&self,
public_key: VerifyingKey,
salt: Option<Vec<u8>>,
salt: Option<Bytes>,
) -> Response<GetMutableResponse> {
self.0.get_mutable(public_key, salt)
}
Expand Down
48 changes: 26 additions & 22 deletions src/common/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Helper functions and structs for mutable items.
use bytes::Bytes;
use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
use sha1_smol::Sha1;
use std::convert::TryFrom;
Expand All @@ -15,16 +16,16 @@ pub struct MutableItem {
/// sequence number
seq: i64,
/// mutable value
value: Vec<u8>,
value: Bytes,
/// ed25519 signature
signature: [u8; 64],
/// Optional salt
salt: Option<Vec<u8>>,
salt: Option<Bytes>,
}

impl MutableItem {
/// Create a new mutable item from a signing key, value, sequence number and optional salt.
pub fn new(signer: SigningKey, value: Vec<u8>, seq: i64, salt: Option<Vec<u8>>) -> Self {
pub fn new(signer: SigningKey, value: Bytes, seq: i64, salt: Option<Bytes>) -> Self {
let signable = encode_signable(&seq, &value, &salt);
let signature = signer.sign(&signable);

Expand All @@ -41,9 +42,9 @@ impl MutableItem {
pub fn new_signed_unchecked(
key: [u8; 32],
signature: [u8; 64],
value: Vec<u8>,
value: Bytes,
seq: i64,
salt: Option<Vec<u8>>,
salt: Option<Bytes>,
) -> Self {
Self {
target: target_from_key(&key, &salt),
Expand All @@ -57,28 +58,27 @@ impl MutableItem {

pub(crate) fn from_dht_message(
target: &Id,
key: &Vec<u8>,
v: &[u8],
key: &[u8],
v: Bytes,
seq: &i64,
signature: &[u8],
salt: &Option<Vec<u8>>,
salt: &Option<Bytes>,
) -> Result<Self> {
let key =
VerifyingKey::try_from(key.as_slice()).map_err(|_| Error::InvalidMutablePublicKey)?;
let key = VerifyingKey::try_from(key).map_err(|_| Error::InvalidMutablePublicKey)?;

let signature =
Signature::from_slice(signature).map_err(|_| Error::InvalidMutableSignature)?;

key.verify(&encode_signable(seq, v, salt), &signature)
key.verify(&encode_signable(seq, &v, salt), &signature)
.map_err(|_| Error::InvalidMutableSignature)?;

Ok(Self {
target: *target,
key: key.to_bytes(),
value: v.to_owned(),
value: v,
seq: *seq,
signature: signature.to_bytes(),
salt: salt.clone(),
salt: salt.to_owned(),
})
}

Expand All @@ -92,7 +92,7 @@ impl MutableItem {
&self.key
}

pub fn value(&self) -> &Vec<u8> {
pub fn value(&self) -> &Bytes {
&self.value
}

Expand All @@ -104,12 +104,12 @@ impl MutableItem {
&self.signature
}

pub fn salt(&self) -> &Option<Vec<u8>> {
pub fn salt(&self) -> &Option<Bytes> {
&self.salt
}
}

pub fn target_from_key(public_key: &[u8; 32], salt: &Option<Vec<u8>>) -> Id {
pub fn target_from_key(public_key: &[u8; 32], salt: &Option<Bytes>) -> Id {
let mut encoded = vec![];

encoded.extend(public_key);
Expand All @@ -125,7 +125,7 @@ pub fn target_from_key(public_key: &[u8; 32], salt: &Option<Vec<u8>>) -> Id {
Id::from_bytes(hash).unwrap()
}

pub fn encode_signable(seq: &i64, value: &[u8], salt: &Option<Vec<u8>>) -> Vec<u8> {
pub fn encode_signable(seq: &i64, value: &Bytes, salt: &Option<Bytes>) -> Bytes {
let mut signable = vec![];

if let Some(salt) = salt {
Expand All @@ -136,7 +136,7 @@ pub fn encode_signable(seq: &i64, value: &[u8], salt: &Option<Vec<u8>>) -> Vec<u
signable.extend(format!("3:seqi{}e1:v{}:", seq, value.len()).into_bytes());
signable.extend(value);

signable
signable.into()
}

#[cfg(test)]
Expand All @@ -145,14 +145,18 @@ mod tests {

#[test]
fn signable_without_salt() {
let signable = encode_signable(&4, &b"Hello world!".to_vec(), &None);
let signable = encode_signable(&4, &Bytes::from_static(b"Hello world!"), &None);

assert_eq!(signable, b"3:seqi4e1:v12:Hello world!");
assert_eq!(&*signable, b"3:seqi4e1:v12:Hello world!");
}
#[test]
fn signable_with_salt() {
let signable = encode_signable(&4, &b"Hello world!".to_vec(), &Some(b"foobar".to_vec()));
let signable = encode_signable(
&4,
&Bytes::from_static(b"Hello world!"),
&Some(Bytes::from_static(b"foobar")),
);

assert_eq!(signable, b"4:salt6:foobar3:seqi4e1:v12:Hello world!");
assert_eq!(&*signable, b"4:salt6:foobar3:seqi4e1:v12:Hello world!");
}
}
3 changes: 2 additions & 1 deletion src/common/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A struct to iterate on incoming responses for a query.
use bytes::Bytes;
use flume::{Receiver, Sender};
use std::net::SocketAddr;

Expand Down Expand Up @@ -73,7 +74,7 @@ pub struct GetPeerResponse {
#[derive(Clone, Debug)]
pub struct GetImmutableResponse {
pub from: Node,
pub value: Vec<u8>,
pub value: Bytes,
}

#[derive(Clone, Debug)]
Expand Down
11 changes: 6 additions & 5 deletions src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
thread::{self, JoinHandle},
};

use bytes::Bytes;
use ed25519_dalek::VerifyingKey;
use flume::{Receiver, Sender};

Expand Down Expand Up @@ -265,7 +266,7 @@ impl Dht {
}

/// Put an immutable data to the DHT.
pub fn put_immutable(&self, value: Vec<u8>) -> Result<StoreQueryMetdata> {
pub fn put_immutable(&self, value: Bytes) -> Result<StoreQueryMetdata> {
let target = Id::from_bytes(hash_immutable(&value)).unwrap();

let (sender, receiver) = flume::unbounded::<ResponseMessage<GetImmutableResponse>>();
Expand All @@ -284,7 +285,7 @@ impl Dht {
pub fn put_immutable_to(
&self,
target: Id,
value: Vec<u8>,
value: Bytes,
nodes: Vec<Node>,
) -> Result<StoreQueryMetdata> {
let (sender, receiver) = flume::bounded::<StoreQueryMetdata>(1);
Expand All @@ -302,7 +303,7 @@ impl Dht {
pub fn get_mutable(
&self,
public_key: VerifyingKey,
salt: Option<Vec<u8>>,
salt: Option<Bytes>,
) -> Response<GetMutableResponse> {
let target = target_from_key(&public_key.to_bytes(), &salt);

Expand Down Expand Up @@ -418,11 +419,11 @@ pub(crate) enum ActorMessage {
AnnouncePeer(Id, Vec<Node>, Option<u16>, Sender<StoreQueryMetdata>),

GetImmutable(Id, Sender<ResponseMessage<GetImmutableResponse>>),
PutImmutable(Id, Vec<u8>, Vec<Node>, Sender<StoreQueryMetdata>),
PutImmutable(Id, Bytes, Vec<Node>, Sender<StoreQueryMetdata>),

GetMutable(
Id,
Option<Vec<u8>>,
Option<Bytes>,
Sender<ResponseMessage<GetMutableResponse>>,
),
PutMutable(MutableItem, Vec<Node>, Sender<StoreQueryMetdata>),
Expand Down
4 changes: 3 additions & 1 deletion src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod internal;
use std::convert::TryInto;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use bytes::Bytes;

use crate::common::{Id, Node, ID_SIZE};
use crate::{Error, Result};

Expand Down Expand Up @@ -150,7 +152,7 @@ pub struct GetMutableRequestArguments {
// A bit of a hack, using this to carry an optional
// salt in the query.request field of [crate::query]
// not really encoded, decoded or sent over the wire.
pub salt: Option<Vec<u8>>,
pub salt: Option<Bytes>,
}

#[derive(Debug, PartialEq, Clone)]
Expand Down
23 changes: 15 additions & 8 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::collections::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;

use bytes::Bytes;

use crate::common::{
validate_immutable, GetImmutableResponse, GetMutableResponse, GetPeerResponse, Id, MutableItem,
Node, ResponseSender, ResponseValue,
Expand Down Expand Up @@ -202,7 +204,7 @@ impl Rpc {
pub fn put_immutable(
&mut self,
target: Id,
value: Vec<u8>,
value: Bytes,
nodes: Vec<Node>,
sender: ResponseSender,
) {
Expand All @@ -216,7 +218,7 @@ impl Rpc {
requester_id: self.id,
target,
token,
v: value.clone(),
v: value.clone().into(),
}),
&mut self.socket,
);
Expand All @@ -226,7 +228,7 @@ impl Rpc {
self.store_queries.insert(target, query);
}

pub fn get_mutable(&mut self, target: Id, salt: Option<Vec<u8>>, sender: ResponseSender) {
pub fn get_mutable(&mut self, target: Id, salt: Option<Bytes>, sender: ResponseSender) {
self.query(
target,
RequestSpecific::GetMutable(GetMutableRequestArguments {
Expand All @@ -249,7 +251,7 @@ impl Rpc {
requester_id: self.id,
target: *item.target(),
token,
v: item.value().clone(),
v: item.value().clone().into(),
k: item.key().to_vec(),
seq: *item.seq(),
sig: item.signature().to_vec(),
Expand Down Expand Up @@ -472,7 +474,7 @@ impl Rpc {

query.response(ResponseValue::Immutable(GetImmutableResponse {
from: Node::new(*responder_id, from),
value: v.clone(),
value: v.to_owned().into(),
}));
}
MessageType::Response(ResponseSpecific::GetMutable(
Expand All @@ -492,9 +494,14 @@ impl Rpc {
_ => &None,
};

if let Ok(item) =
MutableItem::from_dht_message(query.target(), k, v, seq, sig, salt)
{
if let Ok(item) = MutableItem::from_dht_message(
query.target(),
k,
v.to_owned().into(),
seq,
sig,
salt,
) {
query.response(ResponseValue::Mutable(GetMutableResponse {
from: Node::new(*responder_id, from),
item,
Expand Down

0 comments on commit 8b1fa1d

Please sign in to comment.