From 863a1ceca293e418e620696dc517b16f29c66e71 Mon Sep 17 00:00:00 2001 From: Francisco Joray <121896075+franciscojoray@users.noreply.github.com> Date: Wed, 15 Jan 2025 11:57:22 -0300 Subject: [PATCH] feat(network): add `peersharing` protocol module (#574) --- pallas-network/src/miniprotocols/mod.rs | 1 + .../src/miniprotocols/peersharing/client.rs | 134 ++++++++++++++++++ .../src/miniprotocols/peersharing/codec.rs | 100 +++++++++++++ .../src/miniprotocols/peersharing/mod.rs | 8 ++ .../src/miniprotocols/peersharing/protocol.rs | 27 ++++ .../src/miniprotocols/peersharing/server.rs | 126 ++++++++++++++++ 6 files changed, 396 insertions(+) create mode 100644 pallas-network/src/miniprotocols/peersharing/client.rs create mode 100644 pallas-network/src/miniprotocols/peersharing/codec.rs create mode 100644 pallas-network/src/miniprotocols/peersharing/mod.rs create mode 100644 pallas-network/src/miniprotocols/peersharing/protocol.rs create mode 100644 pallas-network/src/miniprotocols/peersharing/server.rs diff --git a/pallas-network/src/miniprotocols/mod.rs b/pallas-network/src/miniprotocols/mod.rs index dbbd59c1..37633641 100644 --- a/pallas-network/src/miniprotocols/mod.rs +++ b/pallas-network/src/miniprotocols/mod.rs @@ -8,6 +8,7 @@ pub mod handshake; pub mod keepalive; pub mod localstate; pub mod localtxsubmission; +pub mod peersharing; pub mod txmonitor; pub mod txsubmission; diff --git a/pallas-network/src/miniprotocols/peersharing/client.rs b/pallas-network/src/miniprotocols/peersharing/client.rs new file mode 100644 index 00000000..e38b3384 --- /dev/null +++ b/pallas-network/src/miniprotocols/peersharing/client.rs @@ -0,0 +1,134 @@ +use std::fmt::Debug; +use thiserror::*; +use tracing::debug; + +use super::protocol::*; +use crate::multiplexer; + +#[derive(Error, Debug)] +pub enum ClientError { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("requested amount mismatch")] + RequestedAmountMismatch, + + #[error("error while sending or receiving data through the channel")] + Plexer(multiplexer::Error), +} + +pub struct Client(State, multiplexer::ChannelBuffer); + +impl Client { + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Idle, multiplexer::ChannelBuffer::new(channel)) + } + + pub fn state(&self) -> &State { + &self.0 + } + + pub fn is_done(&self) -> bool { + self.0 == State::Done + } + + fn has_agency(&self) -> bool { + match &self.0 { + State::Idle => true, + State::Busy(..) => false, + State::Done => false, + } + } + + fn assert_agency_is_ours(&self) -> Result<(), ClientError> { + if !self.has_agency() { + Err(ClientError::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), ClientError> { + if self.has_agency() { + Err(ClientError::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), ClientError> { + match (&self.0, msg) { + (State::Idle, Message::ShareRequest(..)) => Ok(()), + (State::Idle, Message::Done) => Ok(()), + _ => Err(ClientError::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), ClientError> { + match (&self.0, msg) { + (State::Busy(..), Message::SharePeers(..)) => Ok(()), + _ => Err(ClientError::InvalidInbound), + } + } + + pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + self.1 + .send_msg_chunks(msg) + .await + .map_err(ClientError::Plexer)?; + + Ok(()) + } + + pub async fn recv_message(&mut self) -> Result { + self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().await.map_err(ClientError::Plexer)?; + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + pub async fn send_share_request(&mut self, amount: Amount) -> Result<(), ClientError> { + let msg = Message::ShareRequest(amount); + self.send_message(&msg).await?; + self.0 = State::Busy(amount); + debug!(amount, "sent share request message"); + + Ok(()) + } + + pub async fn recv_peer_addresses(&mut self) -> Result, ClientError> { + let msg = self.recv_message().await?; + match msg { + Message::SharePeers(addresses) => { + debug!( + length = addresses.len(), + ?addresses, + "received peer addresses" + ); + self.0 = State::Idle; + Ok(addresses) + } + _ => Err(ClientError::InvalidInbound), + } + } + + pub async fn send_done(&mut self) -> Result<(), ClientError> { + let msg = Message::Done; + self.send_message(&msg).await?; + self.0 = State::Done; + + Ok(()) + } +} diff --git a/pallas-network/src/miniprotocols/peersharing/codec.rs b/pallas-network/src/miniprotocols/peersharing/codec.rs new file mode 100644 index 00000000..c9df8d38 --- /dev/null +++ b/pallas-network/src/miniprotocols/peersharing/codec.rs @@ -0,0 +1,100 @@ +use super::protocol::*; +use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder}; + +impl Encode<()> for PeerAddress { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + PeerAddress::V4(address, port) => { + e.array(3)?.u16(0)?; + e.encode(address)?; + e.encode(port)?; + } + PeerAddress::V6(address, port) => { + e.array(3)?.u16(1)?; + e.encode(address)?; + e.encode(port)?; + } + } + + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for PeerAddress { + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + _ctx: &mut (), + ) -> Result { + d.array()?; + let label = d.u16()?; + + match label { + 0 => { + let address = d.decode()?; + let port = d.decode()?; + Ok(PeerAddress::V4(address, port)) + } + 1 => { + let address = d.decode()?; + let port = d.decode()?; + Ok(PeerAddress::V6(address, port)) + } + _ => Err(decode::Error::message("can't decode PeerAddress")), + } + } +} + +impl Encode<()> for Message { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + Message::ShareRequest(amount) => { + e.array(2)?.u16(0)?; + e.encode(amount)?; + } + Message::SharePeers(addresses) => { + e.array(2)?.u16(1)?; + e.begin_array()?; + for address in addresses { + e.encode(address)?; + } + e.end()?; + } + Message::Done => { + e.array(1)?.u16(2)?; + } + } + + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for Message { + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + _ctx: &mut (), + ) -> Result { + d.array()?; + let label = d.u16()?; + + match label { + 0 => { + let amount = d.decode()?; + Ok(Message::ShareRequest(amount)) + } + 1 => { + let addresses = d.decode()?; + Ok(Message::SharePeers(addresses)) + } + 2 => Ok(Message::Done), + _ => Err(decode::Error::message("can't decode Message")), + } + } +} diff --git a/pallas-network/src/miniprotocols/peersharing/mod.rs b/pallas-network/src/miniprotocols/peersharing/mod.rs new file mode 100644 index 00000000..bc68ddef --- /dev/null +++ b/pallas-network/src/miniprotocols/peersharing/mod.rs @@ -0,0 +1,8 @@ +mod client; +mod codec; +mod protocol; +mod server; + +pub use client::*; +pub use protocol::*; +pub use server::*; diff --git a/pallas-network/src/miniprotocols/peersharing/protocol.rs b/pallas-network/src/miniprotocols/peersharing/protocol.rs new file mode 100644 index 00000000..5678f549 --- /dev/null +++ b/pallas-network/src/miniprotocols/peersharing/protocol.rs @@ -0,0 +1,27 @@ +use std::fmt::Debug; + +use std::net::{Ipv4Addr, Ipv6Addr}; + +use crate::miniprotocols::localstate::queries_v16::primitives::Port; + +pub type Amount = u8; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum State { + Idle, + Busy(Amount), + Done, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum PeerAddress { + V4(Ipv4Addr, Port), + V6(Ipv6Addr, Port), +} + +#[derive(Debug)] +pub enum Message { + ShareRequest(Amount), + SharePeers(Vec), + Done, +} diff --git a/pallas-network/src/miniprotocols/peersharing/server.rs b/pallas-network/src/miniprotocols/peersharing/server.rs new file mode 100644 index 00000000..a8858857 --- /dev/null +++ b/pallas-network/src/miniprotocols/peersharing/server.rs @@ -0,0 +1,126 @@ +use std::fmt::Debug; +use thiserror::*; +use tracing::debug; + +use super::protocol::*; +use crate::multiplexer; + +#[derive(Error, Debug)] +pub enum ServerError { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("error while sending or receiving data through the channel")] + Plexer(multiplexer::Error), +} + +pub struct Server(State, multiplexer::ChannelBuffer); + +impl Server { + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Idle, multiplexer::ChannelBuffer::new(channel)) + } + + pub fn state(&self) -> &State { + &self.0 + } + + pub fn is_done(&self) -> bool { + self.0 == State::Done + } + + fn has_agency(&self) -> bool { + match &self.0 { + State::Idle => false, + State::Busy(..) => true, + State::Done => false, + } + } + + fn assert_agency_is_ours(&self) -> Result<(), ServerError> { + if !self.has_agency() { + Err(ServerError::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), ServerError> { + if self.has_agency() { + Err(ServerError::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), ServerError> { + match (&self.0, msg) { + (State::Busy(..), Message::SharePeers(..)) => Ok(()), + _ => Err(ServerError::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), ServerError> { + match (&self.0, msg) { + (State::Idle, Message::ShareRequest(..)) => Ok(()), + (State::Idle, Message::Done) => Ok(()), + _ => Err(ServerError::InvalidInbound), + } + } + + pub async fn send_message(&mut self, msg: &Message) -> Result<(), ServerError> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + self.1 + .send_msg_chunks(msg) + .await + .map_err(ServerError::Plexer)?; + + Ok(()) + } + + pub async fn recv_message(&mut self) -> Result { + self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().await.map_err(ServerError::Plexer)?; + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + pub async fn recv_share_request(&mut self) -> Result, ServerError> { + let msg = self.recv_message().await?; + match msg { + Message::ShareRequest(amount) => { + debug!(amount, "received share request"); + self.0 = State::Busy(amount); + Ok(Some(amount)) + } + Message::Done => { + debug!("client sent done message in peersharing protocol"); + self.0 = State::Done; + Ok(None) + } + _ => Err(ServerError::InvalidInbound), + } + } + + pub async fn send_peer_addresses( + &mut self, + response: Vec, + ) -> Result<(), ServerError> { + let msg = Message::SharePeers(response); + self.send_message(&msg).await?; + self.0 = State::Idle; + + Ok(()) + } +}