Skip to content

Commit

Permalink
wip: logger: Write logs in separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Dec 20, 2024
1 parent db846f6 commit 3ea4bc2
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 32 deletions.
42 changes: 42 additions & 0 deletions src/logger/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef _LOGGER_COMMON_H
#define _LOGGER_COMMON_H

#include "tll/logger.h"
#include "tll/logger/impl.h"
#include "tll/util/refptr.h"
#include "tll/util/time.h"

#include <mutex>

namespace tll::logger {

struct tll_logger_obj_t : tll::util::refbase_t<tll_logger_obj_t, 0>
{
const char * name = "";
void * obj = nullptr;
tll_logger_impl_t * impl = nullptr;

~tll_logger_obj_t()
{
if (obj && impl && impl->log_free)
impl->log_free(impl, name, obj);
}

auto log(tll::time_point ts, tll_logger_level_t level, std::string_view body)
{
return (*impl->log)(ts.time_since_epoch().count(), name, level, body.data(), body.size(), obj);
}
};

struct Logger : public tll_logger_t, public tll::util::refbase_t<Logger>
{
void destroy();

std::mutex lock;
std::string name;
tll::util::refptr_t<tll_logger_obj_t> impl;
};

} // namespace tll::logger

#endif//_LOGGER_COMMON_H
54 changes: 29 additions & 25 deletions src/logger/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
#include "tll/logger.h"
#include "tll/logger/impl.h"
#include "tll/util/refptr.h"
#include "tll/util/size.h"
#include "tll/util/string.h"
#include "tll/util/time.h"

#include <atomic>
#include <map>
#include <mutex>
#include <set>
Expand All @@ -22,8 +22,9 @@
#include <stdio.h>
#include <stdarg.h>

#include "logger/common.h"
#include "logger/config.h"

#include "logger/thread.h"
#include "logger/util.h"

#ifdef WITH_SPDLOG
Expand All @@ -32,34 +33,14 @@

namespace tll::logger {

struct tll_logger_obj_t : tll::util::refbase_t<tll_logger_obj_t, 0>
{
const char * name = "";
void * obj = nullptr;
tll_logger_impl_t * impl = nullptr;

~tll_logger_obj_t()
{
if (obj && impl && impl->log_free)
impl->log_free(impl, name, obj);
}
};

struct Logger : public tll_logger_t, public tll::util::refbase_t<Logger>
{
void destroy();

std::mutex lock;
std::string name;
tll::util::refptr_t<tll_logger_obj_t> impl;
};

struct logger_context_t
{
std::shared_mutex lock;
typedef std::unique_lock<std::shared_mutex> wlock_t;
typedef std::shared_lock<std::shared_mutex> rlock_t;

std::unique_ptr<Thread> _thread;

std::map<std::string_view, Logger *, std::less<>> _loggers;
std::map<std::string, tll_logger_level_t, std::less<>> _levels_prefix;
std::map<std::string, tll_logger_level_t, std::less<>> _levels;
Expand All @@ -68,6 +49,12 @@ struct logger_context_t
static tll_logger_impl_t stdio;
tll_logger_impl_t * impl = &stdio;

~logger_context_t()
{
// Wait for thread to finish if it was spawned
_thread.reset();
}

Logger * init(std::string_view name)
{
{
Expand Down Expand Up @@ -213,6 +200,20 @@ struct logger_context_t

int configure(const tll::ConstConfig &cfg)
{
auto thread = cfg.getT<bool>("thread");
if (!thread) { // Missing or invalid value, do nothing
} else if (*thread) {
auto size = cfg.getT<tll::util::Size>("ring-size").value_or(128 * 1024);
std::unique_ptr<Thread> tmp { new Thread() };
if (tmp->init(size))
return 0;

std::swap(_thread, tmp);
if (tmp)
tmp->stop();
} else if (!*thread)
_thread.reset();

if (auto levels = cfg.sub("levels"); levels) {
std::set<std::string> skip;
for (auto & [k, v] : levels->browse("**", true)) {
Expand Down Expand Up @@ -320,7 +321,10 @@ int tll_logger_log(tll_logger_t * l, tll_logger_level_t level, const char * buf,

auto ts = tll::time::now();

return (*impl->impl->log)(ts.time_since_epoch().count(), impl->name, level, buf, len, impl->obj);
if (tll::logger::context._thread) {
return tll::logger::context._thread->push(log, ts, level, {buf, len});
} else
return log->impl->log(ts, level, {buf, len});
}

tll_logger_buf_t * tll_logger_tls_buf()
Expand Down
133 changes: 133 additions & 0 deletions src/logger/thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#ifndef _LOGGER_THREAD_H
#define _LOGGER_THREAD_H

#include "tll/cppring.h"
#include "tll/logger.h"
#include "tll/util/time.h"

#include <poll.h>
#include <sys/eventfd.h>
#include <unistd.h>

#include <cstdlib>
#include <mutex>
#include <thread>

#include "logger/common.h"

namespace tll::logger {

struct Header
{
uint16_t level = TLL_LOGGER_DEBUG;
tll::logger::Logger * logger = nullptr;
tll::time_point timestamp;
};

struct Thread
{
std::mutex _lock;
std::shared_ptr<tll::Ring> _ring;
int _fd = -1;
bool _stop = false;
std::thread _thread;

~Thread()
{
stop();

if (_thread.joinable())
_thread.join();
_thread = {};

if (_fd != -1)
close(_fd);
_fd = -1;
}

int init(size_t size)
{
_fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK | EFD_CLOEXEC);
if (_fd == -1)
return EINVAL;
_ring.reset(tll::Ring::allocate(size).release());
if (!_ring)
return EINVAL;

_thread = std::thread([](Thread * self) { self->run(); }, this);
return 0;
}

void stop()
{
_stop = true;
eventfd_write(_fd, 1);
}

void run()
{
std::shared_ptr<Ring> ring = _ring;
pollfd pfd = { .fd = _fd, .events = POLLIN };
while (!_stop || !ring->empty()) {
if (!ring->empty()) {
step();
continue;
}

auto r = poll(&pfd, 1, 1000);
if (r == 1)
step();
else if (r < 0)
return;
else if (_stop)
return;
}
}

void step()
{
const void * data;
size_t size;
if (_ring->read(&data, &size))
return;

eventfd_t v;
eventfd_read(_fd, &v);

if (size < sizeof(Header))
return;

auto header = (const Header *) data;

std::string_view body((const char *) (header + 1), size - sizeof(Header));

{
std::unique_lock<std::mutex> lck(header->logger->lock);
header->logger->impl->log(header->timestamp, (tll_logger_level_t) header->level, body);
}
header->logger->unref();
_ring->shift();
}

int push(Logger * log, tll::time_point ts, tll_logger_level_t level, std::string_view body)
{
std::unique_lock<std::mutex> lock(_lock);
void * data;
if (_ring->write_begin(&data, sizeof(Header) + body.size())) {
_lock.unlock();
return log->impl->log(ts, level, body);
}
auto header = (Header *) data;
header->logger = log->ref();
header->timestamp = ts;
header->level = level;
memcpy(header + 1, body.data(), body.size());
_ring->write_end(data, sizeof(Header) + body.size());
eventfd_write(_fd, 1);
return 0;
}
};

} // namespace tll::logger

#endif//_LOGGER_THREAD_H
2 changes: 1 addition & 1 deletion src/logger/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#define _LOGGER_UTIL_H

#include "tll/logger.h"
#include "tll/util/conv.h"
#include "tll/conv/base.h"

constexpr bool icmp(char a, char b)
{
Expand Down
59 changes: 53 additions & 6 deletions test/test_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,27 @@

#include "gtest/gtest.h"

#include "tll/config.h"
#include "tll/logger.h"
#include "tll/logger/impl.h"
#include "tll/logger/prefix.h"

#include <list>
#include <mutex>
#include <thread>

struct log_map : public tll_logger_impl_t
{
using log_entry_t = std::pair<tll::Logger::level_t, std::string>;

std::map<std::string, std::list<log_entry_t>> map;
struct Object
{
std::mutex * lock = nullptr;
std::list<log_entry_t> list;
};

std::mutex lock;
std::map<std::string, Object> map;

log_map()
{
Expand All @@ -32,16 +42,18 @@ struct log_map : public tll_logger_impl_t
static int _log(long long ts, const char * category, tll_logger_level_t level, const char * data, size_t size, void * obj)
{
fmt::print(stderr, "Log: {} {} {}\n", tll::Logger::level_name(level), category, std::string_view(data, size));
auto list = static_cast<std::list<log_entry_t> *>(obj);
list->push_back({level, std::string(data, size)});
auto list = static_cast<Object *>(obj);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::unique_lock lock(*list->lock);
list->list.push_back({level, std::string(data, size)});
return 0;
}

static void * _new(tll_logger_impl_t * impl, const char * category)
{
fmt::print(stderr, "Create new logger {}\n", category);
auto self = static_cast<log_map *>(impl);
auto r = self->map.insert({category, {}});
auto r = self->map.emplace(category, Object { .lock = &self->lock });
return &r.first->second;
}

Expand Down Expand Up @@ -96,7 +108,7 @@ TEST(Logger, Set)
tll::Logger l0 { "l0" };

ASSERT_EQ(impl.map.size(), 1u);
auto & list = impl.map["l0"];
auto & list = impl.map["l0"].list;

ASSERT_EQ(l0.level(), tll::Logger::Info);
l0.debug("Debug");
Expand Down Expand Up @@ -206,7 +218,7 @@ TEST(Logger, Prefix)
ASSERT_EQ(l.level(), p0.level());

ASSERT_EQ(impl.map.size(), 1u);
auto & list = impl.map["l0"];
auto & list = impl.map["l0"].list;
l.info("l0");
ASSERT_EQ(list.back().second, "l0");

Expand Down Expand Up @@ -234,3 +246,38 @@ TEST(Logger, Prefix)
pinv.info("pinv");
pfinv.info("pfinv");
}

TEST(Logger, Thread)
{
log_map impl;
tll_logger_register(&impl);

tll::Logger l0 { "l0" };
// May be not Debug from other tests
l0.level() = tll::Logger::Debug;
ASSERT_EQ(l0.level(), tll::Logger::Debug);

ASSERT_EQ(impl.map.size(), 1u);
auto & list = impl.map["l0"];

tll::Config cfg;
cfg.set("thread", "yes");

tll::Logger::config(cfg);
for (auto i = 0u; i < 10; i++) {
l0.info("text");
}

{
std::unique_lock<std::mutex> lock(impl.lock);
ASSERT_LE(list.list.size(), 10);
}

cfg.set("thread", "no");
tll::Logger::config(cfg);

{
std::unique_lock<std::mutex> lock(impl.lock);
ASSERT_EQ(list.list.size(), 10);
}
}

0 comments on commit 3ea4bc2

Please sign in to comment.