Skip to content

Commit

Permalink
feat(rust): adds smoltcp transport implementation necessary for client
Browse files Browse the repository at this point in the history
This version uses busy polling and that can cause some issues when
closing the connection which causes the processor to hang.
  • Loading branch information
conectado committed Nov 19, 2021
1 parent e8800c9 commit a717755
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 33 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ ockam_transport_smoltcp = { path = "../../../ockam_transport_smoltcp" }
# https://github.com/rust-lang/rust/issues/27812
serde = { version = "1.0", features = ["derive"] }
tracing = { version = "0.1", default-features = false }
rand = { version = "0.8.4", default-features = false }
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,49 @@ extern crate tracing;

use ockam::{Context, Result, Route};
use ockam_transport_smoltcp::{SmolTcpTransport, TCP};
use rand::Rng;

fn get_peer_addr() -> String {
std::env::args()
.skip(1)
.take(1)
.next()
.unwrap_or(format!("127.0.0.1:10222"))
fn get_peer_addr() -> (String, String) {
let mut args = std::env::args().skip(1).take(2);

(
args.next().unwrap_or("192.168.69.100:6969".to_string()),
args.next().unwrap_or("192.168.69.1".to_string()),
)
}

#[ockam::node]
async fn main(mut ctx: Context) -> Result<()> {
// Get our peer address
let peer_addr = get_peer_addr();
let (peer_ip_addr, bind_ip_addr) = get_peer_addr();

// TODO: Should the transport parse it?
let (peer_ip_addr, peer_port) = match peer_ip_addr.split(":").collect::<Vec<&str>>()[..] {
[peer_ip_addr, peer_port, ..] => (peer_ip_addr, peer_port.parse().unwrap()),
_ => panic!("Cannot parse address"),
};

// Use a random port
// Note: the linux kernel lets a connection hangs for a while so if you re-use a port
// and you're using a server running on linux the example might randomly fail.
let bind_port = rand::thread_rng().gen_range(4096..65535);

let default_gateway = "192.168.69.100";

// Initialize the TCP stack by opening a connection to a the remote
let tcp = SmolTcpTransport::create(&ctx).await?;
tcp.connect(&peer_addr).await?;
tcp.connect(
&bind_ip_addr,
bind_port,
&peer_ip_addr,
peer_port,
&default_gateway,
)
.await?;

// Send a message to the remote
ctx.send(
Route::new()
.append_t(TCP, &peer_addr)
.append_t(TCP, format!("{}:{}", peer_ip_addr, peer_port))
.append("echo_service"),
String::from("Hello you over there!"),
)
Expand Down
3 changes: 2 additions & 1 deletion implementations/rust/ockam/ockam_transport_core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub enum TransportError {
UnknownRoute,
/// Failed to parse the socket address
InvalidAddress,
/// Failed to read message (buffer exhausted) or failed to send it (size is too big)
/// Failed to read message (buffer exhausted), failed to send it (size is too big)
Capacity,
/// Failed to encode message
Encoding,
Expand Down Expand Up @@ -60,6 +60,7 @@ impl From<smoltcp::Error> for TransportError {
fn from(e: smoltcp::Error) -> Self {
match e {
smoltcp::Error::Illegal => Self::BindFailed,
smoltcp::Error::Exhausted => Self::Capacity, // TODO: update docs or change error mapping
_ => Self::GenericIo,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use ockam_core::{async_trait, RouterMessage};
use ockam_core::{Address, AsyncTryClone, Result};
use ockam_node::Context;
use smoltcp::wire::IpAddress;
use smoltcp::wire::{IpAddress, Ipv4Address};

use crate::workers::{TcpListenerProcessor, WorkerPair};
use crate::workers::{ConnectConfiguration, TcpListenerProcessor, TcpMode, WorkerPair};
use crate::TCP;

pub(crate) struct TcpRouterHandle {
Expand All @@ -28,7 +28,43 @@ impl TcpRouterHandle {
impl TcpRouterHandle {
pub async fn bind(&self, ip_addr: impl Into<IpAddress>, port: u16) -> Result<()> {
let ip_addr = ip_addr.into();
TcpListenerProcessor::start(&self.ctx, self.async_try_clone().await?, ip_addr, port).await
TcpListenerProcessor::start(
&self.ctx,
self.async_try_clone().await?,
ip_addr,
port,
TcpMode::Listen,
)
.await
}

pub async fn connect(
&self,
bind_ip_addr: impl Into<IpAddress>,
bind_port: u16,
peer_ip: impl Into<IpAddress>,
peer_port: u16,
default_gateway: Ipv4Address,
) -> Result<()> {
// TODO: Resolve names if at all possible
//let (peer_addr, hostnames) = Self::resolve_peer(peer.as_ref())?;

let connect_config = ConnectConfiguration {
default_gateway,
peer_addr: peer_ip.into(),
peer_port,
};

TcpListenerProcessor::start(
&self.ctx,
self.async_try_clone().await?,
bind_ip_addr.into(),
bind_port,
TcpMode::Connect(connect_config),
)
.await?;

Ok(())
}

pub async fn register(&self, pair: &WorkerPair) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub(crate) use handle::*;
use ockam_core::{async_trait, Address, LocalMessage, Result, Routed, RouterMessage, Worker};
use ockam_node::Context;
use ockam_transport_core::TransportError;
use tracing::{debug, trace};
use tracing::{debug, error, trace};

use std::collections::BTreeMap;

Expand Down Expand Up @@ -81,6 +81,7 @@ impl TcpRouter {
// Connection already exists
next = n.clone();
} else {
error!("Unkown route");
/* === TODO: Allow auto connect ===
// No existing connection
let peer_str;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use ockam_core::Result;
use ockam_node::Context;
use ockam_transport_core::TransportError;

use smoltcp::wire::IpAddress;
use smoltcp::wire::{IpAddress, Ipv4Address};

use crate::router::{TcpRouter, TcpRouterHandle};

Expand All @@ -18,14 +18,39 @@ impl SmolTcpTransport {
}

// TODO: Accept and parse str
// Or maybe use `IpEndpoint`
pub async fn listen<S: AsRef<str>>(&self, bind_ip_addr: S, bind_port: u16) -> Result<()> {
let bind_ip_addr = IpAddress::from_str(bind_ip_addr.as_ref())
.map_err(|_| TransportError::InvalidAddress)?;
self.router_handle.bind(bind_ip_addr, bind_port).await?;
Ok(())
}

pub async fn connect<S: AsRef<str>>(&self, _peer: S) -> Result<()> {
Ok(())
pub async fn connect<S: AsRef<str>, U: AsRef<str>, T: AsRef<str>>(
&self,
bind_ip_addr: S,
bind_port: u16,
peer_ip_addr: U,
peer_port: u16,
default_gateway: T,
) -> Result<()> {
let bind_ip_addr = IpAddress::from_str(bind_ip_addr.as_ref())
.map_err(|_| TransportError::InvalidAddress)?;

let peer_ip_addr = IpAddress::from_str(peer_ip_addr.as_ref())
.map_err(|_| TransportError::InvalidAddress)?;

let default_gateway = Ipv4Address::from_str(default_gateway.as_ref())
.map_err(|_| TransportError::InvalidAddress)?;

self.router_handle
.connect(
bind_ip_addr,
bind_port,
peer_ip_addr,
peer_port,
default_gateway,
)
.await
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod listener;
mod receiver;
mod sender;
mod tcp_core;

pub(crate) use listener::*;
pub(crate) use sender::*;
pub(crate) use tcp_core::*;
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl Worker for TcpSendWorker {

if socket_can_send {
socket.send_slice(&msg[..]).expect("Socket was closed");
socket.close();
}
}

Expand Down
Loading

0 comments on commit a717755

Please sign in to comment.