From 7ac14e558a73c6e6ab65b726ee10885aade3da18 Mon Sep 17 00:00:00 2001 From: nazeh Date: Fri, 9 Feb 2024 17:48:51 +0300 Subject: [PATCH 1/3] feat: add tracing debug logs --- Cargo.toml | 2 ++ examples/announce_peer.rs | 7 +++++++ examples/async/get_immutable.rs | 7 +++++++ examples/get_immutable.rs | 7 +++++++ examples/get_mutable.rs | 6 ++++++ examples/get_peers.rs | 7 +++++++ examples/put_immutable.rs | 7 +++++++ examples/put_mutable.rs | 9 ++++++++- src/rpc.rs | 11 +++++++---- src/socket.rs | 14 +++++++++----- 10 files changed, 67 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 92cc0808..6f7de300 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,12 @@ sha1_smol = "1.0.0" flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false } ed25519-dalek = "2.1.0" bytes = "1.5.0" +tracing = "0.1" [dev-dependencies] clap = { version = "4.4.8", features = ["derive"] } futures = "0.3.29" +tracing-subscriber = "0.3" [features] async = ["flume/async"] diff --git a/examples/announce_peer.rs b/examples/announce_peer.rs index 1ef2e608..f5bc33d5 100644 --- a/examples/announce_peer.rs +++ b/examples/announce_peer.rs @@ -4,6 +4,9 @@ use mainline::{Dht, Id}; use clap::Parser; +use tracing::Level; +use tracing_subscriber; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -12,6 +15,10 @@ struct Cli { } fn main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + let cli = Cli::parse(); match Id::from_str(cli.infohash.as_str()) { diff --git a/examples/async/get_immutable.rs b/examples/async/get_immutable.rs index c82528f1..6a74eeb7 100644 --- a/examples/async/get_immutable.rs +++ b/examples/async/get_immutable.rs @@ -4,6 +4,9 @@ use mainline::{Dht, Id}; use clap::Parser; +use tracing::Level; +use tracing_subscriber; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -12,6 +15,10 @@ struct Cli { } async fn async_main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + let cli = Cli::parse(); let target_parse_result: Result = Id::from_str(cli.target.as_str()); diff --git a/examples/get_immutable.rs b/examples/get_immutable.rs index e4c37c8e..9832556b 100644 --- a/examples/get_immutable.rs +++ b/examples/get_immutable.rs @@ -4,6 +4,9 @@ use mainline::{Dht, Id}; use clap::Parser; +use tracing::Level; +use tracing_subscriber; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -12,6 +15,10 @@ struct Cli { } fn main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + let cli = Cli::parse(); let target_parse_result: Result = Id::from_str(cli.target.as_str()); diff --git a/examples/get_mutable.rs b/examples/get_mutable.rs index 134720ef..eeb6a44e 100644 --- a/examples/get_mutable.rs +++ b/examples/get_mutable.rs @@ -1,5 +1,7 @@ use ed25519_dalek::VerifyingKey; use std::convert::TryFrom; +use tracing::Level; +use tracing_subscriber; use std::time::Instant; @@ -15,6 +17,10 @@ struct Cli { } fn main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + let cli = Cli::parse(); let public_key = from_hex(cli.public_key.clone()); diff --git a/examples/get_peers.rs b/examples/get_peers.rs index 750b63de..0085a044 100644 --- a/examples/get_peers.rs +++ b/examples/get_peers.rs @@ -4,6 +4,9 @@ use mainline::{Dht, Id}; use clap::Parser; +use tracing::Level; +use tracing_subscriber; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -12,6 +15,10 @@ struct Cli { } fn main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + let cli = Cli::parse(); let infohash_parse_result: Result = Id::from_str(cli.infohash.as_str()); diff --git a/examples/put_immutable.rs b/examples/put_immutable.rs index 1f872515..b358efb7 100644 --- a/examples/put_immutable.rs +++ b/examples/put_immutable.rs @@ -4,6 +4,9 @@ use mainline::Dht; use clap::Parser; +use tracing::Level; +use tracing_subscriber; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -12,6 +15,10 @@ struct Cli { } fn main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + let cli = Cli::parse(); let dht = Dht::default(); diff --git a/examples/put_mutable.rs b/examples/put_mutable.rs index dfe7bfed..1cc3b596 100644 --- a/examples/put_mutable.rs +++ b/examples/put_mutable.rs @@ -8,6 +8,9 @@ use mainline::{common::MutableItem, Dht}; use clap::Parser; +use tracing::Level; +use tracing_subscriber; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -18,6 +21,10 @@ struct Cli { } fn main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + let cli = Cli::parse(); let dht = Dht::default(); @@ -42,7 +49,7 @@ fn main() { let metadata = dht.put_mutable(item).expect("put mutable failed"); println!( - "Stored immutable data as {:?} in {:?} seconds", + "Stored mutable data as {:?} in {:?} seconds", metadata.target(), start.elapsed().as_secs_f32() ); diff --git a/src/rpc.rs b/src/rpc.rs index acf16b37..8b14db4a 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,6 +3,7 @@ use std::net::{SocketAddr, ToSocketAddrs}; use std::time::{Duration, Instant}; use bytes::Bytes; +use tracing::debug; use crate::common::{ validate_immutable, GetImmutableResponse, GetMutableResponse, GetPeerResponse, Id, MutableItem, @@ -150,8 +151,8 @@ impl Rpc { MessageType::Response(_) => { self.handle_response(from, &message); } - MessageType::Error(_err) => { - // TODO: Handle error messages! + MessageType::Error(error) => { + debug!(?message, "RPC Error response"); } } }; @@ -464,7 +465,8 @@ impl Rpc { }, )) => { if !validate_immutable(v, query.target()) { - // TODO: log error + let target = query.target(); + debug!(?v, ?target, "Invalid immutable value"); return; } @@ -489,6 +491,7 @@ impl Rpc { }) => salt, _ => &None, }; + let target = query.target(); if let Ok(item) = MutableItem::from_dht_message( query.target(), @@ -503,7 +506,7 @@ impl Rpc { item, })); } else { - // TODO: log error + debug!(?v, ?seq, ?sig, ?salt, ?target, "Invalid mutable record"); } } // Ping response is already handled in add_node() diff --git a/src/socket.rs b/src/socket.rs index 55aa69ac..8a833e73 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::net::{SocketAddr, UdpSocket}; use std::time::{Duration, Instant}; +use tracing::{debug, trace}; use crate::messages::{ErrorSpecific, Message, MessageType, RequestSpecific, ResponseSpecific}; @@ -129,7 +130,9 @@ impl KrpcSocket { .retain(|_, request| request.sent_at.elapsed() < request_timeout); if let Ok((amt, from)) = self.socket.recv_from(&mut buf) { - match Message::from_bytes(&buf[..amt]) { + let bytes = &buf[..amt]; + + match Message::from_bytes(bytes) { Ok(message) => { // Parsed correctly. match message.message_type { @@ -150,16 +153,16 @@ impl KrpcSocket { self.inflight_requests.remove(&message.transaction_id); return Some((message, from)); } else { - // TODO: handle/log response from wrong address. + debug!("Response from the wrong address"); } } else { - // TODO: handle/log unexpected transaction id. + debug!("Unexpected response id"); }; } } } - Err(_err) => { - // TODO: handle/log parsing errors. + Err(error) => { + debug!(?error, ?bytes, "Received invalid message"); } }; }; @@ -211,6 +214,7 @@ impl KrpcSocket { /// Send a raw dht message fn send(&mut self, address: SocketAddr, message: Message) -> Result<()> { + trace!(?message, "Sending a message"); self.socket.send_to(&message.to_bytes()?, address)?; Ok(()) } From 4f2e1b1c028a73ee1b7d6549bdb33a76fe56bc64 Mon Sep 17 00:00:00 2001 From: nazeh Date: Fri, 9 Feb 2024 18:55:47 +0300 Subject: [PATCH 2/3] feat: log error if the bootstrap query failed to populate the routing table --- examples/bootstrap.rs | 16 ++++++++++++++++ src/routing_table.rs | 6 ++++++ src/rpc.rs | 23 +++++++++++++++++++---- 3 files changed, 41 insertions(+), 4 deletions(-) create mode 100644 examples/bootstrap.rs diff --git a/examples/bootstrap.rs b/examples/bootstrap.rs new file mode 100644 index 00000000..3ceb3721 --- /dev/null +++ b/examples/bootstrap.rs @@ -0,0 +1,16 @@ +use std::{thread, time::Duration}; + +use mainline::Dht; + +use tracing::Level; +use tracing_subscriber; + +fn main() { + tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .init(); + + Dht::default(); + + thread::sleep(Duration::from_secs(5)); +} diff --git a/src/routing_table.rs b/src/routing_table.rs index c2fa61ac..81721eeb 100644 --- a/src/routing_table.rs +++ b/src/routing_table.rs @@ -106,6 +106,12 @@ impl RoutingTable { self.buckets.values().all(|bucket| bucket.is_empty()) } + pub fn size(&self) -> usize { + self.buckets + .values() + .fold(0, |acc, bucket| acc + bucket.nodes.len()) + } + pub fn contains(&self, node_id: &Id) -> bool { let distance = self.id.distance(node_id); diff --git a/src/rpc.rs b/src/rpc.rs index 8b14db4a..35b625ca 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,7 +3,7 @@ use std::net::{SocketAddr, ToSocketAddrs}; use std::time::{Duration, Instant}; use bytes::Bytes; -use tracing::debug; +use tracing::{debug, error}; use crate::common::{ validate_immutable, GetImmutableResponse, GetMutableResponse, GetPeerResponse, Id, MutableItem, @@ -73,7 +73,7 @@ impl Rpc { tokens: Tokens::new(), peers: PeersStore::new(), - last_table_refresh: Instant::now(), + last_table_refresh: Instant::now() - REFRESH_TABLE_INTERVAL, last_table_ping: Instant::now(), }) } @@ -135,7 +135,22 @@ impl Rpc { // disconnect response receivers too soon. // // Has to happen _before_ await to recv_from the socket. - self.queries.retain(|_, query| !query.is_done()); + let self_id = self.id; + let table_size = self.routing_table.size(); + + self.queries.retain(|id, query| { + let done = query.is_done(); + + if done && id == &self_id { + if table_size == 0 { + error!("Could not bootstrap the routing table"); + } else { + debug!(table_size, "Populated the routing table"); + } + } + + !done + }); self.store_queries.retain(|_, query| !query.is_done()); self.maintain_routing_table(); @@ -528,7 +543,7 @@ impl Rpc { fn maintain_routing_table(&mut self) { if self.routing_table.is_empty() - || self.last_table_refresh.elapsed() > REFRESH_TABLE_INTERVAL + && self.last_table_refresh.elapsed() > REFRESH_TABLE_INTERVAL { self.last_table_refresh = Instant::now(); self.populate(); From d94b0547ffebee8af6248db1404ac47562667296 Mon Sep 17 00:00:00 2001 From: nazeh Date: Fri, 9 Feb 2024 19:00:37 +0300 Subject: [PATCH 3/3] chore: fmt --- examples/bootstrap.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/bootstrap.rs b/examples/bootstrap.rs index 3ceb3721..08c33814 100644 --- a/examples/bootstrap.rs +++ b/examples/bootstrap.rs @@ -12,5 +12,5 @@ fn main() { Dht::default(); - thread::sleep(Duration::from_secs(5)); + thread::sleep(Duration::from_secs(5)); }