From c40f9dfe582f8127a27fe2f2b2b128338db61fb2 Mon Sep 17 00:00:00 2001 From: Boris Sabatier Date: Sun, 31 Dec 2023 08:39:28 +0100 Subject: [PATCH 1/2] feat: Allow to create tarpc on existing UDS listener --- tarpc/src/serde_transport.rs | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/tarpc/src/serde_transport.rs b/tarpc/src/serde_transport.rs index 1773e846..51ad9fbd 100644 --- a/tarpc/src/serde_transport.rs +++ b/tarpc/src/serde_transport.rs @@ -364,7 +364,19 @@ pub mod unix { Codec: Serializer + Deserializer, CodecFn: Fn() -> Codec, { - let listener = UnixListener::bind(path)?; + listen_on(UnixListener::bind(path)?, codec_fn).await + } + + /// Wrap accepted connections from `listener` in Unix Domain Socket transports. + pub async fn listen_on( + listener: UnixListener, + codec_fn: CodecFn, + ) -> io::Result> + where + Item: for<'de> Deserialize<'de>, + Codec: Serializer + Deserializer, + CodecFn: Fn() -> Codec, + { let local_addr = listener.local_addr()?; Ok(Incoming { listener, @@ -669,4 +681,25 @@ mod tests { assert_matches!(transport.next().await, None); Ok(()) } + + #[cfg(all(unix, feature = "unix"))] + #[tokio::test] + async fn uds_on_existing_transport() -> io::Result<()> { + use super::unix; + use super::*; + + let sock = unix::TempPathBuf::with_random("uds"); + let transport = tokio::net::UnixListener::bind(&sock)?; + let mut listener = unix::listen_on(transport, SymmetricalJson::::default).await?; + tokio::spawn(async move { + let mut transport = listener.next().await.unwrap().unwrap(); + let message = transport.next().await.unwrap().unwrap(); + transport.send(message).await.unwrap(); + }); + let mut transport = unix::connect(&sock, SymmetricalJson::::default).await?; + transport.send(String::from("test")).await?; + assert_matches!(transport.next().await, Some(Ok(s)) if s == "test"); + assert_matches!(transport.next().await, None); + Ok(()) + } } From 21570b678a25384b3af01fd9875eef6ab699df50 Mon Sep 17 00:00:00 2001 From: Boris Sabatier Date: Sun, 31 Dec 2023 08:39:40 +0100 Subject: [PATCH 2/2] feat: Allow to create tarpc on existing TCP listener --- tarpc/src/serde_transport.rs | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/tarpc/src/serde_transport.rs b/tarpc/src/serde_transport.rs index 51ad9fbd..a8eedd52 100644 --- a/tarpc/src/serde_transport.rs +++ b/tarpc/src/serde_transport.rs @@ -210,7 +210,19 @@ pub mod tcp { Codec: Serializer + Deserializer, CodecFn: Fn() -> Codec, { - let listener = TcpListener::bind(addr).await?; + listen_on(TcpListener::bind(addr).await?, codec_fn).await + } + + /// Wrap accepted connections from `listener` in TCP transports. + pub async fn listen_on( + listener: TcpListener, + codec_fn: CodecFn, + ) -> io::Result> + where + Item: for<'de> Deserialize<'de>, + Codec: Serializer + Deserializer, + CodecFn: Fn() -> Codec, + { let local_addr = listener.local_addr()?; Ok(Incoming { listener, @@ -662,6 +674,26 @@ mod tests { Ok(()) } + #[cfg(tcp)] + #[tokio::test] + async fn tcp_on_existing_transport() -> io::Result<()> { + use super::tcp; + + let transport = TcpListener::bind("0.0.0.0:0").await?; + let mut listener = tcp::listen_on(transport, SymmetricalJson::::default).await?; + let addr = listener.local_addr(); + tokio::spawn(async move { + let mut transport = listener.next().await.unwrap().unwrap(); + let message = transport.next().await.unwrap().unwrap(); + transport.send(message).await.unwrap(); + }); + let mut transport = tcp::connect(addr, SymmetricalJson::::default).await?; + transport.send(String::from("test")).await?; + assert_matches!(transport.next().await, Some(Ok(s)) if s == "test"); + assert_matches!(transport.next().await, None); + Ok(()) + } + #[cfg(all(unix, feature = "unix"))] #[tokio::test] async fn uds() -> io::Result<()> {