Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/comms behaviour protocol #210

Merged
merged 43 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
de28de7
feat(comms): init new network behaviour protocol
elenaf9 Apr 19, 2021
f8bc8c9
feat(comms): integrate mdns + relay to behaviour
elenaf9 Apr 23, 2021
d5fbf6b
feat(comms): peer connection and addrs management
elenaf9 Apr 24, 2021
e633fa7
feat(comms): send Res from outbound handshake via channel
elenaf9 Apr 27, 2021
0d95863
feat(comms): remove events for outbound responses
elenaf9 Apr 30, 2021
756f3db
feat(comms): rename types
elenaf9 May 4, 2021
749c8b0
feat(comms): add firewall, manage pending requests
elenaf9 May 7, 2021
609dd7f
chore(comms): update libp2p v0.37 -> v0.38
elenaf9 May 21, 2021
a45d418
test(comms): add firewall tests
elenaf9 May 21, 2021
8e3af02
feat(comms): manage peer addresses and relays
elenaf9 May 24, 2021
1aefc2d
fix(comms): no implicit fallback for using relays
elenaf9 May 27, 2021
753aabd
test(comms): addresses, dialing and using relays
elenaf9 May 27, 2021
6740ade
feat(comms): clean code
elenaf9 Jun 3, 2021
1c644a2
feat(comms): add interface for the comms-library
elenaf9 Jun 3, 2021
bb04d1a
feat(comms): interface: publish network events
elenaf9 Jun 8, 2021
ff421e6
fix(comms): fix no-default-features, fix interface
elenaf9 Jun 8, 2021
17fcc70
test(comms): test dialing via interface
elenaf9 Jun 8, 2021
e0fd5d9
fix(workflows): temp check comms-refactor folder
elenaf9 Jun 8, 2021
5852852
feat(comms): add Errs, move types mod to interface
elenaf9 Jun 10, 2021
0e19adb
docs(comms): add documentation, clean code
elenaf9 Jun 18, 2021
cfb7264
Merge branch 'dev' of github.com:iotaledger/stronghold.rs into feat/c…
elenaf9 Jun 21, 2021
7c6847e
feat(comms): add Builder for ShCommunication
elenaf9 Jun 21, 2021
b634dc2
fix(comms): extend / fix docs, clean code
elenaf9 Jun 22, 2021
6c9ff5d
feat(comms): return Outbound Err via Response Chan
elenaf9 Jun 23, 2021
9aefd77
feat(comms): add outbound failure for shutdown
elenaf9 Jun 24, 2021
c08b66f
feat(comms): use concurrent hashmap in interface
elenaf9 Jun 30, 2021
0efbfe3
fix(comms): use wasm_timer in handler
elenaf9 Jul 2, 2021
3075df6
feat(comms): refactor interface
elenaf9 Jul 8, 2021
49df5e3
fix(comms): fix listeners
elenaf9 Jul 9, 2021
96ea60c
feat(comms): Use Fn in FW-Rule to approve Requests
elenaf9 Jul 9, 2021
f19130d
Merge branch 'dev' of github.com:iotaledger/stronghold.rs into feat/c…
elenaf9 Jul 10, 2021
d758286
feat(comms): clean code, firewall types
elenaf9 Jul 13, 2021
3614bbc
feat(comms): actix CommunicationActor draft
elenaf9 Jul 14, 2021
5a21c45
feat(comms): ARegistry as Service; add impl_handler macro
elenaf9 Jul 15, 2021
442f846
feat(comms): Executor config, fix start_listening
elenaf9 Jul 20, 2021
dc18c19
refactor(p2p): rename library communication -> p2p
elenaf9 Jul 27, 2021
d93c9f6
feat(p2p): wrap channel for inbound requests/ events
elenaf9 Jul 28, 2021
1298fd6
Merge branch 'dev' of github.com:iotaledger/stronghold.rs into HEAD
elenaf9 Aug 25, 2021
dbcd9a3
feat(p2p): integrate Network-Actor into Stronghold
elenaf9 Aug 12, 2021
ceeeee0
fix(comms): remove outdated stronghold-comms crate
elenaf9 Aug 13, 2021
98e455d
fix(p2p): fix documentation
elenaf9 Aug 25, 2021
4e7c1c9
fix(covector): remove comms library from covector
elenaf9 Aug 25, 2021
7b0d58b
fix(p2p): clean code
elenaf9 Aug 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
project: [engine, client, products/commandline, derive, utils, communication]
project: [engine, client, products/commandline, derive, utils, communication-refactored]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
project: [engine, client, products/commandline, derive, utils, communication]
project: [engine, client, products/commandline, derive, utils, communication-refactored]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
project: [engine, client, products/commandline, derive, utils, communication]
project: [engine, client, products/commandline, derive, utils, communication-refactored]
os: [ubuntu-latest, macos-latest, windows-latest]

steps:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"engine",
"engine/runtime",
"client",
"communication",
"communication-refactored",
"utils",
"derive"
]
Expand Down
1 change: 1 addition & 0 deletions communication-refactored/.license_template
39 changes: 39 additions & 0 deletions communication-refactored/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[package]
name = "stronghold-communication-refactored"
version = "0.3.0"
authors = [
"IOTA Stiftung",
"Elena Frank <[email protected]"
]
edition = "2018"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/iotaledger/stronghold.rs"
homepage = "https://stronghold.docs.iota.org"

[lib]
name = "communication_refactored"

[dependencies]
actix = { version = "0.12", optional = true }
futures = "0.3"
libp2p = { version = "0.39", default-features = false, features = ["noise", "yamux"] }
serde = { version = "1.0", default-features = false, features = [ "alloc", "derive" ] }
serde_json = { version = "1.0", default-features = false, features = [ "alloc" ] }
smallvec = "1.6.1"
stronghold-derive = { path = "../derive", version = "0.2.0" }
tokio = { version = "1.7", default-features = false, features = ["rt", "sync"] }
wasm-timer = "0.2.5"

[features]
default = [ "mdns", "relay", "tcp-transport"]
mdns = ["libp2p/mdns"]
relay = ["libp2p/relay"]
tcp-transport = ["libp2p/tcp-tokio", "libp2p/dns-tokio", "libp2p/websocket"]
actor = ["actix"]

[dev-dependencies]
rand = "0.8.3"
tokio = {version = "1.7", features = ["time", "macros"]}
libp2p = { version = "0.39", default-features = false, features = ["tcp-tokio"] }
actix-rt = { version = "2.2"}
1 change: 1 addition & 0 deletions communication-refactored/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Stronghold Communication
166 changes: 166 additions & 0 deletions communication-refactored/src/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

#![cfg(feature = "actor")]

pub mod messages;
use crate::{ListenErr, Multiaddr, OutboundFailure, PeerId, ReceiveRequest, RqRsMessage, ShCommunication};
use actix::{dev::ToEnvelope, prelude::*};
use futures::{channel::mpsc, FutureExt, TryFutureExt};
use messages::*;
use std::{borrow::Borrow, io, marker::PhantomData};

#[macro_use]
macro_rules! impl_handler {
($mty:ty => $rty:ty, |$cid:ident, $mid:ident| $($body:stmt)+ ) => {
impl<ARegistry, C, Rq, Rs, TRq> Handler<$mty> for CommunicationActor<ARegistry, C, Rq, Rs, TRq>
where
ARegistry: ArbiterService + Handler<GetClient<Rq, C>>,
ARegistry::Context: ToEnvelope<ARegistry, GetClient<Rq, C>>,
C: Actor + Handler<Rq> + Send,
C::Context: ToEnvelope<C, Rq>,
Rq: Message<Result = Rs> + RqRsMessage + Borrow<TRq> + Clone,
Rs: RqRsMessage + Clone,
TRq: Clone + Send + 'static,
{
type Result = ResponseActFuture<Self, $rty>;
fn handle(&mut self, $mid: $mty, _: &mut Self::Context) -> Self::Result {
let mut $cid = self.comms.clone();
async move { $($body)+ }.into_actor(self).boxed_local()
}
}
};
}

pub struct GetClient<Rq: Message, C: Actor + Handler<Rq>> {
pub remote: PeerId,
_marker: (PhantomData<C>, PhantomData<Rq>),
}

impl<Rq: Message, C: Actor + Handler<Rq>> Message for GetClient<Rq, C> {
type Result = Addr<C>;
}

pub struct CommunicationActor<ARegistry, C, Rq, Rs, TRq = Rq>
where
ARegistry: Actor,
Rq: Message + RqRsMessage + Borrow<TRq>,
Rs: RqRsMessage,
TRq: Clone + Send + 'static,
{
comms: ShCommunication<Rq, Rs, TRq>,
inbound_request_rx: Option<mpsc::Receiver<ReceiveRequest<Rq, Rs>>>,
_marker: (PhantomData<ARegistry>, PhantomData<C>),
}

impl<ARegistry, C, Rq, Rs, TRq> CommunicationActor<ARegistry, C, Rq, Rs, TRq>
where
ARegistry: Actor,
Rq: Message + RqRsMessage + Borrow<TRq>,
Rs: RqRsMessage,
TRq: Clone + Send + 'static,
{
#[cfg(feature = "tcp-transport")]
pub async fn new() -> Result<Self, io::Error> {
let (firewall_tx, _) = mpsc::channel(0);
let (inbound_request_tx, inbound_request_rx) = mpsc::channel(1);
let comms = ShCommunication::new(firewall_tx, inbound_request_tx, None).await?;
let actor = Self {
comms,
inbound_request_rx: Some(inbound_request_rx),
_marker: (PhantomData, PhantomData),
};
Ok(actor)
}
}

impl<ARegistry, C, Rq, Rs, TRq> Actor for CommunicationActor<ARegistry, C, Rq, Rs, TRq>
where
ARegistry: ArbiterService + Handler<GetClient<Rq, C>>,
ARegistry::Context: ToEnvelope<ARegistry, GetClient<Rq, C>>,
C: Actor + Handler<Rq> + Send,
C::Context: ToEnvelope<C, Rq>,
Rq: Message<Result = Rs> + RqRsMessage + Borrow<TRq>,
Rs: RqRsMessage,
TRq: Clone + Send + 'static,
{
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
let inbound_request_rx = self.inbound_request_rx.take().unwrap();
Self::add_stream(inbound_request_rx, ctx);
}
}

impl<ARegistry, C, Rq, Rs, TRq> StreamHandler<ReceiveRequest<Rq, Rs>> for CommunicationActor<ARegistry, C, Rq, Rs, TRq>
where
ARegistry: ArbiterService + Handler<GetClient<Rq, C>>,
ARegistry::Context: ToEnvelope<ARegistry, GetClient<Rq, C>>,
C: Actor + Handler<Rq> + Send,
C::Context: ToEnvelope<C, Rq>,
Rq: Message<Result = Rs> + RqRsMessage + Borrow<TRq>,
Rs: RqRsMessage,
TRq: Clone + Send + 'static,
{
fn handle(&mut self, item: ReceiveRequest<Rq, Rs>, ctx: &mut Self::Context) {
let ReceiveRequest {
request,
response_tx,
peer,
..
} = item;

let registry = ARegistry::from_registry();
let fut = registry
.send(GetClient {
remote: peer,
_marker: (PhantomData, PhantomData),
})
.and_then(|client| client.send(request))
.map_ok(|response| response_tx.send(response))
.map(|_| ())
.into_actor(self);
ctx.wait(fut);
}
}

impl<ARegistry, C, Rq, Rs, TRq> From<(ShCommunication<Rq, Rs, TRq>, mpsc::Receiver<ReceiveRequest<Rq, Rs>>)>
for CommunicationActor<ARegistry, C, Rq, Rs, TRq>
where
ARegistry: Actor,
Rq: Message + RqRsMessage + Borrow<TRq>,
Rs: RqRsMessage,
TRq: Clone + Send + 'static,
{
fn from((comms, request_rx): (ShCommunication<Rq, Rs, TRq>, mpsc::Receiver<ReceiveRequest<Rq, Rs>>)) -> Self {
Self {
comms,
inbound_request_rx: Some(request_rx),
_marker: (PhantomData, PhantomData),
}
}
}

impl_handler!(SendRequest<Rq, Rs> => Result<Rs, OutboundFailure>, |comms, msg| {
comms.send_request(msg.peer, msg.request).await
});

impl_handler!(StartListening => Result<Multiaddr, ListenErr>, |comms, msg| {
comms.start_listening(msg.address).await
});

impl_handler!(GetLocalPeerId => PeerId, |_comms, _msg| {
_comms.get_peer_id()
});

impl_handler!(AddPeerAddr => (), |comms, msg| {
comms.add_address(msg.peer, msg.address).await
});

impl_handler!(SetFirewallRule<TRq> => (), |comms, msg| {
comms.set_peer_rule(msg.peer, msg.direction, msg.rule).await
});

impl_handler!(SetFirewallDefault<TRq> => (), |comms, msg| {
comms.set_firewall_default(msg.direction, msg.rule).await
});
61 changes: 61 additions & 0 deletions communication-refactored/src/actor/messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::marker::PhantomData;

use crate::{
firewall::{Rule, RuleDirection},
ListenErr, OutboundFailure,
};
use actix::Message;
use libp2p::{Multiaddr, PeerId};

#[derive(Message)]
#[rtype(result = "Result<Rs, OutboundFailure>")]
pub struct SendRequest<Rq, Rs: 'static> {
pub peer: PeerId,
pub request: Rq,
_marker: PhantomData<Rs>,
}

impl<Rq, Rs: 'static> SendRequest<Rq, Rs> {
pub fn new(peer: PeerId, request: Rq) -> Self {
SendRequest {
peer,
request,
_marker: PhantomData,
}
}
}

#[derive(Message)]
#[rtype(result = "Result<Multiaddr, ListenErr>")]
pub struct StartListening {
pub address: Option<Multiaddr>,
}

#[derive(Message)]
#[rtype(result = "PeerId")]
pub struct GetLocalPeerId;

#[derive(Message)]
#[rtype(result = "()")]
pub struct AddPeerAddr {
pub peer: PeerId,
pub address: Multiaddr,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct SetFirewallRule<TRq> {
pub peer: PeerId,
pub direction: RuleDirection,
pub rule: Rule<TRq>,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct SetFirewallDefault<TRq> {
pub direction: RuleDirection,
pub rule: Rule<TRq>,
}
Loading