diff --git a/python/test/test_rate.py b/python/test/test_rate.py index c838f396..84b16cb8 100644 --- a/python/test/test_rate.py +++ b/python/test/test_rate.py @@ -2,6 +2,7 @@ # vim: sts=4 sw=4 et import tll.channel as C +from tll.config import Config from tll.error import TLLError from tll.test_util import Accum from tll.chrono import TimePoint @@ -180,3 +181,72 @@ def test_rate_messages(context): with pytest.raises(TLLError): c.post(b'x' * 128) time.sleep(0.01) c.post(b'x' * 128) + +def test_rate_buckets(context): + cfg = Config.load( +f'''yamls:// +tll.proto: rate+null +name: rate +dump: frame +speed: 1kb +max-window: 1kb +initial: 1kb +bucket.packets: + speed: 100b + max-window: 2b + initial: 2b + unit: message +''') + c = context.Channel(cfg) + + c.open() + c.post(b'x' * 1024) + with pytest.raises(TLLError): c.post(b'x' * 128) + time.sleep(0.01) + c.post(b'x' * 128) + + c.close() + c.open() + c.post(b'x' * 128) + c.post(b'x' * 128) + with pytest.raises(TLLError): c.post(b'x' * 128) + +def test_rate_buckets_input(context): + cfg = Config.load( +f'''yamls:// +tll.proto: rate+direct +name: rate +dump: frame +rate.dir: in +speed: 1kb +max-window: 1kb +initial: 1kb +bucket.packets: + speed: 100b + max-window: 2b + initial: 2b + unit: message +''') + c = Accum(cfg, context = context) + client = context.Channel('direct://', name='client', master=c) + + c.open() + client.open() + client.post(b'x' * 1024) + assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Suspend + time.sleep(0.01) + c.children[-1].process() + assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Zero + c.post(b'x' * 128) + + client.close() + c.close() + c.open() + client.open() + + client.post(b'x' * 128) + client.post(b'x' * 128) + assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Suspend + time.sleep(0.01) + c.children[-1].process() + assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Zero diff --git a/src/channel/rate.cc b/src/channel/rate.cc index 870b7a47..214277c4 100644 --- a/src/channel/rate.cc +++ b/src/channel/rate.cc @@ -23,29 +23,16 @@ TLL_DEFINE_IMPL(Rate); int Rate::_init(const Channel::Url &url, tll::Channel *master) { - using namespace std::chrono; - if (auto r = Base::_init(url, master); r) return r; - auto reader = channel_props_reader(url); - - auto interval = reader.getT("interval", 1s); - _conf.unit = reader.getT("unit", Unit::Byte, {{"byte", Unit::Byte}, {"message", Unit::Message}}); - _conf.speed = reader.getT>("speed"); - _conf.limit = reader.getT("max-window", 16 * 1024); - _conf.initial = reader.getT("initial", _conf.limit / 2); - - if (!reader) - return _log.fail(EINVAL, "Invalid url: {}", reader.error()); - - if (interval.count() == 0) - return _log.fail(EINVAL, "Zero interval"); - - _conf.speed /= interval.count(); + if (auto r = _parse_bucket(url); r) + return r; - if (_conf.speed == 0) return _log.fail(EINVAL, "Zero speed"); - if (_conf.limit <= 0) return _log.fail(EINVAL, "Invalid window size: {}", _conf.limit); + for (auto & [n, cfg] : url.browse("bucket.*", true)) { + if (auto r = _parse_bucket(cfg); r) + return _log.fail(EINVAL, "Failed to init bucket {}", n); + } if ((internal.caps & tll::caps::InOut) == 0) internal.caps |= tll::caps::Output; @@ -71,13 +58,47 @@ int Rate::_init(const Channel::Url &url, tll::Channel *master) return 0; } +int Rate::_parse_bucket(const tll::ConstConfig &cfg) +{ + using namespace std::chrono; + + auto reader = channel_props_reader(cfg); + + auto interval = reader.getT("interval", 1s); + rate::Settings _conf; + + _conf.unit = reader.getT("unit", Unit::Byte, {{"byte", Unit::Byte}, {"message", Unit::Message}}); + _conf.speed = reader.getT>("speed"); + _conf.limit = reader.getT("max-window", 16 * 1024); + _conf.initial = reader.getT("initial", _conf.limit / 2); + + if (!reader) + return _log.fail(EINVAL, "Invalid url: {}", reader.error()); + + if (interval.count() == 0) + return _log.fail(EINVAL, "Zero interval"); + + _conf.speed /= interval.count(); + + if (_conf.speed == 0) return _log.fail(EINVAL, "Zero speed"); + if (_conf.limit <= 0) return _log.fail(EINVAL, "Invalid window size: {}", _conf.limit); + + _buckets.emplace_back(Bucket { .conf = _conf }); + + return 0; +} + int Rate::_on_timer(const tll_msg_t *) { - auto empty = _bucket.empty(); auto now = tll::time::now(); - _bucket.update(_conf, now); - if (empty == _bucket.empty()) { - _rearm(_bucket.next(_conf, now)); + tll::duration next = {}; + for (auto & b : _buckets) { + b.update(b.conf, now); + if (b.empty()) + next = std::max(next, b.next(b.conf, now)); + } + if (next.count()) { + _rearm(next); return 0; } @@ -94,14 +115,20 @@ int Rate::_on_data(const tll_msg_t * msg) if (internal.caps & tll::caps::Output) return Base::_on_data(msg); - const size_t size = _conf.unit == Unit::Byte ? msg->size : 1; auto now = tll::time::now(); - _bucket.update(_conf, now); + tll::duration next = {}; + for (auto & b : _buckets) { + const size_t size = b.conf.unit == Unit::Byte ? msg->size : 1; + b.update(b.conf, now); - _bucket.consume(size); + b.consume(size); - if (_bucket.empty()) { - if (_rearm(_bucket.next(_conf, now))) + if (b.empty()) + next = std::max(next, b.next(b.conf, now)); + } + + if (next.count() != 0) { + if (_rearm(next)) return _log.fail(EINVAL, "Failed to rearm timer"); _child->suspend(); } @@ -116,20 +143,32 @@ int Rate::_post(const tll_msg_t *msg, int flags) if (!(internal.caps & tll::caps::Output)) return _child->post(msg, flags); - const size_t size = _conf.unit == Unit::Byte ? msg->size : 1; auto now = tll::time::now(); - _bucket.update(_conf, now); + bool empty = false; + for (auto & b : _buckets) { + b.update(b.conf, now); + + empty = empty || b.empty(); + } - if (_bucket.empty() && !(flags & TLL_POST_URGENT)) + if (empty && !(flags & TLL_POST_URGENT)) return EAGAIN; if (auto r = _child->post(msg, flags); r) return r; - _bucket.consume(size); + tll::duration next = {}; + for (auto & b : _buckets) { + const size_t size = b.conf.unit == Unit::Byte ? msg->size : 1; + + b.consume(size); + + if (b.empty()) + next = std::max(next, b.next(b.conf, now)); + } - if (_bucket.empty()) { - if (_rearm(_bucket.next(_conf, now))) + if (next.count()) { + if (_rearm(next)) return _log.fail(EINVAL, "Failed to rearm timer"); _callback_control(tcp_client_scheme::WriteFull::meta_id()); } diff --git a/src/channel/rate.h b/src/channel/rate.h index 2353baf3..096c1836 100644 --- a/src/channel/rate.h +++ b/src/channel/rate.h @@ -17,8 +17,11 @@ class Rate : public tll::channel::Prefix { std::unique_ptr _timer; - rate::Settings _conf; - rate::Bucket _bucket; + struct Bucket : public rate::Bucket + { + rate::Settings conf; + }; + std::vector _buckets; public: using Base = tll::channel::Prefix; @@ -34,7 +37,8 @@ class Rate : public tll::channel::Prefix int _init(const Channel::Url &url, tll::Channel *master); int _open(const tll::ConstConfig &cfg) { - _bucket.reset(); + for (auto & b: _buckets) + b.reset(); _timer->open(); return Base::_open(cfg); } @@ -59,6 +63,8 @@ class Rate : public tll::channel::Prefix int _on_timer(const tll_msg_t *msg); int _rearm(const tll::duration &dt); + int _parse_bucket(const tll::ConstConfig &cfg); + void _callback_control(int msgid) { tll_msg_t msg = { TLL_MESSAGE_CONTROL }; diff --git a/src/channel/rate.rst b/src/channel/rate.rst index b11e2297..c2098b4a 100644 --- a/src/channel/rate.rst +++ b/src/channel/rate.rst @@ -27,9 +27,10 @@ Difference from classical token bucket algorithm is that even if message does no is handled. This makes implementation more effective - channel does not need to save message until it is possible to pass it. -When bucket is fully drained then child channel is suspended in input mode or ``WriteFull`` control -message is generated in output mode. After at least one token appears in the bucket - ``WriteReady`` -control messages is generated or child is resumed. +One ore more buckets (depending on initialization parameters) are used to check if it is possible to +send or receive next message. When at least one bucket is fully drained then child channel is +suspended in input mode or ``WriteFull`` control message is generated in output mode. After at least +one token appears in all buckets - ``WriteReady`` control messages is generated or child is resumed. Implementation is available in header only mode for C++ and can be included from ``tll/channel/rate.h`` file. @@ -42,6 +43,11 @@ Input is affected only in read-only mode (``r`` or ``in``). Read-write or write- output stream. To limit input stream on bidirectional channel ``rate.dir=in`` parameter should be used. +Bucket settings +~~~~~~~~~~~~~~~ + +Following parameters are used to describe bucket settings: + ``speed=`` - mandatory parameter, no default. Add ``speed`` number of bytes into bucket each ``interval`` (see below). With default interval value it is equivalent to ``speed`` number of bytes per second. Granularity of this parameter is 1 byte, even if ``100mbit`` notation can be used, see @@ -57,6 +63,11 @@ with ``initial`` number of tokens. ``unit={byte|message}`` - default ``byte``. Use data size in bytes or number of message as rate tokens. +Any amount of additional buckets can be defined using ``bucket.*`` subtrees. Each subtree holds +parameters described above: mandatory ``speed`` and optional ``max-window``, ``interval`` and +others. For example ``bucket.a: { speed: 10kb }`` and ``bucket.b: { speed: 100b, unit: message }`` +declare 2 additional buckets. + Control messages ---------------- @@ -86,6 +97,22 @@ be posted): rate+null://;speed=1b;interval=10s;initial=0b;max-window=4b +Allow sending to UDP 100 messages per second (with burst of 10 messages) with total bandwith limited +to 100kbit (with burst of 64kb), note that ``unit: message`` setting still require ``b`` suffix for +speed/size parameters: + +.. code-block:: yaml + + server: + tll.proto: rate+udp + tll.host: HOST:PORT + udp.mode: client + speed: 100kbit + max-window: 64kb + bucket.messages: + speed: 100b + max-window: 10b + unit: message See also --------