diff --git a/Cargo.lock b/Cargo.lock index 80310d8205..3eac623a5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,17 +24,6 @@ version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" -[[package]] -name = "async-trait" -version = "0.1.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c4f3195085c36ea8d24d32b2f828d23296a9370a28aa39d111f6f16bef9f3b" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atty" version = "0.2.14" @@ -75,7 +64,6 @@ name = "casper-node" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "displaydoc", "either", "enum-iterator", diff --git a/Cargo.toml b/Cargo.toml index 1ea7a03a0e..18c428eb72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,14 @@ [package] name = "casper-node" version = "0.1.0" -authors = ["Marc Brinkmann ", - "Fraser Hutchison "] +authors = ["Marc Brinkmann ", "Fraser Hutchison "] edition = "2018" -description = "The CasperLabs blockchain node server" +description = "The CasperLabs blockchain node" publish = false # Prevent accidental `cargo publish` for now. license-file = "LICENSE" [dependencies] anyhow = "1.0.28" -async-trait = "0.1.31" displaydoc = "0.1.6" either = "1.5.3" enum-iterator = "0.6.0" diff --git a/README.md b/README.md index 733de2ba2a..0ead1ebf80 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,21 @@ -# CasperLabs node +# casper-node -The is the core application for the CasperLabs blockchain. - -## Building - -To compile this application, simply run `cargo build` on a recent stable Rust (`>= 1.43.1`) version. +This is the core application for the CasperLabs blockchain. ## Running a validator node -Launching a validator node with the default configuration is done by simply launching the application: +To run a validator node with the default configuration: ``` -casper-node validator +cargo run --release -- validator ``` It is very likely that the configuration requires editing though, so typically one will want to generate a configuration file first, edit it and then launch: ``` -casper-node generate-config > mynode.toml +cargo run --release -- generate-config > mynode.toml # ... edit mynode.toml -casper-node validator -c mynode.toml +cargo run --release -- validator --config=mynode.toml ``` ## Development @@ -30,4 +26,4 @@ A good starting point is to build the documentation and read it in your browser: cargo doc --no-deps --open ``` -When generating a configuration file, it is usually helpful to set the log-level to `DEBUG` during development. \ No newline at end of file +When generating a configuration file, it is usually helpful to set the log-level to `DEBUG` during development. diff --git a/images/CasperLabs_Logo_Favicon_RGB_50px.png b/images/CasperLabs_Logo_Favicon_RGB_50px.png new file mode 100644 index 0000000000..593254f7bd Binary files /dev/null and b/images/CasperLabs_Logo_Favicon_RGB_50px.png differ diff --git a/images/CasperLabs_Logo_Symbol_RGB.png b/images/CasperLabs_Logo_Symbol_RGB.png new file mode 100644 index 0000000000..fcc47b453c Binary files /dev/null and b/images/CasperLabs_Logo_Symbol_RGB.png differ diff --git a/src/cli.rs b/src/cli.rs index b59ae5a348..855fcdd9c4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,20 +1,28 @@ //! Command-line option parsing. //! -//! Most configuration is done through the configuration, which is the only required command-line -//! argument. However some configuration values can be overwritten for convenience's sake. -use std::{io, io::Write, path}; +//! Most configuration is done via config files (see [`config`](../config/index.html) for details). + +use std::{io, io::Write, path::PathBuf}; + +use anyhow::bail; use structopt::StructOpt; +use tracing::Level; -use crate::{config, reactor, tls}; +use crate::{ + config, + reactor::{self, validator::Reactor}, + tls, +}; // Note: The docstring on `Cli` is the help shown when calling the binary with `--help`. #[derive(Debug, StructOpt)] /// CasperLabs blockchain node. pub enum Cli { - /// Generate a self-signed node certificate + /// Generate a self-signed node certificate. GenerateCert { - /// Output path base the certificate, private key pair in PEM format. The cert will be stored as `output.crt.pem`, while the key will be stored as `output.key.pem`. - output: path::PathBuf, + /// Output path base of the certificate. The certificate will be stored as + /// `output.crt.pem`, while the key will be stored as `output.key.pem`. + output: PathBuf, }, /// Generate a configuration file from defaults and dump it to stdout. GenerateConfig {}, @@ -26,7 +34,7 @@ pub enum Cli { Validator { #[structopt(short, long, env)] /// Path to configuration file. - config: Option, + config: Option, /// Override log-level, forcing debug output. #[structopt(short, long)] @@ -35,12 +43,12 @@ pub enum Cli { } impl Cli { - /// Execute selected CLI command. + /// Executes selected CLI command. pub async fn run(self) -> anyhow::Result<()> { match self { Cli::GenerateCert { output } => { if output.file_name().is_none() { - anyhow::bail!("not a valid output path"); + bail!("not a valid output path"); } let mut cert_path = output.clone(); @@ -69,11 +77,11 @@ impl Cli { .transpose()? .unwrap_or_default(); if debug { - cfg.log.level = tracing::Level::DEBUG; + cfg.log.level = Level::DEBUG; } cfg.log.setup_logging()?; - reactor::launch::(cfg).await + reactor::launch::(cfg).await } } } diff --git a/src/components/small_network.rs b/src/components/small_network.rs index 89ba540376..e967cdc2d1 100644 --- a/src/components/small_network.rs +++ b/src/components/small_network.rs @@ -38,31 +38,53 @@ //! all nodes in the list and simultaneously tell all of its connected nodes about the new node, //! repeating the process. -use crate::effect::{Effect, EffectExt, EffectResultExt}; -use crate::tls::{self, Signed, TlsCert}; -use crate::util::{DisplayIter, Multiple}; -use crate::{config, reactor}; -use anyhow::Context; -use futures::{FutureExt, SinkExt, StreamExt}; +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, + fmt::{self, Debug, Display, Formatter}, + hash::Hash, + io, + net::{SocketAddr, TcpListener}, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::{anyhow, bail, Context}; +use futures::{ + stream::{SplitSink, SplitStream}, + FutureExt, SinkExt, StreamExt, +}; use maplit::hashmap; use openssl::{pkey, x509}; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -use std::hash::Hash; -use std::sync::Arc; -use std::{cmp, collections, fmt, io, net, time}; -use tokio::sync::mpsc; +use pkey::{PKey, Private}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::{ + net::TcpStream, + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, +}; +use tokio_openssl::SslStream; +use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{debug, error, info, warn}; +use x509::X509; -/// A node id. +use crate::{ + config, + effect::{Effect, EffectExt, EffectResultExt}, + reactor::{EventQueueHandle, Queue, Reactor}, + tls::{self, KeyFingerprint, Signed, TlsCert}, + utils::{DisplayIter, Multiple}, +}; + +/// A node ID. /// /// The key fingerprint found on TLS certificates. -pub type NodeId = tls::KeyFingerprint; +pub type NodeId = KeyFingerprint; #[derive(Clone, Debug, Deserialize, Serialize)] pub enum Message

{ /// A pruned set of all endpoint announcements the server has received. - Snapshot(collections::HashSet>), + Snapshot(HashSet>), /// Broadcast a new endpoint known to the sender. BroadcastEndpoint(Signed), /// A payload message. @@ -76,7 +98,7 @@ pub struct Endpoint { /// Will overflow earliest November 2262. timestamp_ns: u64, /// Socket address the node is listening on. - addr: net::SocketAddr, + addr: SocketAddr, /// Certificate. cert: TlsCert, } @@ -88,21 +110,18 @@ pub enum Event

{ /// Connection to the root node failed. RootFailed { error: anyhow::Error }, /// A new TCP connection has been established from an incoming connection. - IncomingNew { - stream: tokio::net::TcpStream, - addr: net::SocketAddr, - }, + IncomingNew { stream: TcpStream, addr: SocketAddr }, /// The TLS handshake completed on the incoming connection. IncomingHandshakeCompleted { result: anyhow::Result<(NodeId, Transport)>, - addr: net::SocketAddr, + addr: SocketAddr, }, /// Received network message. IncomingMessage { node_id: NodeId, msg: Message

}, /// Incoming connection closed. IncomingClosed { result: io::Result<()>, - addr: net::SocketAddr, + addr: SocketAddr, }, /// A new outgoing connection was successfully established. @@ -120,36 +139,36 @@ pub enum Event

{ pub struct SmallNetwork where - R: reactor::Reactor, + R: Reactor, { /// Configuration. cfg: config::SmallNetwork, /// Server certificate. - cert: Arc, + cert: Arc, /// Server private key. - private_key: Arc>, + private_key: Arc>, /// Handle to event queue. - eq: reactor::EventQueueHandle>, - /// A list of known endpoints by node id. - endpoints: collections::HashMap, + eq: EventQueueHandle>, + /// A list of known endpoints by node ID. + endpoints: HashMap, /// Stored signed endpoints that can be sent to other nodes. - signed_endpoints: collections::HashMap>, - /// Outgoing network connections messages. - outgoing: collections::HashMap>>, + signed_endpoints: HashMap>, + /// Outgoing network connections' messages. + outgoing: HashMap>>, } impl SmallNetwork where - R: reactor::Reactor + 'static, - P: Serialize + DeserializeOwned + Clone + fmt::Debug + Send + 'static, + R: Reactor + 'static, + P: Serialize + DeserializeOwned + Clone + Debug + Send + 'static, { #[allow(clippy::type_complexity)] pub fn new( - eq: reactor::EventQueueHandle>, + eq: EventQueueHandle>, cfg: config::SmallNetwork, ) -> anyhow::Result<(SmallNetwork, Multiple>>)> where - R: reactor::Reactor + 'static, + R: Reactor + 'static, { // First, we load or generate the TLS keys. let (cert, private_key) = match (&cfg.cert, &cfg.private_key) { @@ -165,7 +184,7 @@ where (None, None) => tls::generate_node_cert()?, // If we get only one of the two, return an error. - _ => anyhow::bail!("need either both or none of cert, private_key in network config"), + _ => bail!("need either both or none of cert, private_key in network config"), }; // We can now create a listener. @@ -174,9 +193,7 @@ where // Create the model. Initially we know our own endpoint address. let our_endpoint = Endpoint { - timestamp_ns: time::SystemTime::now() - .duration_since(time::UNIX_EPOCH)? - .as_nanos() as u64, + timestamp_ns: SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64, addr, cert: tls::validate_cert(cert.clone())?, }; @@ -194,7 +211,7 @@ where cert: Arc::new(cert), private_key: Arc::new(private_key), eq, - outgoing: collections::HashMap::new(), + outgoing: HashMap::new(), }; // Connect to the root node (even if we are the root node, just loopback). @@ -203,7 +220,7 @@ where Ok((model, effects)) } - /// Attempt to connect to the root node. + /// Attempts to connect to the root node. fn connect_to_root(&self) -> Multiple>> { connect_trusted( self.cfg.root_addr, @@ -270,7 +287,7 @@ where Event::IncomingClosed { result, addr } => { match result { Ok(()) => info!(%addr, "connection closed"), - Err(err) => warn!(%addr, %err, "connection dopped"), + Err(err) => warn!(%addr, %err, "connection dropped"), } Multiple::new() } @@ -326,14 +343,14 @@ where } } - /// Queue a message to be sent to all nodes. + /// Queues a message to be sent to all nodes. fn broadcast_message(&self, msg: Message

) { for node_id in self.outgoing.keys() { self.send_message(*node_id, msg.clone()); } } - /// Queue a message to be sent to a specific node. + /// Queues a message to be sent to a specific node. fn send_message(&self, dest: NodeId, msg: Message

) { // Try to send the message. if let Some(sender) = self.outgoing.get(&dest) { @@ -347,9 +364,9 @@ where } } - /// Update the internal endpoint store from a given endpoint. + /// Updates the internal endpoint store from a given endpoint. /// - /// Returns the node id of the endpoint if it was new. + /// Returns the node ID of the endpoint if it was new. #[inline] fn update_endpoint(&mut self, endpoint: &Endpoint) -> Option { let fp = endpoint.cert.public_key_fingerprint(); @@ -365,7 +382,7 @@ where Some(fp) } - /// Update internal endpoint store and if new, output a `BroadcastEndpoint` effect. + /// Updates internal endpoint store and if new, output a `BroadcastEndpoint` effect. #[inline] fn update_and_broadcast_if_new( &mut self, @@ -416,7 +433,7 @@ where } } - /// Setup an established outgoing connection. + /// Sets up an established outgoing connection. fn setup_outgoing( &mut self, node_id: NodeId, @@ -428,8 +445,8 @@ where let (sender, receiver) = mpsc::unbounded_channel(); if self.outgoing.insert(node_id, sender).is_some() { // We assume that for a reconnect to have happened, the outgoing entry must have - // been either non-existant yet or cleaned up by the handler of the connection - // closing event. If this not the case, an assumed invariant has been violated. + // been either non-existent yet or cleaned up by the handler of the connection + // closing event. If this is not the case, an assumed invariant has been violated. error!(%node_id, "did not expect leftover channel in outgoing map"); } @@ -444,7 +461,7 @@ where }) } - /// Handle received message. + /// Handles a received message. // Internal function to keep indentation and nesting sane. fn handle_message(&mut self, node_id: NodeId, msg: Message

) -> Multiple>> { match msg { @@ -467,14 +484,14 @@ where } } -/// Determine bind address for now. +/// Determines bind address for now. /// /// Will attempt to bind on the root address first if the `bind_interface` is the same as the /// interface of `root_addr`. Otherwise uses an unused port on `bind_interface`. -fn create_listener(cfg: &config::SmallNetwork) -> io::Result { +fn create_listener(cfg: &config::SmallNetwork) -> io::Result { if cfg.root_addr.ip() == cfg.bind_interface { // Try to become the root node, if the root nodes interface is available. - match net::TcpListener::bind(cfg.root_addr) { + match TcpListener::bind(cfg.root_addr) { Ok(listener) => { info!("we are the root node!"); return Ok(listener); @@ -489,38 +506,37 @@ fn create_listener(cfg: &config::SmallNetwork) -> io::Result { } // We did not become the root node, bind on random port. - Ok(net::TcpListener::bind((cfg.bind_interface, 0u16))?) + Ok(TcpListener::bind((cfg.bind_interface, 0u16))?) } /// Core accept loop for the networking server. /// /// Never terminates. -async fn server_task( - eq: reactor::EventQueueHandle>, +async fn server_task( + eq: EventQueueHandle>, mut listener: tokio::net::TcpListener, ) { loop { - // We handle accept errors here, since they can be caused by a temporary - // resource shortage or the remote side closing the connection while - // it is waiting in the queue. + // We handle accept errors here, since they can be caused by a temporary resource shortage + // or the remote side closing the connection while it is waiting in the queue. match listener.accept().await { Ok((stream, addr)) => { // Move the incoming connection to the event queue for handling. let ev = Event::IncomingNew { stream, addr }; - eq.schedule(ev, reactor::Queue::NetworkIncoming).await; + eq.schedule(ev, Queue::NetworkIncoming).await; } Err(err) => warn!(%err, "dropping incoming connection during accept"), } } } -/// Server-side TLS handshake +/// Server-side TLS handshake. /// /// This function groups the TLS handshake into a convenient function, enabling the `?` operator. async fn setup_tls( - stream: tokio::net::TcpStream, - cert: Arc, - private_key: Arc>, + stream: TcpStream, + cert: Arc, + private_key: Arc>, ) -> anyhow::Result<(NodeId, Transport)> { let tls_stream = tokio_openssl::accept( &tls::create_tls_acceptor(&cert.as_ref(), &private_key.as_ref())?, @@ -532,7 +548,7 @@ async fn setup_tls( let peer_cert = tls_stream .ssl() .peer_certificate() - .ok_or_else(|| anyhow::anyhow!("no peer certificate presented"))?; + .ok_or_else(|| anyhow!("no peer certificate presented"))?; Ok(( tls::validate_cert(peer_cert)?.public_key_fingerprint(), @@ -542,14 +558,14 @@ async fn setup_tls( /// Network message reader. /// -/// Schedules all received messages until the stream is closed or an error occurred. +/// Schedules all received messages until the stream is closed or an error occurs. async fn message_reader( - eq: reactor::EventQueueHandle>, - mut stream: futures::stream::SplitStream>, + eq: EventQueueHandle>, + mut stream: SplitStream>, node_id: NodeId, ) -> io::Result<()> where - R: reactor::Reactor, + R: Reactor, P: DeserializeOwned + Send, { while let Some(msg_result) = stream.next().await { @@ -558,7 +574,7 @@ where // We've received a message, push it to the reactor. eq.schedule( Event::IncomingMessage { node_id, msg }, - reactor::Queue::NetworkIncoming, + Queue::NetworkIncoming, ) .await; } @@ -571,12 +587,12 @@ where Ok(()) } -/// Network message sender +/// Network message sender. /// -/// Reads from a channel and sends all messages, until the stream is closed or an error occured. +/// Reads from a channel and sends all messages, until the stream is closed or an error occurs. async fn message_sender

( - mut queue: mpsc::UnboundedReceiver>, - mut sink: futures::stream::SplitSink, Message

>, + mut queue: UnboundedReceiver>, + mut sink: SplitSink, Message

>, ) -> io::Result<()> where P: Serialize + Send, @@ -590,47 +606,46 @@ where } /// Transport type alias for base encrypted connections. -type Transport = tokio_openssl::SslStream; +type Transport = SslStream; /// A framed transport for `Message`s. -type FramedTransport

= tokio_serde::SymmetricallyFramed< - tokio_util::codec::Framed, +type FramedTransport

= SymmetricallyFramed< + Framed, Message

, - tokio_serde::formats::SymmetricalMessagePack>, + SymmetricalMessagePack>, >; -/// Construct a new framed transport on a stream. +/// Constructs a new framed transport on a stream. fn framed

(stream: Transport) -> FramedTransport

{ - let length_delimited = - tokio_util::codec::Framed::new(stream, tokio_util::codec::LengthDelimitedCodec::new()); - tokio_serde::SymmetricallyFramed::new( + let length_delimited = Framed::new(stream, LengthDelimitedCodec::new()); + SymmetricallyFramed::new( length_delimited, - tokio_serde::formats::SymmetricalMessagePack::>::default(), + SymmetricalMessagePack::>::default(), ) } -/// Initiate a TLS connection to an endpoint. +/// Initiates a TLS connection to an endpoint. async fn connect_outgoing( endpoint: Endpoint, - cert: Arc, - private_key: Arc>, + cert: Arc, + private_key: Arc>, ) -> anyhow::Result { let (server_cert, transport) = connect_trusted(endpoint.addr, cert, private_key).await?; let remote_id = server_cert.public_key_fingerprint(); if remote_id != endpoint.cert.public_key_fingerprint() { - anyhow::bail!("remote node has wrong ID"); + bail!("remote node has wrong ID"); } Ok(transport) } -/// Initiate a TLS connection to a remote address, regardless of what ID the remote node reports. +/// Initiates a TLS connection to a remote address, regardless of what ID the remote node reports. async fn connect_trusted( - addr: net::SocketAddr, - cert: Arc, - private_key: Arc>, + addr: SocketAddr, + cert: Arc, + private_key: Arc>, ) -> anyhow::Result<(TlsCert, Transport)> { let mut config = tls::create_tls_connector(&cert, &private_key) .context("could not create TLS connector")? @@ -648,14 +663,14 @@ async fn connect_trusted( let server_cert = tls_stream .ssl() .peer_certificate() - .ok_or_else(|| anyhow::anyhow!("no server certificate presented"))?; + .ok_or_else(|| anyhow!("no server certificate presented"))?; Ok((tls::validate_cert(server_cert)?, tls_stream)) } // Impose a total ordering on endpoints. Compare timestamps first, if the same, order by actual // address. If both of these are the same, use the TLS certificate's fingerprint as a tie-breaker. impl Ord for Endpoint { - fn cmp(&self, other: &Self) -> cmp::Ordering { + fn cmp(&self, other: &Self) -> Ordering { Ord::cmp(&self.timestamp_ns, &other.timestamp_ns) .then_with(|| { Ord::cmp( @@ -668,16 +683,13 @@ impl Ord for Endpoint { } impl PartialOrd for Endpoint { #[inline] - fn partial_cmp(&self, other: &Self) -> Option { + fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl

fmt::Display for Message

-where - P: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for Message

{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Message::Snapshot(snapshot) => { write!(f, "snapshot: {:10}", DisplayIter::new(snapshot.iter())) @@ -688,11 +700,8 @@ where } } -impl

fmt::Display for Event

-where - P: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for Event

{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Event::RootConnected { cert, .. } => { write!(f, "root connected @ {}", cert.public_key_fingerprint()) @@ -722,8 +731,8 @@ where } } -impl fmt::Display for Endpoint { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for Endpoint { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( f, "{}@{} [{}]", @@ -734,12 +743,12 @@ impl fmt::Display for Endpoint { } } -impl fmt::Debug for SmallNetwork +impl Debug for SmallNetwork where - R: reactor::Reactor, - P: fmt::Debug, + R: Reactor, + P: Debug, { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("SmallNetwork") .field("cert", &"") .field("private_key", &"") diff --git a/src/config.rs b/src/config.rs index 57368ecb1b..aabb81c848 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,22 +1,32 @@ -//! Configuration file management +//! Configuration file management. //! -//! Configuration for the node is loaded from TOML files, but all configuration values have -//! sensible defaults. +//! Configuration for the node is loaded from TOML files, but all configuration values have sensible +//! defaults. //! -//! The `cli` offers an option to generate a configuration from defaults for editing. +//! The [`Cli`](../cli/enum.Cli.html#variant.GenerateConfig) offers an option to generate a +//! configuration from defaults for editing. I.e. running the following will dump a default +//! configuration file to stdout: +//! ``` +//! cargo run --release -- generate-config +//! ``` //! //! # Adding a configuration section //! //! When adding a section to the configuration, ensure that //! -//! * it has an entry in the root configuration `Config`, +//! * it has an entry in the root configuration [`Config`](struct.Config.html), //! * `Default` is implemented (derived or manually) with sensible defaults, and //! * it is completely documented. +use std::{ + fs, io, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::{Path, PathBuf}, +}; + use anyhow::Context; use serde::{Deserialize, Serialize}; -use std::{fs, io, net, path}; -use tracing::debug; +use tracing::{debug, Level}; /// Root configuration. #[derive(Debug, Deserialize, Serialize)] @@ -34,7 +44,7 @@ pub struct Config { pub struct Log { /// Log level. #[serde(with = "log_level")] - pub level: tracing::Level, + pub level: Level, } #[derive(Debug, Deserialize, Serialize)] @@ -42,31 +52,31 @@ pub struct Log { pub struct SmallNetwork { /// Interface to bind to. If it is the same as the in `root_addr`, attempt /// become the root node for this particular small network. - pub bind_interface: net::IpAddr, + pub bind_interface: IpAddr, /// Port to bind to when not the root node. Use 0 for a random port. pub bind_port: u16, /// Address to connect to join the network. - pub root_addr: net::SocketAddr, + pub root_addr: SocketAddr, /// Path to certificate file. - pub cert: Option, + pub cert: Option, /// Path to private key for certificate. - pub private_key: Option, + pub private_key: Option, /// Maximum number of retries when trying to connect to an outgoing node. Unlimited if `None`. pub max_outgoing_retries: Option, } impl SmallNetwork { - /// Create a default instance for `SmallNetwork` with a constant port. + /// Creates a default instance for `SmallNetwork` with a constant port. fn default_on_port(port: u16) -> Self { SmallNetwork { - bind_interface: net::Ipv4Addr::new(127, 0, 0, 1).into(), + bind_interface: Ipv4Addr::new(127, 0, 0, 1).into(), bind_port: 0, - root_addr: (net::Ipv4Addr::new(127, 0, 0, 1), port).into(), + root_addr: (Ipv4Addr::new(127, 0, 0, 1), port).into(), cert: None, private_key: None, max_outgoing_retries: None, @@ -86,14 +96,12 @@ impl Default for Config { impl Default for Log { fn default() -> Self { - Log { - level: tracing::Level::INFO, - } + Log { level: Level::INFO } } } impl Log { - /// Initialize logging system based on settings in configuration. + /// Initializes logging system based on settings in configuration. /// /// Will setup logging as described in this configuration for the whole application. This /// function should only be called once during the lifetime of the application. @@ -112,7 +120,7 @@ impl Log { } /// Loads a TOML-formatted configuration from a given file. -pub fn load_from_file>(config_path: P) -> anyhow::Result { +pub fn load_from_file>(config_path: P) -> anyhow::Result { let path_ref = config_path.as_ref(); Ok(toml::from_str( &fs::read_to_string(path_ref) @@ -121,29 +129,31 @@ pub fn load_from_file>(config_path: P) -> anyhow::Result anyhow::Result { toml::to_string_pretty(cfg).with_context(|| "Failed to serialize default configuration") } /// Serialization/deserialization mod log_level { - use serde::{self, Deserialize}; use std::str::FromStr; + + use serde::{self, de::Error, Deserialize, Deserializer, Serializer}; use tracing::Level; pub fn serialize(value: &Level, serializer: S) -> Result where - S: serde::Serializer, + S: Serializer, { serializer.serialize_str(value.to_string().as_str()) } pub fn deserialize<'de, D>(deserializer: D) -> Result where - D: serde::Deserializer<'de>, + D: Deserializer<'de>, { let s = String::deserialize(deserializer)?; - Level::from_str(s.as_str()).map_err(serde::de::Error::custom) + + Level::from_str(s.as_str()).map_err(Error::custom) } } diff --git a/src/effect.rs b/src/effect.rs index 90d3fa82dc..5098e78b40 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -1,32 +1,33 @@ //! Effects subsystem. //! -//! Effects describe things that the creator of the effect intends to happen, -//! producing a value upon completion. They are, in fact, futures. +//! Effects describe things that the creator of the effect intends to happen, producing a value upon +//! completion. They are, in fact, futures. //! //! A boxed, pinned future returning an event is called an effect and typed as an `Effect`, //! where `Ev` is the event's type. //! //! ## Using effects //! -//! To create an effect, an events factory is used that implements one or more of the factory -//! traits of this module. For example, given an events factory `eff`, we can create a +//! To create an effect, an events factory is used that implements one or more of the factory traits +//! of this module. For example, given an events factory `events_factory`, we can create a //! `set_timeout` future and turn it into an effect: //! //! ``` -//! # use std::time; +//! use std::time::Duration; //! use crate::effect::EffectExt; //! //! enum Event { -//! ThreeSecondsElapsed(time::Duration) +//! ThreeSecondsElapsed(Duration) //! } //! -//! eff.set_timeout(time::Duration::from_secs(3)) -//! .event(Event::ThreeSecondsElapsed) +//! events_factory +//! .set_timeout(Duration::from_secs(3)) +//! .event(Event::ThreeSecondsElapsed); //! ``` //! //! This example will produce an effect that, after three seconds, creates an //! `Event::ThreeSecondsElapsed`. Note that effects do nothing on their own, they need to be passed -//! to the `Reactor` (see `reactor` module) to be executed. +//! to a [`reactor`](../reactor/index.html) to be executed. //! //! ## Chaining futures and effects //! @@ -37,24 +38,32 @@ //! It is possible to create an effect from multiple effects being run in parallel using `.also`: //! //! ``` -//! # use std::time; +//! use std::time::Duration; //! use crate::effect::{EffectExt, EffectAlso}; //! //! enum Event { -//! ThreeSecondsElapsed(time::Duration), -//! FiveSecondsElapsed(time::Duration), +//! ThreeSecondsElapsed(Duration), +//! FiveSecondsElapsed(Duration), //! } //! //! // This effect produces a single event after five seconds: -//! eff.set_timeout(time::Duration::from_secs(3)) -//! .then(|_| eff.set_timeout(time::Duration::from_secs(2)) -//! .event(Event::FiveSecondsElapsed); +//! events_factory +//! .set_timeout(Duration::from_secs(3)) +//! .then(|_| { +//! events_factory +//! .set_timeout(Duration::from_secs(2)) +//! .event(Event::FiveSecondsElapsed) +//! }); //! //! // Here, two effects are run in parallel, resulting in two events: -//! eff.set_timeout(time::Duration::from_secs(3)) -//! .event(Event::ThreeSecondsElapsed) -//! .also(eff.set_timeout(time::Duration::from_secs(5)) -//! .event(Event::FiveSecondsElapsed)); +//! events_factory +//! .set_timeout(Duration::from_secs(3)) +//! .event(Event::ThreeSecondsElapsed) +//! .also( +//! events_factory +//! .set_timeout(Duration::from_secs(5)) +//! .event(Event::FiveSecondsElapsed), +//! ); //! ``` //! //! ## Arbitrary effects @@ -63,12 +72,12 @@ //! the effects explicitly listed in this module through traits to create them. Post-processing on //! effects to turn them into events should also be kept brief. -use crate::util::Multiple; -use futures::future::BoxFuture; -use futures::FutureExt; +use std::{future::Future, time::Duration}; + +use futures::{future::BoxFuture, FutureExt}; use smallvec::smallvec; -use std::future::Future; -use std::time; + +use crate::utils::Multiple; /// Effect type. /// @@ -79,7 +88,7 @@ pub type Effect = BoxFuture<'static, Multiple>; /// /// Used to convert futures into actual effects. pub trait EffectExt: Future + Send { - /// Finalize a future into an effect that returns an event. + /// Finalizes a future into an effect that returns an event. /// /// The function `f` is used to translate the returned value from an effect into an event. fn event(self, f: F) -> Multiple> @@ -96,7 +105,7 @@ pub trait EffectResultExt { type Value; type Error; - /// Finalize a future returning a `Result` into two different effects. + /// Finalizes a future returning a `Result` into two different effects. /// /// The function `f` is used to translate the returned value from an effect into an event, while /// the function `g` does the same for a potential error. @@ -146,15 +155,15 @@ where /// Core effects. pub trait Core { - /// Do not do anything. + /// Immediately completes without doing anything. /// - /// Immediately completes, can be used to trigger an event. + /// Can be used to trigger an event. fn immediately(self) -> BoxFuture<'static, ()>; - /// Set a timeout. + /// Sets a timeout. /// /// Once the timeout fires, it will return the actual elapsed time since the execution (not /// creation!) of this effect. Event loops typically execute effects right after a called event /// handling function completes. - fn set_timeout(self, timeout: time::Duration) -> BoxFuture<'static, time::Duration>; + fn set_timeout(self, timeout: Duration) -> BoxFuture<'static, Duration>; } diff --git a/src/main.rs b/src/main.rs index 425d838b57..344231339e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,17 +9,30 @@ //! its core event loop is found inside the [reactor](reactor/index.html). To get a tour of the //! sourcecode, be sure to run `cargo doc --open`. +#![doc( + html_favicon_url = "https://raw.githubusercontent.com/CasperLabs/casper-node/master/images/CasperLabs_Logo_Favicon_RGB_50px.png", + html_logo_url = "https://raw.githubusercontent.com/CasperLabs/casper-node/master/images/CasperLabs_Logo_Symbol_RGB.png", + test(attr(forbid(warnings))) +)] +#![warn( + missing_docs, + trivial_casts, + trivial_numeric_casts, + // unreachable_pub, + unused_qualifications +)] + mod cli; mod components; mod config; mod effect; mod reactor; mod tls; -mod util; +mod utils; use structopt::StructOpt; -/// Parse [command-line arguments](cli/index.html) and run application. +/// Parses [command-line arguments](cli/index.html) and run application. #[tokio::main] pub async fn main() -> anyhow::Result<()> { // Parse CLI args and run selected subcommand. diff --git a/src/reactor.rs b/src/reactor.rs index 7b740ffa3c..752de45eb3 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -2,9 +2,9 @@ //! //! Any long running instance of the node application uses an event-dispatch pattern: Events are //! generated and stored on an event queue, then processed one-by-one. This process happens inside -//! the *reactor*, which also exclusively holds the state of the application besides pending events: +//! the reactor*, which also exclusively holds the state of the application besides pending events: //! -//! 1. The reactor pops an event off of the event queue (called a `Scheduler`). +//! 1. The reactor pops an event off the event queue (called a [`Scheduler`](type.Scheduler.html)). //! 2. The event is dispatched by the reactor. Since the reactor holds mutable state, it can grant //! any component that processes an event mutable, exclusive access to its state. //! 3. Once the (synchronous) event processing has completed, the component returns an effect. @@ -13,25 +13,30 @@ //! //! # Reactors //! -//! There no single reactor, but a reactor for each application type, since it defines which -//! components are used and how they are wired up. The reactor defines the state by being a `struct` -//! of components, their initialization through the `Reactor::new` and a function to `dispatch` -//! events to components. +//! There is no single reactor, but rather a reactor for each application type, since it defines +//! which components are used and how they are wired up. The reactor defines the state by being a +//! `struct` of components, their initialization through the +//! [`Reactor::new()`](trait.Reactor.html#tymethod.new) and a method +//! [`Reactor::dispatch_event()`](trait.Reactor.html#tymethod.dispatch_event) to dispatch events to +//! components. //! -//! With all these set up, a reactor can be `launch`ed, causing it to run indefinitely, processing -//! events. +//! With all these set up, a reactor can be [`launch`](fn.launch.html)ed, causing it to run +//! indefinitely, processing events. pub mod non_validator; mod queue_kind; pub mod validator; -use crate::util::Multiple; -use crate::{config, effect, util}; -use async_trait::async_trait; -use futures::FutureExt; use std::{fmt, mem}; + +use futures::FutureExt; use tracing::{debug, info, trace, warn}; +use crate::{ + config, + effect::Effect, + utils::{self, Multiple, WeightedRoundRobin}, +}; pub use queue_kind::Queue; /// Event scheduler @@ -40,7 +45,7 @@ pub use queue_kind::Queue; /// is the central hook for any part of the program that schedules events directly. /// /// Components rarely use this, but use a bound `EventQueueHandle` instead. -pub type Scheduler = util::round_robin::WeightedRoundRobin; +pub type Scheduler = WeightedRoundRobin; /// Bound event queue handle /// @@ -79,7 +84,7 @@ impl EventQueueHandle where R: Reactor, { - /// Create a new event queue handle with an associated wrapper function. + /// Creates a new event queue handle with an associated wrapper function. fn bind(scheduler: &'static Scheduler, wrapper: fn(Ev) -> R::Event) -> Self { EventQueueHandle { scheduler, wrapper } } @@ -93,8 +98,8 @@ where /// Reactor core. /// -/// Any reactor implements should implement this trait and be launched by the `launch` function. -#[async_trait] +/// Any reactor should implement this trait and be launched by the [`launch`](fn.launch.html) +/// function. pub trait Reactor: Sized { // Note: We've gone for the `Sized` bound here, since we return an instance in `new`. As an // alternative, `new` could return a boxed instance instead, removing this requirement. @@ -104,14 +109,14 @@ pub trait Reactor: Sized { /// Defines what kind of event the reactor processes. type Event: Send + fmt::Debug + fmt::Display + 'static; - /// Dispatch an event on the reactor. + /// Dispatches an event on the reactor. /// /// This function is typically only called by the reactor itself to dispatch an event. It is /// safe to call regardless, but will cause the event to skip the queue and things like /// accounting. - fn dispatch_event(&mut self, event: Self::Event) -> Multiple>; + fn dispatch_event(&mut self, event: Self::Event) -> Multiple>; - /// Create a new instance of the reactor. + /// Creates a new instance of the reactor. /// /// This method creates the full state, which consists of all components, and returns a reactor /// instances along with the effects the components generated upon instantiation. @@ -120,12 +125,12 @@ pub trait Reactor: Sized { fn new( cfg: config::Config, scheduler: &'static Scheduler, - ) -> anyhow::Result<(Self, Multiple>)>; + ) -> anyhow::Result<(Self, Multiple>)>; } -/// Run a reactor. +/// Runs a reactor. /// -/// Start the reactor and associated background tasks, then enter main the event processing loop. +/// Starts the reactor and associated background tasks, then enters main the event processing loop. /// /// `launch` will leak memory on start for global structures each time it is called. /// @@ -146,7 +151,7 @@ pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { let scheduler = Scheduler::::new(Queue::weights()); // Create a new event queue for this reactor run. - let scheduler = util::leak(scheduler); + let scheduler = utils::leak(scheduler); let (mut reactor, initial_effects) = R::new(cfg, scheduler)?; @@ -167,14 +172,10 @@ pub async fn launch(cfg: config::Config) -> anyhow::Result<()> { } } -/// Process effects. -/// /// Spawns tasks that will process the given effects. #[inline] -async fn process_effects( - scheduler: &'static Scheduler, - effects: Multiple>, -) where +async fn process_effects(scheduler: &'static Scheduler, effects: Multiple>) +where Ev: Send + 'static, { // TODO: Properly carry around priorities. @@ -189,9 +190,9 @@ async fn process_effects( } } -/// Convert a single effect into another by wrapping it. +/// Converts a single effect into another by wrapping it. #[inline] -pub fn wrap_effect(wrap: F, effect: effect::Effect) -> effect::Effect +pub fn wrap_effect(wrap: F, effect: Effect) -> Effect where F: Fn(Ev) -> REv + Send + 'static, Ev: Send + 'static, @@ -205,12 +206,9 @@ where .boxed() } -/// Convert multiple effects into another by wrapping. +/// Converts multiple effects into another by wrapping. #[inline] -pub fn wrap_effects( - wrap: F, - effects: Multiple>, -) -> Multiple> +pub fn wrap_effects(wrap: F, effects: Multiple>) -> Multiple> where F: Fn(Ev) -> REv + Send + 'static + Clone, Ev: Send + 'static, diff --git a/src/reactor/queue_kind.rs b/src/reactor/queue_kind.rs index c05f5820b9..2ac1149d3e 100644 --- a/src/reactor/queue_kind.rs +++ b/src/reactor/queue_kind.rs @@ -1,11 +1,12 @@ -//! Queue kinds +//! Queue kinds. //! //! The reactor's event queue uses different queues to group events by priority and polls them in a //! round-robin manner. This way, events are only competing for time within one queue, non-congested //! queues can always assume to be speedily processed. +use std::num::NonZeroUsize; + use enum_iterator::IntoEnumIterator; -use std::num; /// Scheduling priority. /// @@ -24,7 +25,7 @@ pub enum Queue { Regular, /// Reporting events on the local node. /// - /// Metric events take precendence over most other events since missing a request for metrics + /// Metric events take precedence over most other events since missing a request for metrics /// might cause the requester to assume that the node is down and forcefully restart it. Metrics, } @@ -36,12 +37,12 @@ impl Default for Queue { } impl Queue { - /// Return the weight of a specific queue. + /// Returns the weight of a specific queue. /// /// The weight determines how many events are at most processed from a specific queue during /// each event processing round. - fn weight(self) -> num::NonZeroUsize { - num::NonZeroUsize::new(match self { + fn weight(self) -> NonZeroUsize { + NonZeroUsize::new(match self { Queue::NetworkIncoming => 4, Queue::Network => 4, Queue::Regular => 8, @@ -51,7 +52,7 @@ impl Queue { } /// Return weights of all possible `Queue`s. - pub(super) fn weights() -> Vec<(Self, num::NonZeroUsize)> { + pub(super) fn weights() -> Vec<(Self, NonZeroUsize)> { Queue::into_enum_iter().map(|q| (q, q.weight())).collect() } } diff --git a/src/reactor/validator.rs b/src/reactor/validator.rs index 0503f940d8..9a99f09cd5 100644 --- a/src/reactor/validator.rs +++ b/src/reactor/validator.rs @@ -1,11 +1,18 @@ //! Reactor for validator nodes. //! -//! Validator nodes join the validator only network upon startup. -use crate::components::small_network; -use crate::util::Multiple; -use crate::{config, effect, reactor}; +//! Validator nodes join the validator-only network upon startup. + +use std::fmt::{self, Display, Formatter}; + use serde::{Deserialize, Serialize}; -use std::fmt; + +use crate::{ + components::small_network::{self, SmallNetwork}, + config::Config, + effect::Effect, + reactor::{self, EventQueueHandle, Scheduler}, + utils::Multiple, +}; /// Top-level event for the reactor. #[derive(Debug)] @@ -19,18 +26,18 @@ pub enum Message {} /// Validator node reactor. pub struct Reactor { - net: small_network::SmallNetwork, + net: SmallNetwork, } impl reactor::Reactor for Reactor { type Event = Event; fn new( - cfg: config::Config, - scheduler: &'static reactor::Scheduler, - ) -> anyhow::Result<(Self, Multiple>)> { - let (net, net_effects) = small_network::SmallNetwork::new( - reactor::EventQueueHandle::bind(scheduler, Event::Network), + cfg: Config, + scheduler: &'static Scheduler, + ) -> anyhow::Result<(Self, Multiple>)> { + let (net, net_effects) = SmallNetwork::new( + EventQueueHandle::bind(scheduler, Event::Network), cfg.validator_net, )?; @@ -40,23 +47,23 @@ impl reactor::Reactor for Reactor { )) } - fn dispatch_event(&mut self, event: Event) -> Multiple> { + fn dispatch_event(&mut self, event: Event) -> Multiple> { match event { Event::Network(ev) => reactor::wrap_effects(Event::Network, self.net.handle_event(ev)), } } } -impl fmt::Display for Event { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for Event { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Event::Network(ev) => write!(f, "network: {}", ev), } } } -impl fmt::Display for Message { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for Message { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "TODO: MessagePayload") } } diff --git a/src/tls.rs b/src/tls.rs index 610151c957..cde7c6cb1b 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -1,8 +1,8 @@ //! Transport layer security and signing based on OpenSSL. //! -//! This module wraps some of the lower-level TLS constructs to provide a reasonably safe to use API -//! surface for the rest of the application. It also fixates the security parameters of the TLS -//! level in a central place. +//! This module wraps some of the lower-level TLS constructs to provide a reasonably safe-to-use API +//! surface for the rest of the application. It also fixes the security parameters of the TLS level +//! in a central place. //! //! Features include //! @@ -20,44 +20,64 @@ //! ([`Signature`](struct.Signature.html), [`Signed`](struct.Signed.html)), and //! * `serde` support for certificates ([`x509_serde`](x509_serde/index.html)) -use anyhow::Context; +use std::{ + cmp::Ordering, + convert::TryInto, + fmt::{self, Debug, Display, Formatter}, + fs, + hash::Hash, + marker::PhantomData, + path::Path, + str, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::{anyhow, Context}; use displaydoc::Display; use hex_fmt::HexFmt; -use openssl::{asn1, bn, ec, error, hash, nid, pkey, sha, sign, ssl, x509}; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; +use nid::Nid; +use openssl::{ + asn1::{Asn1Integer, Asn1IntegerRef, Asn1Time}, + bn::{BigNum, BigNumContext}, + ec, + error::ErrorStack, + hash::{DigestBytes, MessageDigest}, + nid, + pkey::{PKey, PKeyRef, Private, Public}, + sha, + sign::{Signer, Verifier}, + ssl::{SslAcceptor, SslConnector, SslContextBuilder, SslMethod, SslVerifyMode, SslVersion}, + x509::{X509Builder, X509Name, X509NameBuilder, X509NameRef, X509Ref, X509}, +}; +use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize, Serializer}; use serde_big_array::big_array; -use std::convert::TryInto; -use std::hash::Hash; -use std::marker::PhantomData; -use std::{cmp, fmt, path, str, time}; use thiserror::Error; big_array! { BigArray; } /// The chosen signature algorithm (**ECDSA with SHA512**). -const SIGNATURE_ALGORITHM: nid::Nid = nid::Nid::ECDSA_WITH_SHA512; +const SIGNATURE_ALGORITHM: Nid = Nid::ECDSA_WITH_SHA512; /// The underlying elliptic curve (**P-521**). -const SIGNATURE_CURVE: nid::Nid = nid::Nid::SECP521R1; +const SIGNATURE_CURVE: Nid = Nid::SECP521R1; /// The chosen signature algorithm (**SHA512**). -const SIGNATURE_DIGEST: nid::Nid = nid::Nid::SHA512; +const SIGNATURE_DIGEST: Nid = Nid::SHA512; /// OpenSSL result type alias. /// /// Many functions rely solely on `openssl` functions and return this kind of result. -pub type SslResult = Result; +pub type SslResult = Result; /// SHA512 hash. #[derive(Copy, Clone, Deserialize, Serialize)] pub struct Sha512(#[serde(with = "BigArray")] [u8; Sha512::SIZE]); -/// Certificate fingerprint +/// Certificate fingerprint. #[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] pub struct CertFingerprint(Sha512); -/// Public key fingerprint +/// Public key fingerprint. #[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] pub struct KeyFingerprint(Sha512); @@ -67,11 +87,11 @@ pub struct Signature(Vec); /// TLS certificate. /// -/// Thin wrapper around `X509` enabling things like serde serialization and fingerprint caching. +/// Thin wrapper around `X509` enabling things like Serde serialization and fingerprint caching. #[derive(Clone)] pub struct TlsCert { /// The wrapped x509 certificate. - x509: openssl::x509::X509, + x509: X509, /// Cached certificate fingerprint. cert_fingerprint: CertFingerprint, @@ -81,19 +101,19 @@ pub struct TlsCert { } // Serialization and deserialization happens only via x509, which is checked upon deserialization. -impl<'de> serde::Deserialize<'de> for TlsCert { +impl<'de> Deserialize<'de> for TlsCert { fn deserialize(deserializer: D) -> Result where - D: serde::Deserializer<'de>, + D: Deserializer<'de>, { validate_cert(x509_serde::deserialize(deserializer)?).map_err(serde::de::Error::custom) } } -impl serde::Serialize for TlsCert { +impl Serialize for TlsCert { fn serialize(&self, serializer: S) -> Result where - S: serde::Serializer, + S: Serializer, { x509_serde::serialize(&self.x509, serializer) } @@ -114,10 +134,10 @@ impl Signed where V: Serialize, { - /// Create new signed value. + /// Creates a new signed value. /// /// Serializes the value to a buffer and signs the buffer. - pub fn new(value: &V, signing_key: &pkey::PKeyRef) -> anyhow::Result { + pub fn new(value: &V, signing_key: &PKeyRef) -> anyhow::Result { let data = rmp_serde::to_vec(value)?; let signature = Signature::create(signing_key, &data)?; @@ -133,23 +153,23 @@ impl Signed where V: DeserializeOwned, { - /// Validate signature and restore value. + /// Validates signature and restore value. #[allow(dead_code)] - pub fn validate(&self, public_key: &pkey::PKeyRef) -> anyhow::Result { + pub fn validate(&self, public_key: &PKeyRef) -> anyhow::Result { if self.signature.verify(public_key, &self.data)? { Ok(rmp_serde::from_read(self.data.as_slice())?) } else { - Err(anyhow::anyhow!("invalid signature")) + Err(anyhow!("invalid signature")) } } - /// Validate a self-signed values. + /// Validates a self-signed value. /// /// Allows for extraction of a public key prior to validating a value. #[inline] pub fn validate_self_signed(&self, extract: F) -> anyhow::Result where - F: FnOnce(&V) -> anyhow::Result>, + F: FnOnce(&V) -> anyhow::Result>, { let unverified = rmp_serde::from_read(self.data.as_slice())?; { @@ -158,7 +178,7 @@ where if self.signature.verify(&public_key, &self.data)? { Ok(unverified) } else { - Err(anyhow::anyhow!("invalid signature")) + Err(anyhow!("invalid signature")) } } } @@ -169,7 +189,7 @@ impl Sha512 { pub const SIZE: usize = 64; /// OpenSSL NID. - const NID: nid::Nid = nid::Nid::SHA512; + const NID: Nid = Nid::SHA512; /// Create a new Sha512 by hashing a slice. pub fn new>(data: B) -> Self { @@ -178,7 +198,7 @@ impl Sha512 { Sha512(openssl_sha.finish()) } - /// Return bytestring of the hash, with length `Self::SIZE`. + /// Returns bytestring of the hash, with length `Self::SIZE`. pub fn bytes(&self) -> &[u8] { let bs = &self.0[..]; @@ -186,8 +206,8 @@ impl Sha512 { bs } - /// Convert an OpenSSL digest into an `Sha512`. - fn from_openssl_digest(digest: &hash::DigestBytes) -> Self { + /// Converts an OpenSSL digest into an `Sha512`. + fn from_openssl_digest(digest: &DigestBytes) -> Self { let digest_bytes = digest.as_ref(); debug_assert_eq!( @@ -202,23 +222,23 @@ impl Sha512 { Sha512(buf) } - /// Return a new OpenSSL `MessageDigest` set to SHA-512. - fn create_message_digest() -> hash::MessageDigest { + /// Returns a new OpenSSL `MessageDigest` set to SHA-512. + fn create_message_digest() -> MessageDigest { // This can only fail if we specify a `Nid` that does not exist, which cannot happen unless // there is something wrong with `Self::NID`. - hash::MessageDigest::from_nid(Self::NID).expect("Sha512::NID is invalid") + MessageDigest::from_nid(Self::NID).expect("Sha512::NID is invalid") } } impl Signature { - /// Sign a binary blob with the blessed ciphers and TLS parameters. - pub fn create(private_key: &pkey::PKeyRef, data: &[u8]) -> SslResult { + /// Signs a binary blob with the blessed ciphers and TLS parameters. + pub fn create(private_key: &PKeyRef, data: &[u8]) -> SslResult { // TODO: This needs verification to ensure we're not doing stupid/textbook RSA-ish. // Sha512 is hardcoded, so check we're creating the correct signature. assert_eq!(Sha512::NID, SIGNATURE_DIGEST); - let mut signer = sign::Signer::new(Sha512::create_message_digest(), private_key)?; + let mut signer = Signer::new(Sha512::create_message_digest(), private_key)?; // The API of OpenSSL is a bit weird here; there is no constant size for the buffer required // to create the signatures. Additionally, we need to truncate it to the returned size. @@ -230,22 +250,18 @@ impl Signature { Ok(Signature(sig_buf)) } - /// Verify that signature matches on a binary blob. - pub fn verify( - self: &Signature, - public_key: &pkey::PKeyRef, - data: &[u8], - ) -> SslResult { + /// Verifies that signature matches on a binary blob. + pub fn verify(self: &Signature, public_key: &PKeyRef, data: &[u8]) -> SslResult { assert_eq!(Sha512::NID, SIGNATURE_DIGEST); - let mut verifier = sign::Verifier::new(Sha512::create_message_digest(), public_key)?; + let mut verifier = Verifier::new(Sha512::create_message_digest(), public_key)?; verifier.verify_oneshot(&self.0, data) } } impl TlsCert { - /// Return the certificates fingerprint. + /// Returns the certificate's fingerprint. /// /// In contrast to the `public_key_fingerprint`, this fingerprint also contains the certificate /// information. @@ -253,28 +269,28 @@ impl TlsCert { self.cert_fingerprint } - /// Extract the public key from the certificate. - pub fn public_key(&self) -> pkey::PKey { + /// Extracts the public key from the certificate. + pub fn public_key(&self) -> PKey { // This can never fail, we validate the certificate on construction and deserialization. self.x509 .public_key() .expect("public key extraction failed, how did we end up with an invalid cert?") } - /// Return the public key fingerprint. + /// Returns the public key fingerprint. pub fn public_key_fingerprint(&self) -> KeyFingerprint { self.key_fingerprint } #[allow(dead_code)] - /// Return OpenSSL X509 certificate. - fn x509(&self) -> &x509::X509 { + /// Returns OpenSSL X509 certificate. + fn x509(&self) -> &X509 { &self.x509 } } -impl fmt::Debug for TlsCert { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Debug for TlsCert { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "TlsCert({:?})", self.fingerprint()) } } @@ -293,55 +309,55 @@ impl PartialEq for TlsCert { impl Eq for TlsCert {} -/// Generate a self-signed (key, certificate) pair suitable for TLS and signing. +/// Generates a self-signed (key, certificate) pair suitable for TLS and signing. /// -/// The common name of the certificate will be "casperlabs-node". -pub fn generate_node_cert() -> SslResult<(x509::X509, pkey::PKey)> { +/// The common name of the certificate will be "casper-node". +pub fn generate_node_cert() -> SslResult<(X509, PKey)> { let private_key = generate_private_key()?; - let cert = generate_cert(&private_key, "casperlabs-node")?; + let cert = generate_cert(&private_key, "casper-node")?; Ok((cert, private_key)) } -/// Create a TLS acceptor for a server. +/// Creates a TLS acceptor for a server. /// /// The acceptor will restrict TLS parameters to secure one defined in this crate that are /// compatible with connectors built with `create_tls_connector`. /// /// Incoming certificates must still be validated using `validate_cert`. pub fn create_tls_acceptor( - cert: &x509::X509Ref, - private_key: &pkey::PKeyRef, -) -> SslResult { - let mut builder = ssl::SslAcceptor::mozilla_modern_v5(ssl::SslMethod::tls_server())?; + cert: &X509Ref, + private_key: &PKeyRef, +) -> SslResult { + let mut builder = SslAcceptor::mozilla_modern_v5(SslMethod::tls_server())?; set_context_options(&mut builder, cert, private_key)?; Ok(builder.build()) } -/// Create a TLS acceptor for a client. +/// Creates a TLS acceptor for a client. /// /// A connector compatible with the acceptor created using `create_tls_acceptor`. Server /// certificates must always be validated using `validate_cert` after connecting. pub fn create_tls_connector( - cert: &x509::X509Ref, - private_key: &pkey::PKeyRef, -) -> SslResult { - let mut builder = ssl::SslConnector::builder(ssl::SslMethod::tls_client())?; + cert: &X509Ref, + private_key: &PKeyRef, +) -> SslResult { + let mut builder = SslConnector::builder(SslMethod::tls_client())?; set_context_options(&mut builder, cert, private_key)?; Ok(builder.build()) } -/// Set common options of both acceptor and connector on TLS context. +/// Sets common options of both acceptor and connector on TLS context. /// /// Used internally to set various TLS parameters. fn set_context_options( - ctx: &mut ssl::SslContextBuilder, - cert: &x509::X509Ref, - private_key: &pkey::PKeyRef, + ctx: &mut SslContextBuilder, + cert: &X509Ref, + private_key: &PKeyRef, ) -> SslResult<()> { - ctx.set_min_proto_version(Some(ssl::SslVersion::TLS1_3))?; + ctx.set_min_proto_version(Some(SslVersion::TLS1_3))?; ctx.set_certificate(cert)?; ctx.set_private_key(private_key)?; @@ -351,7 +367,7 @@ fn set_context_options( // no certificate and there will be no error from OpenSSL. For this reason, we pass set `PEER` // (causing the request of a cert), but pass all of them through and verify them after the // handshake has completed. - ctx.set_verify_callback(ssl::SslVerifyMode::PEER, |_, _| true); + ctx.set_verify_callback(SslVerifyMode::PEER, |_, _| true); Ok(()) } @@ -360,46 +376,46 @@ fn set_context_options( #[derive(Debug, Display, Error)] pub enum ValidationError { /// error reading public key from certificate: {0:?} - CannotReadPublicKey(#[source] error::ErrorStack), + CannotReadPublicKey(#[source] ErrorStack), /// error reading subject or issuer name: {0:?} - CorruptSubjectOrIssuer(#[source] error::ErrorStack), + CorruptSubjectOrIssuer(#[source] ErrorStack), /// wrong signature scheme WrongSignatureAlgorithm, /// there was an issue reading or converting times: {0:?} - TimeIssue(#[source] error::ErrorStack), + TimeIssue(#[source] ErrorStack), /// the certificate is not yet valid NotYetValid, /// the certificate expired Expired, /// the serial number could not be compared to the reference: {0:?} - InvalidSerialNumber(#[source] error::ErrorStack), + InvalidSerialNumber(#[source] ErrorStack), /// wrong serial number WrongSerialNumber, - /// no valid elliptic curve key could be extracted from cert: {0:?} - CouldNotExtractEcKey(#[source] error::ErrorStack), + /// no valid elliptic curve key could be extracted from certificate: {0:?} + CouldNotExtractEcKey(#[source] ErrorStack), /// the given public key fails basic sanity checks: {0:?} - KeyFailsCheck(#[source] error::ErrorStack), + KeyFailsCheck(#[source] ErrorStack), /// underlying elliptic curve is wrong WrongCurve, /// certificate is not self-signed NotSelfSigned, /// the signature could not be validated - FailedToValidateSignature(#[source] error::ErrorStack), + FailedToValidateSignature(#[source] ErrorStack), /// the signature is invalid InvalidSignature, /// failed to read fingerprint - InvalidFingerprint(#[source] error::ErrorStack), + InvalidFingerprint(#[source] ErrorStack), /// could not create a big num context - BigNumContextNotAvailable(#[source] error::ErrorStack), + BigNumContextNotAvailable(#[source] ErrorStack), /// could not encode public key as bytes - PublicKeyEncodingFailed(#[source] error::ErrorStack), + PublicKeyEncodingFailed(#[source] ErrorStack), } -/// Check that the cryptographic parameters on a certificate are correct and return the fingerprint -/// of the public key. +/// Checks that the cryptographic parameters on a certificate are correct and returns the +/// fingerprint of the public key. /// /// At the very least this ensures that no weaker ciphers have been used to forge a certificate. -pub fn validate_cert(cert: x509::X509) -> Result { +pub fn validate_cert(cert: X509) -> Result { if cert.signature_algorithm().object().nid() != SIGNATURE_ALGORITHM { // The signature algorithm is not of the exact kind we are using to generate our // certificates, an attacker could have used a weaker one to generate colliding keys. @@ -424,11 +440,11 @@ pub fn validate_cert(cert: x509::X509) -> Result { } // Check expiration times against current time. - let asn1_now = asn1::Asn1Time::from_unix(now()).map_err(ValidationError::TimeIssue)?; + let asn1_now = Asn1Time::from_unix(now()).map_err(ValidationError::TimeIssue)?; if asn1_now .compare(cert.not_before()) .map_err(ValidationError::TimeIssue)? - != cmp::Ordering::Greater + != Ordering::Greater { return Err(ValidationError::NotYetValid); } @@ -436,7 +452,7 @@ pub fn validate_cert(cert: x509::X509) -> Result { if asn1_now .compare(cert.not_after()) .map_err(ValidationError::TimeIssue)? - != cmp::Ordering::Less + != Ordering::Less { return Err(ValidationError::Expired); } @@ -473,7 +489,7 @@ pub fn validate_cert(cert: x509::X509) -> Result { // Additionally we can calculate a fingerprint for the public key: let mut big_num_context = - bn::BigNumContext::new().map_err(ValidationError::BigNumContextNotAvailable)?; + BigNumContext::new().map_err(ValidationError::BigNumContextNotAvailable)?; let buf = ec_key .public_key() @@ -495,78 +511,73 @@ pub fn validate_cert(cert: x509::X509) -> Result { }) } -/// Load a certificate from a file. -pub fn load_cert>(src: P) -> anyhow::Result { - let pem = std::fs::read(src.as_ref()) +/// Loads a certificate from a file. +pub fn load_cert>(src: P) -> anyhow::Result { + let pem = fs::read(src.as_ref()) .with_context(|| format!("failed to load certificate {:?}", src.as_ref()))?; - Ok(x509::X509::from_pem(&pem).context("parsing certificate")?) + Ok(X509::from_pem(&pem).context("parsing certificate")?) } -/// Load a private key from a file. -pub fn load_private_key>(src: P) -> anyhow::Result> { - let pem = std::fs::read(src.as_ref()) +/// Loads a private key from a file. +pub fn load_private_key>(src: P) -> anyhow::Result> { + let pem = fs::read(src.as_ref()) .with_context(|| format!("failed to load private key {:?}", src.as_ref()))?; - // TODO: It might be that we need to call `pkey::PKey::private_key_from_pkcs8` instead. - Ok(pkey::PKey::private_key_from_pem(&pem).context("parsing private key")?) + // TODO: It might be that we need to call `PKey::private_key_from_pkcs8` instead. + Ok(PKey::private_key_from_pem(&pem).context("parsing private key")?) } -/// Save a certificate to a file. -pub fn save_cert>(cert: &x509::X509Ref, dest: P) -> anyhow::Result<()> { +/// Saves a certificate to a file. +pub fn save_cert>(cert: &X509Ref, dest: P) -> anyhow::Result<()> { let pem = cert.to_pem().context("converting certificate to PEM")?; - std::fs::write(dest.as_ref(), pem) + fs::write(dest.as_ref(), pem) .with_context(|| format!("failed to write certificate {:?}", dest.as_ref()))?; Ok(()) } -/// Save a private key to a file. -pub fn save_private_key>( - key: &pkey::PKeyRef, - dest: P, -) -> anyhow::Result<()> { +/// Saves a private key to a file. +pub fn save_private_key>(key: &PKeyRef, dest: P) -> anyhow::Result<()> { let pem = key .private_key_to_pem_pkcs8() .context("converting private key to PEM")?; - std::fs::write(dest.as_ref(), pem) + fs::write(dest.as_ref(), pem) .with_context(|| format!("failed to write private key {:?}", dest.as_ref()))?; Ok(()) } -/// Return an OpenSSL compatible timestamp. -/// - +/// Returns an OpenSSL compatible timestamp. fn now() -> i64 { // Note: We could do the timing dance a little better going straight to the UNIX time functions, // but this saves us having to bring in `libc` as a dependency. - let now = time::SystemTime::now(); + let now = SystemTime::now(); let ts: i64 = now - .duration_since(time::UNIX_EPOCH) + .duration_since(UNIX_EPOCH) // This should work unless the clock is set to before 1970. .expect("Great Scott! Your clock is horribly broken, Marty.") .as_secs() - // This will fail past year 2038 on 32 bit systems and a very far into the future, both - // cases we consider out of scope. + // This will fail past year 2038 on 32 bit systems and very far into the future, both cases + // we consider out of scope. .try_into() .expect("32-bit systems and far future are not supported"); ts } -/// Create an ASN1 integer from a `u32`. -fn mknum(n: u32) -> Result { - let bn = openssl::bn::BigNum::from_u32(n)?; +/// Creates an ASN1 integer from a `u32`. +fn mknum(n: u32) -> Result { + let bn = BigNum::from_u32(n)?; bn.to_asn1_integer() } -/// Create an ASN1 name from string components. +/// Creates an ASN1 name from string components. /// /// If `c` or `o` are empty string, they are omitted from the result. -fn mkname(c: &str, o: &str, cn: &str) -> Result { - let mut builder = x509::X509NameBuilder::new()?; +fn mkname(c: &str, o: &str, cn: &str) -> Result { + let mut builder = X509NameBuilder::new()?; if !c.is_empty() { builder.append_entry_by_text("C", c)?; @@ -580,8 +591,8 @@ fn mkname(c: &str, o: &str, cn: &str) -> Result SslResult { +/// Converts an `X509NameRef` to a human readable string. +fn name_to_string(name: &X509NameRef) -> SslResult { let mut output = String::new(); for entry in name.entries() { @@ -594,20 +605,20 @@ fn name_to_string(name: &x509::X509NameRef) -> SslResult { Ok(output) } -/// Check if an `Asn1IntegerRef` is equal to a given u32. -fn num_eq(num: &asn1::Asn1IntegerRef, other: u32) -> SslResult { +/// Checks if an `Asn1IntegerRef` is equal to a given u32. +fn num_eq(num: &Asn1IntegerRef, other: u32) -> SslResult { let l = num.to_bn()?; - let r = bn::BigNum::from_u32(other)?; + let r = BigNum::from_u32(other)?; // The `BigNum` API seems to be really lacking here. - Ok(l.is_negative() == r.is_negative() && l.ucmp(&r.as_ref()) == cmp::Ordering::Equal) + Ok(l.is_negative() == r.is_negative() && l.ucmp(&r.as_ref()) == Ordering::Equal) } -/// Generate a secret key suitable for TLS encryption. -fn generate_private_key() -> SslResult> { +/// Generates a secret key suitable for TLS encryption. +fn generate_private_key() -> SslResult> { // We do not care about browser-compliance, so we're free to use elliptic curves that are more // likely to hold up under pressure than the NIST ones. We want to go with ED25519 because djb - // know's best: pkey::PKey::generate_ed25519() + // knows best: PKey::generate_ed25519() // // However the following bug currently prevents us from doing so: // https://mta.openssl.org/pipermail/openssl-users/2018-July/008362.html (The same error occurs @@ -623,12 +634,12 @@ fn generate_private_key() -> SslResult> { let ec_group = ec::EcGroup::from_curve_name(SIGNATURE_CURVE)?; let ec_key = ec::EcKey::generate(ec_group.as_ref())?; - pkey::PKey::from_ec_key(ec_key) + PKey::from_ec_key(ec_key) } -/// Generate a self-signed certificate based on `private_key` with given CN. -fn generate_cert(private_key: &pkey::PKey, cn: &str) -> SslResult { - let mut builder = x509::X509Builder::new()?; +/// Generates a self-signed certificate based on `private_key` with given CN. +fn generate_cert(private_key: &PKey, cn: &str) -> SslResult { + let mut builder = X509Builder::new()?; // x509 v3 commonly used, the version is 0-indexed, thus 2 == v3. builder.set_version(2)?; @@ -644,10 +655,10 @@ fn generate_cert(private_key: &pkey::PKey, cn: &str) -> SslResult let ts = now(); // We set valid-from to one minute into the past to allow some clock-skew. - builder.set_not_before(asn1::Asn1Time::from_unix(ts - 60)?.as_ref())?; + builder.set_not_before(Asn1Time::from_unix(ts - 60)?.as_ref())?; // Valid-until is a little under 10 years, missing at least 2 leap days. - builder.set_not_after(asn1::Asn1Time::from_unix(ts + 10 * 365 * 24 * 60 * 60)?.as_ref())?; + builder.set_not_after(Asn1Time::from_unix(ts + 10 * 365 * 24 * 60 * 60)?.as_ref())?; // Set the public key and sign. builder.set_pubkey(private_key.as_ref())?; @@ -665,18 +676,21 @@ fn generate_cert(private_key: &pkey::PKey, cn: &str) -> SslResult Ok(cert) } -/// Serde support for `openssl::x509::X509` certificates. +/// Serde support for `openx509::X509` certificates. /// /// Will also check if certificates are valid according to `validate_cert` when deserializing. mod x509_serde { - use super::validate_cert; - use openssl::x509::X509; use std::str; + use openssl::x509::X509; + use serde::{Deserialize, Deserializer, Serializer}; + + use super::validate_cert; + /// Serde-compatible serialization for X509 certificates. pub fn serialize(value: &X509, serializer: S) -> Result where - S: serde::Serializer, + S: Serializer, { let encoded = value.to_pem().map_err(serde::ser::Error::custom)?; @@ -687,10 +701,8 @@ mod x509_serde { /// Serde-compatible deserialization for X509 certificates. pub fn deserialize<'de, D>(deserializer: D) -> Result where - D: serde::Deserializer<'de>, + D: Deserializer<'de>, { - use serde::Deserialize; - // Create an extra copy for simplicity here. If this becomes a bottleneck, feel free to try // to leverage Cow here, or implement a custom visitor that handles both cases. let s: String = Deserialize::deserialize(deserializer)?; @@ -715,53 +727,53 @@ impl Eq for Sha512 {} impl Ord for Sha512 { #[inline] - fn cmp(&self, other: &Self) -> cmp::Ordering { + fn cmp(&self, other: &Self) -> Ordering { Ord::cmp(self.bytes(), other.bytes()) } } impl PartialOrd for Sha512 { #[inline] - fn partial_cmp(&self, other: &Self) -> Option { + fn partial_cmp(&self, other: &Self) -> Option { Some(Ord::cmp(self, other)) } } -impl fmt::Debug for Sha512 { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Debug for Sha512 { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "{}", HexFmt(&self.0[..])) } } -impl fmt::Display for Sha512 { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for Sha512 { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "{}", HexFmt(&self.0[0..7])) } } -impl fmt::Display for CertFingerprint { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.0, f) +impl Display for CertFingerprint { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Display::fmt(&self.0, f) } } -impl fmt::Display for KeyFingerprint { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.0, f) +impl Display for KeyFingerprint { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Display::fmt(&self.0, f) } } -impl fmt::Display for Signature { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for Signature { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "{}", HexFmt(&self.0[0..7])) } } -impl fmt::Display for Signed +impl Display for Signed where - T: fmt::Display + for<'de> Deserialize<'de>, + T: Display + for<'de> Deserialize<'de>, { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { // Decode the data here, even if it is expensive. match rmp_serde::from_read::<_, T>(self.data.as_slice()) { Ok(item) => write!(f, "signed[{}]<{} bytes>", self.signature, item), diff --git a/src/util.rs b/src/utils.rs similarity index 60% rename from src/util.rs rename to src/utils.rs index f8ba81eb33..7da11e649e 100644 --- a/src/util.rs +++ b/src/utils.rs @@ -1,14 +1,17 @@ -//! Various utilities. -//! -//! The Generic functions that are not limited to a particular module, but are too small to warrant +//! Various functions that are not limited to a particular module, but are too small to warrant //! being factored out into standalone crates. -use std::fmt; -pub mod round_robin; -use std::cell; +mod round_robin; + +use std::{ + cell::RefCell, + fmt::{self, Display, Formatter}, +}; + +use smallvec::SmallVec; + +pub use round_robin::WeightedRoundRobin; -/// Leak a value. -/// /// Moves a value to the heap and then forgets about, leaving only a static reference behind. #[inline] pub fn leak(value: T) -> &'static T { @@ -17,28 +20,28 @@ pub fn leak(value: T) -> &'static T { /// Small amount store. /// -/// Stored in a smallvec to avoid allocations in case there are less than three items grouped. The +/// Stored in a `SmallVec` to avoid allocations in case there are less than three items grouped. The /// size of two items is chosen because one item is the most common use case, and large items are -/// typically boxed. In the latter case two pointers and one enum variant discriminator is almost +/// typically boxed. In the latter case two pointers and one enum variant discriminator is almost /// the same size as an empty vec, which is two pointers. -pub type Multiple = smallvec::SmallVec<[T; 2]>; +pub type Multiple = SmallVec<[T; 2]>; /// A display-helper that shows iterators display joined by ",". #[derive(Debug)] -pub struct DisplayIter(cell::RefCell>); +pub struct DisplayIter(RefCell>); impl DisplayIter { pub fn new(item: T) -> Self { - DisplayIter(cell::RefCell::new(Some(item))) + DisplayIter(RefCell::new(Some(item))) } } -impl fmt::Display for DisplayIter +impl Display for DisplayIter where I: IntoIterator, - T: fmt::Display, + T: Display, { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { if let Some(src) = self.0.borrow_mut().take() { let mut first = true; for item in src.into_iter().take(f.width().unwrap_or(usize::MAX)) { diff --git a/src/util/round_robin.rs b/src/utils/round_robin.rs similarity index 81% rename from src/util/round_robin.rs rename to src/utils/round_robin.rs index c83bec328e..8d4ef6cfd4 100644 --- a/src/util/round_robin.rs +++ b/src/utils/round_robin.rs @@ -1,13 +1,16 @@ -//! Round-robin scheduling. +//! Weighted round-robin scheduling. //! //! This module implements a weighted round-robin scheduler that ensures no deadlocks occur, but -//! still allows prioriting events from one source over another. The module uses `tokio`s +//! still allows prioritizing events from one source over another. The module uses `tokio`'s //! synchronization primitives under the hood. -use std::collections::{HashMap, VecDeque}; -use std::hash::Hash; -use std::num::NonZeroUsize; -use tokio::sync; +use std::{ + collections::{HashMap, VecDeque}, + hash::Hash, + num::NonZeroUsize, +}; + +use tokio::sync::{Mutex, Semaphore}; /// Weighted round-robin scheduler. /// @@ -22,16 +25,16 @@ use tokio::sync; #[derive(Debug)] pub struct WeightedRoundRobin { /// Current iteration state. - state: sync::Mutex>, + state: Mutex>, /// A list of slots that are round-robin'd. slots: Vec>, /// Actual queues. - queues: HashMap>>, + queues: HashMap>>, /// Number of items in all queues combined. - total: sync::Semaphore, + total: Semaphore, } /// The inner state of the queue iteration. @@ -48,8 +51,8 @@ struct IterationState { /// An internal slot in the round-robin scheduler. /// -/// A slot marks the scheduling position, i.e. which queue we are currently -/// polling and how many tickets it has left before the next one is due. +/// A slot marks the scheduling position, i.e. which queue we are currently polling and how many +/// tickets it has left before the next one is due. #[derive(Copy, Clone, Debug)] struct Slot { /// The key, identifying a queue. @@ -63,17 +66,16 @@ impl WeightedRoundRobin where K: Copy + Clone + Eq + Hash, { - /// Create new weighted round-robin scheduler. + /// Creates a new weighted round-robin scheduler. /// - /// Creates a queue for each pair given in `weights`. The second component - /// of each `weight` is the number of times to return items from one - /// queue before moving on to the next one. + /// Creates a queue for each pair given in `weights`. The second component of each `weight` is + /// the number of times to return items from one queue before moving on to the next one. pub fn new(weights: Vec<(K, NonZeroUsize)>) -> Self { assert!(!weights.is_empty(), "must provide at least one slot"); let queues = weights .iter() - .map(|(idx, _)| (*idx, sync::Mutex::new(VecDeque::new()))) + .map(|(idx, _)| (*idx, Mutex::new(VecDeque::new()))) .collect(); let slots: Vec> = weights .into_iter() @@ -85,17 +87,17 @@ where let active_slot = slots[0]; WeightedRoundRobin { - state: sync::Mutex::new(IterationState { + state: Mutex::new(IterationState { active_slot, active_slot_idx: 0, }), slots, queues, - total: sync::Semaphore::new(0), + total: Semaphore::new(0), } } - /// Push an item to a queue identified by key. + /// Pushes an item to a queue identified by key. /// /// ## Panics /// @@ -103,7 +105,7 @@ where pub async fn push(&self, item: I, queue: K) { self.queues .get(&queue) - .expect("tried to push to non-existant queue") + .expect("tried to push to non-existent queue") .lock() .await .push_back(item); @@ -112,7 +114,7 @@ where self.total.add_permits(1); } - /// Return the next item from queue. + /// Returns the next item from queue. /// /// Returns `None` if the queue is empty or an internal error occurred. The /// latter should never happen.