diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 0624f92c62..9cdb1c4ebc 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -77,7 +77,8 @@ class SubscriptionIntraProcess rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + std::weak_ptr callback_lifetime) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), @@ -85,6 +86,7 @@ class SubscriptionIntraProcess topic_name, qos_profile, buffer_type), + callback_lifetime_(callback_lifetime), any_callback_(callback) { TRACETOOLS_TRACEPOINT( @@ -166,6 +168,10 @@ class SubscriptionIntraProcess typename std::enable_if::value, void>::type execute_impl(const std::shared_ptr & data) { + if (callback_lifetime_.expired()) { + return; + } + if (nullptr == data) { return; } @@ -187,6 +193,7 @@ class SubscriptionIntraProcess shared_ptr.reset(); } + std::weak_ptr callback_lifetime_; AnySubscriptionCallback any_callback_; }; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 4366cae2f9..8be42f3aa1 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -156,6 +156,29 @@ class Subscription : public SubscriptionBase "' is not allowed with 0 depth qos policy"); } + // Use std::weak_ptr owner_before trick to determine if user + // has assigned a subscription options_.callback_lifetime weak_ptr. + // https://stackoverflow.com/a/45507610 + std::weak_ptr empty; + if (!options_.callback_lifetime.owner_before(empty) && + !empty.owner_before(options_.callback_lifetime)) { + // options_.callback_lifetime was not user assigned, + // So use options_.callback_group if user assigned, + // falling back to node's default_callback_group + std::shared_ptr vsp = options_.callback_group != nullptr ? + options_.callback_group : + node_base->get_default_callback_group(); + std::weak_ptr vwp = vsp; + options_.callback_lifetime = vwp; + } + + if (options_.callback_lifetime.expired()) + { + throw std::invalid_argument( + "callback_lifetime weak_ptr for topic '" + topic_name + + "' has already expired"); + } + using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, SubscribedType, @@ -172,7 +195,8 @@ class Subscription : public SubscriptionBase context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, - resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); + resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback), + options_.callback_lifetime); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), @@ -300,12 +324,15 @@ class Subscription : public SubscriptionBase now = std::chrono::system_clock::now(); } - any_callback_.dispatch(typed_message, message_info); + if (!options_.callback_lifetime.expired()) + { + any_callback_.dispatch(typed_message, message_info); - if (subscription_topic_statistics_) { - const auto nanos = std::chrono::time_point_cast(now); - const auto time = rclcpp::Time(nanos.time_since_epoch().count()); - subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); + if (subscription_topic_statistics_) { + const auto nanos = std::chrono::time_point_cast(now); + const auto time = rclcpp::Time(nanos.time_since_epoch().count()); + subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); + } } } @@ -321,12 +348,15 @@ class Subscription : public SubscriptionBase now = std::chrono::system_clock::now(); } - any_callback_.dispatch(serialized_message, message_info); + if (!options_.callback_lifetime.expired()) + { + any_callback_.dispatch(serialized_message, message_info); - if (subscription_topic_statistics_) { - const auto nanos = std::chrono::time_point_cast(now); - const auto time = rclcpp::Time(nanos.time_since_epoch().count()); - subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); + if (subscription_topic_statistics_) { + const auto nanos = std::chrono::time_point_cast(now); + const auto time = rclcpp::Time(nanos.time_since_epoch().count()); + subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); + } } } @@ -353,12 +383,15 @@ class Subscription : public SubscriptionBase now = std::chrono::system_clock::now(); } - any_callback_.dispatch(sptr, message_info); + if (!options_.callback_lifetime.expired()) + { + any_callback_.dispatch(sptr, message_info); - if (subscription_topic_statistics_) { - const auto nanos = std::chrono::time_point_cast(now); - const auto time = rclcpp::Time(nanos.time_since_epoch().count()); - subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); + if (subscription_topic_statistics_) { + const auto nanos = std::chrono::time_point_cast(now); + const auto time = rclcpp::Time(nanos.time_since_epoch().count()); + subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); + } } } @@ -449,7 +482,9 @@ class Subscription : public SubscriptionBase * It is important to save a copy of this so that the rmw payload which it * may contain is kept alive for the duration of the subscription. */ - const rclcpp::SubscriptionOptionsWithAllocator options_; + // NOTE: Had to drop const in order to set default options_.callback_lifetime + // if not set in user code. + rclcpp::SubscriptionOptionsWithAllocator options_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 0dd738ee60..295cc0be1d 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -89,6 +89,8 @@ struct SubscriptionOptionsBase QosOverridingOptions qos_overriding_options; ContentFilterOptions content_filter_options; + + std::weak_ptr callback_lifetime; }; /// Structure containing optional configuration for Subscriptions.