diff --git a/Cargo.lock b/Cargo.lock index 895d278c40..accad03a05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2307,6 +2307,7 @@ dependencies = [ "hash-db", "hex", "ip_network", + "libp2p", "log", "lru", "network-clique", @@ -6326,6 +6327,8 @@ version = "0.1.0" dependencies = [ "futures", "log", + "parking_lot 0.12.3", + "rand", "tokio", ] diff --git a/bin/node/src/aleph_cli.rs b/bin/node/src/aleph_cli.rs index 0f53012116..fafcf5ed4f 100644 --- a/bin/node/src/aleph_cli.rs +++ b/bin/node/src/aleph_cli.rs @@ -47,9 +47,13 @@ pub struct AlephCli { #[clap(long, default_value_t = false)] enable_pruning: bool, - /// Maximum bit-rate per node in bytes per second of the alephbft validator network. - #[clap(long, default_value_t = 64 * 1024)] - alephbft_bit_rate_per_connection: u64, + /// Maximum bit-rate in bits per second of the alephbft validator network. + #[clap(long, default_value_t = 768 * 1024)] + alephbft_network_bit_rate: u64, + + /// Maximum bit-rate in bits per second of the substrate network. + #[clap(long, default_value_t = 5*1024*1024)] + substrate_network_bit_rate: u64, /// Don't spend some extra time to collect more debugging data (e.g. validator network details). /// By default collecting is enabled, as the impact on performance is negligible, if any. @@ -93,8 +97,12 @@ impl AlephCli { self.enable_pruning } - pub fn alephbft_bit_rate_per_connection(&self) -> u64 { - self.alephbft_bit_rate_per_connection + pub fn alephbft_network_bit_rate(&self) -> u64 { + self.alephbft_network_bit_rate + } + + pub fn substrate_network_bit_rate(&self) -> u64 { + self.substrate_network_bit_rate } pub fn no_collection_of_extra_debugging_data(&self) -> bool { diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index e78ae0e114..54f1f01c3c 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -234,10 +234,8 @@ fn get_proposer_factory( fn get_rate_limit_config(aleph_config: &AlephCli) -> RateLimiterConfig { RateLimiterConfig { - alephbft_bit_rate_per_connection: aleph_config - .alephbft_bit_rate_per_connection() - .try_into() - .unwrap_or(usize::MAX), + alephbft_network_bit_rate: aleph_config.alephbft_network_bit_rate(), + substrate_network_bit_rate: aleph_config.substrate_network_bit_rate(), } } @@ -296,6 +294,11 @@ pub fn new_authority( )?; let import_queue_handle = BlockImporter::new(service_components.import_queue.service()); + let rate_limiter_config = get_rate_limit_config(&aleph_config); + let network_config = finality_aleph::SubstrateNetworkConfig { + substrate_network_bit_rate: rate_limiter_config.substrate_network_bit_rate, + network_config: config.network.clone(), + }; let BuildNetworkOutput { network, @@ -305,7 +308,7 @@ pub fn new_authority( tx_handler_controller, system_rpc_tx, } = build_network( - &config.network, + network_config, config.protocol_id(), service_components.client.clone(), major_sync, @@ -370,8 +373,6 @@ pub fn new_authority( .spawn_essential_handle() .spawn_blocking("aura", None, aura); - let rate_limiter_config = get_rate_limit_config(&aleph_config); - let AlephRuntimeVars { millisecs_per_block, session_period, diff --git a/clique/src/rate_limiting.rs b/clique/src/rate_limiting.rs index c1e0f043a0..3ae0b2dc22 100644 --- a/clique/src/rate_limiting.rs +++ b/clique/src/rate_limiting.rs @@ -1,34 +1,10 @@ -use rate_limiter::{RateLimiter, SleepingRateLimiter}; -use tokio::io::AsyncRead; +use rate_limiter::{RateLimitedAsyncRead, RateLimiterImpl, SharingRateLimiter}; use crate::{ConnectionInfo, Data, Dialer, Listener, PeerAddressInfo, Splittable, Splitted}; -pub struct RateLimitedAsyncRead { - rate_limiter: RateLimiter, - read: Read, -} - -impl RateLimitedAsyncRead { - pub fn new(read: Read, rate_limiter: RateLimiter) -> Self { - Self { rate_limiter, read } - } -} - -impl AsyncRead for RateLimitedAsyncRead { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - let this = self.get_mut(); - let read = std::pin::Pin::new(&mut this.read); - this.rate_limiter.rate_limit(read, cx, buf) - } -} - impl ConnectionInfo for RateLimitedAsyncRead { fn peer_address_info(&self) -> PeerAddressInfo { - self.read.peer_address_info() + self.inner().peer_address_info() } } @@ -36,11 +12,11 @@ impl ConnectionInfo for RateLimitedAsyncRead { #[derive(Clone)] pub struct RateLimitingDialer { dialer: D, - rate_limiter: SleepingRateLimiter, + rate_limiter: SharingRateLimiter, } impl RateLimitingDialer { - pub fn new(dialer: D, rate_limiter: SleepingRateLimiter) -> Self { + pub fn new(dialer: D, rate_limiter: SharingRateLimiter) -> Self { Self { dialer, rate_limiter, @@ -66,7 +42,7 @@ where let connection = self.dialer.connect(address).await?; let (sender, receiver) = connection.split(); Ok(Splitted( - RateLimitedAsyncRead::new(receiver, RateLimiter::new(self.rate_limiter.clone())), + RateLimitedAsyncRead::new(receiver, RateLimiterImpl::new(self.rate_limiter.clone())), sender, )) } @@ -75,11 +51,11 @@ where /// Implementation of the [Listener] trait governing all returned [Listener::Connection] instances by a rate-limiting wrapper. pub struct RateLimitingListener { listener: L, - rate_limiter: SleepingRateLimiter, + rate_limiter: SharingRateLimiter, } impl RateLimitingListener { - pub fn new(listener: L, rate_limiter: SleepingRateLimiter) -> Self { + pub fn new(listener: L, rate_limiter: SharingRateLimiter) -> Self { Self { listener, rate_limiter, @@ -88,7 +64,10 @@ impl RateLimitingListener { } #[async_trait::async_trait] -impl Listener for RateLimitingListener { +impl Listener for RateLimitingListener +where + L: Listener + Send, +{ type Connection = Splitted< RateLimitedAsyncRead<::Receiver>, ::Sender, @@ -99,7 +78,7 @@ impl Listener for RateLimitingListener { let connection = self.listener.accept().await?; let (sender, receiver) = connection.split(); Ok(Splitted( - RateLimitedAsyncRead::new(receiver, RateLimiter::new(self.rate_limiter.clone())), + RateLimitedAsyncRead::new(receiver, RateLimiterImpl::new(self.rate_limiter.clone())), sender, )) } diff --git a/docker/docker_entrypoint.sh b/docker/docker_entrypoint.sh index 0b098560cc..bb9792554a 100644 --- a/docker/docker_entrypoint.sh +++ b/docker/docker_entrypoint.sh @@ -35,6 +35,8 @@ MAX_RUNTIME_INSTANCES=${MAX_RUNTIME_INSTANCES:-8} BACKUP_PATH=${BACKUP_PATH:-${BASE_PATH}/backup-stash} DATABASE_ENGINE=${DATABASE_ENGINE:-} PRUNING_ENABLED=${PRUNING_ENABLED:-false} +ALEPHBFT_NETWORK_BIT_RATE=${ALEPHBFT_NETWORK_BIT_RATE:-} +SUBSTRATE_NETWORK_BIT_RATE=${SUBSTRATE_NETWORK_BIT_RATE:-} if [[ "true" == "$PURGE_BEFORE_START" ]]; then echo "Purging chain (${CHAIN}) at path ${BASE_PATH}" @@ -141,4 +143,12 @@ if [[ -n "${MAX_SUBSCRIPTIONS_PER_CONNECTION:-}" ]]; then ARGS+=(--rpc-max-subscriptions-per-connection ${MAX_SUBSCRIPTIONS_PER_CONNECTION}) fi +if [[ -n "${ALEPHBFT_NETWORK_BIT_RATE}" ]]; then + ARGS+=(--alephbft-network-bit-rate ${ALEPHBFT_NETWORK_BIT_RATE}) +fi + +if [[ -n "${SUBSTRATE_NETWORK_BIT_RATE}" ]]; then + ARGS+=(--substrate-network-bit-rate ${SUBSTRATE_NETWORK_BIT_RATE}) +fi + echo "${CUSTOM_ARGS}" | xargs aleph-node "${ARGS[@]}" diff --git a/finality-aleph/Cargo.toml b/finality-aleph/Cargo.toml index c7f99077f7..e5349b415c 100644 --- a/finality-aleph/Cargo.toml +++ b/finality-aleph/Cargo.toml @@ -44,6 +44,7 @@ serde = { workspace = true } static_assertions = { workspace = true } tiny-bip39 = { workspace = true } tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] } +libp2p = { workspace = true } substrate-prometheus-endpoint = { workspace = true } diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 73d0fdd7b6..f4af4f7cfe 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -72,7 +72,8 @@ pub use crate::{ justification::AlephJustification, network::{ address_cache::{ValidatorAddressCache, ValidatorAddressingInfo}, - build_network, BuildNetworkOutput, ProtocolNetwork, SubstratePeerId, + build_network, BuildNetworkOutput, ProtocolNetwork, SubstrateNetworkConfig, + SubstratePeerId, }, nodes::run_validator_node, session::SessionPeriod, @@ -255,8 +256,10 @@ type Hasher = abft::HashWrapper; #[derive(Clone)] pub struct RateLimiterConfig { - /// Maximum bit-rate per node in bytes per second of the alephbft validator network. - pub alephbft_bit_rate_per_connection: usize, + /// Maximum bit-rate in bits per second of the alephbft validator network. + pub alephbft_network_bit_rate: u64, + /// Maximum bit-rate in bits per second of the substrate network (shared by sync, gossip, etc.). + pub substrate_network_bit_rate: u64, } pub struct AlephConfig { diff --git a/finality-aleph/src/network/build/base.rs b/finality-aleph/src/network/build/base.rs index 43b70dbc50..8140083e20 100644 --- a/finality-aleph/src/network/build/base.rs +++ b/finality-aleph/src/network/build/base.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use libp2p::{core::StreamMuxer, PeerId, Transport}; use sc_client_api::Backend; use sc_network::{ config::{ @@ -8,6 +9,7 @@ use sc_network::{ }, error::Error as NetworkError, peer_store::PeerStore, + transport::NetworkConfig, NetworkService, NetworkWorker, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; @@ -69,8 +71,9 @@ type BaseNetworkOutput = ( ); /// Create a base network with all the protocols already included. Also spawn (almost) all the necessary services. -pub fn network( +pub fn network( network_config: &NetworkConfiguration, + transport_builder: impl FnOnce(NetworkConfig) -> T, protocol_id: ProtocolId, client: Arc, spawn_handle: &SpawnTaskHandle, @@ -82,6 +85,13 @@ where B::Header: Header, BE: Backend, C: ClientForAleph, + T: Transport + Send + Unpin + 'static, + T::Dial: Send, + T::ListenerUpgrade: Send, + T::Error: Send + Sync, + SM: StreamMuxer + Unpin + Send + 'static, + SM::Substream: Unpin + Send, + SM::Error: Send + Sync, { let mut full_network_config = FullNetworkConfiguration::new(network_config); let genesis_hash = client @@ -135,7 +145,8 @@ where block_announce_config: base_protocol_config, }; - let network_service = NetworkWorker::new(network_params)?; + let network_service = + NetworkWorker::new_with_custom_transport(network_params, transport_builder)?; let network = network_service.service().clone(); spawn_handle.spawn_blocking("network-worker", SPAWN_CATEGORY, network_service.run()); Ok((network, networks, transactions_prototype)) diff --git a/finality-aleph/src/network/build/mod.rs b/finality-aleph/src/network/build/mod.rs index 66f4aee625..250b40d167 100644 --- a/finality-aleph/src/network/build/mod.rs +++ b/finality-aleph/src/network/build/mod.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicBool, Arc}; use log::error; +use rate_limiter::SharingRateLimiter; use sc_client_api::Backend; use sc_network::{ config::{NetworkConfiguration, ProtocolId}, @@ -28,6 +29,7 @@ mod base; mod own_protocols; mod rpc; mod transactions; +mod transport; use base::network as base_network; use own_protocols::Networks; @@ -47,10 +49,17 @@ pub struct NetworkOutput { pub system_rpc_tx: TracingUnboundedSender>, } +pub struct SubstrateNetworkConfig { + /// Maximum bit-rate in bits per second of the substrate network (shared by sync, gossip, etc.). + pub substrate_network_bit_rate: u64, + /// Configuration of the network service. + pub network_config: NetworkConfiguration, +} + /// Start everything necessary to run the inter-node network and return the interfaces for it. /// This includes everything in the base network, the base protocol service, and services for handling transactions and RPCs. pub fn network( - network_config: &NetworkConfiguration, + network_config: SubstrateNetworkConfig, protocol_id: ProtocolId, client: Arc, major_sync: Arc, @@ -72,6 +81,11 @@ where .expect("Genesis block exists."); let (base_protocol_config, events_from_network) = setup_base_protocol::(genesis_hash); + + let network_rate_limit = network_config.substrate_network_bit_rate; + let rate_limiter = SharingRateLimiter::new(network_rate_limit.into()); + let transport_builder = |config| transport::build_transport(rate_limiter, config); + let ( network, Networks { @@ -80,7 +94,8 @@ where }, transaction_prototype, ) = base_network( - network_config, + &network_config.network_config, + transport_builder, protocol_id, client.clone(), spawn_handle, @@ -91,7 +106,7 @@ where let (base_service, syncing_service) = BaseProtocolService::new( major_sync, genesis_hash, - network_config, + &network_config.network_config, protocol_names, network.clone(), events_from_network, diff --git a/finality-aleph/src/network/build/transport.rs b/finality-aleph/src/network/build/transport.rs new file mode 100644 index 0000000000..4ff1c64317 --- /dev/null +++ b/finality-aleph/src/network/build/transport.rs @@ -0,0 +1,104 @@ +use libp2p::{core::muxing::StreamMuxer, PeerId, Transport}; +use rate_limiter::{FuturesRateLimitedAsyncReadWrite, FuturesRateLimiter, SharingRateLimiter}; + +struct RateLimitedStreamMuxer { + rate_limiter: SharingRateLimiter, + stream_muxer: SM, +} + +impl RateLimitedStreamMuxer { + pub fn new(stream_muxer: SM, rate_limiter: SharingRateLimiter) -> Self { + Self { + rate_limiter, + stream_muxer, + } + } + + fn inner(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut SM> + where + SM: Unpin, + { + let this = self.get_mut(); + std::pin::Pin::new(&mut this.stream_muxer) + } +} + +impl StreamMuxer for RateLimitedStreamMuxer +where + SM: StreamMuxer + Unpin, + SM::Substream: Unpin, +{ + type Substream = FuturesRateLimitedAsyncReadWrite; + + type Error = SM::Error; + + fn poll_inbound( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let rate_limiter = self.rate_limiter.clone(); + self.inner().poll_inbound(cx).map(|result| { + result.map(|substream| { + FuturesRateLimitedAsyncReadWrite::new( + substream, + FuturesRateLimiter::new(rate_limiter), + ) + }) + }) + } + + fn poll_outbound( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let rate_limiter = self.rate_limiter.clone(); + self.inner().poll_outbound(cx).map(|result| { + result.map(|substream| { + FuturesRateLimitedAsyncReadWrite::new( + substream, + FuturesRateLimiter::new(rate_limiter), + ) + }) + }) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner().poll_close(cx) + } + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner().poll(cx) + } +} + +pub fn build_transport( + rate_limiter: SharingRateLimiter, + config: sc_network::transport::NetworkConfig, +) -> impl Transport< + Output = ( + PeerId, + impl StreamMuxer + Send, + ), + Dial = impl Send, + ListenerUpgrade = impl Send, + Error = impl Send, +> + Send { + sc_network::transport::build_transport( + config.keypair, + config.memory_only, + config.muxer_window_size, + config.muxer_maximum_buffer_size, + ) + .map(move |(peer_id, stream_muxer), _| { + ( + peer_id, + RateLimitedStreamMuxer::new(stream_muxer, rate_limiter), + ) + }) +} diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index b4c35ff5ce..991f77d9df 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -4,7 +4,9 @@ use std::{ hash::Hash, }; -pub use build::{network as build_network, NetworkOutput as BuildNetworkOutput}; +pub use build::{ + network as build_network, NetworkOutput as BuildNetworkOutput, SubstrateNetworkConfig, +}; use network_clique::{AddressingInformation, NetworkIdentity, PeerId}; use parity_scale_codec::Codec; pub use substrate::{PeerId as SubstratePeerId, ProtocolNetwork}; diff --git a/finality-aleph/src/network/substrate.rs b/finality-aleph/src/network/substrate.rs index ac3ab1db42..5569d5c6ea 100644 --- a/finality-aleph/src/network/substrate.rs +++ b/finality-aleph/src/network/substrate.rs @@ -1,4 +1,5 @@ use std::{ + borrow::{Borrow, BorrowMut}, collections::HashSet, fmt::{Debug, Display, Error as FmtError, Formatter}, }; @@ -18,17 +19,31 @@ use crate::{ STATUS_REPORT_INTERVAL, }; +pub type BoxedNotificationService = Box; + /// A thin wrapper around sc_network::config::NotificationService that stores a list /// of all currently connected peers, and introduces a few convenience methods to /// allow broadcasting messages and sending data to random peers. pub struct ProtocolNetwork { - service: Box, + service: BoxedNotificationService, connected_peers: HashSet, last_status_report: time::Instant, } +impl Borrow for ProtocolNetwork { + fn borrow(&self) -> &BoxedNotificationService { + &self.service + } +} + +impl BorrowMut for ProtocolNetwork { + fn borrow_mut(&mut self) -> &mut BoxedNotificationService { + &mut self.service + } +} + impl ProtocolNetwork { - pub fn new(service: Box) -> Self { + pub fn new(service: BoxedNotificationService) -> Self { Self { service, connected_peers: HashSet::new(), diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index c4a1142aeb..1e6b8eac7f 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -6,7 +6,7 @@ use log::{debug, error}; use network_clique::{RateLimitingDialer, RateLimitingListener, Service, SpawnHandleT}; use pallet_aleph_runtime_api::AlephSessionApi; use primitives::TransactionHash; -use rate_limiter::SleepingRateLimiter; +use rate_limiter::SharingRateLimiter; use sc_client_api::Backend; use sc_keystore::{Keystore, LocalKeystore}; use sc_transaction_pool_api::TransactionPool; @@ -97,7 +97,7 @@ where debug!( target: LOG_TARGET, "Initializing rate-limiter for the validator-network with {} byte(s) per second.", - rate_limiter_config.alephbft_bit_rate_per_connection + rate_limiter_config.alephbft_network_bit_rate ); let (dialer, listener, network_identity) = new_tcp_network( @@ -109,7 +109,7 @@ where .expect("we should have working networking"); let alephbft_rate_limiter = - SleepingRateLimiter::new(rate_limiter_config.alephbft_bit_rate_per_connection); + SharingRateLimiter::new(rate_limiter_config.alephbft_network_bit_rate.into()); let dialer = RateLimitingDialer::new(dialer, alephbft_rate_limiter.clone()); let listener = RateLimitingListener::new(listener, alephbft_rate_limiter); diff --git a/rate-limiter/Cargo.toml b/rate-limiter/Cargo.toml index 2fe3156b48..e77dd3d5c5 100644 --- a/rate-limiter/Cargo.toml +++ b/rate-limiter/Cargo.toml @@ -11,4 +11,8 @@ repository.workspace = true [dependencies] futures = { workspace = true } log = { workspace = true } -tokio = { workspace = true, features = ["time"] } +tokio = { workspace = true, features = ["time", "sync", "macros", "rt-multi-thread"] } + +[dev-dependencies] +parking_lot = { workspace = true } +rand = { workspace = true, features = ["std", "std_rng"] } diff --git a/rate-limiter/src/lib.rs b/rate-limiter/src/lib.rs index ed8a548347..452ef90cdb 100644 --- a/rate-limiter/src/lib.rs +++ b/rate-limiter/src/lib.rs @@ -1,6 +1,172 @@ mod rate_limiter; mod token_bucket; -pub use crate::rate_limiter::{RateLimiter, SleepingRateLimiter}; +use std::num::{NonZeroU64, TryFromIntError}; + +pub use rate_limiter::RateLimiterImpl; +use tokio::io::AsyncRead; + +pub use crate::{ + rate_limiter::{FuturesRateLimiter, SharingRateLimiter}, + token_bucket::SharedTokenBucket, +}; const LOG_TARGET: &str = "rate-limiter"; + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct NonZeroRatePerSecond(NonZeroU64); + +pub const MIN: NonZeroRatePerSecond = NonZeroRatePerSecond(NonZeroU64::MIN); + +impl From for NonZeroU64 { + fn from(NonZeroRatePerSecond(value): NonZeroRatePerSecond) -> Self { + value + } +} + +impl From for u64 { + fn from(NonZeroRatePerSecond(value): NonZeroRatePerSecond) -> Self { + value.into() + } +} + +impl From for NonZeroRatePerSecond { + fn from(value: NonZeroU64) -> Self { + NonZeroRatePerSecond(value) + } +} + +impl TryFrom for NonZeroRatePerSecond { + type Error = TryFromIntError; + + fn try_from(value: u64) -> Result { + Ok(NonZeroRatePerSecond(value.try_into()?)) + } +} + +#[derive(PartialEq, Eq)] +pub enum RatePerSecond { + Block, + Rate(NonZeroRatePerSecond), +} + +impl From for u64 { + fn from(value: RatePerSecond) -> Self { + match value { + RatePerSecond::Block => 0, + RatePerSecond::Rate(NonZeroRatePerSecond(value)) => value.into(), + } + } +} + +impl From for RatePerSecond { + fn from(value: u64) -> Self { + NonZeroU64::try_from(value) + .map(NonZeroRatePerSecond::from) + .map(Self::Rate) + .unwrap_or(Self::Block) + } +} + +impl From for RatePerSecond { + fn from(value: NonZeroRatePerSecond) -> Self { + RatePerSecond::Rate(value) + } +} + +pub struct RateLimitedAsyncRead { + rate_limiter: RateLimiterImpl, + inner: Read, +} + +impl RateLimitedAsyncRead { + pub fn new(read: Read, rate_limiter: RateLimiterImpl) -> Self { + Self { + rate_limiter, + inner: read, + } + } + + pub fn inner(&self) -> &Read { + &self.inner + } +} + +impl AsyncRead for RateLimitedAsyncRead +where + Read: AsyncRead + Unpin, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let read = std::pin::Pin::new(&mut this.inner); + this.rate_limiter.rate_limit(read, cx, buf) + } +} + +pub struct FuturesRateLimitedAsyncReadWrite { + rate_limiter: FuturesRateLimiter, + inner: ReadWrite, +} + +impl FuturesRateLimitedAsyncReadWrite { + pub fn new(wrapped: ReadWrite, rate_limiter: FuturesRateLimiter) -> Self { + Self { + rate_limiter, + inner: wrapped, + } + } + + fn get_inner(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut ReadWrite> + where + ReadWrite: Unpin, + { + let this = self.get_mut(); + std::pin::Pin::new(&mut this.inner) + } +} + +impl futures::AsyncRead for FuturesRateLimitedAsyncReadWrite +where + Read: futures::AsyncRead + Unpin, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let this = self.get_mut(); + let read = std::pin::Pin::new(&mut this.inner); + this.rate_limiter.rate_limit(read, cx, buf) + } +} + +impl futures::AsyncWrite for FuturesRateLimitedAsyncReadWrite +where + Write: futures::AsyncWrite + Unpin, +{ + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.get_inner().poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_inner().poll_flush(cx) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_inner().poll_close(cx) + } +} diff --git a/rate-limiter/src/rate_limiter.rs b/rate-limiter/src/rate_limiter.rs index 41cf85bbd4..e852f8421d 100644 --- a/rate-limiter/src/rate_limiter.rs +++ b/rate-limiter/src/rate_limiter.rs @@ -1,92 +1,121 @@ -use std::time::Instant; +use std::{task::ready, time::Instant}; -use futures::{future::BoxFuture, FutureExt}; -use log::trace; -use tokio::{io::AsyncRead, time::sleep}; +use futures::{ + future::{pending, BoxFuture}, + FutureExt, +}; +use tokio::io::AsyncRead; -use crate::{token_bucket::TokenBucket, LOG_TARGET}; +use crate::{token_bucket::SharedTokenBucket, RatePerSecond}; -/// Allows to limit access to some resource. Given a preferred rate (units of something) and last used amount of units of some -/// resource, it calculates how long we should delay our next access to that resource in order to satisfy that rate. -pub struct SleepingRateLimiter { - rate_limiter: TokenBucket, +pub type SharingRateLimiter = RateLimiterFacade; + +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum Deadline { + Never, + Instant(Instant), } -impl Clone for SleepingRateLimiter { - fn clone(&self) -> Self { - Self { - rate_limiter: self.rate_limiter.clone(), +impl From for Option { + fn from(value: Deadline) -> Self { + match value { + Deadline::Never => None, + Deadline::Instant(value) => Some(value), } } } -impl SleepingRateLimiter { - /// Constructs a instance of [SleepingRateLimiter] with given target rate-per-second. - pub fn new(rate_per_second: usize) -> Self { +/// Wrapper around [RateLimiterFacade] to simplify implementation of the [AsyncRead](tokio::io::AsyncRead) trait. +pub struct RateLimiterImpl { + rate_limiter: BoxFuture<'static, RateLimiterFacade>, +} + +impl RateLimiterImpl { + /// Constructs an instance of [RateLimiterImpl] that uses already configured rate-limiting access governor + /// ([RateLimiterFacade]). + pub fn new(rate_limiter: RateLimiterFacade) -> Self { Self { - rate_limiter: TokenBucket::new(rate_per_second), + rate_limiter: Box::pin(rate_limiter.rate_limit(0)), } } - /// Given `read_size`, that is an amount of units of some governed resource, delays return of `Self` to satisfy configure - /// rate. - pub async fn rate_limit(mut self, read_size: usize) -> Self { - trace!( - target: LOG_TARGET, - "Rate-Limiter attempting to read {}.", - read_size - ); - - let now = Instant::now(); - let delay = self.rate_limiter.rate_limit(read_size, now); - - if let Some(delay) = delay { - trace!( - target: LOG_TARGET, - "Rate-Limiter will sleep {:?} after reading {} byte(s).", - delay, - read_size - ); - sleep(delay).await; - } + /// Helper method for the use of the [AsyncRead](tokio::io::AsyncRead) implementation. + pub fn rate_limit( + &mut self, + read: std::pin::Pin<&mut Read>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let sleeping_rate_limiter = ready!(self.rate_limiter.poll_unpin(cx)); + + let filled_before = buf.filled().len(); + let result = read.poll_read(cx, buf); + let filled_after: &[u8] = buf.filled(); + let filled_after = 8 * filled_after.len(); + let last_read_size = filled_after.saturating_sub(filled_before); + + self.rate_limiter = sleeping_rate_limiter.rate_limit(last_read_size).boxed(); - self + result } } -/// Wrapper around [SleepingRateLimiter] to simplify implementation of the [AsyncRead](tokio::io::AsyncRead) trait. -pub struct RateLimiter { - rate_limiter: BoxFuture<'static, SleepingRateLimiter>, +pub struct FuturesRateLimiter { + rate_limiter: BoxFuture<'static, RateLimiterFacade>, } -impl RateLimiter { +impl FuturesRateLimiter { /// Constructs an instance of [RateLimiter] that uses already configured rate-limiting access governor /// ([SleepingRateLimiter]). - pub fn new(rate_limiter: SleepingRateLimiter) -> Self { + pub fn new(rate_limiter: RateLimiterFacade) -> Self { Self { rate_limiter: Box::pin(rate_limiter.rate_limit(0)), } } - /// Helper method for the use of the [AsyncRead](tokio::io::AsyncRead) implementation. - pub fn rate_limit( + /// Helper method for the use of the [AsyncRead](futures::AsyncRead) implementation. + pub fn rate_limit( &mut self, read: std::pin::Pin<&mut Read>, cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - let sleeping_rate_limiter = match self.rate_limiter.poll_unpin(cx) { - std::task::Poll::Ready(rate_limiter) => rate_limiter, - _ => return std::task::Poll::Pending, - }; + buf: &mut [u8], + ) -> std::task::Poll> { + let sleeping_rate_limiter = ready!(self.rate_limiter.poll_unpin(cx)); - let filled_before = buf.filled().len(); let result = read.poll_read(cx, buf); - let filled_after = buf.filled().len(); - let last_read_size = filled_after.saturating_sub(filled_before); + let last_read_size = match &result { + std::task::Poll::Ready(Ok(read_size)) => 8 * *read_size, + _ => 0, + }; self.rate_limiter = sleeping_rate_limiter.rate_limit(last_read_size).boxed(); result } } + +#[derive(Clone)] +pub enum RateLimiterFacade { + NoTraffic, + RateLimiter(SharedTokenBucket), +} + +impl RateLimiterFacade { + pub fn new(rate: RatePerSecond) -> Self { + match rate { + RatePerSecond::Block => Self::NoTraffic, + RatePerSecond::Rate(rate) => Self::RateLimiter(SharedTokenBucket::new(rate)), + } + } + + async fn rate_limit(self, read_size: usize) -> Self { + match self { + RateLimiterFacade::NoTraffic => pending().await, + RateLimiterFacade::RateLimiter(rate_limiter) => RateLimiterFacade::RateLimiter( + rate_limiter + .rate_limit(read_size.try_into().unwrap_or(u64::MAX)) + .await, + ), + } + } +} diff --git a/rate-limiter/src/token_bucket.rs b/rate-limiter/src/token_bucket.rs index bfcba1d353..1555ca30eb 100644 --- a/rate-limiter/src/token_bucket.rs +++ b/rate-limiter/src/token_bucket.rs @@ -1,211 +1,1178 @@ use std::{ cmp::min, + num::NonZeroU64, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::{Duration, Instant}, }; +use futures::{future::pending, Future, FutureExt}; use log::trace; +use tokio::time::sleep; -use crate::LOG_TARGET; +use crate::{NonZeroRatePerSecond, LOG_TARGET, MIN}; -/// Implementation of the `Token Bucket` algorithm for the purpose of rate-limiting access to some abstract resource. -#[derive(Clone, Debug)] -pub struct TokenBucket { - rate_per_second: usize, - available: usize, - requested: usize, +pub trait TimeProvider { + fn now(&self) -> Instant; +} + +#[derive(Clone, Default)] +pub struct DefaultTimeProvider; + +impl TimeProvider for DefaultTimeProvider { + fn now(&self) -> Instant { + Instant::now() + } +} + +/// Implementation of some sleeping mechanism that doesn't block runtime's executor thread. +/// Implementations should be cancellation-safe. +pub trait SleepUntil { + fn sleep_until(&mut self, instant: Instant) -> impl Future + Send; +} + +#[derive(Clone, Default)] +pub struct TokioSleepUntil; + +impl SleepUntil for TokioSleepUntil { + async fn sleep_until(&mut self, instant: Instant) { + tokio::time::sleep_until(instant.into()).await; + } +} + +/// Implementation of the `Token Bucket` algorithm for the purpose of rate-limiting access to some abstract resource, e.g. an incoming network traffic. +#[derive(Clone)] +struct TokenBucket { last_update: Instant, + rate_per_second: NonZeroU64, + requested: u64, + time_provider: T, +} + +impl std::fmt::Debug for TokenBucket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TokenBucket") + .field("last_update", &self.last_update) + .field("rate_per_second", &self.rate_per_second) + .field("requested", &self.requested) + .finish() + } } impl TokenBucket { - /// Constructs a instance of [TokenBucket] with given target rate-per-second. - pub fn new(rate_per_second: usize) -> Self { - Self { - rate_per_second, - available: rate_per_second, - requested: 0, - last_update: Instant::now(), - } + /// Constructs a instance of [`TokenBucket`] with given target rate-per-second. + pub fn new(rate_per_second: NonZeroRatePerSecond) -> Self { + let time_provider = DefaultTimeProvider; + Self::new_internal(rate_per_second, time_provider) } +} - #[cfg(test)] - pub fn new_with_now(rate_per_second: usize, now: Instant) -> Self { +impl TokenBucket +where + TP: TimeProvider, +{ + fn new_internal(rate_per_second: NonZeroRatePerSecond, time_provider: TP) -> Self { + let now = time_provider.now(); Self { + time_provider, last_update: now, - ..Self::new(rate_per_second) + rate_per_second: rate_per_second.into(), + requested: NonZeroU64::from(rate_per_second).into(), } } - fn calculate_delay(&self) -> Duration { - let delay_micros = (self.requested - self.available) + fn max_possible_available_tokens(&self) -> u64 { + self.rate_per_second.into() + } + + fn available(&self) -> Option { + (self.requested <= self.max_possible_available_tokens()) + .then(|| self.max_possible_available_tokens() - self.requested) + } + + fn account_requested_tokens(&mut self, requested: u64) { + self.requested = self.requested.saturating_add(requested); + } + + fn calculate_delay(&self) -> Option { + if self.available().is_some() { + return None; + } + + let scheduled_for_later = self.requested - self.max_possible_available_tokens(); + let delay_micros = scheduled_for_later .saturating_mul(1_000_000) - .saturating_div(self.rate_per_second); - Duration::from_micros(delay_micros.try_into().unwrap_or(u64::MAX)) + .saturating_div(self.rate_per_second.into()); + + Some(self.last_update + Duration::from_micros(delay_micros)) } - fn update_units(&mut self, now: Instant) -> usize { + fn update_tokens(&mut self) { + let now = self.time_provider.now(); + assert!( + now >= self.last_update, + "Provided value for `now` should be at least equal to `self.last_update`: now = {:#?} self.last_update = {:#?}.", + now, + self.last_update + ); + let time_since_last_update = now.duration_since(self.last_update); + self.last_update = now; let new_units = time_since_last_update .as_micros() - .saturating_mul(self.rate_per_second as u128) + .saturating_mul(u64::from(self.rate_per_second).into()) .saturating_div(1_000_000) .try_into() - .unwrap_or(usize::MAX); - self.available = self.available.saturating_add(new_units); - self.last_update = now; + .unwrap_or(u64::MAX); + self.requested = self.requested.saturating_sub(new_units); + } + + /// Get current rate in bits per second. + pub fn rate(&self) -> NonZeroRatePerSecond { + self.rate_per_second.into() + } - let used = min(self.available, self.requested); - self.available -= used; - self.requested -= used; - self.available = min(self.available, self.token_limit()); - self.available + /// Set a rate in bits per second. + pub fn set_rate(&mut self, rate_per_second: NonZeroRatePerSecond) { + self.update_tokens(); + let available = self.available(); + let previous_rate_per_second = self.rate_per_second.get(); + self.rate_per_second = rate_per_second.into(); + if available.is_some() { + let max_for_available = self.max_possible_available_tokens(); + let available_after_rate_update = min(available.unwrap_or(0), max_for_available); + self.requested = self.rate_per_second.get() - available_after_rate_update; + } else { + self.requested = self.requested - previous_rate_per_second + self.rate_per_second.get(); + } } - /// Calculates [Duration](time::Duration) by which we should delay next call to some governed resource in order to satisfy - /// configured rate limit. - pub fn rate_limit(&mut self, requested: usize, now: Instant) -> Option { + /// Calculates amount of time by which we should delay next call to some governed resource in order to satisfy + /// specified rate limit. + pub fn rate_limit(&mut self, requested: u64) -> Option { trace!( target: LOG_TARGET, - "TokenBucket called for {} of requested bytes. Internal state: {:?}.", - requested, - self + "TokenBucket called for {requested} of requested bytes. Internal state: {self:?}.", ); - if self.requested > 0 || self.available < requested { - assert!( - now >= self.last_update, - "Provided value for `now` should be at least equal to `self.last_update`: now = {:#?} self.last_update = {:#?}.", - now, - self.last_update - ); - if self.update_units(now) < requested { - self.requested = self.requested.saturating_add(requested); - let required_delay = self.calculate_delay(); - return Some(required_delay); + let now_available = self.available().unwrap_or(0); + if now_available < requested { + self.update_tokens() + } + self.account_requested_tokens(requested); + let delay = self.calculate_delay(); + trace!( + target: LOG_TARGET, + "TokenBucket calculated delay after receiving a request of {requested}: {delay:?}.", + ); + delay + } +} + +const BANDWIDTH_CHECK_INTERVAL: Duration = Duration::from_millis(250); + +/// Implementation of the bandwidth sharing strategy that attempts to assign equal portion of the total bandwidth to all active +/// consumers of the bandwidth. +pub struct SharedBandwidthManager { + max_rate: NonZeroRatePerSecond, + peers_count: Arc, + already_requested: Option, +} + +impl Clone for SharedBandwidthManager { + fn clone(&self) -> Self { + Self { + max_rate: self.max_rate, + peers_count: self.peers_count.clone(), + already_requested: None, + } + } +} + +impl SharedBandwidthManager { + pub fn new(max_rate: NonZeroRatePerSecond) -> Self { + Self { + max_rate, + peers_count: Arc::new(AtomicU64::new(0)), + already_requested: None, + } + } + + fn calculate_bandwidth_without_children_increament( + &mut self, + active_children: Option, + ) -> NonZeroRatePerSecond { + let active_children = + active_children.unwrap_or_else(|| self.peers_count.load(Ordering::SeqCst)); + let rate = u64::from(self.max_rate) / active_children; + NonZeroU64::try_from(rate) + .map(NonZeroRatePerSecond::from) + .unwrap_or(MIN) + } + + pub fn request_bandwidth(&mut self) -> NonZeroRatePerSecond { + let active_children = (self.already_requested.is_none()) + .then(|| 1 + self.peers_count.fetch_add(1, Ordering::SeqCst)); + let rate = self.calculate_bandwidth_without_children_increament(active_children); + self.already_requested = Some(rate); + rate + } + + pub fn notify_idle(&mut self) { + if self.already_requested.take().is_some() { + self.peers_count.fetch_sub(1, Ordering::SeqCst); + } + } + + pub async fn bandwidth_changed(&mut self) -> NonZeroRatePerSecond { + let Some(previous_rate) = self.already_requested else { + return pending().await; + }; + let mut rate = self.calculate_bandwidth_without_children_increament(None); + while rate == previous_rate { + sleep(BANDWIDTH_CHECK_INTERVAL).await; + rate = self.calculate_bandwidth_without_children_increament(None); + } + self.already_requested = Some(rate); + rate + } +} + +/// Wrapper around the [TokenBucket] that allows conveniently manage its internal token-rate and allows to idle/sleep in order +/// to fulfill its rate-limit. +#[derive(Clone)] +struct AsyncTokenBucket { + token_bucket: TokenBucket, + next_deadline: Option, + sleep_until: SU, +} + +impl AsyncTokenBucket { + pub fn new(token_bucket: TokenBucket, sleep_until: SU) -> Self { + Self { + token_bucket, + next_deadline: None, + sleep_until, + } + } +} + +impl AsyncTokenBucket +where + TP: TimeProvider, +{ + pub fn rate_limit(&mut self, requested: u64) { + self.next_deadline = TokenBucket::rate_limit(&mut self.token_bucket, requested); + } + + pub fn set_rate(&mut self, rate: NonZeroRatePerSecond) { + if self.token_bucket.rate() != rate { + self.token_bucket.set_rate(rate); + self.next_deadline = self.token_bucket.rate_limit(0); + } + } + + pub async fn wait(&mut self) + where + TP: TimeProvider + Send, + SU: SleepUntil + Send, + { + if let Some(deadline) = self.next_deadline { + self.sleep_until.sleep_until(deadline).await; + self.next_deadline = None; + } + } +} + +/// Allows to share a given amount of bandwidth between multiple instances of [TokenBucket]. Each time an instance requests to +/// share the bandwidth, it is given a fair share o it, i.e. all available bandwidth / # of active instances. All active +/// instances are actively polling (with some predefined interval) for changes in their allocated bandwidth. Alternatively, we +/// could devise a method where on each new request for bandwidth, we actively query every active instance to confirm a change +/// before we allocate it for a new peer. This would provide more accurate bandwidth allocation but with a huge disadvantage for +/// performance. We believe, current solution is a good choice between accuracy and performance. In worst case, utilized +/// bandwidth should be equal to `bandwidth * (1 + 1/2 + ... + 1/n) ≈ bandwidth * (ln n + O(1))`. This can happen when each +/// instance of [TokenBucket] requests slightly more data than its initially acquired bandwidth (equal to `bandwidth / (# of all +/// instances before it + 1)`), small enough so none of them receives a notification about ongoing bandwidth change (calls from +/// next instances). +#[derive(Clone)] +pub struct SharedTokenBucket { + shared_bandwidth: SharedBandwidthManager, + rate_limiter: AsyncTokenBucket, + need_to_notify_parent: bool, +} + +impl SharedTokenBucket { + pub fn new(rate: NonZeroRatePerSecond) -> Self { + let token_bucket = TokenBucket::new(rate); + let sleep_until = TokioSleepUntil; + let rate_limiter = AsyncTokenBucket::new(token_bucket, sleep_until); + Self::new_internal(rate, rate_limiter) + } +} + +impl SharedTokenBucket { + fn new_internal(rate: NonZeroRatePerSecond, rate_limiter: AsyncTokenBucket) -> Self { + Self { + shared_bandwidth: SharedBandwidthManager::new(rate), + rate_limiter, + need_to_notify_parent: false, + } + } + + fn request_bandwidth(&mut self) -> NonZeroRatePerSecond { + self.need_to_notify_parent = true; + self.shared_bandwidth.request_bandwidth() + } + + fn notify_idle(&mut self) { + if self.need_to_notify_parent { + self.shared_bandwidth.notify_idle(); + self.need_to_notify_parent = false; + } + } + + pub async fn rate_limit(mut self, requested: u64) -> Self + where + TP: TimeProvider + Send, + SU: SleepUntil + Send, + { + let rate = self.request_bandwidth(); + self.rate_limiter.set_rate(rate); + + self.rate_limiter.rate_limit(requested); + + loop { + futures::select! { + _ = self.rate_limiter.wait().fuse() => { + self.notify_idle(); + return self; + }, + rate = self.shared_bandwidth.bandwidth_changed().fuse() => { + self.rate_limiter.set_rate(rate); + }, } } - self.available -= requested; - self.available = min(self.available, self.token_limit()); - None } +} - fn token_limit(&self) -> usize { - self.rate_per_second +impl Drop for SharedTokenBucket { + fn drop(&mut self) { + self.notify_idle(); } } #[cfg(test)] mod tests { - use std::time::{Duration, Instant}; + use std::{ + cmp::{max, min}, + iter::repeat, + sync::Arc, + task::Poll, + thread, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + }; + + use futures::{ + future::{pending, poll_fn, BoxFuture, Future}, + pin_mut, + stream::FuturesOrdered, + StreamExt, + }; + use parking_lot::Mutex; - use super::TokenBucket; + use super::{SharedBandwidthManager, SleepUntil, TimeProvider, TokenBucket}; + use crate::token_bucket::{AsyncTokenBucket, NonZeroRatePerSecond, SharedTokenBucket}; - #[test] - fn token_bucket_sanity_check() { - let limit_per_second = 10; + impl TimeProvider for F + where + F: Fn() -> Instant, + { + fn now(&self) -> Instant { + self() + } + } + + impl TimeProvider for Arc> { + fn now(&self) -> Instant { + self.as_ref().now() + } + } + + #[tokio::test] + async fn basic_checks_of_shared_bandwidth_manager() { + let rate = 10.try_into().expect("10 > 0 qed"); + let mut bandwidth_share = SharedBandwidthManager::new(rate); + let mut cloned_bandwidth_share = bandwidth_share.clone(); + let mut another_cloned_bandwidth_share = cloned_bandwidth_share.clone(); + + // only one consumer, so it should get whole bandwidth + assert_eq!(bandwidth_share.request_bandwidth(), rate); + + // since other instances did not request for bandwidth, they should not receive notification that it has changed + let poll_result = poll_fn(|cx| { + let future = cloned_bandwidth_share.bandwidth_changed(); + pin_mut!(future); + Poll::Ready(Future::poll(future, cx)) + }) + .await; + assert_eq!(poll_result, Poll::Pending); + + let poll_result = poll_fn(|cx| { + let future = another_cloned_bandwidth_share.bandwidth_changed(); + pin_mut!(future); + Poll::Ready(Future::poll(future, cx)) + }) + .await; + assert_eq!(poll_result, Poll::Pending); + + // two consumers should equally divide the bandwidth + let rate = 5.try_into().expect("5 > 0 qed"); + assert_eq!(cloned_bandwidth_share.request_bandwidth(), rate); + assert_eq!(bandwidth_share.bandwidth_changed().await, rate); + + // similarly when there are three of them + let bandwidth: u64 = another_cloned_bandwidth_share.request_bandwidth().into(); + let another_bandwidth: u64 = bandwidth_share.bandwidth_changed().await.into(); + let yet_another_bandwidth: u64 = cloned_bandwidth_share.bandwidth_changed().await.into(); + + assert!((3..4).contains(&bandwidth)); + assert!((3..4).contains(&another_bandwidth)); + assert!((3..4).contains(&yet_another_bandwidth)); + + assert!((9..10).contains(&(bandwidth + another_bandwidth + yet_another_bandwidth))); + + // all consumers should be notified after one of them become idle + let rate = 5.try_into().expect("5 > 0 qed"); + another_cloned_bandwidth_share.notify_idle(); + assert_eq!(cloned_bandwidth_share.bandwidth_changed().await, rate); + assert_eq!(bandwidth_share.bandwidth_changed().await, rate); + } + + /// Allows to treat [TokenBucket] and [SharedTokenBucket] in similar fashion in our tests. + trait RateLimiter: Sized { + async fn rate_limit(self, requested: u64) -> (Self, Option); + } + + impl RateLimiter for TokenBucket + where + TP: TimeProvider, + { + async fn rate_limit(mut self, requested: u64) -> (Self, Option) { + let delay = TokenBucket::rate_limit(&mut self, requested); + (self, delay) + } + } + + type TracingRateLimiter = SharedTokenBucket; + + impl RateLimiter for TracingRateLimiter + where + TP: TimeProvider + Send, + { + async fn rate_limit(mut self, requested: u64) -> (Self, Option) { + let last_sleep_deadline = self.rate_limiter.sleep_until.last_deadline.clone(); + let time_before = *last_sleep_deadline.lock(); + self = self.rate_limit(requested).await; + let time_after = *last_sleep_deadline.lock(); + ( + self, + (time_before != time_after).then_some(time_after).flatten(), + ) + } + } + + impl From<(NonZeroRatePerSecond, TP, SU)> for TokenBucket + where + TP: TimeProvider, + { + fn from((rate, time_provider, _): (NonZeroRatePerSecond, TP, SU)) -> Self { + TokenBucket::new_internal(rate, time_provider) + } + } + + impl From<(NonZeroRatePerSecond, TP, SU)> for SharedTokenBucket + where + TP: TimeProvider, + { + fn from((rate, time_provider, sleep_until): (NonZeroRatePerSecond, TP, SU)) -> Self { + let token_bucket = TokenBucket::new_internal(rate, time_provider); + let rate_limiter = AsyncTokenBucket::new(token_bucket, sleep_until); + Self::new_internal(rate, rate_limiter) + } + } + + #[derive(Clone)] + struct SharedTracingSleepUntil { + pub last_deadline: Arc>>, + } + + impl SharedTracingSleepUntil { + pub fn new() -> Self { + Self { + last_deadline: Arc::new(Mutex::new(None)), + } + } + } + + impl SleepUntil for SharedTracingSleepUntil { + async fn sleep_until(&mut self, instant: Instant) { + let mut last_instant = self.last_deadline.lock(); + *last_instant = max(*last_instant, Some(instant)); + } + } + + #[tokio::test] + async fn rate_limiter_sanity_check() { + token_bucket_sanity_check_test::>().await; + token_bucket_sanity_check_test::>().await + } + + async fn token_bucket_sanity_check_test() + where + RL: RateLimiter + + From<( + NonZeroRatePerSecond, + Arc>, + SharedTracingSleepUntil, + )>, + { + let limit_per_second = 10.try_into().expect("10 > 0 qed"); let now = Instant::now(); - let mut rate_limiter = TokenBucket::new_with_now(limit_per_second, now); + let time_to_return = Arc::new(parking_lot::RwLock::new(now)); + let time_provider = time_to_return.clone(); + let time_provider: Box = + Box::new(move || *time_provider.read()); + let rate_limiter = RL::from(( + limit_per_second, + Arc::new(time_provider), + SharedTracingSleepUntil::new(), + )); - assert_eq!( - rate_limiter.rate_limit(9, now + Duration::from_secs(1)), - None - ); + *time_to_return.write() = now + Duration::from_secs(1); + let (rate_limiter, deadline) = rate_limiter.rate_limit(9).await; + assert!(deadline.is_none()); - assert!(rate_limiter - .rate_limit(12, now + Duration::from_secs(1)) - .is_some()); + *time_to_return.write() = now + Duration::from_secs(1); + let (rate_limiter, deadline) = rate_limiter.rate_limit(12).await; + assert!(deadline.is_some()); - assert_eq!( - rate_limiter.rate_limit(8, now + Duration::from_secs(3)), - None - ); + *time_to_return.write() = now + Duration::from_secs(3); + let (_, deadline) = rate_limiter.rate_limit(8).await; + assert!(deadline.is_none()); + } + + #[tokio::test] + async fn no_slowdown_while_within_rate_limit() { + no_slowdown_while_within_rate_limit_test::>().await; + no_slowdown_while_within_rate_limit_test::>().await; } - #[test] - fn no_slowdown_while_within_rate_limit() { - let limit_per_second = 10; + async fn no_slowdown_while_within_rate_limit_test() + where + RL: RateLimiter + + From<( + NonZeroRatePerSecond, + Arc>, + SharedTracingSleepUntil, + )>, + { + let limit_per_second = 10.try_into().expect("10 > 0 qed"); let now = Instant::now(); - let mut rate_limiter = TokenBucket::new_with_now(limit_per_second, now); + let time_to_return = Arc::new(parking_lot::RwLock::new(now)); + let time_provider = time_to_return.clone(); + let time_provider: Box = + Box::new(move || *time_provider.read()); + let sleep_until = SharedTracingSleepUntil::new(); + let rate_limiter = RL::from((limit_per_second, Arc::new(time_provider), sleep_until)); - assert_eq!( - rate_limiter.rate_limit(9, now + Duration::from_secs(1)), - None - ); - assert_eq!( - rate_limiter.rate_limit(5, now + Duration::from_secs(2)), - None - ); - assert_eq!( - rate_limiter.rate_limit(1, now + Duration::from_secs(3)), - None - ); - assert_eq!( - rate_limiter.rate_limit(9, now + Duration::from_secs(3)), - None - ); + *time_to_return.write() = now + Duration::from_secs(1); + let (rate_limiter, deadline) = rate_limiter.rate_limit(9).await; + assert_eq!(deadline, None); + + *time_to_return.write() = now + Duration::from_secs(2); + let (rate_limiter, deadline) = rate_limiter.rate_limit(5).await; + assert_eq!(deadline, None); + + *time_to_return.write() = now + Duration::from_secs(3); + let (rate_limiter, deadline) = rate_limiter.rate_limit(1).await; + assert_eq!(deadline, None); + + *time_to_return.write() = now + Duration::from_secs(3); + let (_, deadline) = rate_limiter.rate_limit(9).await; + assert_eq!(deadline, None); + } + + #[tokio::test] + async fn slowdown_when_limit_reached_token_bucket() { + slowdown_when_limit_reached_test::>().await; + slowdown_when_limit_reached_test::>().await } - #[test] - fn slowdown_when_limit_reached() { - let limit_per_second = 10; + async fn slowdown_when_limit_reached_test() + where + RL: RateLimiter + + From<( + NonZeroRatePerSecond, + Arc>, + SharedTracingSleepUntil, + )>, + { + let limit_per_second = 10.try_into().expect("10 > 0 qed"); let now = Instant::now(); - let mut rate_limiter = TokenBucket::new_with_now(limit_per_second, now); + let time_to_return = Arc::new(parking_lot::RwLock::new(now)); + let time_provider = time_to_return.clone(); + let time_provider: Box = + Box::new(move || *time_provider.read()); + let rate_limiter = RL::from(( + limit_per_second, + Arc::new(time_provider), + SharedTracingSleepUntil::new(), + )); - assert_eq!(rate_limiter.rate_limit(10, now), None); + *time_to_return.write() = now; + let (rate_limiter, deadline) = rate_limiter.rate_limit(10).await; + assert_eq!(deadline, Some(now + Duration::from_secs(1))); // we should wait some time after reaching the limit - assert!(rate_limiter.rate_limit(1, now).is_some()); + *time_to_return.write() = now + Duration::from_secs(1); + let (rate_limiter, deadline) = rate_limiter.rate_limit(1).await; + assert!(deadline.is_some()); + *time_to_return.write() = now + Duration::from_secs(1); + let (_, deadline) = rate_limiter.rate_limit(19).await; assert_eq!( - rate_limiter.rate_limit(19, now), - Some(Duration::from_secs(2)), + deadline, + Some(now + Duration::from_secs(3)), "we should wait exactly 2 seconds" ); } - #[test] - fn buildup_tokens_but_no_more_than_limit() { - let limit_per_second = 10; + #[tokio::test] + async fn buildup_tokens_but_no_more_than_limit_of_token_bucket() { + buildup_tokens_but_no_more_than_limit_test::>().await; + buildup_tokens_but_no_more_than_limit_test::>().await + } + + async fn buildup_tokens_but_no_more_than_limit_test() + where + RL: RateLimiter + + From<( + NonZeroRatePerSecond, + Arc>, + SharedTracingSleepUntil, + )>, + { + let limit_per_second = 10.try_into().expect("10 > 0 qed"); let now = Instant::now(); - let mut rate_limiter = TokenBucket::new_with_now(limit_per_second, now); + let time_to_return = Arc::new(parking_lot::RwLock::new(now)); + let time_provider = time_to_return.clone(); + let time_provider: Box = + Box::new(move || *time_provider.read()); + let rate_limiter = RL::from(( + limit_per_second, + time_provider.into(), + SharedTracingSleepUntil::new(), + )); - assert_eq!( - rate_limiter.rate_limit(10, now + Duration::from_secs(2)), - None - ); + *time_to_return.write() = now + Duration::from_secs(2); + let (rate_limiter, deadline) = rate_limiter.rate_limit(10).await; + assert_eq!(deadline, None); + *time_to_return.write() = now + Duration::from_secs(10); + let (rate_limiter, deadline) = rate_limiter.rate_limit(40).await; assert_eq!( - rate_limiter.rate_limit(40, now + Duration::from_secs(10)), - Some(Duration::from_secs(3)), + deadline, + Some(now + Duration::from_secs(10) + Duration::from_secs(3)), ); + + *time_to_return.write() = now + Duration::from_secs(11); + let (_, deadline) = rate_limiter.rate_limit(40).await; assert_eq!( - rate_limiter.rate_limit(40, now + Duration::from_secs(11)), - Some(Duration::from_secs(6)) + deadline, + Some(now + Duration::from_secs(11) + Duration::from_secs(6)) ); } - #[test] - fn multiple_calls_buildup_wait_time() { - let limit_per_second = 10; + #[tokio::test] + async fn multiple_calls_buildup_wait_time() { + multiple_calls_buildup_wait_time_test::>().await; + multiple_calls_buildup_wait_time_test::>().await + } + + async fn multiple_calls_buildup_wait_time_test() + where + RL: RateLimiter + + From<( + NonZeroRatePerSecond, + Arc>, + SharedTracingSleepUntil, + )>, + { + let limit_per_second = 10.try_into().expect("10 > 0 qed"); let now = Instant::now(); - let mut rate_limiter = TokenBucket::new_with_now(limit_per_second, now); + let time_to_return = Arc::new(parking_lot::RwLock::new(now)); + let time_provider = time_to_return.clone(); + let time_provider: Box = + Box::new(move || *time_provider.read()); + let rate_limiter = RL::from(( + limit_per_second, + time_provider.into(), + SharedTracingSleepUntil::new(), + )); + + *time_to_return.write() = now + Duration::from_secs(3); + let (rate_limiter, deadline) = rate_limiter.rate_limit(10).await; + assert_eq!(deadline, None); + *time_to_return.write() = now + Duration::from_secs(3); + let (rate_limiter, deadline) = rate_limiter.rate_limit(10).await; + assert_eq!(deadline, Some(now + Duration::from_secs(4))); + + *time_to_return.write() = now + Duration::from_secs(3); + let (rate_limiter, deadline) = rate_limiter.rate_limit(10).await; assert_eq!( - rate_limiter.rate_limit(10, now + Duration::from_secs(3)), - None + deadline, + Some(now + Duration::from_secs(4) + Duration::from_secs(1)) ); + *time_to_return.write() = now + Duration::from_secs(3); + let (_, deadline) = rate_limiter.rate_limit(50).await; assert_eq!( - rate_limiter.rate_limit(10, now + Duration::from_secs(3)), - None + deadline, + Some(now + Duration::from_secs(4) + Duration::from_secs(6)) ); + } - assert_eq!( - rate_limiter.rate_limit(10, now + Duration::from_secs(3)), - Some(Duration::from_secs(1)) + struct SleepUntilAfterChange { + wrapped: SU, + last_value: Option, + changes_counter: usize, + } + + /// It allows to wait for a change of the allocated bandwidth, i.e. it can wait for two (or more) calls of + /// [SleepUntil::sleep_until] that uses two different values for the `instant` argument of + /// [SleepUntil::sleep_until(instant)]. + impl SleepUntilAfterChange { + pub fn new(sleep_until: SU) -> Self { + Self { + wrapped: sleep_until, + last_value: None, + changes_counter: 0, + } + } + + pub fn set_number_of_changes_to_wait(&mut self, changes_counter: usize) { + self.changes_counter = changes_counter; + self.last_value = None; + } + } + + impl Clone for SleepUntilAfterChange + where + SU: Clone, + { + fn clone(&self) -> Self { + Self { + wrapped: self.wrapped.clone(), + last_value: None, + changes_counter: self.changes_counter, + } + } + } + + impl SleepUntil for SleepUntilAfterChange + where + SU: SleepUntil + Send, + { + async fn sleep_until(&mut self, instant: Instant) { + let last_value = self.last_value.get_or_insert(instant); + if *last_value != instant { + self.changes_counter = self.changes_counter.saturating_sub(1); + } + if self.changes_counter == 0 { + self.wrapped.sleep_until(instant).await + } else { + pending().await + } + } + } + + #[tokio::test] + async fn two_peers_can_share_bandwidth() { + let limit_per_second = 10.try_into().expect("10 > 0 qed"); + let initial_time = Instant::now(); + let time_to_return = Arc::new(Mutex::new(initial_time)); + + let current_time = time_to_return.clone(); + let current_time_clone = time_to_return.clone(); + + let time_provider: Arc> = + Arc::new(Box::new(move || *time_to_return.lock())); + + let sleep_until = SharedTracingSleepUntil::new(); + let last_deadline = sleep_until.last_deadline.clone(); + let last_deadline_clone = last_deadline.clone(); + let sleep_until = SleepUntilAfterChange::new(sleep_until); + + let barrier = Arc::new(tokio::sync::RwLock::new(tokio::sync::Barrier::new(2))); + let second_barrier = barrier.clone(); + + let mut rate_limiter = + SharedTokenBucket::<_, _>::from((limit_per_second, time_provider, sleep_until)); + let mut rate_limiter_cloned = rate_limiter.clone(); + + let total_data_sent = thread::scope(|s| { + let first_handle = s.spawn(|| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async move { + barrier.read().await.wait().await; + + rate_limiter = rate_limiter.rate_limit(11).await; + rate_limiter + .rate_limiter + .sleep_until + .set_number_of_changes_to_wait(1); + + { + let last_deadline = last_deadline.lock(); + let time = *current_time.lock(); + *current_time.lock() = last_deadline.unwrap_or(time); + } + + barrier.read().await.wait().await; + + rate_limiter.rate_limit(30).await; + + { + let last_deadline = last_deadline.lock(); + let time = *current_time.lock(); + *current_time.lock() = last_deadline.unwrap_or(time); + } + }); + 11 + 30 + }); + + let second_handle = s.spawn(|| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + second_barrier.read().await.wait().await; + + rate_limiter_cloned = rate_limiter_cloned.rate_limit(13).await; + rate_limiter_cloned + .rate_limiter + .sleep_until + .set_number_of_changes_to_wait(1); + + { + let last_deadline = last_deadline_clone.lock(); + let time = *current_time_clone.lock(); + *current_time_clone.lock() = last_deadline.unwrap_or(time); + } + + second_barrier.read().await.wait().await; + + rate_limiter_cloned.rate_limit(25).await; + + { + let last_deadline = last_deadline_clone.lock(); + let time = *current_time_clone.lock(); + *current_time_clone.lock() = last_deadline.unwrap_or(time); + } + }); + 13 + 25 + }); + let total_data_sent: u128 = first_handle + .join() + .expect("first thread should finish without errors") + + second_handle + .join() + .expect("second thread should finish without errors"); + + total_data_sent + }); + let duration = last_deadline_clone.lock().expect("we should sleep a bit") - initial_time; + let rate = total_data_sent * 1000 / duration.as_millis(); + assert!( + rate.abs_diff(10) <= 5, + "calculated bandwidth should be within some error bounds: rate = {rate}; duration = {duration:?}" ); + } - assert_eq!( - rate_limiter.rate_limit(50, now + Duration::from_secs(3)), - Some(Duration::from_secs(6)) + #[tokio::test] + async fn single_peer_can_use_whole_bandwidth() { + let limit_per_second = 10.try_into().expect("10 > 0 qed"); + let now = Instant::now(); + let time_to_return = Arc::new(parking_lot::RwLock::new(now)); + let time_provider = time_to_return.clone(); + let time_provider: Arc> = + Arc::new(Box::new(move || *time_provider.read())); + + let rate_limiter = TracingRateLimiter::<_>::from(( + limit_per_second, + time_provider, + SharedTracingSleepUntil::new(), + )); + + let rate_limiter_cloned = rate_limiter.clone(); + + let (rate_limiter, deadline) = RateLimiter::rate_limit(rate_limiter, 5).await; + assert_eq!(deadline, Some(now + Duration::from_millis(500))); + let (_, deadline) = RateLimiter::rate_limit(rate_limiter_cloned, 5).await; + assert_eq!(deadline, None,); + + *time_to_return.write() = now + Duration::from_millis(1500); + + let (_, deadline) = RateLimiter::rate_limit(rate_limiter, 10).await; + assert_eq!(deadline, None); + } + + #[tokio::test] + async fn peers_receive_at_least_one_token_per_second() { + let limit_per_second = 1.try_into().expect("1 > 0 qed"); + let now = Instant::now(); + let time_to_return = Arc::new(parking_lot::RwLock::new(now)); + let time_provider = time_to_return.clone(); + let time_provider: Arc> = + Arc::new(Box::new(move || *time_provider.read())); + + let rate_limiter = TracingRateLimiter::<_>::from(( + limit_per_second, + time_provider, + SharedTracingSleepUntil::new(), + )); + + *time_to_return.write() = now + Duration::from_secs(1); + + let rate_limiter_cloned = rate_limiter.clone(); + + let (rate_limiter, deadline) = RateLimiter::rate_limit(rate_limiter, 1).await; + assert_eq!(deadline, None); + + let (rate_limiter_cloned, deadline) = RateLimiter::rate_limit(rate_limiter_cloned, 1).await; + assert_eq!(deadline, None); + + *time_to_return.write() = now + Duration::from_secs(2); + + let (_, deadline) = RateLimiter::rate_limit(rate_limiter, 1).await; + assert_eq!(deadline, None); + let (_, deadline) = RateLimiter::rate_limit(rate_limiter_cloned, 2).await; + assert_eq!(deadline, Some(now + Duration::from_secs(3))); + } + + /// Synchronizes all instances of [TokenBucket] using [tokio::sync::Barrier]. It should allow all of such instances to + /// recognize presence of all the other peers that also allocated some bandwidth and then recalculate their own. + struct SleepUntilWithBarrier { + wrapped: SU, + barrier: Arc>, + initial_counter: u64, + counter: u64, + // this is to overcome lack of `Cancellation Safety` of the method [Barrier::wait()]. + // Implementations of [SleepUntil::sleep_until()] should be cancellation safe. + to_wait: Option>, + id: u64, + } + + impl Clone for SleepUntilWithBarrier + where + SU: Clone, + { + fn clone(&self) -> Self { + Self { + wrapped: self.wrapped.clone(), + barrier: self.barrier.clone(), + initial_counter: self.initial_counter, + counter: self.counter, + to_wait: None, + id: self.id + 1, + } + } + } + + impl SleepUntilWithBarrier { + pub fn new( + sleep_until: SU, + barrier: Arc>, + how_many_times_to_use_barrier: u64, + ) -> Self { + Self { + wrapped: sleep_until, + barrier, + initial_counter: how_many_times_to_use_barrier, + counter: how_many_times_to_use_barrier, + to_wait: None, + id: 0, + } + } + + pub fn reset(&mut self) { + self.counter = self.initial_counter; + self.to_wait = None; + } + + pub async fn wait(&mut self) { + while self.counter > 0 { + self.to_wait + .get_or_insert_with(|| { + let barrier = self.barrier.clone(); + Box::pin(async move { + barrier.read().await.wait().await; + }) + }) + .await; + self.to_wait = None; + self.counter -= 1; + } + } + } + + impl SleepUntil for SleepUntilWithBarrier + where + SU: SleepUntil + Send, + { + async fn sleep_until(&mut self, instant: Instant) { + self.wait().await; + self.wrapped.sleep_until(instant).await; + } + } + + #[tokio::test] + async fn avarage_bandwidth_should_be_within_some_reasonable_bounds() { + use rand::{ + distributions::{Distribution, Uniform}, + seq::SliceRandom, + SeedableRng, + }; + + let mut test_state = Vec::new(); + + let rate_limit = 4 * 1024 * 1024; + let limit_per_second = rate_limit.try_into().expect("(4 * 1024 * 1024) > 0 qed"); + + let seed = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("back to the future") + .as_secs(); + let mut rand_generator = rand::rngs::StdRng::seed_from_u64(seed); + + let data_generator = Uniform::from((rate_limit + 1)..100 * rate_limit); + let limiters_count = Uniform::from(10..=128).sample(&mut rand_generator); + let batch_generator = Uniform::from(2..=limiters_count); + + let initial_time = Instant::now(); + let time_to_return = Arc::new(parking_lot::RwLock::new(initial_time)); + let time_provider = time_to_return.clone(); + let time_provider: Arc> = + Arc::new(Box::new(move || *time_provider.read())); + + let test_sleep_until_shared = SharedTracingSleepUntil::new(); + let last_deadline = test_sleep_until_shared.last_deadline.clone(); + + let barrier = Arc::new(tokio::sync::RwLock::new(tokio::sync::Barrier::new(0))); + let how_many_times_should_stop_on_barrier = 1; + let test_sleep_until_with_barrier = SleepUntilWithBarrier::new( + test_sleep_until_shared, + barrier.clone(), + how_many_times_should_stop_on_barrier, ); + let sleep_until_after_time_change = + SleepUntilAfterChange::new(test_sleep_until_with_barrier); + let rate_limiter = SharedTokenBucket::<_, _>::from(( + limit_per_second, + time_provider, + sleep_until_after_time_change, + )); + + let mut rate_limiters: Vec<_> = repeat(()) + .scan((0usize, rate_limiter), |(id, rate_limiter), _| { + let new_rate_limiter = rate_limiter.clone(); + let new_state = rate_limiter.clone(); + let limiter_id = *id; + *rate_limiter = new_state; + *id += 1; + Some((limiter_id, Some(new_rate_limiter))) + }) + .take(limiters_count) + .collect(); + + let mut total_data_scheduled = 0; + let mut total_number_of_calls = 0; + while total_number_of_calls < 1000 { + let batch_size = batch_generator.sample(&mut rand_generator); + + total_number_of_calls += batch_size; + *barrier.write().await = tokio::sync::Barrier::new(batch_size); + + rate_limiters.shuffle(&mut rand_generator); + + let mut batch_data = 0; + let start_time = *time_to_return.read(); + let mut batch_state = Vec::new(); + let mut batch_test: FuturesOrdered<_> = rate_limiters[0..batch_size] + .iter_mut() + .zip((0..batch_size).rev()) + .map(|((selected_limiter_id, selected_rate_limiter), idx)| { + let data_read = data_generator.sample(&mut rand_generator); + + let mut rate_limiter = selected_rate_limiter + .take() + .expect("we should be able to retrieve a rate-limiter"); + + // last instance won't be notified - its bandwidth will not change until some other instance finishes + rate_limiter + .rate_limiter + .sleep_until + .set_number_of_changes_to_wait(min(1, idx)); + rate_limiter.rate_limiter.sleep_until.wrapped.reset(); + + let rate_task = SharedTokenBucket::rate_limit(rate_limiter, data_read); + + batch_state.push((*selected_limiter_id, data_read)); + + total_data_scheduled += u128::from(data_read); + batch_data += data_read; + + async move { + let rate_limiter = rate_task.await; + + (rate_limiter, selected_rate_limiter) + } + }) + .collect(); + + test_state.push(batch_state); + + while let Some((rate_limiter, store)) = batch_test.next().await { + *store = Some(rate_limiter); + } + + let current_time = max( + *time_to_return.read(), + (*last_deadline.lock()).unwrap_or(*time_to_return.read()), + ); + + let batch_time: u64 = max((current_time - start_time).as_millis(), 1000) + .try_into() + .expect("something wrong with our time calculations"); + let rate = batch_data * 1000 / batch_time; + let abs_rate_diff = rate.abs_diff(rate_limit); + // in worst case, utilized bandwidth should be equal to `bandwidth * (1 + 1/2 + ... + 1/n) ≈ bandwidth * (ln n + O(1))` + let max_possible_bandwidth = + rate_limit as f64 * ((batch_size as f64).ln() + 1_f64).trunc(); + assert!( + abs_rate_diff <= max_possible_bandwidth as u64, + "Used bandwidth should be oscillating close to {rate_limit} b/s (+/- 50%), but got {rate} b/s instead. Total data sent: {total_data_scheduled}; Test data: {test_state:?}" + ); + + *time_to_return.write() = current_time; + } } }