Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tracing debug logs #7

Merged
merged 3 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ sha1_smol = "1.0.0"
flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false }
ed25519-dalek = "2.1.0"
bytes = "1.5.0"
tracing = "0.1"

[dev-dependencies]
clap = { version = "4.4.8", features = ["derive"] }
futures = "0.3.29"
tracing-subscriber = "0.3"

[features]
async = ["flume/async"]
Expand Down
7 changes: 7 additions & 0 deletions examples/announce_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

match Id::from_str(cli.infohash.as_str()) {
Expand Down
7 changes: 7 additions & 0 deletions examples/async/get_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

async fn async_main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let target_parse_result: Result<Id, _> = Id::from_str(cli.target.as_str());
Expand Down
16 changes: 16 additions & 0 deletions examples/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::{thread, time::Duration};

use mainline::Dht;

use tracing::Level;
use tracing_subscriber;

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

Dht::default();

thread::sleep(Duration::from_secs(5));
}
7 changes: 7 additions & 0 deletions examples/get_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let target_parse_result: Result<Id, _> = Id::from_str(cli.target.as_str());
Expand Down
6 changes: 6 additions & 0 deletions examples/get_mutable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use ed25519_dalek::VerifyingKey;
use std::convert::TryFrom;
use tracing::Level;
use tracing_subscriber;

use std::time::Instant;

Expand All @@ -15,6 +17,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let public_key = from_hex(cli.public_key.clone());
Expand Down
7 changes: 7 additions & 0 deletions examples/get_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::{Dht, Id};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let infohash_parse_result: Result<Id, _> = Id::from_str(cli.infohash.as_str());
Expand Down
7 changes: 7 additions & 0 deletions examples/put_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use mainline::Dht;

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -12,6 +15,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let dht = Dht::default();
Expand Down
9 changes: 8 additions & 1 deletion examples/put_mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use mainline::{common::MutableItem, Dht};

use clap::Parser;

use tracing::Level;
use tracing_subscriber;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
Expand All @@ -18,6 +21,10 @@ struct Cli {
}

fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

let cli = Cli::parse();

let dht = Dht::default();
Expand All @@ -42,7 +49,7 @@ fn main() {
let metadata = dht.put_mutable(item).expect("put mutable failed");

println!(
"Stored immutable data as {:?} in {:?} seconds",
"Stored mutable data as {:?} in {:?} seconds",
metadata.target(),
start.elapsed().as_secs_f32()
);
Expand Down
6 changes: 6 additions & 0 deletions src/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ impl RoutingTable {
self.buckets.values().all(|bucket| bucket.is_empty())
}

pub fn size(&self) -> usize {
self.buckets
.values()
.fold(0, |acc, bucket| acc + bucket.nodes.len())
}

pub fn contains(&self, node_id: &Id) -> bool {
let distance = self.id.distance(node_id);

Expand Down
32 changes: 25 additions & 7 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::{SocketAddr, ToSocketAddrs};
use std::time::{Duration, Instant};

use bytes::Bytes;
use tracing::{debug, error};

use crate::common::{
validate_immutable, GetImmutableResponse, GetMutableResponse, GetPeerResponse, Id, MutableItem,
Expand Down Expand Up @@ -72,7 +73,7 @@ impl Rpc {
tokens: Tokens::new(),
peers: PeersStore::new(),

last_table_refresh: Instant::now(),
last_table_refresh: Instant::now() - REFRESH_TABLE_INTERVAL,
last_table_ping: Instant::now(),
})
}
Expand Down Expand Up @@ -134,7 +135,22 @@ impl Rpc {
// disconnect response receivers too soon.
//
// Has to happen _before_ await to recv_from the socket.
self.queries.retain(|_, query| !query.is_done());
let self_id = self.id;
let table_size = self.routing_table.size();

self.queries.retain(|id, query| {
let done = query.is_done();

if done && id == &self_id {
if table_size == 0 {
error!("Could not bootstrap the routing table");
} else {
debug!(table_size, "Populated the routing table");
}
}

!done
});
self.store_queries.retain(|_, query| !query.is_done());

self.maintain_routing_table();
Expand All @@ -150,8 +166,8 @@ impl Rpc {
MessageType::Response(_) => {
self.handle_response(from, &message);
}
MessageType::Error(_err) => {
// TODO: Handle error messages!
MessageType::Error(error) => {
debug!(?message, "RPC Error response");
}
}
};
Expand Down Expand Up @@ -464,7 +480,8 @@ impl Rpc {
},
)) => {
if !validate_immutable(v, query.target()) {
// TODO: log error
let target = query.target();
debug!(?v, ?target, "Invalid immutable value");
return;
}

Expand All @@ -489,6 +506,7 @@ impl Rpc {
}) => salt,
_ => &None,
};
let target = query.target();

if let Ok(item) = MutableItem::from_dht_message(
query.target(),
Expand All @@ -503,7 +521,7 @@ impl Rpc {
item,
}));
} else {
// TODO: log error
debug!(?v, ?seq, ?sig, ?salt, ?target, "Invalid mutable record");
}
}
// Ping response is already handled in add_node()
Expand All @@ -525,7 +543,7 @@ impl Rpc {

fn maintain_routing_table(&mut self) {
if self.routing_table.is_empty()
|| self.last_table_refresh.elapsed() > REFRESH_TABLE_INTERVAL
&& self.last_table_refresh.elapsed() > REFRESH_TABLE_INTERVAL
{
self.last_table_refresh = Instant::now();
self.populate();
Expand Down
14 changes: 9 additions & 5 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::collections::HashMap;
use std::net::{SocketAddr, UdpSocket};
use std::time::{Duration, Instant};
use tracing::{debug, trace};

use crate::messages::{ErrorSpecific, Message, MessageType, RequestSpecific, ResponseSpecific};

Expand Down Expand Up @@ -129,7 +130,9 @@ impl KrpcSocket {
.retain(|_, request| request.sent_at.elapsed() < request_timeout);

if let Ok((amt, from)) = self.socket.recv_from(&mut buf) {
match Message::from_bytes(&buf[..amt]) {
let bytes = &buf[..amt];

match Message::from_bytes(bytes) {
Ok(message) => {
// Parsed correctly.
match message.message_type {
Expand All @@ -150,16 +153,16 @@ impl KrpcSocket {
self.inflight_requests.remove(&message.transaction_id);
return Some((message, from));
} else {
// TODO: handle/log response from wrong address.
debug!("Response from the wrong address");
}
} else {
// TODO: handle/log unexpected transaction id.
debug!("Unexpected response id");
};
}
}
}
Err(_err) => {
// TODO: handle/log parsing errors.
Err(error) => {
debug!(?error, ?bytes, "Received invalid message");
}
};
};
Expand Down Expand Up @@ -211,6 +214,7 @@ impl KrpcSocket {

/// Send a raw dht message
fn send(&mut self, address: SocketAddr, message: Message) -> Result<()> {
trace!(?message, "Sending a message");
self.socket.send_to(&message.to_bytes()?, address)?;
Ok(())
}
Expand Down
Loading