Skip to content

Commit

Permalink
Merge pull request #258 from input-output-hk/p2p-propagation
Browse files Browse the repository at this point in the history
network: Can't have ConnectionState in service
  • Loading branch information
mzabaluev authored Apr 5, 2019
2 parents ce35960 + 32a14ca commit cbd15d4
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 38 deletions.
17 changes: 10 additions & 7 deletions src/network/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
use super::super::{service::NodeServer, Channels, ConnectionState};
use super::super::{service::NodeService, Channels, GlobalState};
use crate::settings::start::network::Listen;

use network_grpc::server::{listen, Server};
use network_grpc::server::{self, Server};

use tokio::executor::DefaultExecutor;
use tokio::prelude::*;

use std::net::SocketAddr;
use std::sync::Arc;

pub fn run_listen_socket(
sockaddr: SocketAddr,
state: ConnectionState,
listen: Listen,
state: Arc<GlobalState>,
channels: Channels,
) -> impl Future<Item = (), Error = ()> {
let sockaddr = listen.address();

info!(
"start listening and accepting gRPC connections on {}",
sockaddr
);

match listen(&sockaddr) {
match server::listen(&sockaddr) {
Err(error) => {
error!("Error while listening to {}", error ; sockaddr = sockaddr);
unimplemented!()
}
Ok(listener_stream) => {
let node_server = NodeServer::new(state, channels);
let node_server = NodeService::new(channels, state);
let server = Server::new(node_server, DefaultExecutor::current());

listener_stream
Expand Down
16 changes: 3 additions & 13 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,7 @@ pub struct ConnectionState {
}

impl ConnectionState {
fn new_listen(global: Arc<GlobalState>, listen: &Listen) -> Self {
ConnectionState {
global,
timeout: listen.timeout,
connection: listen.connection,
propagation: Mutex::new(propagate::PeerHandles::new()),
}
}

fn new_peer(global: Arc<GlobalState>, peer: &Peer) -> Self {
fn new(global: Arc<GlobalState>, peer: &Peer) -> Self {
ConnectionState {
global,
timeout: peer.timeout,
Expand Down Expand Up @@ -140,8 +131,7 @@ pub fn run(
match protocol {
Protocol::Grpc => {
let listen = Listen::new(public_address, protocol);
let conn_state = ConnectionState::new_listen(state.clone(), &listen);
grpc::run_listen_socket(public_address, conn_state, channels.clone())
grpc::run_listen_socket(listen, state.clone(), channels.clone())
}
Protocol::Ntt => unimplemented!(),
}
Expand All @@ -157,7 +147,7 @@ pub fn run(
.collect::<Vec<_>>();
let connections = stream::iter_ok(addrs).for_each(move |addr| {
let peer = Peer::new(addr, Protocol::Grpc);
let conn_state = ConnectionState::new_peer(state.clone(), &peer);
let conn_state = ConnectionState::new(state.clone(), &peer);
let conn = grpc::run_connect_socket(addr, conn_state, channels.clone());
conn // TODO: manage propagation peers in a map
});
Expand Down
47 changes: 29 additions & 18 deletions src/network/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{p2p_topology as p2p, propagate::Subscription, Channels, ConnectionState};
use super::{p2p_topology as p2p, propagate::Subscription, Channels, GlobalState};

use crate::blockcfg::{Block, BlockDate, Header, HeaderHash, Message, MessageId};
use crate::intercom::{
Expand All @@ -16,27 +16,30 @@ use network_core::{
},
};

use futures::future::{self, FutureResult};
use futures::prelude::*;
use futures::{
future::{self, FutureResult},
stream,
};

use std::sync::Arc;

#[derive(Clone)]
pub struct NodeServer {
state: Arc<ConnectionState>,
pub struct NodeService {
channels: Channels,
global_state: Arc<GlobalState>,
}

impl NodeServer {
pub fn new(state: ConnectionState, channels: Channels) -> Self {
NodeServer {
state: Arc::new(state),
impl NodeService {
pub fn new(channels: Channels, global_state: Arc<GlobalState>) -> Self {
NodeService {
channels,
global_state,
}
}
}

impl Node for NodeServer {
impl Node for NodeService {
type BlockService = Self;
type ContentService = Self;
type GossipService = Self;
Expand All @@ -61,7 +64,7 @@ impl From<intercom::Error> for core_error::Error {
}
}

impl BlockService for NodeServer {
impl BlockService for NodeService {
type BlockId = HeaderHash;
type BlockDate = BlockDate;
type Block = Block;
Expand All @@ -75,7 +78,7 @@ impl BlockService for NodeServer {
type PullHeadersFuture = FutureResult<Self::PullHeadersStream, core_error::Error>;
type GetHeadersStream = ReplyStream<Header, core_error::Error>;
type GetHeadersFuture = FutureResult<Self::GetHeadersStream, core_error::Error>;
type BlockSubscription = Subscription<Header>;
type BlockSubscription = stream::Empty<Header, core_error::Error>;
type BlockSubscriptionFuture = FutureResult<Self::BlockSubscription, core_error::Error>;

fn tip(&mut self) -> Self::TipFuture {
Expand Down Expand Up @@ -145,12 +148,17 @@ impl BlockService for NodeServer {
error!("Block subscription failed: {:?}", err);
}),
);
let mut handles = self.state.propagation.lock().unwrap();
future::ok(handles.blocks.subscribe())

// FIXME: we can't have per-connection state associated with
// NodeService in the current tower-h2 design. Need to come up
// with a way to identify the peer making the subscription, so that
// we can use this stream for p2p propagation.
// See https://github.com/tower-rs/tower-h2/issues/64
future::ok(stream::empty())
}
}

impl ContentService for NodeServer {
impl ContentService for NodeService {
type Message = Message;
type MessageId = MessageId;
type ProposeTransactionsFuture =
Expand Down Expand Up @@ -179,17 +187,17 @@ impl ContentService for NodeServer {
}
}

impl GossipService for NodeServer {
impl GossipService for NodeService {
type NodeId = p2p::NodeId;
type Node = p2p::Node;
type GossipSubscription = Subscription<Gossip<p2p::Node>>;
type GossipSubscription = stream::Empty<Gossip<p2p::Node>, core_error::Error>;
type GossipSubscriptionFuture = FutureResult<Self::GossipSubscription, core_error::Error>;

fn gossip_subscription<In>(&mut self, inbound: In) -> Self::GossipSubscriptionFuture
where
In: Stream<Item = Gossip<Self::Node>, Error = core_error::Error> + Send + 'static,
{
let global_state = self.state.global.clone();
let global_state = self.global_state.clone();
tokio::spawn(
inbound
.for_each(move |gossip| {
Expand All @@ -200,7 +208,10 @@ impl GossipService for NodeServer {
error!("gossip subscription inbound stream error: {:?}", err);
}),
);

// TODO: send periodic updates to nodes
unimplemented!()
// See the BlockService::block_subscription impl for why this is
// currently not implemented.
future::ok(stream::empty())
}
}

0 comments on commit cbd15d4

Please sign in to comment.