-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
examples: Showcase how to use rust-libp2p in larger application #2158
Comments
I've found |
What would be the benefit of |
It is primarily less noisy but also comes with a built in timeout which is useful if you have other parts of the application that talk to the remote control and don't want to wait forever on results coming back from the network layer. But really, the main value is convenience through f.e. the |
I'd be happy to add such an example, similar to how it is done in the interface the iotaledger/stronghold.rs#210 PR you linked above. I like |
That is likely relevant for the timer inside the channel. There needs to be a tokio reacter somewhere running for this to work I believe. Otherwise it will panic at runtime. |
🙏 thanks! To keep things simple I would prefer an example with plain old future channels instead of |
I tried implementing something like that but I'm hopelessly stuck. Please help. Cargo.toml: [dependencies]
tokio = { version = "1.9", features = ["full"] }
libp2p = {version = "0.39.1", features = ["mdns", "tcp-tokio"]}
async-std = { version = "1", features = ["attributes", "tokio1"] } main.rs: use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use libp2p::swarm::{NetworkBehaviourEventProcess, NetworkBehaviour, SwarmEvent};
use libp2p::NetworkBehaviour;
use libp2p::PeerId;
use libp2p::kad::{Kademlia, QueryId, KademliaEvent, Quorum, Record, QueryResult, PutRecordError, PutRecordOk};
use libp2p::kad::store::MemoryStore;
use libp2p::mdns::{Mdns, MdnsEvent, MdnsConfig};
use libp2p::kad::record::Key;
use libp2p::{identity, noise, Transport, mplex, Swarm};
use libp2p::tcp::{TcpConfig, TokioTcpConfig};
use libp2p::core::upgrade;
use tokio::sync::oneshot;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::io::AsyncBufReadExt;
use libp2p::futures::StreamExt;
// use libp2p::futures::channel::oneshot;
// use libp2p::futures::channel::oneshot::Sender;
#[derive(NetworkBehaviour)]
struct MyBehaviour {
kademlia: Kademlia<MemoryStore>,
mdns: Mdns,
#[behaviour(ignore)]
ht: Arc<Mutex<HashMap<QueryId, Sender<QueryResult>>>>,
}
impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
for (peer_id, multiaddr) in list {
self.kademlia.add_address(&peer_id, multiaddr);
}
}
MdnsEvent::Expired(list) => {
for (peer_id, multiaddr) in list {
self.kademlia.remove_address(&peer_id, &multiaddr)
.expect("Error removing address");
}
}
}
}
}
impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
fn inject_event(&mut self, event: KademliaEvent) {
println!("event => {:?}", event);
match event {
KademliaEvent::OutboundQueryCompleted { result, id, .. } => {
println!("Result => {:?}", result);
// let res = tokio::task::block_in_place(move || self.ht.lock()).remove(&id).unwrap();
let res = self.ht.lock().unwrap().remove(&id).unwrap();
res.send(result).unwrap();
},
_ => {
println!("Something other happened");
}
}
}
}
async fn handle_input_line(kademlia: &mut Dht, line: String) {
let mut args = line.split(' ');
match args.next() {
Some("GET") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
println!("Key => {:?}", key);
let res = kademlia.get(key).await.unwrap();
println!("Record is => {:?}", res);
}
Some("PUT") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
let value = {
match args.next() {
Some(value) => value.as_bytes().to_vec(),
None => {
eprintln!("Expected value");
return;
}
}
};
let record = Record {
key,
value,
publisher: None,
expires: None,
};
// kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
let res = kademlia.put(record).await.unwrap();
println!("Success Key was => {:?}", res.to_vec());
},
_ => {}
}
}
struct Dht (Swarm<MyBehaviour>);
impl Dht {
pub fn new(swarm: Swarm<MyBehaviour>) -> Self {
Self(swarm)
}
pub async fn put(&mut self, value: Record) -> Result<Key, &'static str> {
let behaviour = self.0.behaviour_mut();
let (sx, rx) = oneshot::channel();
let query_id = behaviour.kademlia.put_record(value, Quorum::One).unwrap();
behaviour.ht.lock().unwrap().insert(query_id, sx);
println!("HT => {:?}", behaviour.ht);
drop(behaviour);
let res = rx.await.unwrap();
match res {
QueryResult::PutRecord(d) => {
match d {
Ok(dd) => {
Ok(dd.key)
}
Err(e) => {
Err("Something went wrong again")
}
}
}
_ => {
Err("Something went wrong")
}
}
}
pub async fn get(&mut self, key: Key) -> Result<Record, &'static str> {
let behaviour = self.0.behaviour_mut();
let (sx, rx) = oneshot::channel();
let res = tokio::spawn(async {
rx.await.unwrap()
});
let query_id = behaviour.kademlia.get_record(&key, Quorum::One);
behaviour.ht.lock().unwrap().insert(query_id, sx);
println!("{:?}", behaviour.ht);
match res.await.unwrap() {
QueryResult::GetRecord(d) => {
match d {
Ok(dd) => {
println!("DD => {:?}", dd);
Ok(dd.records.get(0).unwrap().record.clone())
}
Err(_) => {
Err("something went wrong again")
}
}
}
_ => {
Err("Something went wrong")
}
}
}
}
#[tokio::main]
async fn main() {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
println!("Local peer id => {:?}", peer_id);
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&id_keys)
.expect("Signing libp2p-noise static DH keypair failed.");
let transport = TokioTcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(mplex::MplexConfig::new())
.boxed();
let mut swarm = {
let store = MemoryStore::new(peer_id);
let kademlia = Kademlia::new(peer_id, store);
let mdns = Mdns::new(MdnsConfig::default()).await.unwrap();
let ht = Arc::new(Mutex::new(HashMap::new()));
let behaviour = MyBehaviour {
kademlia,
mdns,
ht,
};
Swarm::new(transport, behaviour, peer_id)
};
let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines();
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
let mut managed_swarm = Dht::new(swarm);
managed_swarm.put(Record {
key: Key::new(&[0, 0, 1]),
value: vec![1, 3, 6],
publisher: None,
expires: None
});
loop {
tokio::select! {
line = stdin.next_line() => {
let line = line.unwrap().expect("stdin closed");
handle_input_line(&mut managed_swarm, line).await;
}
event = managed_swarm.0.select_next_some() => {
println!("Event => {:?}", event);
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
}
}
}
}
} when I execute the program halts after printing the current HashTable and then stalls. What am I doing wrong? |
First of all, you are not calling But apart from that, I think the general problem is that you are blocking for a In your method With your current implementation, this could maybe be solved with something like:
But since you are then essentially sending the event from within the
|
Ok I think I understand and I even got the first code to work. However I'm struggling to do the second thing. Could you give an example on how to set the behavior flag correctly? |
If I got it right from the documentation my Network Behavior struct has to look something like this: #[derive(NetworkBehaviour)]
struct MyBehaviour {
#[behaviour(out_event = "KademliaEvent")]
#[behaviour(event_process = false)]
kademlia: Kademlia<MemoryStore>,
mdns: Mdns,
#[behaviour(ignore)]
ht: Arc<Mutex<HashMap<QueryId, Sender<QueryResult>>>>,
} Without implementing |
@umgefahren The attribut-macro
With
And when polling the swarm, the
This is quite analogous to how it is done in the project that @thomaseizinger linked in #2024 (comment), so I'd recommend to take a deeper look into their implementation in case of further questions. |
Thanks that worked :) |
Here is the result: |
Whilst this may work, it may impose some problems down the line :/ In particular, I think polling the swarm (i.e. calling This is only one pattern but thinking of it as an actor-based design where you have a single worker that drives the |
That would be definitely great |
I've posted something here: #2171 |
While we have many examples which showcase rust-libp2p in isolation, it would be great to have an example on how to integrate rust-libp2p into a larger application.
I was thinking of something along the lines of:
A single task polling (a) the
Swarm
and (b) a channel of incoming commands.The commands coming through (b) would contain a
oneshot::Sender
, allowing the task to send back a response when ready.A remote control providing
async
methods, each sending a command down to the task, awaiting theoneshot::Receiver
side.The example should derive its own
NetworkBehaviour
based on e.g.libp2p-request-response
andlibp2p-identify
.See the following resources for additional background:
Happy to expand / discuss details in case someone wants to tackle this.
The text was updated successfully, but these errors were encountered: