diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index bfa62612..1d480b23 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -151,7 +151,7 @@ static rmw_subscription_t * create_subscription( ); static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription); -static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl); +static rmw_guard_condition_t * create_guard_condition(); static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc); struct CddsDomain; @@ -254,6 +254,10 @@ struct rmw_context_impl_t dds_entity_t dds_pub; dds_entity_t dds_sub; + /* Participant reference count*/ + size_t node_count{0}; + std::mutex initialization_mutex; + rmw_context_impl_t() : common(), domain_id(UINT32_MAX), ppant(0) { @@ -263,33 +267,29 @@ struct rmw_context_impl_t common.pub = nullptr; common.sub = nullptr; } + + // Initializes the participant, if it wasn't done already. + // node_count is increased + rmw_ret_t + init(rmw_init_options_t * options); + + // Destroys the participant, when node_count reaches 0. + rmw_ret_t + fini(); + ~rmw_context_impl_t() { - discovery_thread_stop(common); - common.graph_cache.clear_on_change_callback(); - if (common.graph_guard_condition) { - destroy_guard_condition(common.graph_guard_condition); - } - if (common.pub) { - destroy_publisher(common.pub); - } - if (common.sub) { - destroy_subscription(common.sub); - } - if (ppant > 0 && dds_delete(ppant) < 0) { + if (0u != this->node_count) { RCUTILS_SAFE_FWRITE_TO_STDERR( - "Failed to destroy domain in destructor\n"); - } - if (domain_id != UINT32_MAX) { - std::lock_guard lock(gcdds.domains_lock); - CddsDomain & dom = gcdds.domains[domain_id]; - assert(dom.refcount > 0); - if (--dom.refcount == 0) { - static_cast(dds_delete(dom.domain_handle)); - gcdds.domains.erase(domain_id); - } + "Not all nodes were finished before finishing the context\n." + "Ensure `rcl_node_fini` is called for all nodes before `rcl_context_fini`," + "to avoid leaking.\n"); } } + +private: + void + clean_up(); }; struct CddsNode @@ -583,7 +583,7 @@ static void discovery_thread(rmw_context_impl_t * impl) dds_entity_t ws; /* deleting ppant will delete waitset as well, so there is no real need to delete the waitset here on error, but it is more hygienic */ - if ((ws = dds_create_waitset(impl->ppant)) < 0) { + if ((ws = dds_create_waitset(DDS_CYCLONEDDS_HANDLE)) < 0) { RCUTILS_SAFE_FWRITE_TO_STDERR( "ros discovery info listener thread: failed to create waitset, will shutdown ...\n"); return; @@ -637,7 +637,7 @@ static rmw_ret_t discovery_thread_start(rmw_context_impl_t * impl) { auto common_context = &impl->common; common_context->thread_is_running.store(true); - common_context->listener_thread_gc = create_guard_condition(impl); + common_context->listener_thread_gc = create_guard_condition(); if (common_context->listener_thread_gc) { try { common_context->listener_thread = std::thread(discovery_thread, impl); @@ -745,6 +745,21 @@ static bool check_create_domain(dds_domainid_t did, rmw_localhost_only_t localho } } +static +void +check_destroy_domain(dds_domainid_t domain_id) +{ + if (domain_id != UINT32_MAX) { + std::lock_guard lock(gcdds.domains_lock); + CddsDomain & dom = gcdds.domains[domain_id]; + assert(dom.refcount > 0); + if (--dom.refcount == 0) { + static_cast(dds_delete(dom.domain_handle)); + gcdds.domains.erase(domain_id); + } + } +} + #if RMW_SUPPORT_SECURITY /* Returns the full URI of a security file properly formatted for DDS */ bool get_security_file_URI( @@ -816,6 +831,7 @@ void finalize_security_file_URIs( #endif /* RMW_SUPPORT_SECURITY */ /* Attempt to set all the qos properties needed to enable DDS security */ +static rmw_ret_t configure_qos_for_security( dds_qos_t * qos, const rmw_security_options_t * security_options) @@ -861,107 +877,87 @@ rmw_ret_t configure_qos_for_security( #endif } -extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t * context) +rmw_ret_t +rmw_context_impl_t::init(rmw_init_options_t * options) { - rmw_ret_t ret; - - static_cast(options); - static_cast(context); - RCUTILS_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); - RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - options, - options->implementation_identifier, - eclipse_cyclonedds_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - /* domain_id = UINT32_MAX = Cyclone DDS' "default domain id".*/ - if (options->domain_id >= UINT32_MAX) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: domain id out of range"); - return RMW_RET_INVALID_ARGUMENT; - } - const dds_domainid_t domain_id = static_cast(options->domain_id); - - context->instance_id = options->instance_id; - context->implementation_identifier = eclipse_cyclonedds_identifier; - context->impl = nullptr; - - if ((ret = rmw_init_options_copy(options, &context->options)) != RMW_RET_OK) { - return ret; - } - - std::unique_ptr impl(new(std::nothrow) rmw_context_impl_t()); - if (impl == nullptr) { - return RMW_RET_BAD_ALLOC; + std::lock_guard guard(initialization_mutex); + if (0u != this->node_count) { + // initialization has already been done + this->node_count++; + return RMW_RET_OK; } /* Take domains_lock and hold it until after the participant creation succeeded or - failed: otherwise there is a race with rmw_destroy_node deleting the last participant - and tearing down the domain for versions of Cyclone that implement the original - version of dds_create_domain that doesn't return a handle. */ - if (!check_create_domain(domain_id, options->localhost_only)) { + failed: otherwise there is a race with rmw_destroy_node deleting the last participant + and tearing down the domain for versions of Cyclone that implement the original + version of dds_create_domain that doesn't return a handle. */ + this->domain_id = static_cast(options->domain_id); + if (!check_create_domain(this->domain_id, options->localhost_only)) { return RMW_RET_ERROR; } - /* Once the domain id is set in impl, impl's destructor will take care of unref'ing - the domain */ - impl->domain_id = domain_id; - std::unique_ptr> ppant_qos(dds_create_qos(), &dds_delete_qos); if (ppant_qos == nullptr) { + this->clean_up(); return RMW_RET_BAD_ALLOC; } std::string user_data = std::string("enclave=") + std::string( - context->options.enclave) + std::string(";"); + options->enclave) + std::string(";"); dds_qset_userdata(ppant_qos.get(), user_data.c_str(), user_data.size()); if (configure_qos_for_security( ppant_qos.get(), - &context->options.security_options) != RMW_RET_OK) + &options->security_options) != RMW_RET_OK) { - if (context->options.security_options.enforce_security == RMW_SECURITY_ENFORCEMENT_ENFORCE) { + if (RMW_SECURITY_ENFORCEMENT_ENFORCE == options->security_options.enforce_security) { + this->clean_up(); return RMW_RET_ERROR; } } - impl->ppant = dds_create_participant(domain_id, ppant_qos.get(), nullptr); - if (impl->ppant < 0) { + + this->ppant = dds_create_participant(this->domain_id, ppant_qos.get(), nullptr); + if (this->ppant < 0) { + this->clean_up(); RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); return RMW_RET_ERROR; } /* Create readers for DDS built-in topics for monitoring discovery */ - if ((impl->rd_participant = - dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0) + if ((this->rd_participant = + dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0) { + this->clean_up(); RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSParticipant reader"); return RMW_RET_ERROR; } - if ((impl->rd_subscription = - dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, nullptr, nullptr)) < 0) + if ((this->rd_subscription = + dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, nullptr, nullptr)) < 0) { + this->clean_up(); RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSSubscription reader"); return RMW_RET_ERROR; } - if ((impl->rd_publication = - dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, nullptr, nullptr)) < 0) + if ((this->rd_publication = + dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, nullptr, nullptr)) < 0) { + this->clean_up(); RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSPublication reader"); return RMW_RET_ERROR; } - /* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers - to be created for RMW publishers/subscriptions. */ - if ((impl->dds_pub = dds_create_publisher(impl->ppant, nullptr, nullptr)) < 0) { + to be created for RMW publishers/subscriptions. */ + if ((this->dds_pub = dds_create_publisher(this->ppant, nullptr, nullptr)) < 0) { + this->clean_up(); RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher"); return RMW_RET_ERROR; } - if ((impl->dds_sub = dds_create_subscriber(impl->ppant, nullptr, nullptr)) < 0) { + if ((this->dds_sub = dds_create_subscriber(this->ppant, nullptr, nullptr)) < 0) { + this->clean_up(); RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber"); return RMW_RET_ERROR; @@ -975,15 +971,16 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t pubsub_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; /* Create RMW publisher/subscription/guard condition used by rmw_dds_common - discovery */ + discovery */ rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options(); - impl->common.pub = create_publisher( - impl->ppant, impl->dds_pub, + this->common.pub = create_publisher( + this->ppant, this->dds_pub, rosidl_typesupport_cpp::get_message_type_support_handle(), "ros_discovery_info", &pubsub_qos, &publisher_options); - if (impl->common.pub == nullptr) { + if (this->common.pub == nullptr) { + this->clean_up(); return RMW_RET_ERROR; } @@ -991,31 +988,33 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t subscription_options.ignore_local_publications = true; // FIXME: keyed topics => KEEP_LAST and depth 1. pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL; - impl->common.sub = create_subscription( - impl->ppant, impl->dds_sub, + this->common.sub = create_subscription( + this->ppant, this->dds_sub, rosidl_typesupport_cpp::get_message_type_support_handle(), "ros_discovery_info", &pubsub_qos, &subscription_options); - if (impl->common.sub == nullptr) { + if (this->common.sub == nullptr) { + this->clean_up(); return RMW_RET_ERROR; } - impl->common.graph_guard_condition = create_guard_condition(impl.get()); - if (impl->common.graph_guard_condition == nullptr) { + this->common.graph_guard_condition = create_guard_condition(); + if (this->common.graph_guard_condition == nullptr) { + this->clean_up(); return RMW_RET_BAD_ALLOC; } - impl->common.graph_cache.set_on_change_callback( - [guard_condition = impl->common.graph_guard_condition]() { + this->common.graph_cache.set_on_change_callback( + [guard_condition = this->common.graph_guard_condition]() { rmw_ret_t ret = rmw_trigger_guard_condition(guard_condition); if (ret != RMW_RET_OK) { RMW_SET_ERROR_MSG("graph cache on_change_callback failed to trigger guard condition"); } }); - get_entity_gid(impl->ppant, impl->common.gid); - impl->common.graph_cache.add_participant(impl->common.gid, context->options.enclave); + get_entity_gid(this->ppant, this->common.gid); + this->common.graph_cache.add_participant(this->common.gid, options->enclave); // One could also use a set of listeners instead of a thread for maintaining the graph cache: // - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock @@ -1024,11 +1023,83 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t // updates and triggering a guard condition, and so that should be safe. // however, the graph cache updates could be expensive, and so performing those operations on // the thread receiving data from the network may not be wise. - if ((ret = discovery_thread_start(impl.get())) != RMW_RET_OK) { + rmw_ret_t ret; + if ((ret = discovery_thread_start(this)) != RMW_RET_OK) { + this->clean_up(); + return ret; + } + ++this->node_count; + return RMW_RET_OK; +} + +void +rmw_context_impl_t::clean_up() +{ + discovery_thread_stop(common); + common.graph_cache.clear_on_change_callback(); + if (common.graph_guard_condition) { + destroy_guard_condition(common.graph_guard_condition); + } + if (common.pub) { + destroy_publisher(common.pub); + } + if (common.sub) { + destroy_subscription(common.sub); + } + if (ppant > 0 && dds_delete(ppant) < 0) { + RCUTILS_SAFE_FWRITE_TO_STDERR( + "Failed to destroy domain in destructor\n"); + } + check_destroy_domain(domain_id); +} + +rmw_ret_t +rmw_context_impl_t::fini() +{ + std::lock_guard guard(initialization_mutex); + if (0u != --this->node_count) { + // destruction shouldn't happen yet + return RMW_RET_OK; + } + this->clean_up(); + return RMW_RET_OK; +} + +extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t * context) +{ + rmw_ret_t ret; + + static_cast(options); + static_cast(context); + RCUTILS_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); + RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + options, + options->implementation_identifier, + eclipse_cyclonedds_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + /* domain_id = UINT32_MAX = Cyclone DDS' "default domain id".*/ + if (options->domain_id >= UINT32_MAX) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: domain id out of range"); + return RMW_RET_INVALID_ARGUMENT; + } + + context->instance_id = options->instance_id; + context->implementation_identifier = eclipse_cyclonedds_identifier; + context->impl = nullptr; + + if ((ret = rmw_init_options_copy(options, &context->options)) != RMW_RET_OK) { return ret; } - context->impl = impl.release(); + rmw_context_impl_t * impl = new(std::nothrow) rmw_context_impl_t(); + if (nullptr == impl) { + return RMW_RET_BAD_ALLOC; + } + + context->impl = impl; return RMW_RET_OK; } @@ -1091,6 +1162,11 @@ extern "C" rmw_node_t * rmw_create_node( return nullptr; } + ret = context->impl->init(&context->options); + if (RMW_RET_OK != ret) { + return nullptr; + } + auto * node_impl = new CddsNode(); rmw_node_t * node_handle = nullptr; RET_ALLOC_X(node_impl, goto fail_node_impl); @@ -1142,6 +1218,7 @@ extern "C" rmw_node_t * rmw_create_node( fail_node_handle: delete node_impl; fail_node_impl: + context->impl->fini(); return nullptr; } @@ -1168,6 +1245,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) rmw_free(const_cast(node->name)); rmw_free(const_cast(node->namespace_)); + node->context->impl->fini(); rmw_node_free(node); delete node_impl; return result_ret; @@ -2562,11 +2640,11 @@ extern "C" rmw_ret_t rmw_take_event( /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl) +static rmw_guard_condition_t * create_guard_condition() { rmw_guard_condition_t * guard_condition_handle; auto * gcond_impl = new CddsGuardCondition(); - if ((gcond_impl->gcondh = dds_create_guardcondition(impl->ppant)) < 0) { + if ((gcond_impl->gcondh = dds_create_guardcondition(DDS_CYCLONEDDS_HANDLE)) < 0) { RMW_SET_ERROR_MSG("failed to create guardcondition"); goto fail_guardcond; } @@ -2582,7 +2660,8 @@ static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl) extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * context) { - return create_guard_condition(context->impl); + (void)context; + return create_guard_condition(); } static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * guard_condition_handle) @@ -2612,7 +2691,8 @@ extern "C" rmw_ret_t rmw_trigger_guard_condition( extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t max_conditions) { - (void) max_conditions; + (void)context; + (void)max_conditions; rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); CddsWaitset * ws = nullptr; RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); @@ -2639,7 +2719,7 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t std::lock_guard lock(gcdds.lock); // Lazily create dummy guard condition if (gcdds.waitsets.size() == 0) { - if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(context->impl->ppant)) < 0) { + if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(DDS_CYCLONEDDS_HANDLE)) < 0) { RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets"); goto fail_create_dummy; }