Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup of https://github.com/ros2/rclcpp/pull/2683 #2714

Open
wants to merge 3 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <mutex>
#include <set>
#include <vector>

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -41,7 +42,9 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
* of this waitable has signaled the wait_set.
*/
RCLCPP_PUBLIC
explicit ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback = {});
explicit ExecutorNotifyWaitable(
std::function<void(void)> on_execute_callback = {}, const rclcpp::Context::SharedPtr & context =
rclcpp::contexts::get_global_default_context());

// Destructor
RCLCPP_PUBLIC
Expand Down Expand Up @@ -158,8 +161,17 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
std::function<void(size_t)> on_ready_callback_;

/// The collection of guard conditions to be waited on.
std::set<rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
std::set<rclcpp::GuardCondition::SharedPtr> notify_guard_conditions_;

/// The indixes were our guard conditions were stored in the
/// rcl waitset
std::vector<size_t> idxs_of_added_guard_condition_;

/// set to true, if we got a pending trigger
bool needs_processing = false;

/// A guard condition needed to generate wakeups
rclcpp::GuardCondition::SharedPtr guard_condition_;
};

} // namespace executors
Expand Down
23 changes: 4 additions & 19 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
this->entities_need_rebuild_.store(true);
})),
}, options.context)),
entities_need_rebuild_(true),
collector_(notify_waitable_),
wait_set_({}, {}, {}, {}, {}, {}, options.context),
Expand All @@ -84,7 +84,9 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);

wait_set_.add_waitable(notify_waitable_);
// we need to initially rebuild the collection,
// so that the notify_waitable_ is added
collect_entities();
}

Executor::~Executor()
Expand Down Expand Up @@ -730,33 +732,16 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Clear any previous wait result
this->wait_result_.reset();

// we need to make sure that callback groups don't get out of scope
// during the wait. As in jazzy, they are not covered by the DynamicStorage,
// we explicitly hold them here as a bugfix
std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;

{
std::lock_guard<std::mutex> guard(mutex_);

if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}

auto callback_groups = this->collector_.get_all_callback_groups();
cbgs.resize(callback_groups.size());
for(const auto & w_ptr : callback_groups) {
auto shr_ptr = w_ptr.lock();
if(shr_ptr) {
cbgs.push_back(std::move(shr_ptr));
}
}
}

this->wait_result_.emplace(wait_set_.wait(timeout));

// drop references to the callback groups, before trying to execute anything
cbgs.clear();

if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
Expand Down
100 changes: 67 additions & 33 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ namespace rclcpp
namespace executors
{

ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback)
: execute_callback_(on_execute_callback)
ExecutorNotifyWaitable::ExecutorNotifyWaitable(
std::function<void(void)> on_execute_callback,
const rclcpp::Context::SharedPtr & context)
: execute_callback_(on_execute_callback),
guard_condition_(std::make_shared<rclcpp::GuardCondition>(context))
{
}

Expand All @@ -30,6 +33,9 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(ExecutorNotifyWaitable & other)
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
this->execute_callback_ = other.execute_callback_;
this->notify_guard_conditions_ = other.notify_guard_conditions_;
this->guard_condition_ = other.guard_condition_;
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
this->needs_processing = other.needs_processing;
}

ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other)
Expand All @@ -38,6 +44,9 @@ ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitabl
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
this->execute_callback_ = other.execute_callback_;
this->notify_guard_conditions_ = other.notify_guard_conditions_;
this->guard_condition_ = other.guard_condition_;
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
this->needs_processing = other.needs_processing;
}
return *this;
}
Expand All @@ -47,21 +56,42 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

idxs_of_added_guard_condition_.clear();
idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size());

if(needs_processing) {
rcl_guard_condition_t * cond = &guard_condition_->get_rcl_guard_condition();
size_t rcl_index;
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}

idxs_of_added_guard_condition_.push_back(rcl_index);

// we want to directly wake up any way, not need to add the other guard conditions
guard_condition_->trigger();

return;
}

// Note: no guard conditions need to be re-triggered, since the guard
// conditions in this class are not tracking a stateful condition, but instead
// only serve to interrupt the wait set when new information is available to
// consider.
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (!guard_condition) {continue;}

for (const auto & guard_condition : this->notify_guard_conditions_) {
rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL);
size_t rcl_index;
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}

idxs_of_added_guard_condition_.push_back(rcl_index);
}
}

Expand All @@ -71,27 +101,33 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

bool any_ready = false;
for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
const auto * rcl_guard_condition = wait_set.guard_conditions[ii];
for (size_t rcl_index : idxs_of_added_guard_condition_) {
if(rcl_index >= wait_set.size_of_guard_conditions) {
throw std::runtime_error(
"ExecutorNotifyWaitable::is_ready: Internal error, got index out of range");
}

const auto * rcl_guard_condition = wait_set.guard_conditions[rcl_index];

if (nullptr == rcl_guard_condition) {
continue;
}
for (const auto & weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) {
any_ready = true;
break;
}
}

any_ready = true;
needs_processing = true;
break;
}

return any_ready;
}

void
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & /*data*/)
{
std::lock_guard<std::mutex> lock(execute_mutex_);

needs_processing = false;

this->execute_callback_();
}

Expand Down Expand Up @@ -122,11 +158,7 @@ ExecutorNotifyWaitable::set_on_ready_callback(std::function<void(size_t, int)> c
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = gc_callback;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
if (!gc) {
continue;
}
for (const auto & gc : notify_guard_conditions_) {
gc->set_on_trigger_callback(on_ready_callback_);
}
}
Expand All @@ -138,11 +170,7 @@ ExecutorNotifyWaitable::clear_on_ready_callback()
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = nullptr;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
if (!gc) {
continue;
}
for (const auto & gc : notify_guard_conditions_) {
gc->set_on_trigger_callback(nullptr);
}
}
Expand All @@ -159,9 +187,9 @@ void
ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
auto guard_condition = weak_guard_condition.lock();
if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) {
notify_guard_conditions_.insert(weak_guard_condition);
const auto & guard_condition = weak_guard_condition.lock();
if (guard_condition && notify_guard_conditions_.count(guard_condition) == 0) {
notify_guard_conditions_.insert(guard_condition);
if (on_ready_callback_) {
guard_condition->set_on_trigger_callback(on_ready_callback_);
}
Expand All @@ -172,11 +200,17 @@ void
ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
if (notify_guard_conditions_.count(weak_guard_condition) != 0) {
notify_guard_conditions_.erase(weak_guard_condition);
auto guard_condition = weak_guard_condition.lock();
const auto & guard_condition = weak_guard_condition.lock();
if (!guard_condition) {
// If the lock did not work, the guard condition can't be
// saved in the sets, therefore we don't need to remove it
return;
}
auto it = notify_guard_conditions_.find(guard_condition);
if (it != notify_guard_conditions_.end()) {
notify_guard_conditions_.erase(it);
// If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset
if (guard_condition && on_ready_callback_) {
if (on_ready_callback_) {
guard_condition->set_on_trigger_callback(nullptr);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ TEST_F(TestExecutorNotifyWaitable, wait) {
std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(on_execute_callback);

auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
auto notify_guard_condition =
node->get_node_base_interface()->get_shared_notify_guard_condition();
auto notify_guard_condition = std::make_shared<rclcpp::GuardCondition>();
EXPECT_NO_THROW(waitable->add_guard_condition(notify_guard_condition));

auto default_cbg = node->get_node_base_interface()->get_default_callback_group();
Expand All @@ -86,12 +85,15 @@ TEST_F(TestExecutorNotifyWaitable, wait) {
auto waitables = node->get_node_waitables_interface();
waitables->add_waitable(std::static_pointer_cast<rclcpp::Waitable>(waitable), default_cbg);

// notify the guard condition, this should trigger the on_execute_callback
notify_guard_condition->trigger();

rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.spin_all(std::chrono::seconds(1));
EXPECT_EQ(1u, on_execute_calls);

// on_execute_callback doesn't change if the topology doesn't change
// no further trigger, therefore no further callback
executor.spin_all(std::chrono::seconds(1));
EXPECT_EQ(1u, on_execute_calls);
}