Skip to content

Commit

Permalink
Cap mux simple (#5577)
Browse files Browse the repository at this point in the history
Signed-off-by: Emilia Hane <[email protected]>
  • Loading branch information
emhane authored Dec 8, 2023
1 parent 27da72c commit cd4d6c5
Show file tree
Hide file tree
Showing 12 changed files with 703 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/net/eth-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ reth-metrics.workspace = true
metrics.workspace = true

bytes.workspace = true
derive_more = "0.99.17"
thiserror.workspace = true
serde = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"] }
Expand All @@ -38,10 +39,12 @@ proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }

[dev-dependencies]
reth-net-common.workspace = true
reth-primitives = { workspace = true, features = ["arbitrary"] }
reth-tracing.workspace = true
ethers-core = { workspace = true, default-features = false }


test-fuzz = "4"
tokio-util = { workspace = true, features = ["io", "codec"] }
rand.workspace = true
Expand Down
41 changes: 27 additions & 14 deletions crates/net/eth-wire/src/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
EthMessage, EthMessageID, EthVersion,
};
use alloy_rlp::{Decodable, Encodable, RlpDecodable, RlpEncodable};
use derive_more::{Deref, DerefMut};
use reth_codecs::add_arbitrary_tests;
use reth_primitives::bytes::{BufMut, Bytes};
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -249,14 +250,23 @@ pub enum SharedCapability {
/// This represents the message ID offset for the first message of the eth capability in
/// the message id space.
offset: u8,
/// The number of messages of this capability. Needed to calculate range of message IDs in
/// demuxing.
messages: u8,
},
}

impl SharedCapability {
/// Creates a new [`SharedCapability`] based on the given name, offset, and version.
/// Creates a new [`SharedCapability`] based on the given name, offset, version (and messages
/// if the capability is custom).
///
/// Returns an error if the offset is equal or less than [`MAX_RESERVED_MESSAGE_ID`].
pub(crate) fn new(name: &str, version: u8, offset: u8) -> Result<Self, SharedCapabilityError> {
pub(crate) fn new(
name: &str,
version: u8,
offset: u8,
messages: u8,
) -> Result<Self, SharedCapabilityError> {
if offset <= MAX_RESERVED_MESSAGE_ID {
return Err(SharedCapabilityError::ReservedMessageIdOffset(offset))
}
Expand All @@ -266,6 +276,7 @@ impl SharedCapability {
_ => Ok(Self::UnknownCapability {
cap: Capability::new(name.to_string(), version as usize),
offset,
messages,
}),
}
}
Expand Down Expand Up @@ -324,18 +335,18 @@ impl SharedCapability {
}

/// Returns the number of protocol messages supported by this capability.
pub fn num_messages(&self) -> Result<u8, SharedCapabilityError> {
pub fn num_messages(&self) -> u8 {
match self {
SharedCapability::Eth { version: _version, .. } => Ok(EthMessageID::max() + 1),
_ => Err(SharedCapabilityError::UnknownCapability),
SharedCapability::Eth { version: _version, .. } => EthMessageID::max() + 1,
SharedCapability::UnknownCapability { messages, .. } => *messages,
}
}
}

/// Non-empty,ordered list of recognized shared capabilities.
///
/// Shared capabilities are ordered alphabetically by case sensitive name.
#[derive(Debug)]
#[derive(Debug, Clone, Deref, DerefMut, PartialEq, Eq)]
pub struct SharedCapabilities(Vec<SharedCapability>);

impl SharedCapabilities {
Expand Down Expand Up @@ -500,9 +511,14 @@ pub fn shared_capability_offsets(
for name in shared_capability_names {
let proto_version = shared_capabilities.get(&name).expect("shared; qed");

let shared_capability = SharedCapability::new(&name, proto_version.version as u8, offset)?;
let shared_capability = SharedCapability::new(
&name,
proto_version.version as u8,
offset,
proto_version.messages,
)?;

offset += proto_version.messages;
offset += shared_capability.num_messages();
shared_with_offsets.push(shared_capability);
}

Expand All @@ -519,9 +535,6 @@ pub enum SharedCapabilityError {
/// Unsupported `eth` version.
#[error(transparent)]
UnsupportedVersion(#[from] ParseVersionError),
/// Cannot determine the number of messages for unknown capabilities.
#[error("cannot determine the number of messages for unknown capabilities")]
UnknownCapability,
/// Thrown when the message id for a [SharedCapability] overlaps with the reserved p2p message
/// id space [`MAX_RESERVED_MESSAGE_ID`].
#[error("message id offset `{0}` is reserved")]
Expand All @@ -541,7 +554,7 @@ mod tests {

#[test]
fn from_eth_68() {
let capability = SharedCapability::new("eth", 68, MAX_RESERVED_MESSAGE_ID + 1).unwrap();
let capability = SharedCapability::new("eth", 68, MAX_RESERVED_MESSAGE_ID + 1, 13).unwrap();

assert_eq!(capability.name(), "eth");
assert_eq!(capability.version(), 68);
Expand All @@ -556,7 +569,7 @@ mod tests {

#[test]
fn from_eth_67() {
let capability = SharedCapability::new("eth", 67, MAX_RESERVED_MESSAGE_ID + 1).unwrap();
let capability = SharedCapability::new("eth", 67, MAX_RESERVED_MESSAGE_ID + 1, 13).unwrap();

assert_eq!(capability.name(), "eth");
assert_eq!(capability.version(), 67);
Expand All @@ -571,7 +584,7 @@ mod tests {

#[test]
fn from_eth_66() {
let capability = SharedCapability::new("eth", 66, MAX_RESERVED_MESSAGE_ID + 1).unwrap();
let capability = SharedCapability::new("eth", 66, MAX_RESERVED_MESSAGE_ID + 1, 15).unwrap();

assert_eq!(capability.name(), "eth");
assert_eq!(capability.version(), 66);
Expand Down
2 changes: 1 addition & 1 deletion crates/net/eth-wire/src/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl Decodable for DisconnectReason {
/// lower-level disconnect functions (such as those that exist in the `p2p` protocol) if the
/// underlying stream supports it.
#[async_trait::async_trait]
pub trait CanDisconnect<T>: Sink<T> + Unpin + Sized {
pub trait CanDisconnect<T>: Sink<T> + Unpin {
/// Disconnects from the underlying stream, using a [`DisconnectReason`] as disconnect
/// information if the stream implements a protocol that can carry the additional disconnect
/// metadata.
Expand Down
9 changes: 8 additions & 1 deletion crates/net/eth-wire/src/errors/eth.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Error handling for (`EthStream`)[crate::EthStream]
use crate::{
errors::P2PStreamError, version::ParseVersionError, DisconnectReason, EthMessageID, EthVersion,
errors::{MuxDemuxError, P2PStreamError},
version::ParseVersionError,
DisconnectReason, EthMessageID, EthVersion,
};
use reth_primitives::{Chain, GotExpected, GotExpectedBoxed, ValidationError, B256};
use std::io;
Expand All @@ -13,6 +15,9 @@ pub enum EthStreamError {
/// Error of the underlying P2P connection.
P2PStreamError(#[from] P2PStreamError),
#[error(transparent)]
/// Error of the underlying de-/muxed P2P connection.
MuxDemuxError(#[from] MuxDemuxError),
#[error(transparent)]
/// Failed to parse peer's version.
ParseVersionError(#[from] ParseVersionError),
#[error(transparent)]
Expand Down Expand Up @@ -43,6 +48,8 @@ impl EthStreamError {
pub fn as_disconnected(&self) -> Option<DisconnectReason> {
if let EthStreamError::P2PStreamError(err) = self {
err.as_disconnected()
} else if let EthStreamError::MuxDemuxError(MuxDemuxError::P2PStreamError(err)) = self {
err.as_disconnected()
} else {
None
}
Expand Down
2 changes: 2 additions & 0 deletions crates/net/eth-wire/src/errors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! Error types for stream variants
mod eth;
mod muxdemux;
mod p2p;

pub use eth::*;
pub use muxdemux::*;
pub use p2p::*;
47 changes: 47 additions & 0 deletions crates/net/eth-wire/src/errors/muxdemux.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use thiserror::Error;

use crate::capability::{SharedCapabilityError, UnsupportedCapabilityError};

use super::P2PStreamError;

/// Errors thrown by de-/muxing.
#[derive(Error, Debug)]
pub enum MuxDemuxError {
/// Error of the underlying P2P connection.
#[error(transparent)]
P2PStreamError(#[from] P2PStreamError),
/// Stream is in use by secondary stream impeding disconnect.
#[error("secondary streams are still running")]
StreamInUse,
/// Stream has already been set up for this capability stream type.
#[error("stream already init for stream type")]
StreamAlreadyExists,
/// Capability stream type is not shared with peer on underlying p2p connection.
#[error("stream type is not shared on this p2p connection")]
CapabilityNotShared,
/// Capability stream type has not been configured in [`crate::muxdemux::MuxDemuxer`].
#[error("stream type is not configured")]
CapabilityNotConfigured,
/// Capability stream type has not been configured for
/// [`crate::capability::SharedCapabilities`] type.
#[error("stream type is not recognized")]
CapabilityNotRecognized,
/// Message ID is out of range.
#[error("message id out of range, {0}")]
MessageIdOutOfRange(u8),
/// Demux channel failed.
#[error("sending demuxed bytes to secondary stream failed")]
SendIngressBytesFailed,
/// Mux channel failed.
#[error("sending bytes from secondary stream to mux failed")]
SendEgressBytesFailed,
/// Attempt to disconnect the p2p stream via a stream clone.
#[error("secondary stream cannot disconnect p2p stream")]
CannotDisconnectP2PStream,
/// Shared capability error.
#[error(transparent)]
SharedCapabilityError(#[from] SharedCapabilityError),
/// Capability not supported on the p2p connection.
#[error(transparent)]
UnsupportedCapabilityError(#[from] UnsupportedCapabilityError),
}
2 changes: 1 addition & 1 deletion crates/net/eth-wire/src/ethstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ where
// start_disconnect, which would ideally be a part of the CanDisconnect trait, or at
// least similar.
//
// Other parts of reth do not need traits like CanDisconnect because they work
// Other parts of reth do not yet need traits like CanDisconnect because atm they work
// exclusively with EthStream<P2PStream<S>>, where the inner P2PStream is accessible,
// allowing for its start_disconnect method to be called.
//
Expand Down
6 changes: 5 additions & 1 deletion crates/net/eth-wire/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod errors;
mod ethstream;
mod hello;
pub mod multiplex;
pub mod muxdemux;
mod p2pstream;
mod pinger;
pub mod protocol;
Expand All @@ -37,11 +38,14 @@ pub use tokio_util::codec::{
};

pub use crate::{
capability::Capability,
disconnect::{CanDisconnect, DisconnectReason},
ethstream::{EthStream, UnauthedEthStream, MAX_MESSAGE_SIZE},
hello::{HelloMessage, HelloMessageBuilder, HelloMessageWithProtocols},
muxdemux::{MuxDemuxStream, StreamClone},
p2pstream::{
P2PMessage, P2PMessageID, P2PStream, ProtocolVersion, UnauthedP2PStream,
DisconnectP2P, P2PMessage, P2PMessageID, P2PStream, ProtocolVersion, UnauthedP2PStream,
MAX_RESERVED_MESSAGE_ID,
},
types::EthVersion,
};
Loading

0 comments on commit cd4d6c5

Please sign in to comment.