From 1fbf91bff5df23c703318bab36036fc602bf7005 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Wed, 13 Nov 2024 23:32:41 +0100 Subject: [PATCH] Added a verbose `share()` method to `SharedRateLimiter` (and to other types in its hierarchy). It is more readable this way - previous version was using `Clone`, which could be confusing regarding if bandwidth was shared or each node uses separate rate. --- clique/src/rate_limiting.rs | 27 ++++++--- finality-aleph/src/network/build/mod.rs | 4 +- finality-aleph/src/network/build/transport.rs | 27 ++++++--- finality-aleph/src/nodes.rs | 6 +- rate-limiter/src/lib.rs | 2 +- rate-limiter/src/rate_limiter.rs | 12 +++- rate-limiter/src/token_bucket.rs | 55 +++++++++++-------- 7 files changed, 86 insertions(+), 47 deletions(-) diff --git a/clique/src/rate_limiting.rs b/clique/src/rate_limiting.rs index a3424ab715..bfdbe9cd4f 100644 --- a/clique/src/rate_limiting.rs +++ b/clique/src/rate_limiting.rs @@ -1,4 +1,4 @@ -use rate_limiter::{RateLimitedAsyncRead, SharingRateLimiter}; +use rate_limiter::{RateLimitedAsyncRead, SharedRateLimiter}; use crate::{ConnectionInfo, Data, Dialer, Listener, PeerAddressInfo, Splittable, Splitted}; @@ -12,14 +12,25 @@ where } /// Implementation of the [Dialer] trait governing all returned [Dialer::Connection] instances by a rate-limiting wrapper. -#[derive(Clone)] pub struct RateLimitingDialer { dialer: D, - rate_limiter: SharingRateLimiter, + rate_limiter: SharedRateLimiter, +} + +impl Clone for RateLimitingDialer +where + D: Clone, +{ + fn clone(&self) -> Self { + Self { + dialer: self.dialer.clone(), + rate_limiter: self.rate_limiter.share(), + } + } } impl RateLimitingDialer { - pub fn new(dialer: D, rate_limiter: SharingRateLimiter) -> Self { + pub fn new(dialer: D, rate_limiter: SharedRateLimiter) -> Self { Self { dialer, rate_limiter, @@ -45,7 +56,7 @@ where let connection = self.dialer.connect(address).await?; let (sender, receiver) = connection.split(); Ok(Splitted( - RateLimitedAsyncRead::new(receiver, self.rate_limiter.clone()), + RateLimitedAsyncRead::new(receiver, self.rate_limiter.share()), sender, )) } @@ -54,11 +65,11 @@ where /// Implementation of the [Listener] trait governing all returned [Listener::Connection] instances by a rate-limiting wrapper. pub struct RateLimitingListener { listener: L, - rate_limiter: SharingRateLimiter, + rate_limiter: SharedRateLimiter, } impl RateLimitingListener { - pub fn new(listener: L, rate_limiter: SharingRateLimiter) -> Self { + pub fn new(listener: L, rate_limiter: SharedRateLimiter) -> Self { Self { listener, rate_limiter, @@ -81,7 +92,7 @@ where let connection = self.listener.accept().await?; let (sender, receiver) = connection.split(); Ok(Splitted( - RateLimitedAsyncRead::new(receiver, self.rate_limiter.clone()), + RateLimitedAsyncRead::new(receiver, self.rate_limiter.share()), sender, )) } diff --git a/finality-aleph/src/network/build/mod.rs b/finality-aleph/src/network/build/mod.rs index 250b40d167..3c61111d39 100644 --- a/finality-aleph/src/network/build/mod.rs +++ b/finality-aleph/src/network/build/mod.rs @@ -1,7 +1,7 @@ use std::sync::{atomic::AtomicBool, Arc}; use log::error; -use rate_limiter::SharingRateLimiter; +use rate_limiter::SharedRateLimiter; use sc_client_api::Backend; use sc_network::{ config::{NetworkConfiguration, ProtocolId}, @@ -83,7 +83,7 @@ where setup_base_protocol::(genesis_hash); let network_rate_limit = network_config.substrate_network_bit_rate; - let rate_limiter = SharingRateLimiter::new(network_rate_limit.into()); + let rate_limiter = SharedRateLimiter::new(network_rate_limit.into()); let transport_builder = |config| transport::build_transport(rate_limiter, config); let ( diff --git a/finality-aleph/src/network/build/transport.rs b/finality-aleph/src/network/build/transport.rs index 403dddd2e2..af7641473f 100644 --- a/finality-aleph/src/network/build/transport.rs +++ b/finality-aleph/src/network/build/transport.rs @@ -1,13 +1,13 @@ use libp2p::{core::muxing::StreamMuxer, PeerId, Transport}; -use rate_limiter::{FuturesRateLimitedAsyncReadWrite, SharingRateLimiter}; +use rate_limiter::{FuturesRateLimitedAsyncReadWrite, SharedRateLimiter}; struct RateLimitedStreamMuxer { - rate_limiter: SharingRateLimiter, + rate_limiter: SharedRateLimiter, stream_muxer: SM, } impl RateLimitedStreamMuxer { - pub fn new(stream_muxer: SM, rate_limiter: SharingRateLimiter) -> Self { + pub fn new(stream_muxer: SM, rate_limiter: SharedRateLimiter) -> Self { Self { rate_limiter, stream_muxer, @@ -36,7 +36,7 @@ where self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let rate_limiter = self.rate_limiter.clone(); + let rate_limiter = self.rate_limiter.share(); self.inner().poll_inbound(cx).map(|result| { result.map(|substream| FuturesRateLimitedAsyncReadWrite::new(substream, rate_limiter)) }) @@ -46,7 +46,7 @@ where self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let rate_limiter = self.rate_limiter.clone(); + let rate_limiter = self.rate_limiter.share(); self.inner().poll_outbound(cx).map(|result| { result.map(|substream| FuturesRateLimitedAsyncReadWrite::new(substream, rate_limiter)) }) @@ -68,7 +68,7 @@ where } pub fn build_transport( - rate_limiter: SharingRateLimiter, + rate_limiter: SharedRateLimiter, config: sc_network::transport::NetworkConfig, ) -> impl Transport< Output = ( @@ -79,6 +79,19 @@ pub fn build_transport( ListenerUpgrade = impl Send, Error = impl Send, > + Send { + struct ClonableSharedRateLimiter(SharedRateLimiter); + impl ClonableSharedRateLimiter { + fn share(&self) -> SharedRateLimiter { + self.0.share() + } + } + impl Clone for ClonableSharedRateLimiter { + fn clone(&self) -> Self { + Self(self.share()) + } + } + let rate_limiter = ClonableSharedRateLimiter(rate_limiter); + sc_network::transport::build_transport( config.keypair, config.memory_only, @@ -88,7 +101,7 @@ pub fn build_transport( .map(move |(peer_id, stream_muxer), _| { ( peer_id, - RateLimitedStreamMuxer::new(stream_muxer, rate_limiter), + RateLimitedStreamMuxer::new(stream_muxer, rate_limiter.share()), ) }) } diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index 1e6b8eac7f..917febbbb7 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -6,7 +6,7 @@ use log::{debug, error}; use network_clique::{RateLimitingDialer, RateLimitingListener, Service, SpawnHandleT}; use pallet_aleph_runtime_api::AlephSessionApi; use primitives::TransactionHash; -use rate_limiter::SharingRateLimiter; +use rate_limiter::SharedRateLimiter; use sc_client_api::Backend; use sc_keystore::{Keystore, LocalKeystore}; use sc_transaction_pool_api::TransactionPool; @@ -109,8 +109,8 @@ where .expect("we should have working networking"); let alephbft_rate_limiter = - SharingRateLimiter::new(rate_limiter_config.alephbft_network_bit_rate.into()); - let dialer = RateLimitingDialer::new(dialer, alephbft_rate_limiter.clone()); + SharedRateLimiter::new(rate_limiter_config.alephbft_network_bit_rate.into()); + let dialer = RateLimitingDialer::new(dialer, alephbft_rate_limiter.share()); let listener = RateLimitingListener::new(listener, alephbft_rate_limiter); let (validator_network_service, validator_network) = Service::new( diff --git a/rate-limiter/src/lib.rs b/rate-limiter/src/lib.rs index 6356cfd7a9..238a829a24 100644 --- a/rate-limiter/src/lib.rs +++ b/rate-limiter/src/lib.rs @@ -7,7 +7,7 @@ use futures::{future::BoxFuture, ready, FutureExt}; use rate_limiter::RateLimiterFacade; use tokio::io::AsyncRead; -pub use crate::{rate_limiter::SharingRateLimiter, token_bucket::SharedTokenBucket}; +pub use crate::{rate_limiter::SharedRateLimiter, token_bucket::SharedTokenBucket}; const LOG_TARGET: &str = "rate-limiter"; diff --git a/rate-limiter/src/rate_limiter.rs b/rate-limiter/src/rate_limiter.rs index 965d873b8c..df62c69411 100644 --- a/rate-limiter/src/rate_limiter.rs +++ b/rate-limiter/src/rate_limiter.rs @@ -4,7 +4,7 @@ use futures::future::pending; use crate::{token_bucket::SharedTokenBucket, RatePerSecond}; -pub type SharingRateLimiter = RateLimiterFacade; +pub type SharedRateLimiter = RateLimiterFacade; #[derive(PartialEq, Eq, Debug, Copy, Clone)] pub enum Deadline { @@ -21,7 +21,6 @@ impl From for Option { } } -#[derive(Clone)] pub enum RateLimiterFacade { NoTraffic, RateLimiter(SharedTokenBucket), @@ -45,4 +44,13 @@ impl RateLimiterFacade { ), } } + + pub fn share(&self) -> Self { + match self { + RateLimiterFacade::NoTraffic => RateLimiterFacade::NoTraffic, + RateLimiterFacade::RateLimiter(shared_token_bucket) => { + RateLimiterFacade::RateLimiter(shared_token_bucket.share()) + } + } + } } diff --git a/rate-limiter/src/token_bucket.rs b/rate-limiter/src/token_bucket.rs index dd46e72bf8..fefdcd2314 100644 --- a/rate-limiter/src/token_bucket.rs +++ b/rate-limiter/src/token_bucket.rs @@ -188,16 +188,6 @@ pub struct SharedBandwidthManager { already_requested: Option, } -impl Clone for SharedBandwidthManager { - fn clone(&self) -> Self { - Self { - max_rate: self.max_rate, - peers_count: self.peers_count.clone(), - already_requested: None, - } - } -} - impl SharedBandwidthManager { /// Constructs a new instance of [SharedBandwidthManager] configured with a given rate that will be shared between all /// calling consumers (clones of this instance). @@ -209,6 +199,14 @@ impl SharedBandwidthManager { } } + pub fn share(&self) -> Self { + Self { + max_rate: self.max_rate, + peers_count: self.peers_count.clone(), + already_requested: None, + } + } + fn calculate_bandwidth(&mut self, active_children: Option) -> NonZeroRatePerSecond { let active_children = active_children.unwrap_or_else(|| self.peers_count.load(Ordering::Acquire)); @@ -263,7 +261,10 @@ struct AsyncTokenBucket { sleep_until: SU, } -impl AsyncTokenBucket { +impl AsyncTokenBucket +where + TP: TimeProvider, +{ /// Constructs an instance of [AsyncTokenBucket] using given [TokenBucket] /// and implementation of the [SleepUntil] trait. pub fn new(token_bucket: TokenBucket, sleep_until: SU) -> Self { @@ -273,12 +274,7 @@ impl AsyncTokenBucket { sleep_until, } } -} -impl AsyncTokenBucket -where - TP: TimeProvider, -{ /// Accounts `requested` units. A next call to [AsyncTokenBucket::wait] will /// account these units while calculating necessary delay. pub fn rate_limit(&mut self, requested: u64) { @@ -317,7 +313,6 @@ where /// 1/n) ≈ bandwidth * (ln n + O(1))`. This can happen when each instance of [TokenBucket] tries to spend slightly more data /// than its initially acquired bandwidth, but small enough so none of them other instances receives a notification about /// ongoing bandwidth change. -#[derive(Clone)] pub struct SharedTokenBucket { shared_bandwidth: SharedBandwidthManager, rate_limiter: AsyncTokenBucket, @@ -344,6 +339,18 @@ impl SharedTokenBucket { } } + pub fn share(&self) -> Self + where + TP: Clone, + SU: Clone, + { + Self { + shared_bandwidth: self.shared_bandwidth.share(), + rate_limiter: self.rate_limiter.clone(), + need_to_notify_parent: false, + } + } + fn request_bandwidth(&mut self) -> NonZeroRatePerSecond { self.need_to_notify_parent = true; self.shared_bandwidth.request_bandwidth() @@ -428,8 +435,8 @@ mod tests { async fn basic_checks_of_shared_bandwidth_manager() { let rate = 10.try_into().expect("10 > 0 qed"); let mut bandwidth_share = SharedBandwidthManager::new(rate); - let mut cloned_bandwidth_share = bandwidth_share.clone(); - let mut another_cloned_bandwidth_share = cloned_bandwidth_share.clone(); + let mut cloned_bandwidth_share = bandwidth_share.share(); + let mut another_cloned_bandwidth_share = cloned_bandwidth_share.share(); // only one consumer, so it should get whole bandwidth assert_eq!(bandwidth_share.request_bandwidth(), rate); @@ -844,7 +851,7 @@ mod tests { let mut rate_limiter = SharedTokenBucket::<_, _>::from((limit_per_second, time_provider, sleep_until)); - let mut rate_limiter_cloned = rate_limiter.clone(); + let mut rate_limiter_cloned = rate_limiter.share(); let total_data_sent = thread::scope(|s| { let first_handle = s.spawn(|| { @@ -946,7 +953,7 @@ mod tests { SharedTracingSleepUntil::new(), )); - let rate_limiter_cloned = rate_limiter.clone(); + let rate_limiter_cloned = rate_limiter.share(); let (rate_limiter, deadline) = RateLimiter::rate_limit(rate_limiter, 5).await; assert_eq!(deadline, Some(now + Duration::from_millis(500))); @@ -976,7 +983,7 @@ mod tests { *time_to_return.write() = now + Duration::from_secs(1); - let rate_limiter_cloned = rate_limiter.clone(); + let rate_limiter_cloned = rate_limiter.share(); let (rate_limiter, deadline) = RateLimiter::rate_limit(rate_limiter, 1).await; assert_eq!(deadline, None); @@ -1117,8 +1124,8 @@ mod tests { let mut rate_limiters: Vec<_> = repeat(()) .scan((0usize, rate_limiter), |(id, rate_limiter), _| { - let new_rate_limiter = rate_limiter.clone(); - let new_state = rate_limiter.clone(); + let new_rate_limiter = rate_limiter.share(); + let new_state = rate_limiter.share(); let limiter_id = *id; *rate_limiter = new_state; *id += 1;