Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce EventsExecutor implementation #1391

Open
wants to merge 32 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
45f7f5f
Introduce EventsExecutor implementation (#1389)
Dec 31, 2024
18036c4
Fix explosion for reference count updates without GIL
Jan 4, 2025
ebb1ee3
Fix rclpy triggering its own deprecation warnings
Jan 7, 2025
68ecfb4
Fix ament linter complaints
Jan 7, 2025
9722f08
Switch to non-boost asio library
Jan 7, 2025
1570aac
Use internal _rclpy C++ types instead of jumping through Python hoops
Jan 9, 2025
99fdd9f
Reformat with clang-format, then uncrustify again
Jan 9, 2025
3631594
Respect init signal handling options
Jan 9, 2025
f68f995
Reconcile signal handling differences with SingleThreadedExecutor
Jan 10, 2025
447df5e
test_executor.py: Add coverage for EventsExecutor
Jan 13, 2025
4a26313
Make spin_once() only return after a user-visible callback
Jan 13, 2025
9e95b53
Add get_nodes() method
Jan 13, 2025
99f6f93
Add context management support
Jan 13, 2025
dde4442
Remove mutex protection on nodes_ member access
Jan 16, 2025
aabfc18
Fix deadlock during shutdown()
Jan 16, 2025
4a181d8
A little further on using C++ _rclpy types instead of Python interface
Jan 20, 2025
7cc7f5f
Fix issue with iterating through incomplete Tasks
Jan 20, 2025
3502872
Add support for coroutines to timer handling
Jan 21, 2025
4dd05c6
Fix test_not_lose_callback() test to destroy entities properly
Jan 21, 2025
776a1a3
Correct test that wasn't running at all, and remove EventsExecutor fr…
Jan 22, 2025
8bfbfbd
Warn on every timer added over the threshold, not just the first
bmartin427 Jan 22, 2025
126c754
Keep rcl_timer_call() tightly coupled with user callback dispatch
Jan 24, 2025
ee14711
Protect against deferred method calls happening against a deleted Clo…
Jan 24, 2025
35f4aea
Add support for new TimerInfo data passed to timer handlers
Jan 24, 2025
df3dbb7
Simplify spin_once() implementation
Jan 24, 2025
168c5cc
Fix stale Future done callbacks with spin_until_future_complete()
Jan 24, 2025
b9e5240
Use existing rclpy signal handling instead of asio
Jan 27, 2025
0959658
Replace asio timers with a dedicated timer wait thread
Jan 28, 2025
8f764df
Correct busy-looping in async callback handling
Jan 29, 2025
a9c21c6
Replace asio::io_context with a new EventsQueue object
Jan 24, 2025
c3f0714
Merge remote-tracking branch 'origin/rolling' into events_executor
Jan 29, 2025
6856054
Add EventsExecutor to new test_executor test from merge
Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ pybind11_add_module(_rclpy_pybind11 SHARED
src/rclpy/destroyable.cpp
src/rclpy/duration.cpp
src/rclpy/clock_event.cpp
src/rclpy/events_executor/delayed_event_thread.cpp
src/rclpy/events_executor/events_executor.cpp
src/rclpy/events_executor/events_queue.cpp
src/rclpy/events_executor/rcl_support.cpp
src/rclpy/events_executor/timers_manager.cpp
src/rclpy/exceptions.cpp
src/rclpy/graph.cpp
src/rclpy/guard_condition.cpp
Expand Down Expand Up @@ -182,6 +187,7 @@ if(BUILD_TESTING)
test/test_create_node.py
test/test_create_while_spinning.py
test/test_destruction.py
test/test_events_executor.py
test/test_executor.py
test/test_expand_topic_name.py
test/test_guard_condition.py
Expand Down
15 changes: 15 additions & 0 deletions rclpy/rclpy/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2024-2025 Brad Martin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .events_executor import EventsExecutor # noqa: F401
44 changes: 44 additions & 0 deletions rclpy/rclpy/experimental/events_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2024-2025 Brad Martin
# Copyright 2024 Merlin Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import faulthandler
import typing

import rclpy.executors
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
import rclpy.node


# Try to look like we inherit from the rclpy Executor for type checking purposes without
# getting any of the code from the base class.
def EventsExecutor(*, context: rclpy.Context | None = None) -> rclpy.executors.Executor:
if context is None:
context = rclpy.get_default_context()

# For debugging purposes, if anything goes wrong in C++ make sure we also get a
# Python backtrace dumped with the crash.
faulthandler.enable()

ex = typing.cast(rclpy.executors.Executor, _rclpy.EventsExecutor(context))

# rclpy.Executor does this too. Note, the context itself is smart enough to check
# for bound methods, and check whether the instances they're bound to still exist at
# callback time, so we don't have to worry about tearing down this stale callback at
# destruction time.
# TODO(bmartin427) This should really be done inside of the EventsExecutor
# implementation itself, but I'm unable to figure out a pybind11 incantation that
# allows me to pass this bound method call from C++.
context.on_shutdown(ex.wake)

return ex
14 changes: 14 additions & 0 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,20 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None:
if invoke:
callback(self)

def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool:
"""
Remove a previously-added done callback.

Returns true if the given callback was found and removed. Always fails if the Future was
already complete.
"""
with self._lock:
try:
self._callbacks.remove(callback)
except ValueError:
return False
return True


class Task(Future[T]):
"""
Expand Down
3 changes: 3 additions & 0 deletions rclpy/src/rclpy/_rclpy_pybind11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "duration.hpp"
#include "clock_event.hpp"
#include "event_handle.hpp"
#include "events_executor/events_executor.hpp"
#include "exceptions.hpp"
#include "graph.hpp"
#include "guard_condition.hpp"
Expand Down Expand Up @@ -247,4 +248,6 @@ PYBIND11_MODULE(_rclpy_pybind11, m) {
rclpy::define_signal_handler_api(m);
rclpy::define_clock_event(m);
rclpy::define_lifecycle_api(m);

rclpy::events_executor::define_events_executor(m);
}
72 changes: 72 additions & 0 deletions rclpy/src/rclpy/events_executor/delayed_event_thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2025 Brad Martin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "events_executor/delayed_event_thread.hpp"

#include <utility>

namespace rclpy
{
namespace events_executor
{

DelayedEventThread::DelayedEventThread(EventsQueue * events_queue)
: events_queue_(events_queue), thread_([this]() {RunThread();})
{
}

DelayedEventThread::~DelayedEventThread()
{
{
std::unique_lock<std::mutex> lock(mutex_);
done_ = true;
}
cv_.notify_one();
thread_.join();
}

void DelayedEventThread::EnqueueAt(
std::chrono::steady_clock::time_point when, std::function<void()> handler)
{
{
std::unique_lock<std::mutex> lock(mutex_);
when_ = when;
handler_ = handler;
}
cv_.notify_one();
}

void DelayedEventThread::RunThread()
{
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
if (handler_) {
// Make sure we don't dispatch a stale wait if it changes while we're waiting.
const auto latched_when = when_;
if (
(std::cv_status::timeout == cv_.wait_until(lock, latched_when)) && handler_ &&
(when_ <= latched_when))
{
auto handler = std::move(handler_);
handler_ = {};
events_queue_->Enqueue(std::move(handler));
}
} else {
// Wait indefinitely until we get signaled that there's something worth looking at.
cv_.wait(lock);
}
}
}

} // namespace events_executor
} // namespace rclpy
59 changes: 59 additions & 0 deletions rclpy/src/rclpy/events_executor/delayed_event_thread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2025 Brad Martin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>

#include "events_executor/events_queue.hpp"

namespace rclpy
{
namespace events_executor
{

/// This object manages posting an event handler to an EventsQueue after a specified delay. The
/// current delay may be changed or canceled at any time. This is done by way of a self-contained
/// child thread to perform the wait.
class DelayedEventThread
{
public:
/// The pointer is aliased and must live for the lifetime of this object.
explicit DelayedEventThread(EventsQueue *);
~DelayedEventThread();

/// Schedules an event handler to be enqueued at the specified time point. Replaces any previous
/// wait and handler, which will never be dispatched if it has not been already.
void EnqueueAt(std::chrono::steady_clock::time_point when, std::function<void()> handler);

/// Cancels any previously-scheduled handler.
void Cancel() {EnqueueAt({}, {});}

private:
void RunThread();

EventsQueue * const events_queue_;
std::mutex mutex_;
bool done_{};
std::condition_variable cv_;
std::chrono::steady_clock::time_point when_;
std::function<void()> handler_;
std::thread thread_;
};

} // namespace events_executor
} // namespace rclpy
Loading