From 1942f29c8d8670948748cf7fd15cc40bcb227ff6 Mon Sep 17 00:00:00 2001 From: conectado Date: Sun, 23 Jan 2022 17:44:55 -0300 Subject: [PATCH] feat(rust): add docs and minor fixes for core tcp workers --- .../ockam/ockam_transport_core/src/lib.rs | 1 - .../ockam/ockam_transport_core/src/tcp/mod.rs | 1 + .../src/tcp/traits/mod.rs | 18 ++++++---- .../src/tcp/workers/listener.rs | 6 ++++ .../src/tcp/workers/mod.rs | 4 ++- .../src/tcp/workers/receiver.rs | 9 ++++- .../src/tcp/workers/sender.rs | 35 ++++++++++++++++--- .../ockam/ockam_transport_core/src/utils.rs | 23 ------------ 8 files changed, 61 insertions(+), 36 deletions(-) delete mode 100644 implementations/rust/ockam/ockam_transport_core/src/utils.rs diff --git a/implementations/rust/ockam/ockam_transport_core/src/lib.rs b/implementations/rust/ockam/ockam_transport_core/src/lib.rs index fe2f7f8b212..073dc29e729 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/lib.rs @@ -10,7 +10,6 @@ mod error; #[cfg(test)] mod error_test; pub mod tcp; -pub mod utils; /// TCP address type constant pub const TCP: u8 = 1; diff --git a/implementations/rust/ockam/ockam_transport_core/src/tcp/mod.rs b/implementations/rust/ockam/ockam_transport_core/src/tcp/mod.rs index c9ed482aa92..dfa2274d413 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/tcp/mod.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/tcp/mod.rs @@ -1,2 +1,3 @@ +//! TCP functionality shared across transport protocols. pub mod traits; pub mod workers; diff --git a/implementations/rust/ockam/ockam_transport_core/src/tcp/traits/mod.rs b/implementations/rust/ockam/ockam_transport_core/src/tcp/traits/mod.rs index 5a2abdb9ac4..d4ac2f5d6dc 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/tcp/traits/mod.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/tcp/traits/mod.rs @@ -209,15 +209,21 @@ where /// Registers the passed pair using accepting from the address format `#` where PROTOCOL is a universal constant identifying the protocol used and `peer` is either the address of hostname of the peer of the given pair. async fn register(&self, pair: &WorkerPair) -> Result<(), ockam_core::Error> { - let tcp_address: Address = format!("{}#{}", TCP, pair.peer()).into(); + // definition on how to create the used address for the pair. + // Note: not using a lambda to be able to be generic over addr. + fn get_addr(addr: impl Display) -> Address { + format!("{TCP}#{addr}").into() + } + + // define all external addresses for the pair + let tcp_address: Address = get_addr(pair.peer()); let mut accepts = vec![tcp_address]; - accepts.extend( - pair.hostnames() - .iter() - .map(|x| Address::from_string(format!("{}#{}", TCP, x))), - ); + accepts.extend(pair.hostnames().iter().map(get_addr)); + + // internal address of the pair let self_addr = pair.tx_addr(); + // Send registration request to router address(Note: the implementation needs to ensure that this message is correctly supported) self.ctx() .send( self.addr().clone(), diff --git a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/listener.rs b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/listener.rs index d5be2ec6270..bf18730e8e3 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/listener.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/listener.rs @@ -11,6 +11,11 @@ use tracing::{debug, trace}; use crate::tcp::traits::io::{AsyncReadExt, AsyncWriteExt}; use crate::tcp::traits::{IntoSplit, PairRegister, TcpAccepter, TcpBinder}; +/// A TCP Processor that listen for new connections and launch a new [WorkerPair](super::sender::WorkerPair) for each one. +/// +/// The new Pair will be then registered with a `Router`. +/// +/// Create and start this processor by using [TcpListenProcessor::start]. pub struct TcpListenProcessor { inner: T, router_handle: U, @@ -26,6 +31,7 @@ where Y: AsyncWriteExt + Send + Unpin + 'static, W: Send + Sync + Clone + Display + 'static, { + /// Creates and start a new [TcpListenProcessor]. pub async fn start( ctx: &Context, router_handle: U, diff --git a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/mod.rs b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/mod.rs index 99fae961778..e55f7806799 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/mod.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/mod.rs @@ -1,7 +1,9 @@ +//! Workers used to implement TCP transport protocols. mod listener; mod receiver; mod sender; +use receiver::TcpRecvProcessor; + pub use listener::TcpListenProcessor; -pub use receiver::TcpRecvProcessor; pub use sender::{TcpSendWorker, WorkerPair}; diff --git a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/receiver.rs index bd8ac7ff1d2..60164aa4d6c 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/receiver.rs @@ -8,7 +8,14 @@ use tracing::{error, info, trace}; use crate::tcp::traits::io::AsyncReadExt; -pub struct TcpRecvProcessor { +/// A TCP receiving message worker +/// +/// This type will be created by [TcpSendWorker::start_pair](super::sender::TcpSendWorker::start_pair) +/// +/// This half of the worker is created when spawning a new connection +/// worker pair, and listens for incoming TCP packets, to relay into +/// the node message system. +pub(crate) struct TcpRecvProcessor { peer_addr: Address, tcp_stream: R, cluster_name: &'static str, diff --git a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/sender.rs b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/sender.rs index d34b4f4486b..ca01e6e3782 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/tcp/workers/sender.rs @@ -1,12 +1,12 @@ use super::TcpRecvProcessor; use crate::tcp::traits::io::{AsyncReadExt, AsyncWriteExt}; use crate::tcp::traits::{IntoSplit, TcpStreamConnector}; -use crate::{utils, TransportError}; +use crate::TransportError; use core::ops::Deref; use core::time::Duration; use futures::future::{AbortHandle, Abortable}; use ockam_core::compat::boxed::Box; -use ockam_core::{async_trait, route, Any, Decodable}; +use ockam_core::{async_trait, route, Any, Decodable, Encodable}; use ockam_core::{Address, Result, Routed, TransportMessage, Worker}; use ockam_node::Context; use tracing::{debug, trace, warn}; @@ -24,12 +24,17 @@ where U: Clone, T: Deref, { + /// Returns the hostnames of the remote peer associated with the worker pair. pub fn hostnames(&self) -> &[V] { &self.hostnames } + + /// Returns the socket address of the peer to the worker pair. pub fn peer(&self) -> U { self.peer.clone() } + + /// Returns the external address of the pair. pub fn tx_addr(&self) -> Address { self.tx_addr.clone() } @@ -93,6 +98,9 @@ where } } + /// Starts a pair of recv/send workers to process messages from `peer`. + /// + /// It returns a [WorkerPair] that holds the information of the pair of workers. pub async fn start_pair( ctx: &Context, stream: Option, @@ -170,6 +178,25 @@ where } } +fn prepare_message(msg: TransportMessage) -> Result> { + let mut msg_buf = msg.encode().map_err(|_| TransportError::SendBadMessage)?; + + // Create a buffer that includes the message length in big endian + let mut len = (msg_buf.len() as u16).to_be_bytes().to_vec(); + + // Fun fact: reversing a vector in place, appending the length, + // and then reversing it again is faster for large message sizes + // than adding the large chunk of data. + // + // https://play.rust-lang.org/?version=stable&mode=release&edition=2018&gist=8669a640004ac85c7be38b19e3e73dcb + msg_buf.reverse(); + len.reverse(); + msg_buf.append(&mut len); + msg_buf.reverse(); + + Ok(msg_buf) +} + #[async_trait] impl Worker for TcpSendWorker where @@ -237,7 +264,7 @@ where let recipient = msg.msg_addr(); if recipient == self.internal_addr { let msg = TransportMessage::v1(route![], route![], vec![]); - let msg = utils::prepare_message(msg)?; + let msg = prepare_message(msg)?; // Sending empty heartbeat if tx.write_all(&msg).await.is_err() { warn!("Failed to send heartbeat to peer {}", self.peer); @@ -253,7 +280,7 @@ where // knows what to do with the incoming message msg.onward_route.step()?; // Create a message buffer with pre-pended length - let msg = utils::prepare_message(msg)?; + let msg = prepare_message(msg)?; if tx.write_all(msg.as_slice()).await.is_err() { warn!("Failed to send message to peer {}", self.peer); diff --git a/implementations/rust/ockam/ockam_transport_core/src/utils.rs b/implementations/rust/ockam/ockam_transport_core/src/utils.rs deleted file mode 100644 index 214e2e70385..00000000000 --- a/implementations/rust/ockam/ockam_transport_core/src/utils.rs +++ /dev/null @@ -1,23 +0,0 @@ -use ockam_core::compat::vec::Vec; -use ockam_core::{Encodable, Result, TransportMessage}; - -use crate::TransportError; - -pub fn prepare_message(msg: TransportMessage) -> Result> { - let mut msg_buf = msg.encode().map_err(|_| TransportError::SendBadMessage)?; - - // Create a buffer that includes the message length in big endian - let mut len = (msg_buf.len() as u16).to_be_bytes().to_vec(); - - // Fun fact: reversing a vector in place, appending the length, - // and then reversing it again is faster for large message sizes - // than adding the large chunk of data. - // - // https://play.rust-lang.org/?version=stable&mode=release&edition=2018&gist=8669a640004ac85c7be38b19e3e73dcb - msg_buf.reverse(); - len.reverse(); - msg_buf.append(&mut len); - msg_buf.reverse(); - - Ok(msg_buf) -}