Skip to content

Commit

Permalink
feat(rust): add docs and minor fixes for core tcp workers
Browse files Browse the repository at this point in the history
  • Loading branch information
conectado committed Jan 23, 2022
1 parent 0570ac1 commit 1942f29
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 36 deletions.
1 change: 0 additions & 1 deletion implementations/rust/ockam/ockam_transport_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod error;
#[cfg(test)]
mod error_test;
pub mod tcp;
pub mod utils;

/// TCP address type constant
pub const TCP: u8 = 1;
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
//! TCP functionality shared across transport protocols.
pub mod traits;
pub mod workers;
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,21 @@ where

/// Registers the passed pair using accepting from the address format `<PROTOCOL>#<peer>` where PROTOCOL is a universal constant identifying the protocol used and `peer` is either the address of hostname of the peer of the given pair.
async fn register(&self, pair: &WorkerPair<T, U>) -> Result<(), ockam_core::Error> {
let tcp_address: Address = format!("{}#{}", TCP, pair.peer()).into();
// definition on how to create the used address for the pair.
// Note: not using a lambda to be able to be generic over addr.
fn get_addr(addr: impl Display) -> Address {
format!("{TCP}#{addr}").into()
}

// define all external addresses for the pair
let tcp_address: Address = get_addr(pair.peer());
let mut accepts = vec![tcp_address];
accepts.extend(
pair.hostnames()
.iter()
.map(|x| Address::from_string(format!("{}#{}", TCP, x))),
);
accepts.extend(pair.hostnames().iter().map(get_addr));

// internal address of the pair
let self_addr = pair.tx_addr();

// Send registration request to router address(Note: the implementation needs to ensure that this message is correctly supported)
self.ctx()
.send(
self.addr().clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ use tracing::{debug, trace};
use crate::tcp::traits::io::{AsyncReadExt, AsyncWriteExt};
use crate::tcp::traits::{IntoSplit, PairRegister, TcpAccepter, TcpBinder};

/// A TCP Processor that listen for new connections and launch a new [WorkerPair](super::sender::WorkerPair) for each one.
///
/// The new Pair will be then registered with a `Router`.
///
/// Create and start this processor by using [TcpListenProcessor::start].
pub struct TcpListenProcessor<T, U> {
inner: T,
router_handle: U,
Expand All @@ -26,6 +31,7 @@ where
Y: AsyncWriteExt + Send + Unpin + 'static,
W: Send + Sync + Clone + Display + 'static,
{
/// Creates and start a new [TcpListenProcessor].
pub async fn start<A, B>(
ctx: &Context,
router_handle: U,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! Workers used to implement TCP transport protocols.
mod listener;
mod receiver;
mod sender;

use receiver::TcpRecvProcessor;

pub use listener::TcpListenProcessor;
pub use receiver::TcpRecvProcessor;
pub use sender::{TcpSendWorker, WorkerPair};
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ use tracing::{error, info, trace};

use crate::tcp::traits::io::AsyncReadExt;

pub struct TcpRecvProcessor<R> {
/// A TCP receiving message worker
///
/// This type will be created by [TcpSendWorker::start_pair](super::sender::TcpSendWorker::start_pair)
///
/// This half of the worker is created when spawning a new connection
/// worker pair, and listens for incoming TCP packets, to relay into
/// the node message system.
pub(crate) struct TcpRecvProcessor<R> {
peer_addr: Address,
tcp_stream: R,
cluster_name: &'static str,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::TcpRecvProcessor;
use crate::tcp::traits::io::{AsyncReadExt, AsyncWriteExt};
use crate::tcp::traits::{IntoSplit, TcpStreamConnector};
use crate::{utils, TransportError};
use crate::TransportError;
use core::ops::Deref;
use core::time::Duration;
use futures::future::{AbortHandle, Abortable};
use ockam_core::compat::boxed::Box;
use ockam_core::{async_trait, route, Any, Decodable};
use ockam_core::{async_trait, route, Any, Decodable, Encodable};
use ockam_core::{Address, Result, Routed, TransportMessage, Worker};
use ockam_node::Context;
use tracing::{debug, trace, warn};
Expand All @@ -24,12 +24,17 @@ where
U: Clone,
T: Deref<Target = [V]>,
{
/// Returns the hostnames of the remote peer associated with the worker pair.
pub fn hostnames(&self) -> &[V] {
&self.hostnames
}

/// Returns the socket address of the peer to the worker pair.
pub fn peer(&self) -> U {
self.peer.clone()
}

/// Returns the external address of the pair.
pub fn tx_addr(&self) -> Address {
self.tx_addr.clone()
}
Expand Down Expand Up @@ -93,6 +98,9 @@ where
}
}

/// Starts a pair of recv/send workers to process messages from `peer`.
///
/// It returns a [WorkerPair] that holds the information of the pair of workers.
pub async fn start_pair<Y, Z>(
ctx: &Context,
stream: Option<Y>,
Expand Down Expand Up @@ -170,6 +178,25 @@ where
}
}

fn prepare_message(msg: TransportMessage) -> Result<Vec<u8>> {
let mut msg_buf = msg.encode().map_err(|_| TransportError::SendBadMessage)?;

// Create a buffer that includes the message length in big endian
let mut len = (msg_buf.len() as u16).to_be_bytes().to_vec();

// Fun fact: reversing a vector in place, appending the length,
// and then reversing it again is faster for large message sizes
// than adding the large chunk of data.
//
// https://play.rust-lang.org/?version=stable&mode=release&edition=2018&gist=8669a640004ac85c7be38b19e3e73dcb
msg_buf.reverse();
len.reverse();
msg_buf.append(&mut len);
msg_buf.reverse();

Ok(msg_buf)
}

#[async_trait]
impl<T, U, V, W, X> Worker for TcpSendWorker<T, U, V, W>
where
Expand Down Expand Up @@ -237,7 +264,7 @@ where
let recipient = msg.msg_addr();
if recipient == self.internal_addr {
let msg = TransportMessage::v1(route![], route![], vec![]);
let msg = utils::prepare_message(msg)?;
let msg = prepare_message(msg)?;
// Sending empty heartbeat
if tx.write_all(&msg).await.is_err() {
warn!("Failed to send heartbeat to peer {}", self.peer);
Expand All @@ -253,7 +280,7 @@ where
// knows what to do with the incoming message
msg.onward_route.step()?;
// Create a message buffer with pre-pended length
let msg = utils::prepare_message(msg)?;
let msg = prepare_message(msg)?;

if tx.write_all(msg.as_slice()).await.is_err() {
warn!("Failed to send message to peer {}", self.peer);
Expand Down
23 changes: 0 additions & 23 deletions implementations/rust/ockam/ockam_transport_core/src/utils.rs

This file was deleted.

0 comments on commit 1942f29

Please sign in to comment.