diff --git a/CHANGELOG.md b/CHANGELOG.md index 55e8b26..ee18251 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased - +## [0.3.0] - 07-01-22 ### Added - Add network transport configuration [#72] [#76] @@ -15,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add configurable FEC redundancy [#82] - Add configurable UDP send interval [#83] - Add UDP network tweak configuration [#86] +- Add dedicated tokio task to handle and decode chunks [#87] - Add logs to pending RwLock [#92] ### Fixed @@ -64,5 +64,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#80]: https://github.com/dusk-network/kadcast/issues/80 [#82]: https://github.com/dusk-network/kadcast/issues/82 [#83]: https://github.com/dusk-network/kadcast/issues/83 +[#87]: https://github.com/dusk-network/kadcast/issues/87 [#90]: https://github.com/dusk-network/kadcast/issues/90 [#92]: https://github.com/dusk-network/kadcast/issues/92 diff --git a/src/lib.rs b/src/lib.rs index 5bab83a..00cb296 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,6 +117,7 @@ impl Peer { listen_address, outbound_channel_rx, transport_conf, + channel_size, ); tokio::spawn(TableMantainer::start( bootstrapping_nodes, diff --git a/src/transport.rs b/src/transport.rs index 4fc1d01..e14bc3b 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -10,7 +10,7 @@ use socket2::SockRef; use tokio::{ io, net::UdpSocket, - sync::mpsc::{Receiver, Sender}, + sync::mpsc::{self, Receiver, Sender}, time::{self}, }; use tracing::*; @@ -27,6 +27,7 @@ use crate::{ }; pub(crate) type MessageBeanOut = (Message, Vec); pub(crate) type MessageBeanIn = (Message, SocketAddr); +type UDPChunk = (Vec, SocketAddr); const MAX_DATAGRAM_SIZE: usize = 65_507; pub(crate) struct WireNetwork {} @@ -40,76 +41,111 @@ impl WireNetwork { listen_address: String, outbound_channel_rx: Receiver, conf: HashMap, + channel_size: usize, ) { let listen_address = listen_address .parse() .expect("Unable to parse public_ip address"); let c = conf.clone(); tokio::spawn(async move { - WireNetwork::listen_out(outbound_channel_rx, &c) + WireNetwork::listen_out(outbound_channel_rx, &conf) .await .unwrap_or_else(|op| error!("Error in listen_out {:?}", op)); }); + + let (dec_chan_tx, dec_chan_rx) = mpsc::channel(channel_size); + + let c1 = c.clone(); tokio::spawn(async move { - WireNetwork::listen_in( - listen_address, - inbound_channel_tx.clone(), - &conf, - ) - .await - .unwrap_or_else(|op| error!("Error in listen_in {:?}", op)); + WireNetwork::decode(inbound_channel_tx.clone(), dec_chan_rx, &c) + .await + .unwrap_or_else(|op| error!("Error in decode {:?}", op)); + }); + + tokio::spawn(async move { + WireNetwork::listen_in(listen_address, dec_chan_tx.clone(), &c1) + .await + .unwrap_or_else(|op| error!("Error in listen_in {:?}", op)); }); } async fn listen_in( listen_address: SocketAddr, - inbound_channel_tx: Sender, + dec_chan_tx: Sender, conf: &HashMap, ) -> io::Result<()> { debug!("WireNetwork::listen_in started"); - let mut decoder = TransportDecoder::configure(conf); + let socket = UdpSocket::bind(listen_address) .await .expect("Unable to bind address"); - WireNetwork::configure_socket(&socket, conf)?; info!("Listening on: {}", socket.local_addr()?); + + // Try to extend socket recv buffer size + WireNetwork::configure_socket(&socket, conf)?; + + // Read UDP socket recv buffer and delegate the processing to decode + // task loop { let mut bytes = [0; MAX_DATAGRAM_SIZE]; - let (_, remote_address) = + let (len, remote_address) = socket.recv_from(&mut bytes).await.map_err(|e| { error!("Error receiving from socket {}", e); e })?; - match Message::unmarshal_binary(&mut &bytes[..]) { - Ok(deser) => { - debug!("> Received raw message {}", deser.type_byte()); - let to_process = decoder.decode(deser); - if let Some(message) = to_process { - let valid_header = PeerNode::verify_header( - message.header(), - &remote_address.ip(), - ); - match valid_header { - true => { - inbound_channel_tx - .send((message, remote_address)) - .await - .unwrap_or_else( - |op| error!("Unable to send to inbound channel {:?}", op), + dec_chan_tx + .send((bytes[0..len].to_vec(), remote_address)) + .await + .unwrap_or_else(|op| { + error!("Unable to send to dec_chan_tx channel {:?}", op) + }); + } + } + + async fn decode( + inbound_channel_tx: Sender, + mut dec_chan_rx: Receiver, + conf: &HashMap, + ) -> io::Result<()> { + debug!("WireNetwork::decode started"); + let mut decoder = TransportDecoder::configure(conf); + + loop { + if let Some((message, remote_address)) = dec_chan_rx.recv().await { + match Message::unmarshal_binary(&mut &message[..]) { + Ok(deser) => { + debug!("> Received raw message {}", deser.type_byte()); + let to_process = decoder.decode(deser); + if let Some(message) = to_process { + let valid_header = PeerNode::verify_header( + message.header(), + &remote_address.ip(), + ); + match valid_header { + true => { + inbound_channel_tx + .send((message, remote_address)) + .await + .unwrap_or_else( + |op| error!("Unable to send to inbound channel {:?}", op), + ); + } + false => { + error!( + "Invalid Id {:?} - {}", + message.header(), + &remote_address.ip() ); - } - false => { - error!( - "Invalid Id {:?} - {}", - message.header(), - &remote_address.ip() - ); + } } } } + Err(e) => error!( + "Error deser from {:?} - {} - {}", + message, remote_address, e + ), } - Err(e) => error!("Error deser from {} - {}", remote_address, e), } } }