Skip to content

Commit

Permalink
feat(network): add peersharing protocol module (#574)
Browse files Browse the repository at this point in the history
  • Loading branch information
franciscojoray authored Jan 15, 2025
1 parent 307495b commit 863a1ce
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 0 deletions.
1 change: 1 addition & 0 deletions pallas-network/src/miniprotocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod handshake;
pub mod keepalive;
pub mod localstate;
pub mod localtxsubmission;
pub mod peersharing;
pub mod txmonitor;
pub mod txsubmission;

Expand Down
134 changes: 134 additions & 0 deletions pallas-network/src/miniprotocols/peersharing/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::fmt::Debug;
use thiserror::*;
use tracing::debug;

use super::protocol::*;
use crate::multiplexer;

#[derive(Error, Debug)]
pub enum ClientError {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,

#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,

#[error("inbound message is not valid for current state")]
InvalidInbound,

#[error("outbound message is not valid for current state")]
InvalidOutbound,

#[error("requested amount mismatch")]
RequestedAmountMismatch,

#[error("error while sending or receiving data through the channel")]
Plexer(multiplexer::Error),
}

pub struct Client(State, multiplexer::ChannelBuffer);

impl Client {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(State::Idle, multiplexer::ChannelBuffer::new(channel))
}

pub fn state(&self) -> &State {
&self.0
}

pub fn is_done(&self) -> bool {
self.0 == State::Done
}

fn has_agency(&self) -> bool {
match &self.0 {
State::Idle => true,
State::Busy(..) => false,
State::Done => false,
}
}

fn assert_agency_is_ours(&self) -> Result<(), ClientError> {
if !self.has_agency() {
Err(ClientError::AgencyIsTheirs)
} else {
Ok(())
}
}

fn assert_agency_is_theirs(&self) -> Result<(), ClientError> {
if self.has_agency() {
Err(ClientError::AgencyIsOurs)
} else {
Ok(())
}
}

fn assert_outbound_state(&self, msg: &Message) -> Result<(), ClientError> {
match (&self.0, msg) {
(State::Idle, Message::ShareRequest(..)) => Ok(()),
(State::Idle, Message::Done) => Ok(()),
_ => Err(ClientError::InvalidOutbound),
}
}

fn assert_inbound_state(&self, msg: &Message) -> Result<(), ClientError> {
match (&self.0, msg) {
(State::Busy(..), Message::SharePeers(..)) => Ok(()),
_ => Err(ClientError::InvalidInbound),
}
}

pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(ClientError::Plexer)?;

Ok(())
}

pub async fn recv_message(&mut self) -> Result<Message, ClientError> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(ClientError::Plexer)?;
self.assert_inbound_state(&msg)?;

Ok(msg)
}

pub async fn send_share_request(&mut self, amount: Amount) -> Result<(), ClientError> {
let msg = Message::ShareRequest(amount);
self.send_message(&msg).await?;
self.0 = State::Busy(amount);
debug!(amount, "sent share request message");

Ok(())
}

pub async fn recv_peer_addresses(&mut self) -> Result<Vec<PeerAddress>, ClientError> {
let msg = self.recv_message().await?;
match msg {
Message::SharePeers(addresses) => {
debug!(
length = addresses.len(),
?addresses,
"received peer addresses"
);
self.0 = State::Idle;
Ok(addresses)
}
_ => Err(ClientError::InvalidInbound),
}
}

pub async fn send_done(&mut self) -> Result<(), ClientError> {
let msg = Message::Done;
self.send_message(&msg).await?;
self.0 = State::Done;

Ok(())
}
}
100 changes: 100 additions & 0 deletions pallas-network/src/miniprotocols/peersharing/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::protocol::*;
use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder};

impl Encode<()> for PeerAddress {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
PeerAddress::V4(address, port) => {
e.array(3)?.u16(0)?;
e.encode(address)?;
e.encode(port)?;
}
PeerAddress::V6(address, port) => {
e.array(3)?.u16(1)?;
e.encode(address)?;
e.encode(port)?;
}
}

Ok(())
}
}

impl<'b> Decode<'b, ()> for PeerAddress {
fn decode(
d: &mut pallas_codec::minicbor::Decoder<'b>,
_ctx: &mut (),
) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;

match label {
0 => {
let address = d.decode()?;
let port = d.decode()?;
Ok(PeerAddress::V4(address, port))
}
1 => {
let address = d.decode()?;
let port = d.decode()?;
Ok(PeerAddress::V6(address, port))
}
_ => Err(decode::Error::message("can't decode PeerAddress")),
}
}
}

impl Encode<()> for Message {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Message::ShareRequest(amount) => {
e.array(2)?.u16(0)?;
e.encode(amount)?;
}
Message::SharePeers(addresses) => {
e.array(2)?.u16(1)?;
e.begin_array()?;
for address in addresses {
e.encode(address)?;
}
e.end()?;
}
Message::Done => {
e.array(1)?.u16(2)?;
}
}

Ok(())
}
}

impl<'b> Decode<'b, ()> for Message {
fn decode(
d: &mut pallas_codec::minicbor::Decoder<'b>,
_ctx: &mut (),
) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;

match label {
0 => {
let amount = d.decode()?;
Ok(Message::ShareRequest(amount))
}
1 => {
let addresses = d.decode()?;
Ok(Message::SharePeers(addresses))
}
2 => Ok(Message::Done),
_ => Err(decode::Error::message("can't decode Message")),
}
}
}
8 changes: 8 additions & 0 deletions pallas-network/src/miniprotocols/peersharing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod client;
mod codec;
mod protocol;
mod server;

pub use client::*;
pub use protocol::*;
pub use server::*;
27 changes: 27 additions & 0 deletions pallas-network/src/miniprotocols/peersharing/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::fmt::Debug;

use std::net::{Ipv4Addr, Ipv6Addr};

use crate::miniprotocols::localstate::queries_v16::primitives::Port;

pub type Amount = u8;

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Idle,
Busy(Amount),
Done,
}

#[derive(Debug, PartialEq, Clone)]
pub enum PeerAddress {
V4(Ipv4Addr, Port),
V6(Ipv6Addr, Port),
}

#[derive(Debug)]
pub enum Message {
ShareRequest(Amount),
SharePeers(Vec<PeerAddress>),
Done,
}
Loading

0 comments on commit 863a1ce

Please sign in to comment.