Skip to content

Commit

Permalink
Also free entities created on behalf of graph cache
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Boasson <[email protected]>
  • Loading branch information
eboasson committed Apr 16, 2020
1 parent ce59290 commit 81b6c61
Showing 1 changed file with 69 additions and 89 deletions.
158 changes: 69 additions & 89 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ bool operator<(dds_builtintopic_guid_t const & a, dds_builtintopic_guid_t const
static rmw_ret_t discovery_thread_stop(rmw_dds_common::Context & context);
static bool dds_qos_to_rmw_qos(const dds_qos_t * dds_qos, rmw_qos_profile_t * qos_policies);

static rmw_publisher_t * create_publisher(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_publisher_options_t * publisher_options
);
static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher);

static rmw_subscription_t * create_subscription(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options
);
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription);

static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl);
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc);

struct CddsDomain;
struct CddsWaitset;

Expand Down Expand Up @@ -244,7 +263,17 @@ struct rmw_context_impl_t
~rmw_context_impl_t()
{
discovery_thread_stop(common);
if (dds_delete(ppant) < 0) {
common.graph_cache.clear_on_change_callback();
if (common.graph_guard_condition) {
destroy_guard_condition(common.graph_guard_condition);
}
if (common.pub) {
destroy_publisher(common.pub);
}
if (common.sub) {
destroy_subscription(common.sub);
}
if (ppant > 0 && dds_delete(ppant) < 0) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy domain in destructor\n");
}
Expand Down Expand Up @@ -329,25 +358,6 @@ static void clean_waitset_caches();
static void check_for_blocked_requests(CddsClient & client);
#endif

static rmw_publisher_t * create_publisher(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_publisher_options_t * publisher_options
);
static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher);

static rmw_subscription_t * create_subscription(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options
);
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription);

static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl);
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc);

#ifndef WIN32
/* TODO(allenh1): check for Clang */
#pragma GCC visibility push (default)
Expand Down Expand Up @@ -618,14 +628,14 @@ static void discovery_thread(rmw_context_impl_t * impl)
dds_delete(ws);
}

static rmw_ret_t discovery_thread_start(rmw_context_t * context)
static rmw_ret_t discovery_thread_start(rmw_context_impl_t * impl)
{
auto common_context = &context->impl->common;
auto common_context = &impl->common;
common_context->thread_is_running.store(true);
common_context->listener_thread_gc = create_guard_condition(context->impl);
common_context->listener_thread_gc = create_guard_condition(impl);
if (common_context->listener_thread_gc) {
try {
common_context->listener_thread = std::thread(discovery_thread, context->impl);
common_context->listener_thread = std::thread(discovery_thread, impl);
return RMW_RET_OK;
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to create std::thread: %s", exc.what());
Expand All @@ -648,23 +658,24 @@ static rmw_ret_t discovery_thread_start(rmw_context_t * context)

static rmw_ret_t discovery_thread_stop(rmw_dds_common::Context & common_context)
{
common_context.thread_is_running.exchange(false);
rmw_ret_t rmw_ret = rmw_trigger_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
}
try {
common_context.listener_thread.join();
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to join std::thread: %s", exc.what());
return RMW_RET_ERROR;
} catch (...) {
RMW_SET_ERROR_MSG("Failed to join std::thread");
return RMW_RET_ERROR;
}
rmw_ret = destroy_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
if (common_context.thread_is_running.exchange(false)) {
rmw_ret_t rmw_ret = rmw_trigger_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
}
try {
common_context.listener_thread.join();
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to join std::thread: %s", exc.what());
return RMW_RET_ERROR;
} catch (...) {
RMW_SET_ERROR_MSG("Failed to join std::thread");
return RMW_RET_ERROR;
}
rmw_ret = destroy_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
}
}
return RMW_RET_OK;
}
Expand Down Expand Up @@ -880,6 +891,12 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
return RMW_RET_BAD_ALLOC;
}

/* "impl"'s destructor relies on these being initialized properly */
impl->common.thread_is_running.store(false);
impl->common.graph_guard_condition = nullptr;
impl->common.pub = nullptr;
impl->common.sub = nullptr;

/* Take domains_lock and hold it until after the participant creation succeeded or
failed: otherwise there is a race with rmw_destroy_node deleting the last participant
and tearing down the domain for versions of Cyclone that implement the original
Expand Down Expand Up @@ -961,76 +978,42 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
/* Create RMW publisher/subscription/guard condition used by rmw_dds_common
discovery */
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
std::unique_ptr<rmw_publisher_t, std::function<void(rmw_publisher_t *)>>
publisher(
create_publisher(
impl->common.pub = create_publisher(
impl->ppant, impl->dds_pub,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"ros_discovery_info",
&pubsub_qos,
&publisher_options),
[&](rmw_publisher_t * pub) {
if (RMW_RET_OK != destroy_publisher(pub)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy publisher after function: '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
});
if (publisher == nullptr) {
&publisher_options);
if (impl->common.pub == nullptr) {
return RMW_RET_ERROR;
}

rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
subscription_options.ignore_local_publications = true;
// FIXME: keyed topics => KEEP_LAST and depth 1.
pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL;
std::unique_ptr<rmw_subscription_t, std::function<void(rmw_subscription_t *)>>
subscription(
create_subscription(
impl->common.sub = create_subscription(
impl->ppant, impl->dds_sub,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"ros_discovery_info",
&pubsub_qos,
&subscription_options),
[&](rmw_subscription_t * sub) {
if (RMW_RET_OK != destroy_subscription(sub)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy subscription after function: '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
});
if (subscription == nullptr) {
&subscription_options);
if (impl->common.sub == nullptr) {
return RMW_RET_ERROR;
}

std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
create_guard_condition(impl.get()),
[&](rmw_guard_condition_t * gc) {
if (RMW_RET_OK != destroy_guard_condition(gc)) {
RMW_SAFE_FWRITE_TO_STDERR(
"Failed to destroy guard condition after function: '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
});
if (!graph_guard_condition) {
impl->common.graph_guard_condition = create_guard_condition(impl.get());
if (impl->common.graph_guard_condition == nullptr) {
return RMW_RET_BAD_ALLOC;
}

impl->common.graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = impl->common.graph_guard_condition]() {
static_cast<void>(rmw_trigger_guard_condition(guard_condition));
});

impl->common.pub = publisher.get();
impl->common.sub = subscription.get();
impl->common.graph_guard_condition = graph_guard_condition.get();
get_entity_gid(impl->ppant, impl->common.gid);
impl->common.graph_cache.add_participant(impl->common.gid, context->options.enclave);
impl->common.thread_is_running.store(false);
impl->common.listener_thread_gc = nullptr;

context->impl = impl.get();

// One could also use a set of listeners instead of a thread for maintaining the graph cache:
// - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock
Expand All @@ -1039,14 +1022,11 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
// updates and triggering a guard condition, and so that should be safe.
// however, the graph cache updates could be expensive, and so performing those operations on
// the thread receiving data from the network may not be wise.
if ((ret = discovery_thread_start(context)) != RMW_RET_OK) {
if ((ret = discovery_thread_start(impl.get())) != RMW_RET_OK) {
return ret;
}

graph_guard_condition.release();
publisher.release();
subscription.release();
impl.release();
context->impl = impl.release();
return RMW_RET_OK;
}

Expand Down

0 comments on commit 81b6c61

Please sign in to comment.