Skip to content

Commit

Permalink
wip: channel: Ref guarded methods
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Dec 28, 2024
1 parent 1f1e107 commit b2915ab
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/channel/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -669,11 +669,22 @@ tll_config_t * tll_channel_config(tll_channel_t *c) { if (!c->internal) return 0
const tll_channel_list_t * tll_channel_children(const tll_channel_t *c) { if (!c->internal) return 0; return c->internal->children; }
tll_channel_context_t * tll_channel_context(const tll_channel_t *c) { return tll_channel_context_ref(c->context); }

void tll_channel_free_real(tll_channel_t *c);
void tll_channel_free(tll_channel_t *c)
{
if (!c) return;
if (--c->internal->ref == 0)
return tll_channel_free_real(c);
tll_logger_printf(c->internal->logger, TLL_LOGGER_INFO, "Delete later %d", c->internal->ref);
}

void tll_channel_free_real(tll_channel_t *c)
{
std::string_view name = tll_channel_name(c);

if (c->internal->flags)
return;
c->internal->flags |= 1u;
auto state = tll_channel_state(c);
if (state != TLL_STATE_DESTROY && state != TLL_STATE_CLOSED)
tll_channel_close(c, 1);
Expand Down Expand Up @@ -701,10 +712,26 @@ void tll_channel_free(tll_channel_t *c)
delete c;
}

struct ChannelRef
{
const tll_channel_t * _ptr = nullptr;
ChannelRef(const tll_channel_t * ptr) : _ptr(ptr)
{
++_ptr->internal->ref;
//tll_logger_printf(_ptr->internal->logger, TLL_LOGGER_WARNING, "Increase ref %d", _ptr->internal->ref);
}
~ChannelRef() {
//tll_logger_printf(_ptr->internal->logger, TLL_LOGGER_WARNING, "Decrease ref %d", _ptr->internal->ref);
if (--_ptr->internal->ref == 0)
tll_channel_free_real(const_cast<tll_channel_t *>(_ptr));
}
};

int tll_channel_process(tll_channel_t *c, long timeout, int flags)
{
if (!c || !c->impl || !c->internal) return EINVAL;
if (!tll::dcaps::need_process(c->internal->dcaps)) return EAGAIN;
ChannelRef ref(c);
return (*c->impl->process)(c, timeout, flags);
}

Expand All @@ -713,6 +740,7 @@ int tll_channel_post(tll_channel_t *c, const tll_msg_t *msg, int flags)
if (!c || !c->impl) return EINVAL;
if (c->internal->dump)
tll_channel_log_msg(c, "tll.channel.impl", TLL_LOGGER_INFO, c->internal->dump, msg, "Post", 4);
ChannelRef ref(c);
auto r = (*c->impl->post)(c, msg, flags);
if (r) {
tll_channel_log_msg(c, "tll.channel.impl", TLL_LOGGER_ERROR, TLL_MESSAGE_LOG_FRAME, msg, "Failed to post", -1);
Expand Down Expand Up @@ -786,6 +814,7 @@ int tll_channel_resume(tll_channel_t *c)
#define FORWARD_SAFE_ERR(func, err, c, ...) \
{ \
if (!c || !c->impl || !c->impl->func) return err; \
ChannelRef ref(c); \
return (*c->impl->func)(c, ##__VA_ARGS__); \
}

Expand All @@ -794,6 +823,7 @@ int tll_channel_resume(tll_channel_t *c)
int tll_channel_open(tll_channel_t *c, const char *str, size_t len)
{
if (!c || !c->impl || !c->impl->open) return EINVAL;
ChannelRef ref(c);
if (!str || !len) {
tll::Config cfg;
return (*c->impl->open)(c, cfg);
Expand All @@ -809,6 +839,7 @@ int tll_channel_open(tll_channel_t *c, const char *str, size_t len)
int tll_channel_open_cfg(tll_channel_t *c, const tll_config_t *cfg)
{
if (!c || !c->impl || !c->impl->open) return EINVAL;
ChannelRef ref(c);
if (!cfg) {
tll::Config cfg;
return (*c->impl->open)(c, cfg);
Expand Down
1 change: 1 addition & 0 deletions src/channel/impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void tll_channel_internal_init_v1(tll_channel_internal_t *ptr)
memset(ptr, 0, offsetof(tll_channel_internal_t, reserved) + sizeof(ptr->reserved));
ptr->version = TLL_CHANNEL_INTERNAL_V1;
ptr->fd = -1;
ptr->ref = 1;
}

#if !(defined(__linux__) || defined(__FreeBSD__))
Expand Down
3 changes: 3 additions & 0 deletions src/tll/channel/impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ typedef struct tll_channel_internal_t
tll_stat_block_t * stat;

tll_logger_t * logger;

unsigned ref;
unsigned flags;
intptr_t reserved[4];
} tll_channel_internal_t;

Expand Down
54 changes: 54 additions & 0 deletions test/test_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,3 +616,57 @@ TEST(Channel, ReopenInternal)
reopen.on_state(Closed, now + 1ns);
ASSERT_EQ(reopen.next - now, 1ns);
}

TEST(Channel, DeleteLater)
{
auto ctx = tll::channel::Context(tll::Config());

ASSERT_EQ(ctx.reg(&Echo::impl), 0);

auto c0 = ctx.channel("echo://;name=echo");
auto c1 = ctx.channel("zero://;name=zero");
auto c2 = ctx.channel("null://;name=null");

ASSERT_NE(ctx.get("echo"), nullptr);
ASSERT_NE(ctx.get("zero"), nullptr);
ASSERT_NE(ctx.get("null"), nullptr);

auto deleter = [](const tll_channel_t * c, const tll_msg_t * m, void * user)
{
auto flag = (bool *) user;
fmt::print("Flag for {}: {}\n", tll_channel_name(c), *flag);
if (*flag)
return 0;
*flag = true;
tll_channel_free(const_cast<tll_channel_t *>(c));
return 0;
};

bool f0 = false, f1 = false, f2 = false;

c0->callback_add(deleter, &f0, TLL_MESSAGE_MASK_DATA);
c1->callback_add(deleter, &f1, TLL_MESSAGE_MASK_DATA);
c2->callback_add(deleter, &f2, TLL_MESSAGE_MASK_STATE);

c0->open();
ASSERT_NE(ctx.get("echo"), nullptr);
c0->process();
ASSERT_EQ(c0->state(), tll::state::Active);

c1->open();
ASSERT_EQ(c1->state(), tll::state::Active);
ASSERT_NE(ctx.get("zero"), nullptr);

c2->open();
ASSERT_EQ(ctx.get("null"), nullptr);
c2.release();

tll_msg_t msg = { .type = TLL_MESSAGE_DATA, .msgid = 10 };
ASSERT_EQ(c0->post(&msg), 0);
ASSERT_EQ(ctx.get("echo"), nullptr);
c0.release();

c1->process();
ASSERT_EQ(ctx.get("zero"), nullptr);
c1.release();
}

0 comments on commit b2915ab

Please sign in to comment.