diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 21868568..20812e57 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -213,26 +213,33 @@ class rmw_context_impl_s::Data final // Setup the liveliness subscriber to receives updates from the ROS graph // and update the graph cache. - zc_liveliness_subscriber_options_t sub_options; - zc_liveliness_subscriber_options_default(&sub_options); - z_owned_closure_sample_t callback; - z_closure(&callback, graph_sub_data_handler, nullptr, this); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - if (zc_liveliness_declare_subscriber( - z_loan(session_->_0), - &graph_subscriber_, z_loan(liveliness_ke), - z_move(callback), &sub_options) != Z_OK) - { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - throw std::runtime_error("Unable to subscribe to ROS graph updates."); - } + zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str()); + zenoh::Session::LivelinessSubscriberOptions sub_options = + zenoh::Session::LivelinessSubscriberOptions::create_default(); + sub_options.history = true; + graph_subscriber_cpp_ = session_->liveliness_declare_subscriber( + keyexpr_cpp, + [&](const zenoh::Sample& s) { + // Update the graph cache. + std::lock_guard lock(mutex_); + switch (s.get_kind()) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + graph_cache_->parse_put(std::string{s.get_keyexpr().as_string_view()}); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + graph_cache_->parse_del(std::string{s.get_keyexpr().as_string_view()}); + break; + default: + return; + } + }, + zenoh::closures::none, + std::move(sub_options), + &err); - undeclare_z_sub.cancel(); + if (err != Z_OK) { + throw std::runtime_error("unable to create zenoh subscription. "); + } } // Shutdown the Zenoh session. @@ -258,7 +265,16 @@ class rmw_context_impl_s::Data final } } - z_undeclare_subscriber(z_move(graph_subscriber_)); + zenoh::ZResult err; + std::move(graph_subscriber_cpp_).value().undeclare(&err); + if (err != Z_OK) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } + if (shm_provider_.has_value()) { z_drop(z_move(shm_provider_.value())); } @@ -267,12 +283,6 @@ class rmw_context_impl_s::Data final // We specifically do *not* hold the mutex_ while tearing down the session; this allows us // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). } - - // // Close the zenoh session - // if (z_close(z_loan_mut(session_), NULL) != Z_OK) { - // RMW_SET_ERROR_MSG("Error while closing zenoh session"); - // return RMW_RET_ERROR; - // } session_.reset(); return RMW_RET_OK; } @@ -437,7 +447,7 @@ class rmw_context_impl_s::Data final // Graph cache. std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. - z_owned_subscriber_t graph_subscriber_; + std::optional> graph_subscriber_cpp_; // Equivalent to rmw_dds_common::Context's guard condition. // Guard condition that should be triggered when the graph changes. std::unique_ptr graph_guard_condition_;