Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable limited use of asyncio with executors #971

Open
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -159,6 +159,7 @@ if(BUILD_TESTING)
test/test_action_client.py
test/test_action_graph.py
test/test_action_server.py
test/test_asyncio_interop.py
test/test_callback_group.py
test/test_client.py
test/test_clock.py
49 changes: 35 additions & 14 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
@@ -190,6 +190,27 @@ def create_task(self, callback: Union[Callable, Coroutine], *args, **kwargs) ->
# Task inherits from Future
return task

def call_soon(self, callback, *args) -> Task:
"""
Add a callback or coroutine to be executed during :meth:`spin`.

Arguments to this function are passed to the callback.

:param callback: A callback to be run in the executor.
:return: A Task which the executor will execute.
"""
if self._is_shutdown:
raise ShutdownException()

if not isinstance(callback, Task):
callback = Task(callback, args, None, executor=self)

with self._tasks_lock:
self._tasks.append((callback, None, None))
self._guard.trigger()

return callback

def shutdown(self, timeout_sec: float = None) -> bool:
"""
Stop executing callbacks and wait for their completion.
@@ -432,12 +453,9 @@ async def handler(entity, gc, is_shutdown, work_tracker):
gc.trigger()
except InvalidHandle:
pass
task = Task(
return Task(
handler, (entity, self._guard, self._is_shutdown, self._work_tracker),
executor=self)
with self._tasks_lock:
self._tasks.append((task, entity, node))
return task

def can_execute(self, entity: WaitableEntityType) -> bool:
"""
@@ -481,16 +499,19 @@ def _wait_for_ready_callbacks(
# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
if (not task.executing() and not task.done() and
(node is None or node in nodes_to_use)):
yielded_work = True
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
tasks = self._tasks
# Tasks that need to be executed again will add themselves back to the executor
self._tasks = []
while tasks:
task_trio = tasks.pop()
task, entity, node = task_trio
if node is None or node in nodes_to_use:
yielded_work = True
yield task_trio
else:
# Asked not to execute these tasks, so don't do them yet
with self._tasks_lock:
self._tasks.append(task_trio)

# Gather entities that can be waited on
subscriptions: List[Subscription] = []
102 changes: 55 additions & 47 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import inspect
import sys
import threading
@@ -52,11 +53,12 @@ def __del__(self):
file=sys.stderr)

def __await__(self):
# Yield if the task is not finished
while not self._done:
yield
if not self._done:
yield self
return self.result()

__iter__ = __await__

def cancel(self):
"""Request cancellation of the running task if it is not done already."""
with self._lock:
@@ -142,7 +144,7 @@ def _schedule_or_invoke_done_callbacks(self):
if executor is not None:
# Have the executor take care of the callbacks
for callback in callbacks:
executor.create_task(callback, self)
executor.call_soon(callback, self)
else:
# No executor, call right away
for callback in callbacks:
@@ -176,7 +178,7 @@ def add_done_callback(self, callback):
if self._done:
executor = self._executor()
if executor is not None:
executor.create_task(callback, self)
executor.call_soon(callback, self)
else:
invoke = True
else:
@@ -199,6 +201,8 @@ class Task(Future):

def __init__(self, handler, args=None, kwargs=None, executor=None):
super().__init__(executor=executor)
if executor is None:
raise RuntimeError('Task requires an executor')
# _handler is either a normal function or a coroutine
self._handler = handler
# Arguments passed into the function
@@ -212,62 +216,66 @@ def __init__(self, handler, args=None, kwargs=None, executor=None):
self._handler = handler(*args, **kwargs)
self._args = None
self._kwargs = None
# True while the task is being executed
self._executing = False
# Lock acquired to prevent task from executing in parallel with itself
self._task_lock = threading.Lock()

def __call__(self):
def __call__(self, future=None):
"""
Run or resume a task.

This attempts to execute a handler. If the handler is a coroutine it will attempt to
await it. If there are done callbacks it will schedule them with the executor.

The return value of the handler is stored as the task result.

This function must not be called in parallel with itself.

:param future: do not use
"""
if self._done or self._executing or not self._task_lock.acquire(blocking=False):
if self._done:
return
try:
if self._done:
return
self._executing = True

if inspect.iscoroutine(self._handler):
# Execute a coroutine
try:
self._handler.send(None)
except StopIteration as e:
# The coroutine finished; store the result
self._handler.close()
self.set_result(e.value)
self._complete_task()
except Exception as e:
self.set_exception(e)
self._complete_task()
else:
# Execute a normal function
try:
self.set_result(self._handler(*self._args, **self._kwargs))
except Exception as e:
self.set_exception(e)
if inspect.iscoroutine(self._handler):
# Execute a coroutine
try:
result = self._handler.send(None)
if isinstance(result, Future):
# Wait for an rclpy future to complete
result.add_done_callback(self)
elif asyncio.isfuture(result):
# Get the event loop of this thread (raises RuntimeError if there isn't one)
event_loop = asyncio.get_running_loop()
# Make sure we're in the same thread as the future's event loop.
# TODO(sloretz) is asyncio.Future.get_loop() thread-safe?
if result.get_loop() is not event_loop:
raise RuntimeError('Cannot await asyncio future from a different thread')
# Resume this task when the asyncio future completes
result.add_done_callback(lambda _: self._executor().call_soon(self))
elif result is None:
# Wait for one iteration if a bare yield is used
self._executor().call_soon(self)
else:
# What is this intermediate value?
# Could be a different async library's coroutine
# Could be a generator yielded a value
raise RuntimeError(f'Coroutine yielded unexpected value: {result}')
except StopIteration as e:
# Coroutine or generator returning a result
self._handler.close()
self.set_result(e.value)
self._complete_task()

self._executing = False
finally:
self._task_lock.release()
except Exception as e:
# Coroutine or generator raising an exception
self._handler.close()
self.set_exception(e)
self._complete_task()
else:
# Execute a normal function
try:
self.set_result(self._handler(*self._args, **self._kwargs))
except Exception as e:
self.set_exception(e)
self._complete_task()

def _complete_task(self):
"""Cleanup after task finished."""
self._handler = None
self._args = None
self._kwargs = None

def executing(self):
"""
Check if the task is currently being executed.

:return: True if the task is currently executing.
:rtype: bool
"""
return self._executing
66 changes: 66 additions & 0 deletions rclpy/test/test_asyncio_interop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import time

import pytest

import rclpy
from rclpy.executors import SingleThreadedExecutor


MAX_TEST_TIME = 5.0
TIME_FUDGE_FACTOR = 0.2


@pytest.fixture
def node_and_executor():
rclpy.init()
node = rclpy.create_node('test_asyncio_interop')
executor = SingleThreadedExecutor()
executor.add_node(node)
yield node, executor
executor.shutdown()
node.destroy_node()
rclpy.shutdown()


def test_sleep_in_event_loop(node_and_executor):
node, executor = node_and_executor

expected_sleep_time = 0.5
sleep_time = None

async def cb():
nonlocal sleep_time
start = time.monotonic()
await asyncio.sleep(expected_sleep_time)
end = time.monotonic()
sleep_time = end - start

guard = node.create_guard_condition(cb)
guard.trigger()

async def spin():
nonlocal sleep_time
start = time.monotonic()
while not sleep_time and MAX_TEST_TIME > time.monotonic() - start:
executor.spin_once(timeout_sec=0)
# Don't use 100% CPU
await asyncio.sleep(0.01)

asyncio.run(spin())
assert sleep_time >= expected_sleep_time
assert abs(expected_sleep_time - sleep_time) <= expected_sleep_time * TIME_FUDGE_FACTOR
21 changes: 0 additions & 21 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import threading
import time
import unittest
@@ -157,25 +156,15 @@ def test_execute_coroutine_timer(self):
executor.add_node(self.node)

called1 = False
called2 = False

async def coroutine():
nonlocal called1
nonlocal called2
called1 = True
await asyncio.sleep(0)
called2 = True

tmr = self.node.create_timer(0.1, coroutine)
try:
executor.spin_once(timeout_sec=1.23)
self.assertTrue(called1)
self.assertFalse(called2)

called1 = False
executor.spin_once(timeout_sec=0)
self.assertFalse(called1)
self.assertTrue(called2)
finally:
self.node.destroy_timer(tmr)

@@ -185,26 +174,16 @@ def test_execute_coroutine_guard_condition(self):
executor.add_node(self.node)

called1 = False
called2 = False

async def coroutine():
nonlocal called1
nonlocal called2
called1 = True
await asyncio.sleep(0)
called2 = True

gc = self.node.create_guard_condition(coroutine)
try:
gc.trigger()
executor.spin_once(timeout_sec=0)
self.assertTrue(called1)
self.assertFalse(called2)

called1 = False
executor.spin_once(timeout_sec=1)
self.assertFalse(called1)
self.assertTrue(called2)
finally:
self.node.destroy_guard_condition(gc)

10 changes: 7 additions & 3 deletions rclpy/test/test_guard_condition.py
Original file line number Diff line number Diff line change
@@ -26,15 +26,19 @@ def setUpClass(cls):
rclpy.init(context=cls.context)
cls.node = rclpy.create_node(
'TestGuardCondition', namespace='/rclpy/test', context=cls.context)
cls.executor = SingleThreadedExecutor(context=cls.context)
cls.executor.add_node(cls.node)

@classmethod
def tearDownClass(cls):
cls.executor.shutdown()
cls.node.destroy_node()
rclpy.shutdown(context=cls.context)

def setUp(self):
self.executor = SingleThreadedExecutor(context=self.context)
self.executor.add_node(self.node)

def tearDown(self):
self.executor.shutdown()

def test_trigger(self):
called = False

Loading