Skip to content

Commit

Permalink
rate: Multiple buckets in one channel
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Nov 27, 2024
1 parent ed89488 commit 69c944c
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 40 deletions.
70 changes: 70 additions & 0 deletions python/test/test_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# vim: sts=4 sw=4 et

import tll.channel as C
from tll.config import Config
from tll.error import TLLError
from tll.test_util import Accum
from tll.chrono import TimePoint
Expand Down Expand Up @@ -180,3 +181,72 @@ def test_rate_messages(context):
with pytest.raises(TLLError): c.post(b'x' * 128)
time.sleep(0.01)
c.post(b'x' * 128)

def test_rate_buckets(context):
cfg = Config.load(
f'''yamls://
tll.proto: rate+null
name: rate
dump: frame
speed: 1kb
max-window: 1kb
initial: 1kb
bucket.packets:
speed: 100b
max-window: 2b
initial: 2b
unit: message
''')
c = context.Channel(cfg)

c.open()
c.post(b'x' * 1024)
with pytest.raises(TLLError): c.post(b'x' * 128)
time.sleep(0.01)
c.post(b'x' * 128)

c.close()
c.open()
c.post(b'x' * 128)
c.post(b'x' * 128)
with pytest.raises(TLLError): c.post(b'x' * 128)

def test_rate_buckets_input(context):
cfg = Config.load(
f'''yamls://
tll.proto: rate+direct
name: rate
dump: frame
rate.dir: in
speed: 1kb
max-window: 1kb
initial: 1kb
bucket.packets:
speed: 100b
max-window: 2b
initial: 2b
unit: message
''')
c = Accum(cfg, context = context)
client = context.Channel('direct://', name='client', master=c)

c.open()
client.open()
client.post(b'x' * 1024)
assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Suspend
time.sleep(0.01)
c.children[-1].process()
assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Zero
c.post(b'x' * 128)

client.close()
c.close()
c.open()
client.open()

client.post(b'x' * 128)
client.post(b'x' * 128)
assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Suspend
time.sleep(0.01)
c.children[-1].process()
assert (c.children[0].dcaps & c.DCaps.Suspend) == c.DCaps.Zero
107 changes: 73 additions & 34 deletions src/channel/rate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,16 @@ TLL_DEFINE_IMPL(Rate);

int Rate::_init(const Channel::Url &url, tll::Channel *master)
{
using namespace std::chrono;

if (auto r = Base::_init(url, master); r)
return r;

auto reader = channel_props_reader(url);

auto interval = reader.getT<rate::Settings::fseconds>("interval", 1s);
_conf.unit = reader.getT("unit", Unit::Byte, {{"byte", Unit::Byte}, {"message", Unit::Message}});
_conf.speed = reader.getT<tll::util::SizeT<double>>("speed");
_conf.limit = reader.getT<tll::util::Size>("max-window", 16 * 1024);
_conf.initial = reader.getT<tll::util::Size>("initial", _conf.limit / 2);

if (!reader)
return _log.fail(EINVAL, "Invalid url: {}", reader.error());

if (interval.count() == 0)
return _log.fail(EINVAL, "Zero interval");

_conf.speed /= interval.count();
if (auto r = _parse_bucket(url); r)
return r;

if (_conf.speed == 0) return _log.fail(EINVAL, "Zero speed");
if (_conf.limit <= 0) return _log.fail(EINVAL, "Invalid window size: {}", _conf.limit);
for (auto & [n, cfg] : url.browse("bucket.*", true)) {
if (auto r = _parse_bucket(cfg); r)
return _log.fail(EINVAL, "Failed to init bucket {}", n);
}

if ((internal.caps & tll::caps::InOut) == 0)
internal.caps |= tll::caps::Output;
Expand All @@ -71,13 +58,47 @@ int Rate::_init(const Channel::Url &url, tll::Channel *master)
return 0;
}

int Rate::_parse_bucket(const tll::ConstConfig &cfg)
{
using namespace std::chrono;

auto reader = channel_props_reader(cfg);

auto interval = reader.getT<rate::Settings::fseconds>("interval", 1s);
rate::Settings _conf;

_conf.unit = reader.getT("unit", Unit::Byte, {{"byte", Unit::Byte}, {"message", Unit::Message}});
_conf.speed = reader.getT<tll::util::SizeT<double>>("speed");
_conf.limit = reader.getT<tll::util::Size>("max-window", 16 * 1024);
_conf.initial = reader.getT<tll::util::Size>("initial", _conf.limit / 2);

if (!reader)
return _log.fail(EINVAL, "Invalid url: {}", reader.error());

if (interval.count() == 0)
return _log.fail(EINVAL, "Zero interval");

_conf.speed /= interval.count();

if (_conf.speed == 0) return _log.fail(EINVAL, "Zero speed");
if (_conf.limit <= 0) return _log.fail(EINVAL, "Invalid window size: {}", _conf.limit);

_buckets.emplace_back(Bucket { .conf = _conf });

return 0;
}

int Rate::_on_timer(const tll_msg_t *)
{
auto empty = _bucket.empty();
auto now = tll::time::now();
_bucket.update(_conf, now);
if (empty == _bucket.empty()) {
_rearm(_bucket.next(_conf, now));
tll::duration next = {};
for (auto & b : _buckets) {
b.update(b.conf, now);
if (b.empty())
next = std::max(next, b.next(b.conf, now));
}
if (next.count()) {
_rearm(next);
return 0;
}

Expand All @@ -94,14 +115,20 @@ int Rate::_on_data(const tll_msg_t * msg)
if (internal.caps & tll::caps::Output)
return Base::_on_data(msg);

const size_t size = _conf.unit == Unit::Byte ? msg->size : 1;
auto now = tll::time::now();
_bucket.update(_conf, now);
tll::duration next = {};
for (auto & b : _buckets) {
const size_t size = b.conf.unit == Unit::Byte ? msg->size : 1;
b.update(b.conf, now);

_bucket.consume(size);
b.consume(size);

if (_bucket.empty()) {
if (_rearm(_bucket.next(_conf, now)))
if (b.empty())
next = std::max(next, b.next(b.conf, now));
}

if (next.count() != 0) {
if (_rearm(next))
return _log.fail(EINVAL, "Failed to rearm timer");
_child->suspend();
}
Expand All @@ -116,20 +143,32 @@ int Rate::_post(const tll_msg_t *msg, int flags)
if (!(internal.caps & tll::caps::Output))
return _child->post(msg, flags);

const size_t size = _conf.unit == Unit::Byte ? msg->size : 1;
auto now = tll::time::now();
_bucket.update(_conf, now);
bool empty = false;
for (auto & b : _buckets) {
b.update(b.conf, now);

empty = empty || b.empty();
}

if (_bucket.empty() && !(flags & TLL_POST_URGENT))
if (empty && !(flags & TLL_POST_URGENT))
return EAGAIN;

if (auto r = _child->post(msg, flags); r)
return r;

_bucket.consume(size);
tll::duration next = {};
for (auto & b : _buckets) {
const size_t size = b.conf.unit == Unit::Byte ? msg->size : 1;

b.consume(size);

if (b.empty())
next = std::max(next, b.next(b.conf, now));
}

if (_bucket.empty()) {
if (_rearm(_bucket.next(_conf, now)))
if (next.count()) {
if (_rearm(next))
return _log.fail(EINVAL, "Failed to rearm timer");
_callback_control(tcp_client_scheme::WriteFull::meta_id());
}
Expand Down
12 changes: 9 additions & 3 deletions src/channel/rate.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ class Rate : public tll::channel::Prefix<Rate>
{
std::unique_ptr<tll::Channel> _timer;

rate::Settings _conf;
rate::Bucket _bucket;
struct Bucket : public rate::Bucket
{
rate::Settings conf;
};
std::vector<Bucket> _buckets;

public:
using Base = tll::channel::Prefix<Rate>;
Expand All @@ -34,7 +37,8 @@ class Rate : public tll::channel::Prefix<Rate>
int _init(const Channel::Url &url, tll::Channel *master);
int _open(const tll::ConstConfig &cfg)
{
_bucket.reset();
for (auto & b: _buckets)
b.reset();
_timer->open();
return Base::_open(cfg);
}
Expand All @@ -59,6 +63,8 @@ class Rate : public tll::channel::Prefix<Rate>
int _on_timer(const tll_msg_t *msg);
int _rearm(const tll::duration &dt);

int _parse_bucket(const tll::ConstConfig &cfg);

void _callback_control(int msgid)
{
tll_msg_t msg = { TLL_MESSAGE_CONTROL };
Expand Down
33 changes: 30 additions & 3 deletions src/channel/rate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ Difference from classical token bucket algorithm is that even if message does no
is handled. This makes implementation more effective - channel does not need to save message until
it is possible to pass it.

When bucket is fully drained then child channel is suspended in input mode or ``WriteFull`` control
message is generated in output mode. After at least one token appears in the bucket - ``WriteReady``
control messages is generated or child is resumed.
One ore more buckets (depending on initialization parameters) are used to check if it is possible to
send or receive next message. When at least one bucket is fully drained then child channel is
suspended in input mode or ``WriteFull`` control message is generated in output mode. After at least
one token appears in all buckets - ``WriteReady`` control messages is generated or child is resumed.

Implementation is available in header only mode for C++ and can be included from
``tll/channel/rate.h`` file.
Expand All @@ -42,6 +43,11 @@ Input is affected only in read-only mode (``r`` or ``in``). Read-write or write-
output stream. To limit input stream on bidirectional channel ``rate.dir=in`` parameter should be
used.

Bucket settings
~~~~~~~~~~~~~~~

Following parameters are used to describe bucket settings:

``speed=<SIZE>`` - mandatory parameter, no default. Add ``speed`` number of bytes into bucket each
``interval`` (see below). With default interval value it is equivalent to ``speed`` number of bytes
per second. Granularity of this parameter is 1 byte, even if ``100mbit`` notation can be used, see
Expand All @@ -57,6 +63,11 @@ with ``initial`` number of tokens.
``unit={byte|message}`` - default ``byte``. Use data size in bytes or number of message as rate
tokens.

Any amount of additional buckets can be defined using ``bucket.*`` subtrees. Each subtree holds
parameters described above: mandatory ``speed`` and optional ``max-window``, ``interval`` and
others. For example ``bucket.a: { speed: 10kb }`` and ``bucket.b: { speed: 100b, unit: message }``
declare 2 additional buckets.

Control messages
----------------

Expand Down Expand Up @@ -86,6 +97,22 @@ be posted):

rate+null://;speed=1b;interval=10s;initial=0b;max-window=4b

Allow sending to UDP 100 messages per second (with burst of 10 messages) with total bandwith limited
to 100kbit (with burst of 64kb), note that ``unit: message`` setting still require ``b`` suffix for
speed/size parameters:

.. code-block:: yaml
server:
tll.proto: rate+udp
tll.host: HOST:PORT
udp.mode: client
speed: 100kbit
max-window: 64kb
bucket.messages:
speed: 100b
max-window: 10b
unit: message
See also
--------
Expand Down

0 comments on commit 69c944c

Please sign in to comment.