Skip to content

Commit

Permalink
wip: refcount free
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Dec 28, 2024
1 parent 05a0927 commit 67f9721
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/channel/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,12 @@ void tll_channel_free(tll_channel_t *c)
if (!c) return;
std::string_view name = tll_channel_name(c);

if (--c->internal->ref > 0) {
tll_logger_printf(c->internal->logger, TLL_LOGGER_INFO, "Delete later %d", c->internal->ref);
return;
}
c->internal->ref += 2; // Guard against close

auto state = tll_channel_state(c);
if (state != TLL_STATE_DESTROY && state != TLL_STATE_CLOSED)
tll_channel_close(c, 1);
Expand Down Expand Up @@ -701,10 +707,25 @@ void tll_channel_free(tll_channel_t *c)
delete c;
}

struct ChannelRef
{
const tll_channel_t * _ptr = nullptr;
ChannelRef(const tll_channel_t * ptr) : _ptr(ptr)
{
tll_channel_internal_ref(_ptr->internal);
tll_logger_printf(_ptr->internal->logger, TLL_LOGGER_WARNING, "Increase ref %d", _ptr->internal->ref);
}
~ChannelRef() {
tll_logger_printf(_ptr->internal->logger, TLL_LOGGER_WARNING, "Decrease ref %d", _ptr->internal->ref);
tll_channel_internal_unref(_ptr->internal);
}
};

int tll_channel_process(tll_channel_t *c, long timeout, int flags)
{
if (!c || !c->impl || !c->internal) return EINVAL;
if (!tll::dcaps::need_process(c->internal->dcaps)) return EAGAIN;
ChannelRef ref(c);
return (*c->impl->process)(c, timeout, flags);
}

Expand All @@ -713,6 +734,7 @@ int tll_channel_post(tll_channel_t *c, const tll_msg_t *msg, int flags)
if (!c || !c->impl) return EINVAL;
if (c->internal->dump)
tll_channel_log_msg(c, "tll.channel.impl", TLL_LOGGER_INFO, c->internal->dump, msg, "Post", 4);
ChannelRef ref(c);
auto r = (*c->impl->post)(c, msg, flags);
if (r) {
tll_channel_log_msg(c, "tll.channel.impl", TLL_LOGGER_ERROR, TLL_MESSAGE_LOG_FRAME, msg, "Failed to post", -1);
Expand Down Expand Up @@ -786,6 +808,7 @@ int tll_channel_resume(tll_channel_t *c)
#define FORWARD_SAFE_ERR(func, err, c, ...) \
{ \
if (!c || !c->impl || !c->impl->func) return err; \
ChannelRef ref(c); \
return (*c->impl->func)(c, ##__VA_ARGS__); \
}

Expand All @@ -794,6 +817,7 @@ int tll_channel_resume(tll_channel_t *c)
int tll_channel_open(tll_channel_t *c, const char *str, size_t len)
{
if (!c || !c->impl || !c->impl->open) return EINVAL;
ChannelRef ref(c);
if (!str || !len) {
tll::Config cfg;
return (*c->impl->open)(c, cfg);
Expand All @@ -809,6 +833,7 @@ int tll_channel_open(tll_channel_t *c, const char *str, size_t len)
int tll_channel_open_cfg(tll_channel_t *c, const tll_config_t *cfg)
{
if (!c || !c->impl || !c->impl->open) return EINVAL;
ChannelRef ref(c);
if (!cfg) {
tll::Config cfg;
return (*c->impl->open)(c, cfg);
Expand Down
1 change: 1 addition & 0 deletions src/channel/impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void tll_channel_internal_init_v1(tll_channel_internal_t *ptr)
memset(ptr, 0, offsetof(tll_channel_internal_t, reserved) + sizeof(ptr->reserved));
ptr->version = TLL_CHANNEL_INTERNAL_V1;
ptr->fd = -1;
ptr->ref = 1;
}

#if !(defined(__linux__) || defined(__FreeBSD__))
Expand Down
13 changes: 13 additions & 0 deletions src/tll/channel/impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ typedef struct tll_channel_internal_t
tll_stat_block_t * stat;

tll_logger_t * logger;

int ref;
intptr_t reserved[4];
} tll_channel_internal_t;

Expand All @@ -133,6 +135,17 @@ void tll_channel_internal_clear(tll_channel_internal_t *ptr);
int tll_channel_internal_child_add(tll_channel_internal_t *ptr, tll_channel_t *c, const char * tag, int len);
int tll_channel_internal_child_del(tll_channel_internal_t *ptr, const tll_channel_t *c, const char * tag, int len);

static inline void tll_channel_internal_ref(const tll_channel_internal_t * in)
{
++((tll_channel_internal_t *) in)->ref;
}

static inline void tll_channel_internal_unref(const tll_channel_internal_t * in)
{
if (--((tll_channel_internal_t *) in)->ref == 0)
tll_channel_free(in->self);
}

static inline int tll_channel_callback_data(const tll_channel_internal_t * in, const tll_msg_t * msg)
{
if (in->dump)
Expand Down
40 changes: 40 additions & 0 deletions test/test_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,3 +616,43 @@ TEST(Channel, ReopenInternal)
reopen.on_state(Closed, now + 1ns);
ASSERT_EQ(reopen.next - now, 1ns);
}

TEST(Channel, DeleteLater)
{
auto ctx = tll::channel::Context(tll::Config());

ASSERT_EQ(ctx.reg(&Echo::impl), 0);

auto c0 = ctx.channel("echo://;name=echo");
auto c1 = ctx.channel("zero://;name=zero");
auto c2 = ctx.channel("null://;name=null");

ASSERT_NE(ctx.get("echo"), nullptr);
ASSERT_NE(ctx.get("zero"), nullptr);
ASSERT_NE(ctx.get("null"), nullptr);

auto deleter = [](const tll_channel_t * c, const tll_msg_t * m, void * user)
{
auto flag = (bool *) user;
fmt::println("Flag for {}: {}", tll_channel_name(c), *flag);
if (*flag)
return 0;
*flag = true;
tll_channel_free(const_cast<tll_channel_t *>(c));
return 0;
};

bool f0 = false, f1 = false, f2 = false;

c0->callback_add(deleter, &f0, TLL_MESSAGE_MASK_DATA);
c1->callback_add(deleter, &f1, TLL_MESSAGE_MASK_DATA);
c2->callback_add(deleter, &f2, TLL_MESSAGE_MASK_STATE);

c0->open();
ASSERT_NE(ctx.get("echo"), nullptr);
c1->open();
ASSERT_NE(ctx.get("zero"), nullptr);
c2->open();
ASSERT_EQ(ctx.get("null"), nullptr);
c2.release();
}

0 comments on commit 67f9721

Please sign in to comment.