Skip to content

Commit

Permalink
bench: Support reading channel configuration from file
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Nov 29, 2023
1 parent ff89bd5 commit 8d5a919
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 53 deletions.
161 changes: 113 additions & 48 deletions bench/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@
#include <tll/logger.h>
#include <tll/util/argparse.h>
#include <tll/util/bench.h>
#include <tll/util/ownedmsg.h>

#include <tll/channel/prefix.h>

template <typename R, typename... Args>
[[nodiscard]]
R fail(R err, tll::logger::format_string<Args...> format, Args && ... args)
{
fmt::print(format, std::forward<Args>(args)...);
return err;
}

using namespace tll::bench;

class Echo : public tll::channel::Base<Echo>
Expand All @@ -30,7 +39,7 @@ int post(tll::Channel * c, const tll_msg_t * msg)
return c->post(msg);
}

std::unique_ptr<tll::Channel> prepare(tll::channel::Context &ctx, std::string_view url, bool callback, size_t &counter)
std::unique_ptr<tll::Channel> prepare(tll::channel::Context &ctx, const tll::Channel::Url &url, bool callback, size_t &counter)
{
auto c = ctx.channel(url);
if (!c)
Expand Down Expand Up @@ -59,15 +68,15 @@ std::unique_ptr<tll::Channel> prepare(tll::channel::Context &ctx, std::string_vi
return c;
}

int timeit_post(tll::channel::Context &ctx, std::string_view url, bool callback, unsigned count, const tll_msg_t *msg)
int timeit_post(tll::channel::Context &ctx, const tll::Channel::Url &url, bool callback, unsigned count, const tll_msg_t *msg)
{
size_t counter = 0;
auto c = prepare(ctx, url, callback, counter);

if (!c.get())
return -1;

timeit(count, url, post, c.get(), msg);
timeit(count, tll::conv::to_string(url), post, c.get(), msg);
if (callback && !counter)
fmt::print("Callback was added but not called\n");
return 0;
Expand All @@ -85,7 +94,7 @@ std::vector<tll::Channel *> process_list(tll::Channel * c)
return r;
}

int timeit_process(tll::channel::Context &ctx, std::string_view url, bool callback, unsigned count)
int timeit_process(tll::channel::Context &ctx, const tll::Channel::Url &url, bool callback, unsigned count)
{
size_t counter = 0;
auto c = prepare(ctx, url, callback, counter);
Expand All @@ -103,28 +112,61 @@ int timeit_process(tll::channel::Context &ctx, std::string_view url, bool callba
return -1;
}

timeit(count, url, tll_channel_process, list[0], 0, 0);
timeit(count, tll::conv::to_string(url), tll_channel_process, list[0], 0, 0);
if (callback && !counter)
fmt::print("Callback was added but not called\n");
return 0;
}

std::optional<tll::util::OwnedMessage> payload_read(tll::channel::Context &ctx, const tll::Channel::Url &url)
{
auto c = ctx.channel(url);
if (!c)
return fail(std::nullopt, "Failed to create payload channel {}\n");

struct Callback
{
tll::util::OwnedMessage msg;

int callback(const tll::Channel *, const tll_msg_t *m)
{
msg = m;
return 0;
}
};
Callback cb;
c->callback_add(&cb, TLL_MESSAGE_MASK_DATA);
c->open();
for (auto i = 0; i < 10; i++) {
c->process();
if (cb.msg.size)
break;
}
if (!cb.msg.size)
return fail(std::nullopt, "Failed to read data from payload channel\n");

return std::move(cb.msg);
}

int main(int argc, char *argv[])
{
tll::Logger::set("tll", tll::Logger::Warning, true);

tll::util::ArgumentParser parser("url [--module=module]");

std::vector<std::string> url;
std::vector<tll::Channel::Url> curl;
std::vector<std::string> modules;
std::string payload_channel;
std::string config_file;
bool callback = false;
bool process = false;
unsigned count = 10000000;
unsigned msgsize = 1024;
unsigned msgsize = 0;
int msgid = 0;

parser.add_argument({"URL"}, "channel url", &url);
parser.add_argument({"--config"}, "read benchmark configuration from file", &config_file);
parser.add_argument({"-m", "--module"}, "load channel modules", &modules);
parser.add_argument({"-c", "--callback"}, "add callback", &callback);
parser.add_argument({"--process"}, "run process benchmark", &process);
Expand All @@ -141,67 +183,90 @@ int main(int argc, char *argv[])
return 1;
}

std::vector<char> buf;
buf.resize(msgsize);

auto ctx = tll::channel::Context(tll::Config());

tll::util::OwnedMessage msg;

for (auto &m : modules) {
if (ctx.load(m, "channel_module")) {
fmt::print("Failed to load module {}\n", m);
return 1;
}
if (ctx.load(m))
return fail(1, "Failed to load module {}\n", m);
}

tll_msg_t msg = { TLL_MESSAGE_DATA };

if (payload_channel.size()) {
auto c = ctx.channel(payload_channel);
if (!c) {
fmt::print("Failed to create payload channel {}\n", payload_channel);
return 1;
}
buf.resize(0);
struct Callback
{
std::vector<char> * buf;
tll_msg_t * msg;

int callback(const tll::Channel *, const tll_msg_t *m)
{
buf->resize(m->size);
memcpy(buf->data(), m->data, m->size);
msg->msgid = m->msgid;
return 0;
if (config_file.size()) {
auto cfg = tll::Config::load("yaml://" + config_file);
if (!cfg)
return fail(1, "Failed to load config {}\n", config_file);

tll::Channel::Url lurl;
lurl.set("tll.proto", "loader");
lurl.set("tll.internal", "yes");
lurl.set("name", "loader");
if (auto mcfg = cfg->sub("module"))
lurl.set("module", mcfg->copy());
if (auto acfg = cfg->sub("alias"))
lurl.set("alias", acfg->copy());
if (!ctx.channel(lurl))
return fail(1, "Failed to load channel modules\n");

auto reader = tll::make_props_reader(*cfg);
if (msgsize == 0)
msgsize = reader.getT("msgsize", msgsize);
if (msgid == 0)
msgid = reader.getT("msgid", msgid);

if (!reader)
return fail(1, "Invalid config parameters: {}", reader.error());

if (auto url = cfg->getT("payload", tll::Channel::Url()); url) {
if (url->proto() != "") {
if (auto r = payload_read(ctx, *url); !r)
return 1;
else
std::swap(msg, *r);
}
};
Callback cb = { &buf, &msg };
c->callback_add(&cb, TLL_MESSAGE_MASK_DATA);
c->open();
for (auto i = 0; i < 10; i++) {
c->process();
if (buf.size())
break;
} else
return fail(1, "Invalid preload url in config: {}", url.error());

for (auto &[p, _] : cfg->browse("channel.*", true)) {
auto r = cfg->getT<tll::Channel::Url>(p);
if (!r)
return fail(1, "Failed to load channel url from config: {}", r.error());
curl.push_back(*r);
}
if (buf.empty()) {
fmt::print("Failed to read data from payload channel {}\n", payload_channel);
}

if (payload_channel.size()) {
auto url = tll::Channel::Url::load(payload_channel);
if (!url)
return fail(1, "Failed to parse payload url {}\n", payload_channel);

if (auto r = payload_read(ctx, *url); !r)
return 1;
}
else
std::swap(msg, *r);
}

msg.data = buf.data();
msg.size = buf.size();
if (!msgsize)
msgsize = 1024;
if (!msg.data)
msg.resize(msgsize);
if (msgid)
msg.msgid = msgid;

ctx.reg(&Echo::impl);
ctx.reg(&Prefix::impl);

if (url.empty())
if (url.empty() && curl.empty())
url = {"null://", "prefix+null://", "echo://", "prefix+echo://"};

timeit_post(ctx, "null://;name=prewarm", true, count, &msg);
for (auto & u : url) {
auto r = tll::Channel::Url::parse(u);
if (!r)
return fail(1, "Invalid url '{}': {}\n", u, r.error());
curl.push_back(*r);
}
tll::bench::prewarm(100ms);
for (auto & u : curl) {
if (process)
timeit_process(ctx, u, callback, count);
else
Expand Down
22 changes: 17 additions & 5 deletions src/tll/util/ownedmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ class OwnedMessage : public tll_msg_t

~OwnedMessage()
{
if (data)
delete [] (char *) data;
data = nullptr;
reset();
}

OwnedMessage & operator = (OwnedMessage rhs)
Expand All @@ -52,11 +50,25 @@ class OwnedMessage : public tll_msg_t
return *this;
}

void reset()
{
if (data)
delete [] (char *) data;
data = nullptr;
size = 0;
}

void resize(size_t size)
{
reset();
this->data = new char[size];
this->size = size;
}

static OwnedMessage * allocate(size_t size)
{
auto m = new OwnedMessage;
m->data = new char[size];
m->size = size;
m->resize(size);
return m;
}

Expand Down

0 comments on commit 8d5a919

Please sign in to comment.