Skip to content

Commit

Permalink
Ensure waitables handle guard condition retriggering (#2483)
Browse files Browse the repository at this point in the history
Signed-off-by: William Woodall <[email protected]>
Signed-off-by: Michael Carroll <[email protected]>
Co-authored-by: Michael Carroll <[email protected]>
wjwwood and mjcarroll authored Apr 8, 2024
1 parent 63c2e2e commit bfa7efa
Showing 13 changed files with 215 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -50,7 +50,6 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
RCLCPP_PUBLIC
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);


RCLCPP_PUBLIC
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);

Original file line number Diff line number Diff line change
@@ -101,6 +101,23 @@ class SubscriptionIntraProcess

virtual ~SubscriptionIntraProcess() = default;

void
add_to_wait_set(rcl_wait_set_t & wait_set) override
{
// This block is necessary when the guard condition wakes the wait set, but
// the intra process waitable was not handled before the wait set is waited
// on again.
// Basically we're keeping the guard condition triggered so long as there is
// data in the buffer.
if (this->buffer_->has_data()) {
// If there is data still to be processed, indicate to the
// executor or waitset by triggering the guard condition.
this->trigger_guard_condition();
}
// Let the parent classes handle the rest of the work:
return SubscriptionIntraProcessBufferT::add_to_wait_set(wait_set);
}

std::shared_ptr<void>
take_data() override
{
@@ -149,7 +166,7 @@ class SubscriptionIntraProcess
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
execute_impl(const std::shared_ptr<void> & data)
{
if (!data) {
if (nullptr == data) {
return;
}

23 changes: 11 additions & 12 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@

#include <memory>
#include <vector>
#include <utility>

#include "rcl/allocator.h"

@@ -121,8 +120,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (waitable_handles_[i]->is_ready(*wait_set)) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
if (!waitable_handles_[i]->is_ready(*wait_set)) {
waitable_handles_[i].reset();
}
}

@@ -146,7 +145,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
timer_handles_.end()
);

waitable_handles_.clear();
waitable_handles_.erase(
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
waitable_handles_.end()
);
}

bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
@@ -390,17 +392,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
rclcpp::AnyExecutable & any_exec,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
auto & waitable_handles = waitable_triggered_handles_;
auto it = waitable_handles.begin();
while (it != waitable_handles.end()) {
auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) {
std::shared_ptr<Waitable> & waitable = *it;
if (waitable) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
if (!group) {
// Group was not found, meaning the waitable is not valid...
// Remove it from the ready list and continue looking
it = waitable_handles.erase(it);
it = waitable_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
@@ -413,11 +414,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
any_exec.waitable = waitable;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
waitable_handles.erase(it);
waitable_handles_.erase(it);
return;
}
// Else, the waitable is no longer valid, remove it and continue
it = waitable_handles.erase(it);
it = waitable_handles_.erase(it);
}
}

@@ -498,8 +499,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;

VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;

std::shared_ptr<VoidAlloc> allocator_;
};

1 change: 1 addition & 0 deletions rclcpp/src/rclcpp/event_handler.cpp
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
#include <stdexcept>
#include <string>

#include "rcl/error_handling.h"
#include "rcl/event.h"

#include "rclcpp/event_handler.hpp"
5 changes: 5 additions & 0 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
@@ -48,6 +48,11 @@ void
ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

// Note: no guard conditions need to be re-triggered, since the guard
// conditions in this class are not tracking a stateful condition, but instead
// only serve to interrupt the wait set when new information is available to
// consider.
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (!guard_condition) {continue;}
5 changes: 5 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -566,6 +566,11 @@ if(TARGET test_thread_safe_synchronization)
target_link_libraries(test_thread_safe_synchronization ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_intra_process_waitable waitables/test_intra_process_waitable.cpp)
if(TARGET test_intra_process_waitable)
target_link_libraries(test_intra_process_waitable ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_rosout_qos test_rosout_qos.cpp)
if(TARGET test_rosout_qos)
target_link_libraries(test_rosout_qos ${PROJECT_NAME} rcl::rcl rmw::rmw)
176 changes: 8 additions & 168 deletions rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp
Original file line number Diff line number Diff line change
@@ -60,57 +60,6 @@ class TestWaitable : public rclcpp::Waitable
void execute(const std::shared_ptr<void> &) override {}
};

static bool test_waitable_result2 = false;

class TestWaitable2 : public rclcpp::Waitable
{
public:
explicit TestWaitable2(rcl_publisher_t * pub_ptr)
: pub_ptr_(pub_ptr),
pub_event_(rcl_get_zero_initialized_event())
{
EXPECT_EQ(
rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
RCL_RET_OK);
}

~TestWaitable2()
{
EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK);
}

void add_to_wait_set(rcl_wait_set_t & wait_set) override
{
EXPECT_EQ(rcl_wait_set_add_event(&wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK);
}

bool is_ready(const rcl_wait_set_t &) override
{
return test_waitable_result2;
}

std::shared_ptr<void>
take_data() override
{
return nullptr;
}

void execute(const std::shared_ptr<void> & data) override
{
(void) data;
}

size_t get_number_of_ready_events() override
{
return 1;
}

private:
rcl_publisher_t * pub_ptr_;
rcl_event_t pub_event_;
size_t wait_set_event_index_;
};

struct RclWaitSetSizes
{
size_t size_of_subscriptions = 0;
@@ -705,129 +654,20 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) {
}

TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) {
auto node1 = std::make_shared<rclcpp::Node>("waitable_node", "ns");
auto node2 = std::make_shared<rclcpp::Node>("waitable_node2", "ns");
rclcpp::Waitable::SharedPtr waitable1 = std::make_shared<TestWaitable>();
rclcpp::Waitable::SharedPtr waitable2 = std::make_shared<TestWaitable>();
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) {
rclcpp::AnyExecutable result;
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
return result;
};

{
auto node1 = std::make_shared<rclcpp::Node>(
"waitable_node", "ns",
rclcpp::NodeOptions()
.start_parameter_event_publisher(false)
.start_parameter_services(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub1 = node1->create_publisher<test_msgs::msg::Empty>(
"test_topic_1", rclcpp::QoS(10), pub_options);

auto waitable1 =
std::make_shared<TestWaitable2>(pub1->get_publisher_handle().get());
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);

auto basic_node = create_node_with_disabled_callback_groups("basic_node");
WeakCallbackGroupsToNodesMap weak_groups_to_nodes;
basic_node->for_each_callback_group(
[basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node->get_node_base_interface()));
});
node1->for_each_callback_group(
[node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node1->get_node_base_interface()));
});
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);

rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
ASSERT_EQ(
rcl_wait_set_init(
&wait_set,
allocator_memory_strategy()->number_of_ready_subscriptions(),
allocator_memory_strategy()->number_of_guard_conditions(),
allocator_memory_strategy()->number_of_ready_timers(),
allocator_memory_strategy()->number_of_ready_clients(),
allocator_memory_strategy()->number_of_ready_services(),
allocator_memory_strategy()->number_of_ready_events(),
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
allocator_memory_strategy()->get_allocator()),
RCL_RET_OK);

ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set));

ASSERT_EQ(
rcl_wait(
&wait_set,
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(100))
.count()),
RCL_RET_OK);
test_waitable_result2 = true;
allocator_memory_strategy()->remove_null_handles(&wait_set);

rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes);
EXPECT_EQ(result.node_base, node1->get_node_base_interface());
test_waitable_result2 = false;

EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK);
}

{
auto node2 = std::make_shared<rclcpp::Node>(
"waitable_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub2 = node2->create_publisher<test_msgs::msg::Empty>(
"test_topic_2", rclcpp::QoS(10), pub_options);

auto waitable2 =
std::make_shared<TestWaitable2>(pub2->get_publisher_handle().get());
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto basic_node2 = std::make_shared<rclcpp::Node>(
"basic_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));
WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes;
basic_node2->for_each_callback_group(
[basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node2->get_node_base_interface()));
});
node2->for_each_callback_group(
[node2,
&weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node2->get_node_base_interface()));
});

rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes);
EXPECT_EQ(failed_result.node_base, nullptr);
}
EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity));
}

TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {
1 change: 1 addition & 0 deletions rclcpp/test/rclcpp/test_memory_strategy.cpp
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ class TestWaitable : public rclcpp::Waitable
public:
void add_to_wait_set(rcl_wait_set_t &) override {}
bool is_ready(const rcl_wait_set_t &) override {return true;}

std::shared_ptr<void> take_data() override {return nullptr;}
void execute(const std::shared_ptr<void> &) override {}
};
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable
public:
TestWaitable()
: is_ready_(false) {}

void add_to_wait_set(rcl_wait_set_t &) override {}

bool is_ready(const rcl_wait_set_t &) override {return is_ready_;}
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable
public:
TestWaitable()
: is_ready_(false), add_to_wait_set_(false) {}

void add_to_wait_set(rcl_wait_set_t &) override
{
if (!add_to_wait_set_) {
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable
public:
TestWaitable()
: is_ready_(false) {}

void add_to_wait_set(rcl_wait_set_t &) override {}

bool is_ready(const rcl_wait_set_t &) override {return is_ready_;}
46 changes: 46 additions & 0 deletions rclcpp/test/rclcpp/waitables/test_intra_process_waitable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include "rclcpp/rclcpp.hpp"
#include "test_msgs/msg/empty.hpp"

#include "./waitable_test_helpers.hpp"

class TestIntraProcessWaitable : public ::testing::Test
{
protected:
static void SetUpTestCase() {rclcpp::init(0, nullptr);}
static void TearDownTestCase() {rclcpp::shutdown();}
};

TEST_F(TestIntraProcessWaitable, test_that_waitable_stays_ready_after_second_wait) {
auto node = std::make_shared<rclcpp::Node>(
"test_node",
rclcpp::NodeOptions().use_intra_process_comms(true));

using test_msgs::msg::Empty;
auto sub = node->create_subscription<Empty>("test_topic", 10, [](const Empty &) {});
auto pub = node->create_publisher<Empty>("test_topic", 10);

auto make_sub_intra_process_waitable_ready = [pub]() {
pub->publish(Empty());
};

rclcpp::test::waitables::do_test_that_waitable_stays_ready_after_second_wait(
sub->get_intra_process_waitable(),
make_sub_intra_process_waitable_ready,
true /* expected_to_stay_ready */);
}
117 changes: 117 additions & 0 deletions rclcpp/test/rclcpp/waitables/waitable_test_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__WAITABLES__WAITABLE_TEST_HELPERS_HPP_
#define RCLCPP__WAITABLES__WAITABLE_TEST_HELPERS_HPP_

#include <gtest/gtest.h>

#include <chrono>
#include <functional>
#include <memory>

#include <rclcpp/wait_set.hpp>

namespace rclcpp
{
namespace test
{
namespace waitables
{

/// Test that a given waitable is ready after a second wait.
/**
* The purpose of this test is to check that a waitable will remain ready
* on subsequent wait calls, if that is the expected behavior.
* Not all waitables should remain ready after a wait call, which can be
* expressed in the expected_to_stay_ready argument which defaults to true.
* If set to false, it will check that it is not ready after a second wait, as
* well as some other parts of the test.
*
* The given waitable should:
*
* - not be ready initially
* - not be ready after being waited on (and timing out)
* - should become ready after the make_waitable_ready method is called
* - may or may not be ready at this point
* - should be ready after waiting on it, within the wait_timeout
* - should be ready still after a second wait (unless expected_to_stay_ready = false)
* - if expected_to_stay_ready, should become not ready after a take_data/execute
*/
template<typename WaitableT>
void
do_test_that_waitable_stays_ready_after_second_wait(
const std::shared_ptr<WaitableT> & waitable,
std::function<void()> make_waitable_ready,
bool expected_to_stay_ready = true,
std::chrono::nanoseconds wait_timeout = std::chrono::seconds(5))
{
rclcpp::WaitSet wait_set;
wait_set.add_waitable(waitable);

// not ready initially
EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set()))
<< "waitable is unexpectedly ready before waiting";

// not ready after a wait that timesout
{
auto wait_result = wait_set.wait(std::chrono::seconds(0));
EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Timeout)
<< "wait set did not timeout as expected";
EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set()))
<< "waitable is unexpectedly ready after waiting, but before making ready";
}

// make it ready and wait on it
make_waitable_ready();
{
auto wait_result = wait_set.wait(wait_timeout);
EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Ready)
<< "wait set was not ready after the waitable should have been made ready";
EXPECT_TRUE(waitable->is_ready(wait_set.get_rcl_wait_set()))
<< "waitable is unexpectedly not ready after making it ready and waiting";
}

// wait again, and see that it is ready as expected or not expected
{
auto wait_result = wait_set.wait(std::chrono::seconds(0));
if (expected_to_stay_ready) {
EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Ready)
<< "wait set was not ready on a second wait on the waitable";
EXPECT_TRUE(waitable->is_ready(wait_set.get_rcl_wait_set()))
<< "waitable unexpectedly not ready after second wait";
} else {
EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Timeout)
<< "wait set did not time out after the waitable should have no longer been ready";
EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set()))
<< "waitable was ready after waiting a second time, which was not expected";
}
}

// if expected_to_stay_ready, check that take_data/execute makes it not ready
if (expected_to_stay_ready) {
waitable->execute(waitable->take_data());
auto wait_result = wait_set.wait(std::chrono::seconds(0));
EXPECT_EQ(wait_result.kind(), rclcpp::WaitResultKind::Timeout)
<< "wait set did not time out after the waitable should have no longer been ready";
EXPECT_FALSE(waitable->is_ready(wait_set.get_rcl_wait_set()))
<< "waitable was unexpectedly ready after a take_data and execute";
}
}

} // namespace waitables
} // namespace test
} // namespace rclcpp

#endif // RCLCPP__WAITABLES__WAITABLE_TEST_HELPERS_HPP_

0 comments on commit bfa7efa

Please sign in to comment.