diff --git a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp index 0de01fd4b0..d389cbf1e0 100644 --- a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp @@ -50,7 +50,6 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable RCLCPP_PUBLIC ExecutorNotifyWaitable(ExecutorNotifyWaitable & other); - RCLCPP_PUBLIC ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other); diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 5c562a61ff..0624f92c62 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -101,6 +101,23 @@ class SubscriptionIntraProcess virtual ~SubscriptionIntraProcess() = default; + void + add_to_wait_set(rcl_wait_set_t & wait_set) override + { + // This block is necessary when the guard condition wakes the wait set, but + // the intra process waitable was not handled before the wait set is waited + // on again. + // Basically we're keeping the guard condition triggered so long as there is + // data in the buffer. + if (this->buffer_->has_data()) { + // If there is data still to be processed, indicate to the + // executor or waitset by triggering the guard condition. + this->trigger_guard_condition(); + } + // Let the parent classes handle the rest of the work: + return SubscriptionIntraProcessBufferT::add_to_wait_set(wait_set); + } + std::shared_ptr take_data() override { @@ -149,7 +166,7 @@ class SubscriptionIntraProcess typename std::enable_if::value, void>::type execute_impl(const std::shared_ptr & data) { - if (!data) { + if (nullptr == data) { return; } diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 28e929c495..1da372abfd 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -17,7 +17,6 @@ #include #include -#include #include "rcl/allocator.h" @@ -121,8 +120,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } } for (size_t i = 0; i < waitable_handles_.size(); ++i) { - if (waitable_handles_[i]->is_ready(*wait_set)) { - waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i])); + if (!waitable_handles_[i]->is_ready(*wait_set)) { + waitable_handles_[i].reset(); } } @@ -146,7 +145,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy timer_handles_.end() ); - waitable_handles_.clear(); + waitable_handles_.erase( + std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr), + waitable_handles_.end() + ); } bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override @@ -390,9 +392,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy rclcpp::AnyExecutable & any_exec, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { - auto & waitable_handles = waitable_triggered_handles_; - auto it = waitable_handles.begin(); - while (it != waitable_handles.end()) { + auto it = waitable_handles_.begin(); + while (it != waitable_handles_.end()) { std::shared_ptr & waitable = *it; if (waitable) { // Find the group for this handle and see if it can be serviced @@ -400,7 +401,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group) { // Group was not found, meaning the waitable is not valid... // Remove it from the ready list and continue looking - it = waitable_handles.erase(it); + it = waitable_handles_.erase(it); continue; } if (!group->can_be_taken_from().load()) { @@ -413,11 +414,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy any_exec.waitable = waitable; any_exec.callback_group = group; any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); - waitable_handles.erase(it); + waitable_handles_.erase(it); return; } // Else, the waitable is no longer valid, remove it and continue - it = waitable_handles.erase(it); + it = waitable_handles_.erase(it); } } @@ -498,8 +499,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy VectorRebind> timer_handles_; VectorRebind> waitable_handles_; - VectorRebind> waitable_triggered_handles_; - std::shared_ptr allocator_; }; diff --git a/rclcpp/src/rclcpp/event_handler.cpp b/rclcpp/src/rclcpp/event_handler.cpp index 6232e5b0d9..630bc26d33 100644 --- a/rclcpp/src/rclcpp/event_handler.cpp +++ b/rclcpp/src/rclcpp/event_handler.cpp @@ -15,6 +15,7 @@ #include #include +#include "rcl/error_handling.h" #include "rcl/event.h" #include "rclcpp/event_handler.hpp" diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index a95bc3797c..09789a13ff 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -48,6 +48,11 @@ void ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set) { std::lock_guard lock(guard_condition_mutex_); + + // Note: no guard conditions need to be re-triggered, since the guard + // conditions in this class are not tracking a stateful condition, but instead + // only serve to interrupt the wait set when new information is available to + // consider. for (auto weak_guard_condition : this->notify_guard_conditions_) { auto guard_condition = weak_guard_condition.lock(); if (!guard_condition) {continue;} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index c2e6b2bfe4..d52c94b1d2 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -566,6 +566,11 @@ if(TARGET test_thread_safe_synchronization) target_link_libraries(test_thread_safe_synchronization ${PROJECT_NAME} ${test_msgs_TARGETS}) endif() +ament_add_gtest(test_intra_process_waitable waitables/test_intra_process_waitable.cpp) +if(TARGET test_intra_process_waitable) + target_link_libraries(test_intra_process_waitable ${PROJECT_NAME} ${test_msgs_TARGETS}) +endif() + ament_add_gtest(test_rosout_qos test_rosout_qos.cpp) if(TARGET test_rosout_qos) target_link_libraries(test_rosout_qos ${PROJECT_NAME} rcl::rcl rmw::rmw) diff --git a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp index d25b3490c2..452228645b 100644 --- a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp @@ -60,57 +60,6 @@ class TestWaitable : public rclcpp::Waitable void execute(const std::shared_ptr &) override {} }; -static bool test_waitable_result2 = false; - -class TestWaitable2 : public rclcpp::Waitable -{ -public: - explicit TestWaitable2(rcl_publisher_t * pub_ptr) - : pub_ptr_(pub_ptr), - pub_event_(rcl_get_zero_initialized_event()) - { - EXPECT_EQ( - rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED), - RCL_RET_OK); - } - - ~TestWaitable2() - { - EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK); - } - - void add_to_wait_set(rcl_wait_set_t & wait_set) override - { - EXPECT_EQ(rcl_wait_set_add_event(&wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK); - } - - bool is_ready(const rcl_wait_set_t &) override - { - return test_waitable_result2; - } - - std::shared_ptr - take_data() override - { - return nullptr; - } - - void execute(const std::shared_ptr & data) override - { - (void) data; - } - - size_t get_number_of_ready_events() override - { - return 1; - } - -private: - rcl_publisher_t * pub_ptr_; - rcl_event_t pub_event_; - size_t wait_set_event_index_; -}; - struct RclWaitSetSizes { size_t size_of_subscriptions = 0; @@ -705,129 +654,20 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) { } TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) { + auto node1 = std::make_shared("waitable_node", "ns"); + auto node2 = std::make_shared("waitable_node2", "ns"); + rclcpp::Waitable::SharedPtr waitable1 = std::make_shared(); + rclcpp::Waitable::SharedPtr waitable2 = std::make_shared(); + node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); + node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); return result; }; - { - auto node1 = std::make_shared( - "waitable_node", "ns", - rclcpp::NodeOptions() - .start_parameter_event_publisher(false) - .start_parameter_services(false)); - - rclcpp::PublisherOptions pub_options; - pub_options.use_default_callbacks = false; - - auto pub1 = node1->create_publisher( - "test_topic_1", rclcpp::QoS(10), pub_options); - - auto waitable1 = - std::make_shared(pub1->get_publisher_handle().get()); - node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); - - auto basic_node = create_node_with_disabled_callback_groups("basic_node"); - WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - basic_node->for_each_callback_group( - [basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_nodes.insert( - std::pair( - group_ptr, - basic_node->get_node_base_interface())); - }); - node1->for_each_callback_group( - [node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_nodes.insert( - std::pair( - group_ptr, - node1->get_node_base_interface())); - }); - allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - ASSERT_EQ( - rcl_wait_set_init( - &wait_set, - allocator_memory_strategy()->number_of_ready_subscriptions(), - allocator_memory_strategy()->number_of_guard_conditions(), - allocator_memory_strategy()->number_of_ready_timers(), - allocator_memory_strategy()->number_of_ready_clients(), - allocator_memory_strategy()->number_of_ready_services(), - allocator_memory_strategy()->number_of_ready_events(), - rclcpp::contexts::get_global_default_context()->get_rcl_context().get(), - allocator_memory_strategy()->get_allocator()), - RCL_RET_OK); - - ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set)); - - ASSERT_EQ( - rcl_wait( - &wait_set, - std::chrono::duration_cast(std::chrono::milliseconds(100)) - .count()), - RCL_RET_OK); - test_waitable_result2 = true; - allocator_memory_strategy()->remove_null_handles(&wait_set); - - rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes); - EXPECT_EQ(result.node_base, node1->get_node_base_interface()); - test_waitable_result2 = false; - - EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK); - } - - { - auto node2 = std::make_shared( - "waitable_node2", "ns", - rclcpp::NodeOptions() - .start_parameter_services(false) - .start_parameter_event_publisher(false)); - - rclcpp::PublisherOptions pub_options; - pub_options.use_default_callbacks = false; - - auto pub2 = node2->create_publisher( - "test_topic_2", rclcpp::QoS(10), pub_options); - - auto waitable2 = - std::make_shared(pub2->get_publisher_handle().get()); - node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); - - auto basic_node2 = std::make_shared( - "basic_node2", "ns", - rclcpp::NodeOptions() - .start_parameter_services(false) - .start_parameter_event_publisher(false)); - WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes; - basic_node2->for_each_callback_group( - [basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_uncollected_nodes.insert( - std::pair( - group_ptr, - basic_node2->get_node_base_interface())); - }); - node2->for_each_callback_group( - [node2, - &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_uncollected_nodes.insert( - std::pair( - group_ptr, - node2->get_node_base_interface())); - }); - - rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes); - EXPECT_EQ(failed_result.node_base, nullptr); - } + EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity)); } TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) { diff --git a/rclcpp/test/rclcpp/test_memory_strategy.cpp b/rclcpp/test/rclcpp/test_memory_strategy.cpp index 348b7f0143..4c166ebb88 100644 --- a/rclcpp/test/rclcpp/test_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/test_memory_strategy.cpp @@ -37,6 +37,7 @@ class TestWaitable : public rclcpp::Waitable public: void add_to_wait_set(rcl_wait_set_t &) override {} bool is_ready(const rcl_wait_set_t &) override {return true;} + std::shared_ptr take_data() override {return nullptr;} void execute(const std::shared_ptr &) override {} }; diff --git a/rclcpp/test/rclcpp/wait_set_policies/test_static_storage.cpp b/rclcpp/test/rclcpp/wait_set_policies/test_static_storage.cpp index 55573cc11b..f3e94b6aa0 100644 --- a/rclcpp/test/rclcpp/wait_set_policies/test_static_storage.cpp +++ b/rclcpp/test/rclcpp/wait_set_policies/test_static_storage.cpp @@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable public: TestWaitable() : is_ready_(false) {} + void add_to_wait_set(rcl_wait_set_t &) override {} bool is_ready(const rcl_wait_set_t &) override {return is_ready_;} diff --git a/rclcpp/test/rclcpp/wait_set_policies/test_storage_policy_common.cpp b/rclcpp/test/rclcpp/wait_set_policies/test_storage_policy_common.cpp index cd44918236..eaf3a866d1 100644 --- a/rclcpp/test/rclcpp/wait_set_policies/test_storage_policy_common.cpp +++ b/rclcpp/test/rclcpp/wait_set_policies/test_storage_policy_common.cpp @@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable public: TestWaitable() : is_ready_(false), add_to_wait_set_(false) {} + void add_to_wait_set(rcl_wait_set_t &) override { if (!add_to_wait_set_) { diff --git a/rclcpp/test/rclcpp/wait_set_policies/test_thread_safe_synchronization.cpp b/rclcpp/test/rclcpp/wait_set_policies/test_thread_safe_synchronization.cpp index 7554f56270..e6a034832b 100644 --- a/rclcpp/test/rclcpp/wait_set_policies/test_thread_safe_synchronization.cpp +++ b/rclcpp/test/rclcpp/wait_set_policies/test_thread_safe_synchronization.cpp @@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable public: TestWaitable() : is_ready_(false) {} + void add_to_wait_set(rcl_wait_set_t &) override {} bool is_ready(const rcl_wait_set_t &) override {return is_ready_;} diff --git a/rclcpp/test/rclcpp/waitables/test_intra_process_waitable.cpp b/rclcpp/test/rclcpp/waitables/test_intra_process_waitable.cpp new file mode 100644 index 0000000000..723e80513b --- /dev/null +++ b/rclcpp/test/rclcpp/waitables/test_intra_process_waitable.cpp @@ -0,0 +1,46 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/rclcpp.hpp" +#include "test_msgs/msg/empty.hpp" + +#include "./waitable_test_helpers.hpp" + +class TestIntraProcessWaitable : public ::testing::Test +{ +protected: + static void SetUpTestCase() {rclcpp::init(0, nullptr);} + static void TearDownTestCase() {rclcpp::shutdown();} +}; + +TEST_F(TestIntraProcessWaitable, test_that_waitable_stays_ready_after_second_wait) { + auto node = std::make_shared( + "test_node", + rclcpp::NodeOptions().use_intra_process_comms(true)); + + using test_msgs::msg::Empty; + auto sub = node->create_subscription("test_topic", 10, [](const Empty &) {}); + auto pub = node->create_publisher("test_topic", 10); + + auto make_sub_intra_process_waitable_ready = [pub]() { + pub->publish(Empty()); + }; + + rclcpp::test::waitables::do_test_that_waitable_stays_ready_after_second_wait( + sub->get_intra_process_waitable(), + make_sub_intra_process_waitable_ready, + true /* expected_to_stay_ready */); +} diff --git a/rclcpp/test/rclcpp/waitables/waitable_test_helpers.hpp b/rclcpp/test/rclcpp/waitables/waitable_test_helpers.hpp new file mode 100644 index 0000000000..49e074d5b0 --- /dev/null +++ b/rclcpp/test/rclcpp/waitables/waitable_test_helpers.hpp @@ -0,0 +1,117 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__WAITABLES__WAITABLE_TEST_HELPERS_HPP_ +#define RCLCPP__WAITABLES__WAITABLE_TEST_HELPERS_HPP_ + +#include + +#include +#include +#include + +#include + +namespace rclcpp +{ +namespace test +{ +namespace waitables +{ + +/// Test that a given waitable is ready after a second wait. +/** + * The purpose of this test is to check that a waitable will remain ready + * on subsequent wait calls, if that is the expected behavior. + * Not all waitables should remain ready after a wait call, which can be + * expressed in the expected_to_stay_ready argument which defaults to true. + * If set to false, it will check that it is not ready after a second wait, as + * well as some other parts of the test. + * + * The given waitable should: + * + * - not be ready initially + * - not be ready after being waited on (and timing out) + * - should become ready after the make_waitable_ready method is called + * - may or may not be ready at this point + * - should be ready after waiting on it, within the wait_timeout + * - should be ready still after a second wait (unless expected_to_stay_ready = false) + * - if expected_to_stay_ready, should become not ready after a take_data/execute + */ +template +void +do_test_that_waitable_stays_ready_after_second_wait( + const std::shared_ptr & waitable, + std::function make_waitable_ready, + bool expected_to_stay_ready = true, + std::chrono::nanoseconds wait_timeout = std::chrono::seconds(5)) +{ + rclcpp::WaitSet wait_set; + wait_set.add_waitable(waitable); + + // not ready initially + EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set())) + << "waitable is unexpectedly ready before waiting"; + + // not ready after a wait that timesout + { + auto wait_result = wait_set.wait(std::chrono::seconds(0)); + EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Timeout) + << "wait set did not timeout as expected"; + EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set())) + << "waitable is unexpectedly ready after waiting, but before making ready"; + } + + // make it ready and wait on it + make_waitable_ready(); + { + auto wait_result = wait_set.wait(wait_timeout); + EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Ready) + << "wait set was not ready after the waitable should have been made ready"; + EXPECT_TRUE(waitable->is_ready(wait_set.get_rcl_wait_set())) + << "waitable is unexpectedly not ready after making it ready and waiting"; + } + + // wait again, and see that it is ready as expected or not expected + { + auto wait_result = wait_set.wait(std::chrono::seconds(0)); + if (expected_to_stay_ready) { + EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Ready) + << "wait set was not ready on a second wait on the waitable"; + EXPECT_TRUE(waitable->is_ready(wait_set.get_rcl_wait_set())) + << "waitable unexpectedly not ready after second wait"; + } else { + EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Timeout) + << "wait set did not time out after the waitable should have no longer been ready"; + EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set())) + << "waitable was ready after waiting a second time, which was not expected"; + } + } + + // if expected_to_stay_ready, check that take_data/execute makes it not ready + if (expected_to_stay_ready) { + waitable->execute(waitable->take_data()); + auto wait_result = wait_set.wait(std::chrono::seconds(0)); + EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Timeout) + << "wait set did not time out after the waitable should have no longer been ready"; + EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set())) + << "waitable was unexpectedly ready after a take_data and execute"; + } +} + +} // namespace waitables +} // namespace test +} // namespace rclcpp + +#endif // RCLCPP__WAITABLES__WAITABLE_TEST_HELPERS_HPP_