Skip to content

Commit

Permalink
Merge pull request #7 from Nuhvi/feat/tracing
Browse files Browse the repository at this point in the history
feat: add tracing debug logs
  • Loading branch information
Nuhvi authored Feb 9, 2024
2 parents b5e3288 + d94b054 commit 14c67f2
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ sha1_smol = "1.0.0"
flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false }
ed25519-dalek = "2.1.0"
bytes = "1.5.0"
tracing = "0.1"

[dev-dependencies]
clap = { version = "4.4.8", features = ["derive"] }
futures = "0.3.29"
tracing-subscriber = "0.3"

[features]
async = ["flume/async"]
Expand Down
7 changes: 7 additions & 0 deletions examples/announce_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

match Id::from_str(cli.infohash.as_str()) {
Expand Down
7 changes: 7 additions & 0 deletions examples/async/get_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

async fn async_main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let target_parse_result: Result<Id, _> = Id::from_str(cli.target.as_str());
Expand Down
16 changes: 16 additions & 0 deletions examples/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::{thread, time::Duration};

use mainline::Dht;

use tracing::Level;
use tracing_subscriber;

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

Dht::default();

thread::sleep(Duration::from_secs(5));
}
7 changes: 7 additions & 0 deletions examples/get_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let target_parse_result: Result<Id, _> = Id::from_str(cli.target.as_str());
Expand Down
6 changes: 6 additions & 0 deletions examples/get_mutable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use ed25519_dalek::VerifyingKey;
use std::convert::TryFrom;
use tracing::Level;
use tracing_subscriber;

use std::time::Instant;

Expand All @@ -15,6 +17,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let public_key = from_hex(cli.public_key.clone());
Expand Down
7 changes: 7 additions & 0 deletions examples/get_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let infohash_parse_result: Result<Id, _> = Id::from_str(cli.infohash.as_str());
Expand Down
7 changes: 7 additions & 0 deletions examples/put_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::Dht;

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let dht = Dht::default();
Expand Down
9 changes: 8 additions & 1 deletion examples/put_mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use mainline::{common::MutableItem, Dht};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -18,6 +21,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let dht = Dht::default();
Expand All @@ -42,7 +49,7 @@ fn main() {
let metadata = dht.put_mutable(item).expect("put mutable failed");

println!(
"Stored immutable data as {:?} in {:?} seconds",
"Stored mutable data as {:?} in {:?} seconds",
metadata.target(),
start.elapsed().as_secs_f32()
);
Expand Down
6 changes: 6 additions & 0 deletions src/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ impl RoutingTable {
self.buckets.values().all(|bucket| bucket.is_empty())
}

pub fn size(&self) -> usize {
self.buckets
.values()
.fold(0, |acc, bucket| acc + bucket.nodes.len())
}

pub fn contains(&self, node_id: &Id) -> bool {
let distance = self.id.distance(node_id);

Expand Down
32 changes: 25 additions & 7 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::{SocketAddr, ToSocketAddrs};
use std::time::{Duration, Instant};

use bytes::Bytes;
use tracing::{debug, error};

use crate::common::{
validate_immutable, GetImmutableResponse, GetMutableResponse, GetPeerResponse, Id, MutableItem,
Expand Down Expand Up @@ -72,7 +73,7 @@ impl Rpc {
tokens: Tokens::new(),
peers: PeersStore::new(),

last_table_refresh: Instant::now(),
last_table_refresh: Instant::now() - REFRESH_TABLE_INTERVAL,
last_table_ping: Instant::now(),
})
}
Expand Down Expand Up @@ -134,7 +135,22 @@ impl Rpc {
// disconnect response receivers too soon.
//
// Has to happen _before_ await to recv_from the socket.
self.queries.retain(|_, query| !query.is_done());
let self_id = self.id;
let table_size = self.routing_table.size();

self.queries.retain(|id, query| {
let done = query.is_done();

if done && id == &self_id {
if table_size == 0 {
error!("Could not bootstrap the routing table");
} else {
debug!(table_size, "Populated the routing table");
}
}

!done
});
self.store_queries.retain(|_, query| !query.is_done());

self.maintain_routing_table();
Expand All @@ -150,8 +166,8 @@ impl Rpc {
MessageType::Response(_) => {
self.handle_response(from, &message);
}
MessageType::Error(_err) => {
// TODO: Handle error messages!
MessageType::Error(error) => {
debug!(?message, "RPC Error response");
}
}
};
Expand Down Expand Up @@ -464,7 +480,8 @@ impl Rpc {
},
)) => {
if !validate_immutable(v, query.target()) {
// TODO: log error
let target = query.target();
debug!(?v, ?target, "Invalid immutable value");
return;
}

Expand All @@ -489,6 +506,7 @@ impl Rpc {
}) => salt,
_ => &None,
};
let target = query.target();

if let Ok(item) = MutableItem::from_dht_message(
query.target(),
Expand All @@ -503,7 +521,7 @@ impl Rpc {
item,
}));
} else {
// TODO: log error
debug!(?v, ?seq, ?sig, ?salt, ?target, "Invalid mutable record");
}
}
// Ping response is already handled in add_node()
Expand All @@ -525,7 +543,7 @@ impl Rpc {

fn maintain_routing_table(&mut self) {
if self.routing_table.is_empty()
|| self.last_table_refresh.elapsed() > REFRESH_TABLE_INTERVAL
&& self.last_table_refresh.elapsed() > REFRESH_TABLE_INTERVAL
{
self.last_table_refresh = Instant::now();
self.populate();
Expand Down
14 changes: 9 additions & 5 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::collections::HashMap;
use std::net::{SocketAddr, UdpSocket};
use std::time::{Duration, Instant};
use tracing::{debug, trace};

use crate::messages::{ErrorSpecific, Message, MessageType, RequestSpecific, ResponseSpecific};

Expand Down Expand Up @@ -129,7 +130,9 @@ impl KrpcSocket {
.retain(|_, request| request.sent_at.elapsed() < request_timeout);

if let Ok((amt, from)) = self.socket.recv_from(&mut buf) {
match Message::from_bytes(&buf[..amt]) {
let bytes = &buf[..amt];

match Message::from_bytes(bytes) {
Ok(message) => {
// Parsed correctly.
match message.message_type {
Expand All @@ -150,16 +153,16 @@ impl KrpcSocket {
self.inflight_requests.remove(&message.transaction_id);
return Some((message, from));
} else {
// TODO: handle/log response from wrong address.
debug!("Response from the wrong address");
}
} else {
// TODO: handle/log unexpected transaction id.
debug!("Unexpected response id");
};
}
}
}
Err(_err) => {
// TODO: handle/log parsing errors.
Err(error) => {
debug!(?error, ?bytes, "Received invalid message");
}
};
};
Expand Down Expand Up @@ -211,6 +214,7 @@ impl KrpcSocket {

/// Send a raw dht message
fn send(&mut self, address: SocketAddr, message: Message) -> Result<()> {
trace!(?message, "Sending a message");
self.socket.send_to(&message.to_bytes()?, address)?;
Ok(())
}
Expand Down

0 comments on commit 14c67f2

Please sign in to comment.