diff --git a/rclpy/rclpy/__init__.py b/rclpy/rclpy/__init__.py index 6e94dce51..2e85298b1 100644 --- a/rclpy/rclpy/__init__.py +++ b/rclpy/rclpy/__init__.py @@ -33,16 +33,20 @@ After a node is created, items of work can be done (e.g. subscription callbacks) by *spinning* on the node. The following functions can be used to process work that is waiting to be executed: :func:`spin`, -:func:`spin_once`, and :func:`spin_until_future_complete`. +:func:`spin_once`, and :func:`spin_until_complete`. When finished with a previously initialized :class:`.Context` (ie. done using all ROS nodes associated with the context), the :func:`shutdown` function should be called. This will invalidate all entities derived from the context. """ +from typing import Callable from typing import List from typing import Optional from typing import TYPE_CHECKING +from typing import Union + +import warnings from rclpy.context import Context from rclpy.parameter import Parameter @@ -242,6 +246,52 @@ def spin(node: 'Node', executor: Optional['Executor'] = None) -> None: executor.remove_node(node) +def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None: + """ + Execute block until the context associated with the executor is shutdown or time duration pass. + + Callbacks will be executed by the provided executor. + + This function blocks. + + :param node: A node to add to the executor to check for work. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait. + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_for(duration_sec) + finally: + executor.remove_node(node) + + +def spin_until_complete( + node: 'Node', + condition: Union[Future, Callable[[], bool]], + executor: Optional['Executor'] = None, + timeout_sec: Optional[float] = None, +) -> None: + """ + Execute work until the condition is complete. + + Callbacks and other work will be executed by the provided executor until ``condition()`` or + ``future.done()`` returns ``True`` or the context associated with the executor is shutdown. + + :param node: A node to add to the executor to check for work. + :param condition: The callable or future object to wait on. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait. Block until the condition is complete + if ``None`` or negative. Don't wait if 0. + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_until_complete(condition, timeout_sec) + finally: + executor.remove_node(node) + + def spin_until_future_complete( node: 'Node', future: Future, @@ -260,9 +310,5 @@ def spin_until_future_complete( :param timeout_sec: Seconds to wait. Block until the future is complete if ``None`` or negative. Don't wait if 0. """ - executor = get_global_executor() if executor is None else executor - try: - executor.add_node(node) - executor.spin_until_future_complete(future, timeout_sec) - finally: - executor.remove_node(node) + warnings.warn('Deprecated in favor of spin_until_complete.') + spin_until_complete(node, future, executor, timeout_sec) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 78544c91c..b88b78e17 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -307,25 +307,37 @@ def spin(self) -> None: while self._context.ok() and not self._is_shutdown: self.spin_once() - def spin_until_future_complete( + def spin_for(self, duration_sec: Optional[float] = None) -> None: + """Execute callbacks until shutdown, or timeout.""" + self.spin_until_complete(lambda: False, duration_sec) + + def spin_until_complete( self, - future: Future, - timeout_sec: Optional[float] = None + condition: Union[Future, Callable[[], bool]], + timeout_sec: Optional[float] = None, ) -> None: - """Execute callbacks until a given future is done or a timeout occurs.""" - # Make sure the future wakes this executor when it is done - future.add_done_callback(lambda x: self.wake()) + """Execute callbacks until a given condition is done or a timeout occurs.""" + # Common conditon for safisfying both Callable and Future + finish_condition = None + if (isinstance(condition, Future)): + # Make sure the future wakes this executor when it is done + condition.add_done_callback(lambda x: self.wake()) + def finish_condition(): return condition.done() + elif (callable(condition)): + def finish_condition(): return condition() + else: + raise TypeError('Condition has to be of Future or Callable type') if timeout_sec is None or timeout_sec < 0: - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_sec) + while self._context.ok() and not finish_condition() and not self._is_shutdown: + self.spin_once_until_complete(finish_condition, timeout_sec) else: start = time.monotonic() end = start + timeout_sec timeout_left = TimeoutObject(timeout_sec) - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_left) + while self._context.ok() and not finish_condition() and not self._is_shutdown: + self.spin_once_until_complete(finish_condition, timeout_left) now = time.monotonic() if now >= end: @@ -333,6 +345,15 @@ def spin_until_future_complete( timeout_left.timeout = end - now + def spin_until_future_complete( + self, + future: Future, + timeout_sec: Optional[float] = None, + ) -> None: + """Execute callbacks until a given future is done or a timeout occurs.""" + warnings.warn('Deprecated in favor of spin_until_complete.') + self.spin_until_complete(future, timeout_sec) + def spin_once(self, timeout_sec: Optional[float] = None) -> None: """ Wait for and execute a single callback. @@ -346,6 +367,23 @@ def spin_once(self, timeout_sec: Optional[float] = None) -> None: """ raise NotImplementedError() + def spin_once_until_complete( + self, + condition: Union[Future, Callable[[], bool]], + timeout_sec: Optional[Union[float, TimeoutObject]] = None, + ) -> None: + """ + Wait for and execute a single callback. + + This should behave in the same way as :meth:`spin_once`. + If needed by the implementation, it should awake other threads waiting. + + :param condition: The executor will wait until this condition is done. + :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. + Don't wait if 0. + """ + raise NotImplementedError() + def spin_once_until_future_complete( self, future: Future, @@ -361,6 +399,7 @@ def spin_once_until_future_complete( :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. Don't wait if 0. """ + warnings.warn('Deprecated in favor of spin_once_until_complete.') raise NotImplementedError() def _take_timer(self, tmr): @@ -826,13 +865,21 @@ def _spin_once_impl( def spin_once(self, timeout_sec: Optional[float] = None) -> None: self._spin_once_impl(timeout_sec) + def spin_once_until_complete( + self, + condition: Callable[[], bool], + timeout_sec: Optional[Union[float, TimeoutObject]] = None, + ) -> None: + self._spin_once_impl(timeout_sec, condition) + def spin_once_until_future_complete( self, future: Future, timeout_sec: Optional[Union[float, TimeoutObject]] = None ) -> None: + warnings.warn('Deprecated in favor of spin_once_until_complete.') future.add_done_callback(lambda x: self.wake()) - self._spin_once_impl(timeout_sec, future.done) + self.spin_once_until_complete(future.done, timeout_sec) class MultiThreadedExecutor(Executor): @@ -898,10 +945,18 @@ def _spin_once_impl( def spin_once(self, timeout_sec: Optional[float] = None) -> None: self._spin_once_impl(timeout_sec) + def spin_once_until_complete( + self, + condition: Callable[[], bool], + timeout_sec: float = None, + ) -> None: + self._spin_once_impl(timeout_sec, condition) + def spin_once_until_future_complete( self, future: Future, timeout_sec: Optional[Union[float, TimeoutObject]] = None ) -> None: + warnings.warn('Deprecated in favor of spin_once_until_complete.') future.add_done_callback(lambda x: self.wake()) - self._spin_once_impl(timeout_sec, future.done) + self.spin_once_until_complete(future.done, timeout_sec) diff --git a/rclpy/rclpy/parameter_client.py b/rclpy/rclpy/parameter_client.py index e549a8698..9f75b2546 100644 --- a/rclpy/rclpy/parameter_client.py +++ b/rclpy/rclpy/parameter_client.py @@ -64,7 +64,7 @@ def __init__( Parameter('int_param', Parameter.Type.INTEGER, 88), Parameter('string/param', Parameter.Type.STRING, 'hello world').to_parameter_msg(), ]) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() # rcl_interfaces.srv.SetParameters.Response For more on service names, see: `ROS 2 docs`_. diff --git a/rclpy/test/test_action_client.py b/rclpy/test/test_action_client.py index 6dfe71b0d..3a7f82375 100644 --- a/rclpy/test/test_action_client.py +++ b/rclpy/test/test_action_client.py @@ -159,7 +159,7 @@ def test_send_goal_async(self): try: self.assertTrue(ac.wait_for_server(timeout_sec=2.0)) future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertTrue(future.done()) goal_handle = future.result() self.assertTrue(goal_handle.accepted) @@ -177,7 +177,7 @@ def test_send_goal_async_with_feedback_after_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback after goal has been accepted self.mock_action_server.publish_feedback(goal_uuid) @@ -202,7 +202,7 @@ def test_send_goal_async_with_feedback_before_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Check the feedback was not received self.assertEqual(self.feedback, None) @@ -220,12 +220,12 @@ def test_send_goal_async_with_feedback_after_goal_result_requested(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) # Then request result goal_handle = goal_future.result() result_future = goal_handle.get_result_async() - rclpy.spin_until_future_complete(self.node, result_future, self.executor) + rclpy.spin_until_complete(self.node, result_future, self.executor) self.assertTrue(result_future.done()) # Publish feedback after goal result is requested @@ -246,14 +246,14 @@ def test_send_goal_async_with_feedback_for_another_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=first_goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Send another goal, but without a feedback callback second_goal_uuid = UUID(uuid=list(uuid.uuid4().bytes)) future = ac.send_goal_async( Fibonacci.Goal(), goal_uuid=second_goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback for the second goal self.mock_action_server.publish_feedback(second_goal_uuid) @@ -277,7 +277,7 @@ def test_send_goal_async_with_feedback_for_not_a_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback for a non-existent goal ID self.mock_action_server.publish_feedback(UUID(uuid=list(uuid.uuid4().bytes))) @@ -298,9 +298,9 @@ def test_send_goal_multiple(self): future_0 = ac.send_goal_async(Fibonacci.Goal()) future_1 = ac.send_goal_async(Fibonacci.Goal()) future_2 = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, future_0, executor) - rclpy.spin_until_future_complete(self.node, future_1, executor) - rclpy.spin_until_future_complete(self.node, future_2, executor) + rclpy.spin_until_complete(self.node, future_0, executor) + rclpy.spin_until_complete(self.node, future_1, executor) + rclpy.spin_until_complete(self.node, future_2, executor) self.assertTrue(future_0.done()) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) @@ -326,13 +326,13 @@ def test_send_cancel_async(self): # Send a goal goal_future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) goal_handle = goal_future.result() # Cancel the goal cancel_future = goal_handle.cancel_goal_async() - rclpy.spin_until_future_complete(self.node, cancel_future, self.executor) + rclpy.spin_until_complete(self.node, cancel_future, self.executor) self.assertTrue(cancel_future.done()) self.assertEqual( cancel_future.result().goals_canceling[0].goal_id, @@ -347,13 +347,13 @@ def test_get_result_async(self): # Send a goal goal_future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) goal_handle = goal_future.result() # Get the goal result result_future = goal_handle.get_result_async() - rclpy.spin_until_future_complete(self.node, result_future, self.executor) + rclpy.spin_until_complete(self.node, result_future, self.executor) self.assertTrue(result_future.done()) finally: ac.destroy() diff --git a/rclpy/test/test_action_server.py b/rclpy/test/test_action_server.py index fa2af1e74..bfb1b6874 100644 --- a/rclpy/test/test_action_server.py +++ b/rclpy/test/test_action_server.py @@ -169,7 +169,7 @@ def handle_accepted_callback(goal_handle): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertTrue(future.result().accepted) self.assertTrue(handle_accepted_callback_triggered) action_server.destroy() @@ -199,7 +199,7 @@ def handle_accepted_callback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_msg.goal.order = goal_order future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertFalse(future.result().accepted) action_server.destroy() @@ -220,7 +220,7 @@ def goal_callback(goal): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # An invalid return type in the goal callback should translate to a rejected goal self.assertFalse(future.result().accepted) action_server.destroy() @@ -244,9 +244,9 @@ def test_multi_goal_accept(self): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) future2 = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future0, executor) - rclpy.spin_until_future_complete(self.node, future1, executor) - rclpy.spin_until_future_complete(self.node, future2, executor) + rclpy.spin_until_complete(self.node, future0, executor) + rclpy.spin_until_complete(self.node, future1, executor) + rclpy.spin_until_complete(self.node, future2, executor) self.assertTrue(future0.result().accepted) self.assertTrue(future1.result().accepted) @@ -270,8 +270,8 @@ def test_duplicate_goal(self): future0 = self.mock_action_client.send_goal(goal_msg) future1 = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future0, executor) - rclpy.spin_until_future_complete(self.node, future1, executor) + rclpy.spin_until_complete(self.node, future0, executor) + rclpy.spin_until_complete(self.node, future1, executor) # Exactly one of the goals should be accepted self.assertNotEqual(future0.result().accepted, future1.result().accepted) @@ -306,7 +306,7 @@ def cancel_callback(request): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, executor) + rclpy.spin_until_complete(self.node, goal_future, executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) @@ -315,7 +315,7 @@ def cancel_callback(request): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, executor) + rclpy.spin_until_complete(self.node, cancel_future, executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 1) assert all(cancel_result.goals_canceling[0].goal_id.uuid == goal_uuid.uuid) @@ -352,7 +352,7 @@ def cancel_callback(request): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, executor) + rclpy.spin_until_complete(self.node, goal_future, executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) @@ -361,7 +361,7 @@ def cancel_callback(request): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, executor) + rclpy.spin_until_complete(self.node, cancel_future, executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 0) @@ -398,7 +398,7 @@ def execute_callback(gh): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) send_goal_response = goal_future.result() self.assertTrue(send_goal_response.accepted) self.assertIsNotNone(server_goal_handle) @@ -410,7 +410,7 @@ def execute_callback(gh): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, self.executor) + rclpy.spin_until_complete(self.node, cancel_future, self.executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 1) @@ -421,7 +421,7 @@ def execute_callback(gh): # Get the result and exepect it to have canceled status get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result = get_result_future.result() self.assertEqual(result.status, GoalStatus.STATUS_CANCELED) self.assertEqual(server_goal_handle.status, GoalStatus.STATUS_CANCELED) @@ -447,12 +447,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() self.assertEqual(result_response.status, GoalStatus.STATUS_SUCCEEDED) self.assertEqual(result_response.result.sequence.tolist(), [1, 1, 2, 3, 5]) @@ -478,12 +478,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) self.assertEqual(result_response.result.sequence.tolist(), [1, 1, 2, 3, 5]) @@ -508,12 +508,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() # Goal status should default to STATUS_ABORTED self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) @@ -537,12 +537,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() # Goal status should default to STATUS_ABORTED self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) @@ -563,7 +563,7 @@ def test_expire_goals_none(self): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertEqual(1, len(action_server._goal_handles)) @@ -586,7 +586,7 @@ def test_expire_goals_one(self): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertEqual(1, len(action_server._goal_handles)) @@ -642,7 +642,7 @@ def execute_with_feedback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertIsNotNone(self.mock_action_client.feedback_msg) self.assertEqual( @@ -674,7 +674,7 @@ def execute_with_feedback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete( + rclpy.spin_until_complete( self.node, goal_future, self.executor) feedback_msg = self.mock_action_client.feedback_msg diff --git a/rclpy/test/test_callback_group.py b/rclpy/test/test_callback_group.py index 8a49c346a..c964b5553 100644 --- a/rclpy/test/test_callback_group.py +++ b/rclpy/test/test_callback_group.py @@ -118,12 +118,12 @@ def long_callback(msg): # Start the long running callback pub_trigger_long.publish(Empty()) # Spin until we are sure that the long running callback is running - executor.spin_until_future_complete(future_up) + executor.spin_until_complete(future_up) # Publish the short callback pub_trigger_short.publish(Empty()) # Wait until the long running callback ends # (due to the signal from the short one or a timeout) - executor.spin_until_future_complete(future_down) + executor.spin_until_complete(future_down) # Check if we were able to receive the short callback during the long running one self.assertTrue(received_short_callback_in_long_callback) finally: diff --git a/rclpy/test/test_client.py b/rclpy/test/test_client.py index 556c32e2d..b17161d97 100644 --- a/rclpy/test/test_client.py +++ b/rclpy/test/test_client.py @@ -119,8 +119,8 @@ def test_concurrent_calls_to_service(self): future1 = cli.call_async(GetParameters.Request()) future2 = cli.call_async(GetParameters.Request()) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) - rclpy.spin_until_future_complete(self.node, future1, executor=executor) - rclpy.spin_until_future_complete(self.node, future2, executor=executor) + rclpy.spin_until_complete(self.node, future1, executor=executor) + rclpy.spin_until_complete(self.node, future2, executor=executor) self.assertTrue(future1.result() is not None) self.assertTrue(future2.result() is not None) finally: @@ -167,7 +167,7 @@ def test_different_type_raises(self): future = cli.call_async(GetParameters.Request()) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) with self.assertRaises(TypeError): - rclpy.spin_until_future_complete(self.node, future, executor=executor) + rclpy.spin_until_complete(self.node, future, executor=executor) finally: self.node.destroy_client(cli) self.node.destroy_service(srv) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 1d3d8d975..2aede3e58 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -380,36 +380,104 @@ def test_executor_add_node(self): assert not executor.add_node(self.node) assert id(executor) == id(self.node.executor) - def test_executor_spin_until_future_complete_timeout(self): + def test_executor_spin_for(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) def timer_callback(): pass - timer = self.node.create_timer(0.003, timer_callback) + timer = self.node.create_timer(1.0, timer_callback) + + start = time.monotonic() + executor.spin_for(duration_sec=10.0) + end = time.monotonic() + self.assertGreaterEqual(end - start, 10.0) + + timer.cancel() + + def test_executor_spin_until_complete_timeout(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.1, timer_callback) # Timeout future = Future() self.assertFalse(future.done()) start = time.monotonic() - executor.spin_until_future_complete(future=future, timeout_sec=0.1) + executor.spin_until_complete(future, timeout_sec=1.0) end = time.monotonic() # Nothing is ever setting the future, so this should have waited - # at least 0.1 seconds. - self.assertGreaterEqual(end - start, 0.1) + # at least 1.0 seconds. + self.assertGreaterEqual(end - start, 1.0) + self.assertFalse(future.done()) + + start = time.monotonic() + executor.spin_until_complete(lambda: False, timeout_sec=1.0) + end = time.monotonic() + # condition is always false so this should have waited + # at least 1.0 seconds. + self.assertGreaterEqual(end - start, 1.0) self.assertFalse(future.done()) timer.cancel() - def test_executor_spin_until_future_complete_future_done(self): + def test_executor_spin_until_complete_condition_done(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) def timer_callback(): pass - timer = self.node.create_timer(0.003, timer_callback) + timer = self.node.create_timer(0.1, timer_callback) + + condition_var = False + + def set_condition(): + nonlocal condition_var + condition_var = True + + def condition(): + nonlocal condition_var + return condition_var + + # Condition complete timeout_sec > 0 + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=1.0) + self.assertTrue(condition()) + + # timeout_sec = None + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=None) + self.assertTrue(condition()) + + # Condition complete timeout < 0 + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=-1) + self.assertTrue(condition()) + + timer.cancel() + + def test_executor_spin_until_complete_future_done(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.1, timer_callback) def set_future_result(future): future.set_result('finished') @@ -419,7 +487,7 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=0.2) + executor.spin_until_complete(future, timeout_sec=1.0) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') @@ -428,7 +496,7 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=None) + executor.spin_until_complete(future, timeout_sec=None) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') @@ -437,27 +505,37 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=-1) + executor.spin_until_complete(future, timeout_sec=-1) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') timer.cancel() - def test_executor_spin_until_future_complete_do_not_wait(self): + def test_executor_spin_until_complete_do_not_wait(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) def timer_callback(): pass - timer = self.node.create_timer(0.003, timer_callback) + timer = self.node.create_timer(0.1, timer_callback) # Do not wait timeout_sec = 0 future = Future() self.assertFalse(future.done()) - executor.spin_until_future_complete(future=future, timeout_sec=0) + executor.spin_until_complete(future, timeout_sec=0) self.assertFalse(future.done()) + condition_var = False + + def condition(): + nonlocal condition_var + return condition_var + + self.assertFalse(condition()) + executor.spin_until_complete(condition, timeout_sec=0) + self.assertFalse(condition()) + timer.cancel() def test_executor_add_node_wakes_executor(self): @@ -529,11 +607,11 @@ def test_single_threaded_spin_once_until_future(self): future = Future(executor=executor) - # Setup a thread to spin_once_until_future_complete, which will spin + # Setup a thread to spin_once_until_complete, which will spin # for a maximum of 10 seconds. start = time.time() - thread = threading.Thread(target=executor.spin_once_until_future_complete, - args=(future, 10)) + thread = threading.Thread(target=executor.spin_once_until_complete, + args=(future.done, 10)) thread.start() # Mark the future as complete immediately @@ -557,11 +635,11 @@ def test_multi_threaded_spin_once_until_future(self): future = Future(executor=executor) - # Setup a thread to spin_once_until_future_complete, which will spin + # Setup a thread to spin_once_until_complete, which will spin # for a maximum of 10 seconds. start = time.time() - thread = threading.Thread(target=executor.spin_once_until_future_complete, - args=(future, 10)) + thread = threading.Thread(target=executor.spin_once_until_complete, + args=(future.done, 10)) thread.start() # Mark the future as complete immediately @@ -603,7 +681,7 @@ def timer2_callback(): executor.add_node(self.node) future = Future(executor=executor) - executor.spin_until_future_complete(future, 4) + executor.spin_until_complete(future, 4) assert count == 2 diff --git a/rclpy/test/test_logging_service.py b/rclpy/test/test_logging_service.py index ac0897cab..c3cfafa9c 100644 --- a/rclpy/test/test_logging_service.py +++ b/rclpy/test/test_logging_service.py @@ -91,7 +91,7 @@ def test_set_and_get_one_logging_level(self): set_level.level = test_log_level request.levels.append(set_level) future = set_client.call_async(request) - self.executor2.spin_until_future_complete(future, 10) + self.executor2.spin_until_complete(future, 10) self.assertTrue(future.done()) response = future.result() self.assertEqual(len(response.results), 1) @@ -107,7 +107,7 @@ def test_set_and_get_one_logging_level(self): request = GetLoggerLevels.Request() request.names = [test_log_name] future = get_client.call_async(request) - self.executor2.spin_until_future_complete(future, 10) + self.executor2.spin_until_complete(future, 10) self.assertTrue(future.done()) response = future.result() self.assertEqual(len(response.levels), 1) @@ -146,7 +146,7 @@ def test_set_and_get_multi_logging_level(self): self.assertTrue(set_client.wait_for_service(2)) future = set_client.call_async(request) - self.executor2.spin_until_future_complete(future, 10) + self.executor2.spin_until_complete(future, 10) self.assertTrue(future.done()) response = future.result() self.assertEqual(len(response.results), 3) @@ -163,7 +163,7 @@ def test_set_and_get_multi_logging_level(self): request = GetLoggerLevels.Request() request.names = [test_log_name1, test_log_name2, test_log_name3] future = get_client.call_async(request) - self.executor2.spin_until_future_complete(future, 10) + self.executor2.spin_until_complete(future, 10) self.assertTrue(future.done()) response = future.result() self.assertEqual(len(response.levels), 3) @@ -191,7 +191,7 @@ def test_set_logging_level_with_invalid_param(self): set_level.level = 12 request.levels.append(set_level) future = set_client.call_async(request) - self.executor2.spin_until_future_complete(future, 10) + self.executor2.spin_until_complete(future, 10) self.assertTrue(future.done()) response = future.result() self.assertEqual(len(response.results), 2) @@ -221,7 +221,7 @@ def test_set_logging_level_with_partial_invalid_param(self): set_level.level = 30 request.levels.append(set_level) future = set_client.call_async(request) - self.executor2.spin_until_future_complete(future, 10) + self.executor2.spin_until_complete(future, 10) self.assertTrue(future.done()) response = future.result() self.assertEqual(len(response.results), 3) diff --git a/rclpy/test/test_parameter_client.py b/rclpy/test/test_parameter_client.py index 6797a0f4a..1c1d3f8ea 100644 --- a/rclpy/test/test_parameter_client.py +++ b/rclpy/test/test_parameter_client.py @@ -59,7 +59,7 @@ def test_set_parameter(self): Parameter('int_param', Parameter.Type.INTEGER, 88).to_parameter_msg(), Parameter('string.param', Parameter.Type.STRING, 'hello world'), ]) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.results) == 2 @@ -68,7 +68,7 @@ def test_set_parameter(self): def test_get_parameter(self): future = self.client.get_parameters(['int_arr_param', 'float.param..']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.values) == 2 @@ -77,14 +77,14 @@ def test_get_parameter(self): def test_list_parameters(self): future = self.client.list_parameters() - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert 'int_arr_param' in results.result.names assert 'float.param..' in results.result.names future = self.client.list_parameters(['float.param.'], 1) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert 'int_arr_param' not in results.result.names @@ -93,7 +93,7 @@ def test_list_parameters(self): def test_describe_parameters(self): future = self.client.describe_parameters(['int_arr_param']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.descriptors) == 1 @@ -102,7 +102,7 @@ def test_describe_parameters(self): def test_get_paramter_types(self): future = self.client.get_parameter_types(['int_arr_param']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.types) == 1 @@ -113,7 +113,7 @@ def test_set_parameters_atomically(self): Parameter('int_param', Parameter.Type.INTEGER, 888), Parameter('string.param', Parameter.Type.STRING, 'Hello World').to_parameter_msg(), ]) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert results.result.successful @@ -124,7 +124,7 @@ def test_delete_parameters(self): self.target_node.declare_parameter('delete_param_dynamic', 10, descriptor=descriptor) future = self.client.delete_parameters(['delete_param']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) result = future.result() assert result is not None assert len(result.results) == 1 @@ -132,7 +132,7 @@ def test_delete_parameters(self): assert result.results[0].reason == 'Static parameter cannot be undeclared' future = self.client.delete_parameters(['delete_param_dynamic']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) result = future.result() assert result is not None assert len(result.results) == 1 @@ -150,7 +150,7 @@ def test_load_parameter_file(self): f.flush() f.close() future = self.client.load_parameter_file(f.name) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) result = future.result() assert result is not None assert len(result.results) == 2 @@ -171,7 +171,7 @@ def test_load_parameter_file_atomically(self): f.flush() f.close() future = self.client.load_parameter_file_atomically(f.name) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) result = future.result() assert result is not None assert result.result.successful @@ -184,7 +184,7 @@ def test_get_uninitialized_parameter(self): # The type in description should be STRING future = self.client.describe_parameters(['uninitialized_parameter']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.descriptors) == 1 @@ -193,7 +193,7 @@ def test_get_uninitialized_parameter(self): # The value should be empty future = self.client.get_parameters(['uninitialized_parameter']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert results.values == [] @@ -214,7 +214,7 @@ def on_new_parameter_event(msg): future = self.client.set_parameters([ Parameter('int_param', Parameter.Type.INTEGER, 88).to_parameter_msg(), ]) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.results) == 1 @@ -227,7 +227,7 @@ def test_on_parameter_event_changed(self): future = self.client.set_parameters([ Parameter('int_param', Parameter.Type.INTEGER, 88).to_parameter_msg(), ]) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.results) == 1 @@ -247,7 +247,7 @@ def on_changed_parameter_event(msg): future = self.client.set_parameters([ Parameter('int_param', Parameter.Type.INTEGER, 99).to_parameter_msg(), ]) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.results) == 1 @@ -260,7 +260,7 @@ def test_on_parameter_event_deleted(self): future = self.client.set_parameters([ Parameter('int_param', Parameter.Type.INTEGER, 88).to_parameter_msg(), ]) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.results) == 1 @@ -277,7 +277,7 @@ def on_deleted_parameter_event(msg): param_event_sub = self.client.on_parameter_event(on_deleted_parameter_event) future = self.client.delete_parameters(['int_param']) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.results) == 1 diff --git a/rclpy/test/test_parameter_service.py b/rclpy/test/test_parameter_service.py index fc036e600..06ecc0857 100644 --- a/rclpy/test/test_parameter_service.py +++ b/rclpy/test/test_parameter_service.py @@ -59,7 +59,7 @@ def test_get_uninitialized_parameter(self): request = DescribeParameters.Request() request.names = ['uninitialized_parameter'] future = self.describe_parameters_client.call_async(request) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert len(results.descriptors) == 1 @@ -70,7 +70,7 @@ def test_get_uninitialized_parameter(self): request = GetParameters.Request() request.names = ['uninitialized_parameter'] future = self.get_parameter_client.call_async(request) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) results = future.result() assert results is not None assert results.values == [] diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py index 09359ae30..db1bca893 100644 --- a/rclpy/test/test_qos_event.py +++ b/rclpy/test/test_qos_event.py @@ -177,7 +177,7 @@ def warn(self, message, once=False): EmptyMsg, self.topic_name, message_callback, qos_profile_subscription) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) - rclpy.spin_until_future_complete(self.node, log_msgs_future, executor, 10.0) + rclpy.spin_until_complete(self.node, log_msgs_future, executor, 10.0) self.assertEqual( pub_log_msg, diff --git a/rclpy/test/test_rosout_subscription.py b/rclpy/test/test_rosout_subscription.py index f6332f973..214e55864 100644 --- a/rclpy/test/test_rosout_subscription.py +++ b/rclpy/test/test_rosout_subscription.py @@ -54,37 +54,37 @@ def test_parent_log(self): self.rosout_msg_name = 'test_rosout_subscription' logger = self.node.get_logger() logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) def test_child_log(self): self.rosout_msg_name = 'test_rosout_subscription.child1' logger = self.node.get_logger() logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertFalse(self.fut.done()) logger = self.node.get_logger().get_child('child1') logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) self.fut = Future() logger = self.node.get_logger().get_child('child2') logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertFalse(self.fut.done()) self.rosout_msg_name = 'test_rosout_subscription.child2' logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) def test_child_hierarchy(self): self.rosout_msg_name = 'test_rosout_subscription.child.grandchild' logger = self.node.get_logger().get_child('child').get_child('grandchild') logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) def test_first_child_removed(self): @@ -92,11 +92,11 @@ def test_first_child_removed(self): logger = self.node.get_logger().get_child('child') logger2 = self.node.get_logger().get_child('child') logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) logger = None logger2.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) def test_logger_parameter(self): @@ -107,11 +107,11 @@ def call_logger(logger): logger1 = logger logger1.info('test') call_logger(logger) - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) logger.info('test') - self.executor.spin_until_future_complete(self.fut, 3) + self.executor.spin_until_complete(self.fut, 3) self.assertTrue(self.fut.done()) def test_logger_rosout_disabled_without_exception(self): diff --git a/rclpy/test/test_service_introspection.py b/rclpy/test/test_service_introspection.py index 909a5e776..8592f3eed 100644 --- a/rclpy/test/test_service_introspection.py +++ b/rclpy/test/test_service_introspection.py @@ -70,7 +70,7 @@ def test_service_introspection_metadata(self): ServiceIntrospectionState.METADATA) future = self.cli.call_async(req) - self.executor.spin_until_future_complete(future, timeout_sec=5.0) + self.executor.spin_until_complete(future, timeout_sec=5.0) self.assertTrue(future.done()) start = time.monotonic() @@ -116,7 +116,7 @@ def test_service_introspection_contents(self): ServiceIntrospectionState.CONTENTS) future = self.cli.call_async(req) - self.executor.spin_until_future_complete(future, timeout_sec=5.0) + self.executor.spin_until_complete(future, timeout_sec=5.0) self.assertTrue(future.done()) start = time.monotonic() diff --git a/rclpy/test/test_type_description_service.py b/rclpy/test/test_type_description_service.py index e3f7a7c69..e88787975 100644 --- a/rclpy/test/test_type_description_service.py +++ b/rclpy/test/test_type_description_service.py @@ -58,7 +58,7 @@ def test_get_type_description(self): type_hash=type_hash, include_type_sources=True) future = self.get_type_description_client.call_async(request) - self.executor.spin_until_future_complete(future) + self.executor.spin_until_complete(future) response = future.result() assert response is not None assert response.successful diff --git a/rclpy/test/test_waitable.py b/rclpy/test/test_waitable.py index 3a8f28583..f9f4a4941 100644 --- a/rclpy/test/test_waitable.py +++ b/rclpy/test/test_waitable.py @@ -298,7 +298,7 @@ def tearDownClass(cls): def start_spin_thread(self, waitable): waitable.future = Future(executor=self.executor) self.thr = threading.Thread( - target=self.executor.spin_until_future_complete, args=(waitable.future,), daemon=True) + target=self.executor.spin_until_complete, args=(waitable.future,), daemon=True) self.thr.start() return self.thr