Skip to content

Commit

Permalink
Executor executes the tasks in FIFO order. (#1304)
Browse files Browse the repository at this point in the history
* Executor executes the tasks in FIFO order.

Signed-off-by: Tomoya Fujita <[email protected]>

* comment fix to use warning doc section.

Signed-off-by: Tomoya Fujita <[email protected]>

---------

Signed-off-by: Tomoya Fujita <[email protected]>
  • Loading branch information
fujitatomoya authored Jun 26, 2024
1 parent afb1387 commit 43198cb
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
5 changes: 4 additions & 1 deletion rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ def create_task(self, callback: Union[Callable, Coroutine], *args, **kwargs) ->
Arguments to this function are passed to the callback.
.. warning:: Created task is queued in the executor in FIFO order,
but users should not rely on the task execution order.
:param callback: A callback to be run in the executor.
"""
task = Task(callback, args, kwargs, executor=self)
Expand Down Expand Up @@ -569,7 +572,7 @@ def _wait_for_ready_callbacks(
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
for task, entity, node in tasks:
if (not task.executing() and not task.done() and
(node is None or node in nodes_to_use)):
yielded_work = True
Expand Down
42 changes: 34 additions & 8 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def func():
self.assertTrue(future.done())
self.assertEqual('Sentinel Result', future.result())

def test_create_task_dependent_coroutines(self):
def test_create_task_fifo_order(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)
Expand All @@ -281,25 +281,51 @@ async def coro1():
future1 = executor.create_task(coro1)

async def coro2():
nonlocal future1
await future1
return 'Sentinel Result 2'

future2 = executor.create_task(coro2)

# Coro2 is newest task, so it gets to await future1 in this spin
executor.spin_once(timeout_sec=0)
# Coro1 execs in this spin
# Coro1 is the 1st task, so it gets executed in this spin
executor.spin_once(timeout_sec=0)
self.assertTrue(future1.done())
self.assertEqual('Sentinel Result 1', future1.result())
self.assertFalse(future2.done())

# Coro2 passes the await step here (timeout change forces new generator)
executor.spin_once(timeout_sec=1)
# Coro2 is the next in the queue, so it gets executed in this spin
executor.spin_once(timeout_sec=0)
self.assertTrue(future2.done())
self.assertEqual('Sentinel Result 2', future2.result())

def test_create_task_dependent_coroutines(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

async def coro1():
nonlocal future2
await future2
return 'Sentinel Result 1'

future1 = executor.create_task(coro1)

async def coro2():
return 'Sentinel Result 2'

future2 = executor.create_task(coro2)

# Coro1 is the 1st task, so it gets to await future2 in this spin
executor.spin_once(timeout_sec=0)
# Coro2 execs in this spin
executor.spin_once(timeout_sec=0)
self.assertFalse(future1.done())
self.assertTrue(future2.done())
self.assertEqual('Sentinel Result 2', future2.result())

# Coro1 passes the await step here (timeout change forces new generator)
executor.spin_once(timeout_sec=1)
self.assertTrue(future1.done())
self.assertEqual('Sentinel Result 1', future1.result())

def test_create_task_during_spin(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
Expand Down

0 comments on commit 43198cb

Please sign in to comment.