Skip to content

Commit

Permalink
Check if Task(Future) is canceled. (#1377)
Browse files Browse the repository at this point in the history
* Check if Task(Future) is canceled.

Signed-off-by: Tomoya Fujita <[email protected]>

* Close cancelled coroutine (#1394)

* Add FutureState

Signed-off-by: Nadav Elkabets <[email protected]>

* Close canceled coroutine

Signed-off-by: Nadav Elkabets <[email protected]>

* Fixed behavior in test

Signed-off-by: Nadav Elkabets <[email protected]>

---------

Signed-off-by: Nadav Elkabets <[email protected]>
Signed-off-by: Tomoya Fujita <[email protected]>

* address flake8 and pep257 failures.

Signed-off-by: Tomoya Fujita <[email protected]>

* Cancelled future is not done (#1397)

* Remove redundant coro.close

Signed-off-by: nadav <[email protected]>

* Only finished future is done

Signed-off-by: nadav <[email protected]>

* Add function _pending and fix checks

Signed-off-by: = <[email protected]>

* Replace check in done from pending to finished

Signed-off-by: = <[email protected]>

* Adapt test to new behavior

Signed-off-by: = <[email protected]>

* Add tests

Signed-off-by: = <[email protected]>

* Make changes within active task mutex

Signed-off-by: = <[email protected]>

---------

Signed-off-by: nadav <[email protected]>
Signed-off-by: = <[email protected]>

* keep the consistent behavior to avoid exception, and adjusted some tests accordingly.

Signed-off-by: Tomoya Fujita <[email protected]>

* revert doc section to raise the exception.

Signed-off-by: Tomoya Fujita <[email protected]>

* remove StrEnum and put logical operator in the beginning of line.

Signed-off-by: Tomoya Fujita <[email protected]>

* add more test to check Task state.

Signed-off-by: Tomoya Fujita <[email protected]>

---------

Signed-off-by: Tomoya Fujita <[email protected]>
Signed-off-by: Nadav Elkabets <[email protected]>
Signed-off-by: nadav <[email protected]>
Signed-off-by: = <[email protected]>
Co-authored-by: Nadav Elkabets <[email protected]>
Co-authored-by: Nadav Elkabets <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2025
1 parent 296425c commit 9a144bf
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 19 deletions.
16 changes: 14 additions & 2 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,24 @@ def spin_until_future_complete(
future.add_done_callback(lambda x: self.wake())

if timeout_sec is None or timeout_sec < 0:
while self._context.ok() and not future.done() and not self._is_shutdown:
while (
self._context.ok()
and not future.done()
and not future.cancelled()
and not self._is_shutdown
):
self._spin_once_until_future_complete(future, timeout_sec)
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = TimeoutObject(timeout_sec)

while self._context.ok() and not future.done() and not self._is_shutdown:
while (
self._context.ok()
and not future.done()
and not future.cancelled()
and not self._is_shutdown
):
self._spin_once_until_future_complete(future, timeout_left)
now = time.monotonic()

Expand Down Expand Up @@ -653,6 +663,8 @@ def _wait_for_ready_callbacks(
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
# Get rid of any tasks that are cancelled
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].cancelled(), self._tasks))

# Gather entities that can be waited on
subscriptions: List[Subscription[Any, ]] = []
Expand Down
54 changes: 37 additions & 17 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
import inspect
import sys
import threading
Expand All @@ -36,14 +37,19 @@ def _fake_weakref() -> None:
return None


class FutureState(Enum):
"""States defining the lifecycle of a future."""

PENDING = 'PENDING'
CANCELLED = 'CANCELLED'
FINISHED = 'FINISHED'


class Future(Generic[T]):
"""Represent the outcome of a task in the future."""

def __init__(self, *, executor: Optional['Executor'] = None) -> None:
# true if the task is done or cancelled
self._done = False
# true if the task is cancelled
self._cancelled = False
self._state = FutureState.PENDING
# the final return value of the handler
self._result: Optional[T] = None
# An exception raised by the handler when called
Expand All @@ -66,15 +72,20 @@ def __del__(self) -> None:

def __await__(self) -> Generator[None, None, Optional[T]]:
# Yield if the task is not finished
while not self._done:
while self._pending():
yield
return self.result()

def _pending(self) -> bool:
return self._state == FutureState.PENDING

def cancel(self) -> None:
"""Request cancellation of the running task if it is not done already."""
with self._lock:
if not self._done:
self._cancelled = True
if not self._pending():
return

self._state = FutureState.CANCELLED
self._schedule_or_invoke_done_callbacks()

def cancelled(self) -> bool:
Expand All @@ -83,15 +94,15 @@ def cancelled(self) -> bool:
:return: True if the task was cancelled
"""
return self._cancelled
return self._state == FutureState.CANCELLED

def done(self) -> bool:
"""
Indicate if the task has finished executing.
:return: True if the task is finished or raised while it was executing
"""
return self._done
return self._state == FutureState.FINISHED

def result(self) -> Optional[T]:
"""
Expand Down Expand Up @@ -123,8 +134,8 @@ def set_result(self, result: T) -> None:
"""
with self._lock:
self._result = result
self._done = True
self._cancelled = False
self._state = FutureState.FINISHED

self._schedule_or_invoke_done_callbacks()

def set_exception(self, exception: Exception) -> None:
Expand All @@ -136,8 +147,8 @@ def set_exception(self, exception: Exception) -> None:
with self._lock:
self._exception = exception
self._exception_fetched = False
self._done = True
self._cancelled = False
self._state = FutureState.FINISHED

self._schedule_or_invoke_done_callbacks()

def _schedule_or_invoke_done_callbacks(self) -> None:
Expand Down Expand Up @@ -186,7 +197,7 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None:
"""
invoke = False
with self._lock:
if self._done:
if not self._pending():
assert self._executor is not None
executor = self._executor()
if executor is not None:
Expand Down Expand Up @@ -251,10 +262,14 @@ def __call__(self) -> None:
The return value of the handler is stored as the task result.
"""
if self._done or self._executing or not self._task_lock.acquire(blocking=False):
if (
not self._pending() or
self._executing or
not self._task_lock.acquire(blocking=False)
):
return
try:
if self._done:
if not self._pending():
return
self._executing = True

Expand All @@ -265,7 +280,6 @@ def __call__(self) -> None:
handler.send(None)
except StopIteration as e:
# The coroutine finished; store the result
handler.close()
self.set_result(e.value)
self._complete_task()
except Exception as e:
Expand Down Expand Up @@ -297,3 +311,9 @@ def executing(self) -> bool:
:return: True if the task is currently executing.
"""
return self._executing

def cancel(self) -> None:
if self._pending() and inspect.iscoroutine(self._handler):
self._handler.close()

super().cancel()
20 changes: 20 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,26 @@ async def coroutine():
self.assertTrue(future.done())
self.assertEqual('Sentinel Result', future.result())

def test_create_task_coroutine_cancel(self) -> None:
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

async def coroutine():
return 'Sentinel Result'

future = executor.create_task(coroutine)
self.assertFalse(future.done())
self.assertFalse(future.cancelled())

future.cancel()
self.assertTrue(future.cancelled())

executor.spin_until_future_complete(future)
self.assertFalse(future.done())
self.assertTrue(future.cancelled())
self.assertEqual(None, future.result())

def test_create_task_normal_function(self) -> None:
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
Expand Down
33 changes: 33 additions & 0 deletions rclpy/test/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,39 @@ def cb(fut):
f.add_done_callback(cb)
assert called

def test_set_result_on_done_future_without_exception(self) -> None:
f = Future()
f.set_result(None)
self.assertTrue(f.done())
self.assertFalse(f.cancelled())
f.set_result(None)
self.assertTrue(f.done())
self.assertFalse(f.cancelled())

def test_set_result_on_cancelled_future_without_exception(self) -> None:
f = Future()
f.cancel()
self.assertTrue(f.cancelled())
self.assertFalse(f.done())
f.set_result(None)
self.assertTrue(f.done())

def test_set_exception_on_done_future_without_exception(self) -> None:
f = Future()
f.set_result(None)
self.assertIsNone(f.exception())
f.set_exception(Exception())
f.set_result(None)
self.assertIsNotNone(f.exception())

def test_set_exception_on_cancelled_future_without_exception(self) -> None:
f = Future()
f.cancel()
self.assertTrue(f.cancelled())
self.assertIsNone(f.exception())
f.set_exception(Exception())
self.assertIsNotNone(f.exception())


if __name__ == '__main__':
unittest.main()

0 comments on commit 9a144bf

Please sign in to comment.