diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index f22cca0d1..8608a4642 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -14,72 +14,113 @@ import threading import time +from types import TracebackType from typing import Any +from typing import Callable +from typing import Dict +from typing import Generic +from typing import Optional +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING from typing import TypedDict +from typing import TypeVar + import uuid import weakref from action_msgs.msg import GoalStatus +from action_msgs.msg._goal_status_array import GoalStatusArray from action_msgs.srv import CancelGoal - +from builtin_interfaces.msg import Time from rclpy.executors import await_or_execute from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.qos import qos_profile_action_status_default from rclpy.qos import qos_profile_services_default from rclpy.qos import QoSProfile from rclpy.task import Future +from rclpy.type_support import Action from rclpy.type_support import check_for_type_support +from rclpy.type_support import FeedbackMessage +from rclpy.type_support import FeedbackT +from rclpy.type_support import GetResultServiceResponse +from rclpy.type_support import GoalT +from rclpy.type_support import ResultT +from rclpy.type_support import SendGoalServiceResponse from rclpy.waitable import NumberOfEntities, Waitable - from unique_identifier_msgs.msg import UUID +if TYPE_CHECKING: + from rclpy.node import Node + from rclpy.callback_groups import CallbackGroup + from typing_extensions import Unpack, TypeAlias + + ClientGoalHandleDictResultT = TypeVar('ClientGoalHandleDictResultT') + ClientGoalHandleDictFeedbackT = TypeVar('ClientGoalHandleDictFeedbackT') + + class ClientGoalHandleDict(TypedDict, + Generic[ClientGoalHandleDictResultT, ClientGoalHandleDictFeedbackT], + total=False): + goal: Tuple[int, SendGoalServiceResponse] + cancel: Tuple[int, CancelGoal.Response] + result: Tuple[int, GetResultServiceResponse[ClientGoalHandleDictResultT]] + feedback: FeedbackMessage[ClientGoalHandleDictFeedbackT] + status: GoalStatusArray +else: + ClientGoalHandleDict: 'TypeAlias' = Dict[str, object] + + +T = TypeVar('T') -class ClientGoalHandleDict(TypedDict, total=False): - goal: Any - cancel: Any - result: Any - feedback: Any - status: Any +class SendGoalKWargs(TypedDict): + feedback_callback: Optional[Callable[[FeedbackT], None]] + goal_uuid: Optional[UUID] -class ClientGoalHandle(): + +class ClientGoalHandle(Generic[GoalT, ResultT, FeedbackT]): """Goal handle for working with Action Clients.""" - def __init__(self, action_client, goal_id, goal_response): + def __init__(self, action_client: 'ActionClient[GoalT, ResultT, FeedbackT]', + goal_id: UUID, goal_response: SendGoalServiceResponse): self._action_client = action_client self._goal_id = goal_id self._goal_response = goal_response self._status = GoalStatus.STATUS_UNKNOWN - def __eq__(self, other): + def __eq__(self, other: object) -> bool: + if not isinstance(other, ClientGoalHandle): + return False return self._goal_id == other.goal_id - def __ne__(self, other): + def __ne__(self, other: object) -> bool: + if not isinstance(other, ClientGoalHandle): + return True return self._goal_id != other.goal_id - def __repr__(self): + def __repr__(self) -> str: return 'ClientGoalHandle '.format( self.goal_id.uuid, self.accepted, self.status) @property - def goal_id(self): + def goal_id(self) -> UUID: return self._goal_id @property - def stamp(self): + def stamp(self) -> Time: return self._goal_response.stamp @property - def accepted(self): + def accepted(self) -> bool: return self._goal_response.accepted @property - def status(self): + def status(self) -> int: return self._status - def cancel_goal(self): + def cancel_goal(self) -> Optional[CancelGoal.Response]: """ Send a cancel request for the goal and wait for the response. @@ -89,7 +130,7 @@ def cancel_goal(self): """ return self._action_client._cancel_goal(self) - def cancel_goal_async(self): + def cancel_goal_async(self) -> Future[CancelGoal.Response]: """ Asynchronous request for the goal be canceled. @@ -98,7 +139,7 @@ def cancel_goal_async(self): """ return self._action_client._cancel_goal_async(self) - def get_result(self): + def get_result(self) -> Optional[GetResultServiceResponse[ResultT]]: """ Request the result for the goal and wait for the response. @@ -108,7 +149,7 @@ def get_result(self): """ return self._action_client._get_result(self) - def get_result_async(self): + def get_result_async(self) -> Future[GetResultServiceResponse[ResultT]]: """ Asynchronously request the goal result. @@ -118,22 +159,23 @@ def get_result_async(self): return self._action_client._get_result_async(self) -class ActionClient(Waitable[ClientGoalHandleDict]): +class ActionClient(Generic[GoalT, ResultT, FeedbackT], + Waitable['ClientGoalHandleDict[ResultT, FeedbackT]']): """ROS Action client.""" def __init__( self, - node, - action_type, - action_name, + node: 'Node', + action_type: Type[Action[GoalT, ResultT, FeedbackT]], + action_name: str, *, - callback_group=None, - goal_service_qos_profile=qos_profile_services_default, - result_service_qos_profile=qos_profile_services_default, - cancel_service_qos_profile=qos_profile_services_default, - feedback_sub_qos_profile=QoSProfile(depth=10), - status_sub_qos_profile=qos_profile_action_status_default - ): + callback_group: 'Optional[CallbackGroup]' = None, + goal_service_qos_profile: QoSProfile = qos_profile_services_default, + result_service_qos_profile: QoSProfile = qos_profile_services_default, + cancel_service_qos_profile: QoSProfile = qos_profile_services_default, + feedback_sub_qos_profile: QoSProfile = QoSProfile(depth=10), + status_sub_qos_profile: QoSProfile = qos_profile_action_status_default + ) -> None: """ Create an ActionClient. @@ -174,28 +216,34 @@ def __init__( self._is_ready = False # key: UUID in bytes, value: weak reference to ClientGoalHandle - self._goal_handles = {} + self._goal_handles: Dict[bytes, + weakref.ReferenceType[ClientGoalHandle[GoalT, + ResultT, + FeedbackT]]] = {} # key: goal request sequence_number, value: Future for goal response - self._pending_goal_requests = {} + self._pending_goal_requests: Dict[int, Future[ClientGoalHandle[GoalT, + ResultT, + FeedbackT]]] = {} # key: goal request sequence_number, value: UUID - self._goal_sequence_number_to_goal_id = {} + self._goal_sequence_number_to_goal_id: Dict[int, UUID] = {} # key: cancel request sequence number, value: Future for cancel response - self._pending_cancel_requests = {} + self._pending_cancel_requests: Dict[int, Future[CancelGoal.Response]] = {} # key: result request sequence number, value: Future for result response - self._pending_result_requests = {} + self._pending_result_requests: Dict[int, Future[GetResultServiceResponse[ResultT]]] = {} # key: result request sequence_number, value: UUID - self._result_sequence_number_to_goal_id = {} + self._result_sequence_number_to_goal_id: Dict[int, UUID] = {} # key: UUID in bytes, value: callback function - self._feedback_callbacks = {} + self._feedback_callbacks: Dict[bytes, Callable[[FeedbackT], None]] = {} callback_group.add_entity(self) self._node.add_waitable(self) self._logger = self._node.get_logger().get_child('action_client') - def _generate_random_uuid(self): + def _generate_random_uuid(self) -> UUID: return UUID(uuid=list(uuid.uuid4().bytes)) - def _remove_pending_request(self, future, pending_requests): + def _remove_pending_request(self, future: Future[T], pending_requests: Dict[int, Future[T]] + ) -> Optional[int]: """ Remove a future from the list of pending requests. @@ -219,15 +267,18 @@ def _remove_pending_request(self, future, pending_requests): return seq return None - def _remove_pending_goal_request(self, future): + def _remove_pending_goal_request(self, + future: Future[ClientGoalHandle[GoalT, ResultT, FeedbackT]] + ) -> None: seq = self._remove_pending_request(future, self._pending_goal_requests) if seq in self._goal_sequence_number_to_goal_id: del self._goal_sequence_number_to_goal_id[seq] - def _remove_pending_cancel_request(self, future): + def _remove_pending_cancel_request(self, future: Future[CancelGoal.Response]) -> None: self._remove_pending_request(future, self._pending_cancel_requests) - def _remove_pending_result_request(self, future): + def _remove_pending_result_request(self, future: Future[GetResultServiceResponse[ResultT]] + ) -> None: seq = self._remove_pending_request(future, self._pending_result_requests) if seq in self._result_sequence_number_to_goal_id: goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid) @@ -237,7 +288,7 @@ def _remove_pending_result_request(self, future): del self._feedback_callbacks[goal_uuid] # Start Waitable API - def is_ready(self, wait_set): + def is_ready(self, wait_set: _rclpy.WaitSet) -> bool: """Return True if one or more entities are ready in the wait set.""" ready_entities = self._client_handle.is_ready(wait_set) self._is_feedback_ready = ready_entities[0] @@ -247,9 +298,9 @@ def is_ready(self, wait_set): self._is_result_response_ready = ready_entities[4] return any(ready_entities) - def take_data(self) -> ClientGoalHandleDict: + def take_data(self) -> 'ClientGoalHandleDict[ResultT, FeedbackT]': """Take stuff from lower level so the wait set doesn't immediately wake again.""" - data: ClientGoalHandleDict = {} + data: 'ClientGoalHandleDict[ResultT, FeedbackT]' = {} if self._is_goal_response_ready: taken_data = self._client_handle.take_goal_response( self._action_type.Impl.SendGoalService.Response) @@ -287,7 +338,7 @@ def take_data(self) -> ClientGoalHandleDict: return data - async def execute(self, taken_data: ClientGoalHandleDict) -> None: + async def execute(self, taken_data: 'ClientGoalHandleDict[ResultT, FeedbackT]') -> None: """ Execute work after data has been taken from a ready wait set. @@ -350,9 +401,9 @@ async def execute(self, taken_data: ClientGoalHandleDict) -> None: status = status_msg.status if goal_uuid in self._goal_handles: - goal_handle = self._goal_handles[goal_uuid]() - if goal_handle is not None: - goal_handle._status = status + status_goal_handle = self._goal_handles[goal_uuid]() + if status_goal_handle is not None: + status_goal_handle._status = status # Remove "done" goals from the list if (GoalStatus.STATUS_SUCCEEDED == status or GoalStatus.STATUS_CANCELED == status or @@ -362,24 +413,26 @@ async def execute(self, taken_data: ClientGoalHandleDict) -> None: # Weak reference is None del self._goal_handles[goal_uuid] - def get_num_entities(self): + def get_num_entities(self) -> NumberOfEntities: """Return number of each type of entity used in the wait set.""" num_entities = self._client_handle.get_num_entities() return NumberOfEntities(*num_entities) - def add_to_wait_set(self, wait_set): + def add_to_wait_set(self, wait_set: _rclpy.WaitSet) -> None: """Add entities to wait set.""" self._client_handle.add_to_waitset(wait_set) - def __enter__(self): - return self._client_handle.__enter__() + def __enter__(self) -> None: + self._client_handle.__enter__() - def __exit__(self, t, v, tb): + def __exit__(self, t: Optional[Type[BaseException]], v: Optional[BaseException], + tb: Optional[TracebackType]) -> None: self._client_handle.__exit__(t, v, tb) # End Waitable API - def send_goal(self, goal, **kwargs): + def send_goal(self, goal: GoalT, **kwargs: 'Unpack[SendGoalKWargs]' + ) -> Optional[GetResultServiceResponse[ResultT]]: """ Send a goal and wait for the result. @@ -403,7 +456,7 @@ def send_goal(self, goal, **kwargs): event = threading.Event() - def unblock(future): + def unblock(future: Future[Any]) -> None: nonlocal event event.set() @@ -411,16 +464,25 @@ def unblock(future): send_goal_future.add_done_callback(unblock) event.wait() - if send_goal_future.exception() is not None: - raise send_goal_future.exception() + exception = send_goal_future.exception() + if exception is not None: + raise exception goal_handle = send_goal_future.result() + if not isinstance(goal_handle, ClientGoalHandle): + raise TypeError( + 'Expected type ClientGoalHandle but received {}'.format(type(goal_handle))) result = self._get_result(goal_handle) return result - def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None): + def send_goal_async( + self, + goal: GoalT, + feedback_callback: Optional[Callable[[FeedbackT], None]] = None, + goal_uuid: Optional[UUID] = None + ) -> Future[ClientGoalHandle[GoalT, ResultT, FeedbackT]]: """ Send a goal and asynchronously get the result. @@ -457,7 +519,7 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None): goal_uuid = bytes(request.goal_id.uuid) self._feedback_callbacks[goal_uuid] = feedback_callback - future = Future() + future: Future[ClientGoalHandle[GoalT, ResultT, FeedbackT]] = Future() self._pending_goal_requests[sequence_number] = future self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id future.add_done_callback(self._remove_pending_goal_request) @@ -466,7 +528,8 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None): return future - def _cancel_goal(self, goal_handle): + def _cancel_goal(self, goal_handle: ClientGoalHandle[GoalT, ResultT, FeedbackT] + ) -> Optional[CancelGoal.Response]: """ Send a cancel request for an active goal and wait for the response. @@ -478,7 +541,7 @@ def _cancel_goal(self, goal_handle): """ event = threading.Event() - def unblock(future): + def unblock(future: Future[Any]) -> None: nonlocal event event.set() @@ -486,11 +549,15 @@ def unblock(future): future.add_done_callback(unblock) event.wait() - if future.exception() is not None: - raise future.exception() + exception = future.exception() + if exception is not None: + raise exception return future.result() - def _cancel_goal_async(self, goal_handle): + def _cancel_goal_async( + self, + goal_handle: ClientGoalHandle[GoalT, ResultT, FeedbackT] + ) -> Future[CancelGoal.Response]: """ Send a cancel request for an active goal and asynchronously get the result. @@ -510,7 +577,7 @@ def _cancel_goal_async(self, goal_handle): raise RuntimeError( 'Sequence ({}) conflicts with pending cancel request'.format(sequence_number)) - future = Future() + future: Future[CancelGoal.Response] = Future() self._pending_cancel_requests[sequence_number] = future future.add_done_callback(self._remove_pending_cancel_request) # Add future so executor is aware @@ -518,7 +585,8 @@ def _cancel_goal_async(self, goal_handle): return future - def _get_result(self, goal_handle): + def _get_result(self, goal_handle: ClientGoalHandle[GoalT, ResultT, FeedbackT] + ) -> Optional[GetResultServiceResponse[ResultT]]: """ Request the result for an active goal and wait for the response. @@ -530,7 +598,7 @@ def _get_result(self, goal_handle): """ event = threading.Event() - def unblock(future): + def unblock(future: Future[Any]) -> None: nonlocal event event.set() @@ -538,11 +606,13 @@ def unblock(future): future.add_done_callback(unblock) event.wait() - if future.exception() is not None: - raise future.exception() + exception = future.exception() + if exception is not None: + raise exception return future.result() - def _get_result_async(self, goal_handle): + def _get_result_async(self, goal_handle: ClientGoalHandle[GoalT, ResultT, FeedbackT] + ) -> Future[GetResultServiceResponse[ResultT]]: """ Request the result for an active goal asynchronously. @@ -562,7 +632,7 @@ def _get_result_async(self, goal_handle): raise RuntimeError( 'Sequence ({}) conflicts with pending result request'.format(sequence_number)) - future = Future() + future: Future[GetResultServiceResponse[ResultT]] = Future() self._pending_result_requests[sequence_number] = future self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id future.add_done_callback(self._remove_pending_result_request) @@ -571,7 +641,7 @@ def _get_result_async(self, goal_handle): return future - def server_is_ready(self): + def server_is_ready(self) -> bool: """ Check if there is an action server ready to process requests from this client. @@ -580,7 +650,7 @@ def server_is_ready(self): with self._node.handle: return self._client_handle.is_action_server_available() - def wait_for_server(self, timeout_sec=None): + def wait_for_server(self, timeout_sec: Optional[float] = None) -> bool: """ Wait for an action sever to be ready. @@ -601,7 +671,7 @@ def wait_for_server(self, timeout_sec=None): return self.server_is_ready() - def destroy(self): + def destroy(self) -> None: """Destroy the underlying action client handle.""" self._client_handle.destroy_when_not_in_use() self._node.remove_waitable(self) diff --git a/rclpy/rclpy/action/server.py b/rclpy/rclpy/action/server.py index 02e498678..81b311616 100644 --- a/rclpy/rclpy/action/server.py +++ b/rclpy/rclpy/action/server.py @@ -17,18 +17,44 @@ import threading import traceback -from typing import Any, TypedDict +from types import TracebackType +from typing import (Any, Callable, Dict, Generic, Literal, Optional, Tuple, Type, + TYPE_CHECKING, TypedDict, TypeVar) -from action_msgs.msg import GoalInfo, GoalStatus +from action_msgs.msg import GoalInfo, GoalStatus +from action_msgs.srv._cancel_goal import CancelGoal from rclpy.executors import await_or_execute from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.qos import qos_profile_action_status_default from rclpy.qos import qos_profile_services_default from rclpy.qos import QoSProfile from rclpy.task import Future -from rclpy.type_support import check_for_type_support +from rclpy.type_support import (Action, check_for_type_support, FeedbackMessage, FeedbackT, + GetResultServiceRequest, GetResultServiceResponse, GoalT, ResultT, + SendGoalServiceRequest) from rclpy.waitable import NumberOfEntities, Waitable +from unique_identifier_msgs.msg import UUID + + +if TYPE_CHECKING: + from typing_extensions import TypeAlias + + from rclpy.callback_groups import CallbackGroup + from rclpy.node import Node + + ServerGoalHandleDictGoalT = TypeVar('ServerGoalHandleDictGoalT') + + class ServerGoalHandleDict(TypedDict, + Generic[ServerGoalHandleDictGoalT], + total=False): + goal: Tuple['_rclpy.rmw_request_id_t', SendGoalServiceRequest[ServerGoalHandleDictGoalT]] + cancel: Tuple['_rclpy.rmw_request_id_t', CancelGoal.Request] + result: Tuple['_rclpy.rmw_request_id_t', GetResultServiceRequest] + expired: Tuple[GoalInfo, ...] +else: + ServerGoalHandleDict: 'TypeAlias' = Dict[str, object] + # Re-export exception defined in _rclpy C extension. RCLError = _rclpy.RCLError @@ -51,17 +77,15 @@ class CancelResponse(Enum): GoalEvent = _rclpy.GoalEvent -class ServerGoalHandleDict(TypedDict, total=False): - goal: Any - cancel: Any - result: Any - expired: Any - - -class ServerGoalHandle: +class ServerGoalHandle(Generic[GoalT, ResultT, FeedbackT]): """Goal handle for working with Action Servers.""" - def __init__(self, action_server, goal_info, goal_request): + def __init__( + self, + action_server: 'ActionServer[GoalT, ResultT, FeedbackT]', + goal_info: GoalInfo, + goal_request: GoalT + ) -> None: """ Accept a new goal with the given action server. @@ -81,39 +105,43 @@ def __init__(self, action_server, goal_info, goal_request): self._cancel_requested = False self._lock = threading.Lock() - def __eq__(self, other): - return self.goal_id == other.goal_id + def __eq__(self, other: object) -> bool: + if isinstance(other, ServerGoalHandle): + return self.goal_id == other.goal_id + return False - def __ne__(self, other): - return self.goal_id != other.goal_id + def __ne__(self, other: object) -> bool: + if isinstance(other, ServerGoalHandle): + return self.goal_id != other.goal_id + return True @property - def request(self): + def request(self) -> GoalT: return self._goal_request @property - def goal_id(self): + def goal_id(self) -> UUID: return self._goal_info.goal_id @property - def is_active(self): + def is_active(self) -> bool: with self._lock: if self._goal_handle is None: return False return self._goal_handle.is_active() @property - def is_cancel_requested(self): + def is_cancel_requested(self) -> bool: return GoalStatus.STATUS_CANCELING == self.status @property - def status(self): + def status(self) -> int: with self._lock: if self._goal_handle is None: return GoalStatus.STATUS_UNKNOWN return self._goal_handle.get_status() - def _update_state(self, event): + def _update_state(self, event: _rclpy.GoalEvent) -> None: with self._lock: # Ignore updates for already destructed goal handles if self._goal_handle is None: @@ -129,7 +157,11 @@ def _update_state(self, event): if not self._goal_handle.is_active(): self._action_server.notify_goal_done() - def execute(self, execute_callback=None): + def execute( + self, + execute_callback: Optional[Callable[['ServerGoalHandle[GoalT, ResultT, FeedbackT]'], + ResultT]] = None + ) -> None: # It's possible that there has been a request to cancel the goal prior to executing. # In this case we want to avoid the illegal state transition to EXECUTING # but still call the users execute callback to let them handle canceling the goal. @@ -137,7 +169,7 @@ def execute(self, execute_callback=None): self._update_state(_rclpy.GoalEvent.EXECUTE) self._action_server.notify_execute(self, execute_callback) - def publish_feedback(self, feedback): + def publish_feedback(self, feedback: FeedbackMessage[FeedbackT]) -> None: if not isinstance(feedback, self._action_server.action_type.Feedback): raise TypeError() @@ -155,16 +187,16 @@ def publish_feedback(self, feedback): # Publish self._action_server._handle.publish_feedback(feedback_message) - def succeed(self): + def succeed(self) -> None: self._update_state(_rclpy.GoalEvent.SUCCEED) - def abort(self): + def abort(self) -> None: self._update_state(_rclpy.GoalEvent.ABORT) - def canceled(self): + def canceled(self) -> None: self._update_state(_rclpy.GoalEvent.CANCELED) - def destroy(self): + def destroy(self) -> None: with self._lock: if self._goal_handle is None: return @@ -172,42 +204,47 @@ def destroy(self): self._goal_handle = None -def default_handle_accepted_callback(goal_handle): +def default_handle_accepted_callback(goal_handle: ServerGoalHandle[Any, Any, Any]) -> None: """Execute the goal.""" goal_handle.execute() -def default_goal_callback(goal_request): +def default_goal_callback( + goal_request: SendGoalServiceRequest[Any] +) -> Literal[GoalResponse.ACCEPT]: """Accept all goals.""" return GoalResponse.ACCEPT -def default_cancel_callback(cancel_request): +def default_cancel_callback(cancel_request: CancelGoal.Request) -> Literal[CancelResponse.REJECT]: """No cancellations.""" return CancelResponse.REJECT -class ActionServer(Waitable[ServerGoalHandleDict]): +class ActionServer(Generic[GoalT, ResultT, FeedbackT], Waitable['ServerGoalHandleDict[GoalT]']): """ROS Action server.""" def __init__( self, - node, - action_type, - action_name, - execute_callback, + node: 'Node', + action_type: Type[Action[GoalT, ResultT, FeedbackT]], + action_name: str, + execute_callback: Callable[[ServerGoalHandle[GoalT, ResultT, FeedbackT]], ResultT], *, - callback_group=None, - goal_callback=default_goal_callback, - handle_accepted_callback=default_handle_accepted_callback, - cancel_callback=default_cancel_callback, - goal_service_qos_profile=qos_profile_services_default, - result_service_qos_profile=qos_profile_services_default, - cancel_service_qos_profile=qos_profile_services_default, - feedback_pub_qos_profile=QoSProfile(depth=10), - status_pub_qos_profile=qos_profile_action_status_default, - result_timeout=10 - ): + callback_group: 'Optional[CallbackGroup]' = None, + goal_callback: Callable[[CancelGoal.Request], GoalResponse] = default_goal_callback, + handle_accepted_callback: Callable[[ServerGoalHandle[GoalT, + ResultT, + FeedbackT]], + None] = default_handle_accepted_callback, + cancel_callback: Callable[[CancelGoal.Request], CancelResponse] = default_cancel_callback, + goal_service_qos_profile: QoSProfile = qos_profile_services_default, + result_service_qos_profile: QoSProfile = qos_profile_services_default, + cancel_service_qos_profile: QoSProfile = qos_profile_services_default, + feedback_pub_qos_profile: QoSProfile = QoSProfile(depth=10), + status_pub_qos_profile: QoSProfile = qos_profile_action_status_default, + result_timeout: int = 10 + ) -> None: """ Create an ActionServer. @@ -263,16 +300,20 @@ def __init__( ) # key: UUID in bytes, value: GoalHandle - self._goal_handles = {} + self._goal_handles: Dict[bytes, ServerGoalHandle[GoalT, ResultT, FeedbackT]] = {} # key: UUID in bytes, value: Future - self._result_futures = {} + self._result_futures: Dict[bytes, Future[GetResultServiceResponse[ResultT]]] = {} callback_group.add_entity(self) self._node.add_waitable(self) self._logger = self._node.get_logger().get_child('action_server') - async def _execute_goal_request(self, request_header_and_message): + async def _execute_goal_request( + self, + request_header_and_message: Tuple['_rclpy.rmw_request_id_t', + SendGoalServiceRequest[GoalT]] + ) -> None: request_header, goal_request = request_header_and_message goal_uuid = goal_request.goal_id goal_info = GoalInfo() @@ -333,7 +374,11 @@ async def _execute_goal_request(self, request_header_and_message): # Provide the user a reference to the goal handle await await_or_execute(self._handle_accepted_callback, goal_handle) - async def _execute_goal(self, execute_callback, goal_handle): + async def _execute_goal( + self, + execute_callback: Callable[[ServerGoalHandle[GoalT, ResultT, FeedbackT]], ResultT], + goal_handle: ServerGoalHandle[GoalT, ResultT, FeedbackT] + ) -> None: goal_uuid = goal_handle.goal_id.uuid self._logger.debug('Executing goal with ID {0}'.format(goal_uuid)) @@ -361,7 +406,10 @@ async def _execute_goal(self, execute_callback, goal_handle): result_response.result = execute_result self._result_futures[bytes(goal_uuid)].set_result(result_response) - async def _execute_cancel_request(self, request_header_and_message): + async def _execute_cancel_request( + self, + request_header_and_message: Tuple['_rclpy.rmw_request_id_t', CancelGoal.Request] + ) -> None: request_header, cancel_request = request_header_and_message self._logger.debug('Cancel request received: {0}'.format(cancel_request)) @@ -403,7 +451,10 @@ async def _execute_cancel_request(self, request_header_and_message): except RCLError: self._logger.warn('Failed to send cancel response (the client may have gone away)') - async def _execute_get_result_request(self, request_header_and_message): + async def _execute_get_result_request( + self, + request_header_and_message: Tuple['_rclpy.rmw_request_id_t', GetResultServiceRequest] + ) -> None: request_header, result_request = request_header_and_message goal_uuid = result_request.goal_id.uuid @@ -425,7 +476,7 @@ async def _execute_get_result_request(self, request_header_and_message): self._result_futures[bytes(goal_uuid)].add_done_callback( functools.partial(self._send_result_response, request_header)) - async def _execute_expire_goals(self, expired_goals): + async def _execute_expire_goals(self, expired_goals: Tuple[GoalInfo, ...]) -> None: for goal in expired_goals: goal_uuid = bytes(goal.goal_id.uuid) self._goal_handles[goal_uuid].destroy() @@ -433,7 +484,11 @@ async def _execute_expire_goals(self, expired_goals): self.remove_future(self._result_futures[goal_uuid]) del self._result_futures[goal_uuid] - def _send_result_response(self, request_header, future): + def _send_result_response( + self, + request_header: '_rclpy.rmw_request_id_t', + future: Future[GetResultServiceResponse[ResultT]] + ) -> None: try: # If the client goes away anytime before this, sending the result response may fail. # Catch the exception here and go on so we don't crash. @@ -442,11 +497,11 @@ def _send_result_response(self, request_header, future): self._logger.warn('Failed to send result response (the client may have gone away)') @property - def action_type(self): + def action_type(self) -> Type[Action[GoalT, ResultT, FeedbackT]]: return self._action_type # Start Waitable API - def is_ready(self, wait_set): + def is_ready(self, wait_set: _rclpy.WaitSet) -> bool: """Return True if one or more entities are ready in the wait set.""" with self._lock: ready_entities = self._handle.is_ready(wait_set) @@ -456,9 +511,9 @@ def is_ready(self, wait_set): self._is_goal_expired = ready_entities[3] return any(ready_entities) - def take_data(self) -> ServerGoalHandleDict: + def take_data(self) -> 'ServerGoalHandleDict[GoalT]': """Take stuff from lower level so the wait set doesn't immediately wake again.""" - data: ServerGoalHandleDict = {} + data: 'ServerGoalHandleDict[GoalT]' = {} if self._is_goal_request_ready: with self._lock: taken_data = self._handle.take_goal_request( @@ -492,7 +547,7 @@ def take_data(self) -> ServerGoalHandleDict: return data - async def execute(self, taken_data: ServerGoalHandleDict) -> None: + async def execute(self, taken_data: 'ServerGoalHandleDict[GoalT]') -> None: """ Execute work after data has been taken from a ready wait set. @@ -511,7 +566,7 @@ async def execute(self, taken_data: ServerGoalHandleDict) -> None: if 'expired' in taken_data: await self._execute_expire_goals(taken_data['expired']) - def get_num_entities(self): + def get_num_entities(self) -> NumberOfEntities: """Return number of each type of entity used in the wait set.""" num_entities = self._handle.get_num_entities() return NumberOfEntities( @@ -521,20 +576,26 @@ def get_num_entities(self): num_entities[3], num_entities[4]) - def add_to_wait_set(self, wait_set): + def add_to_wait_set(self, wait_set: _rclpy.WaitSet) -> None: """Add entities to wait set.""" with self._lock: self._handle.add_to_waitset(wait_set) - def __enter__(self): - return self._handle.__enter__() + def __enter__(self) -> None: + self._handle.__enter__() - def __exit__(self, t, v, tb): + def __exit__(self, t: Optional[Type[BaseException]], + v: Optional[BaseException], tb: Optional[TracebackType]) -> None: self._handle.__exit__(t, v, tb) # End Waitable API - def notify_execute(self, goal_handle, execute_callback): + def notify_execute( + self, + goal_handle: ServerGoalHandle[GoalT, ResultT, FeedbackT], + execute_callback: Optional[Callable[[ServerGoalHandle[GoalT, ResultT, FeedbackT]], + ResultT]] + ) -> None: # Use provided callback, defaulting to a previously registered callback if execute_callback is None: if self._execute_callback is None: @@ -542,13 +603,18 @@ def notify_execute(self, goal_handle, execute_callback): execute_callback = self._execute_callback # Schedule user callback for execution - self._node.executor.create_task(self._execute_goal, execute_callback, goal_handle) + if self._node.executor: + self._node.executor.create_task(self._execute_goal, execute_callback, goal_handle) - def notify_goal_done(self): + def notify_goal_done(self) -> None: with self._lock: self._handle.notify_goal_done() - def register_handle_accepted_callback(self, handle_accepted_callback): + def register_handle_accepted_callback( + self, + handle_accepted_callback: Optional[Callable[[ + ServerGoalHandle[GoalT, ResultT, FeedbackT]], None]] + ) -> None: """ Register a callback for handling newly accepted goals. @@ -569,7 +635,10 @@ def register_handle_accepted_callback(self, handle_accepted_callback): handle_accepted_callback = default_handle_accepted_callback self._handle_accepted_callback = handle_accepted_callback - def register_goal_callback(self, goal_callback): + def register_goal_callback( + self, + goal_callback: Optional[Callable[[SendGoalServiceRequest[GoalT]], GoalResponse]] + ) -> None: """ Register a callback for handling new goal requests. @@ -588,7 +657,10 @@ def register_goal_callback(self, goal_callback): goal_callback = default_goal_callback self._goal_callback = goal_callback - def register_cancel_callback(self, cancel_callback): + def register_cancel_callback( + self, + cancel_callback: Optional[Callable[[CancelGoal.Request], CancelResponse]] + ) -> None: """ Register a callback for handling cancel requests. @@ -607,7 +679,10 @@ def register_cancel_callback(self, cancel_callback): cancel_callback = default_cancel_callback self._cancel_callback = cancel_callback - def register_execute_callback(self, execute_callback): + def register_execute_callback( + self, + execute_callback: Callable[[ServerGoalHandle[GoalT, ResultT, FeedbackT]], ResultT] + ) -> None: """ Register a callback for executing action goals. @@ -626,7 +701,7 @@ def register_execute_callback(self, execute_callback): raise TypeError('Failed to register goal execution callback: not callable') self._execute_callback = execute_callback - def destroy(self): + def destroy(self) -> None: """Destroy the underlying action server handle.""" for goal_handle in self._goal_handles.values(): goal_handle.destroy() diff --git a/rclpy/rclpy/impl/_rclpy_pybind11.pyi b/rclpy/rclpy/impl/_rclpy_pybind11.pyi index 002b4e4b2..564ad87f3 100644 --- a/rclpy/rclpy/impl/_rclpy_pybind11.pyi +++ b/rclpy/rclpy/impl/_rclpy_pybind11.pyi @@ -18,13 +18,20 @@ from enum import Enum from types import TracebackType from typing import Any, Generic, Literal, overload, Sequence, TypeAlias, TypedDict, TypeVar + +from action_msgs.msg import GoalInfo +from action_msgs.msg._goal_status_array import GoalStatusArray +from action_msgs.srv._cancel_goal import CancelGoal from rclpy.clock import JumpHandle from rclpy.clock_type import ClockType from rclpy.duration import Duration from rclpy.impl import service_introspection from rclpy.parameter import Parameter from rclpy.subscription import MessageInfo -from rclpy.type_support import MsgT, Srv, SrvEventT, SrvRequestT, SrvResponseT +from rclpy.type_support import (Action, FeedbackMessage, FeedbackT, GetResultServiceRequest, + GetResultServiceResponse, GoalT, MsgT, ResultT, + SendGoalServiceRequest, SendGoalServiceResponse, Srv, SrvRequestT, + SrvResponseT) T = TypeVar('T') @@ -44,21 +51,14 @@ class rcl_time_point_t: class rmw_service_info_t: - - @property - def source_timestamp(self) -> int: ... - - @property - def received_timestamp(self) -> int: ... - - @property - def request_id(self) -> rmw_request_id_t: ... + source_timestamp: int + received_timestamp: int + request_id: rmw_request_id_t class rmw_request_id_t: - - @property - def sequence_number(self) -> int: ... + writer_guid: list[int] + sequence_number: int class Destroyable: @@ -72,9 +72,9 @@ class Destroyable: """Destroy the rcl object as soon as it's not actively being used.""" -class Client(Destroyable, Generic[SrvRequestT, SrvResponseT, SrvEventT]): +class Client(Destroyable, Generic[SrvRequestT, SrvResponseT]): - def __init__(self, node: Node, srv_type: type[Srv[SrvRequestT, SrvResponseT, SrvEventT]], + def __init__(self, node: Node, srv_type: type[Srv[SrvRequestT, SrvResponseT]], srv_name: str, pyqos_profile: rmw_qos_profile_t) -> None: ... @property @@ -105,9 +105,9 @@ class Client(Destroyable, Generic[SrvRequestT, SrvResponseT, SrvEventT]): """Configure whether introspection is enabled.""" -class Service(Destroyable, Generic[SrvRequestT, SrvResponseT, SrvEventT]): +class Service(Destroyable, Generic[SrvRequestT, SrvResponseT]): - def __init__(self, node: Node, pysrv_type: type[Srv[SrvRequestT, SrvResponseT, SrvEventT]], + def __init__(self, node: Node, pysrv_type: type[Srv[SrvRequestT, SrvResponseT]], name: str, pyqos_profile: rmw_qos_profile_t) -> None: ... @property @@ -290,6 +290,7 @@ rmw_offered_qos_incompatible_event_status_t = rmw_qos_incompatible_event_status_ class RCLError(BaseException): + def __init__(self, error_text: str) -> None: ... @@ -692,6 +693,188 @@ class WaitSet(Destroyable): """Wait until timeout is reached or event happened.""" +class ActionClient(Generic[GoalT, ResultT, FeedbackT], Destroyable): + + def __init__( + self, + node: Node, + pyaction_type: type[Action[GoalT, ResultT, FeedbackT]], + goal_service_qos: rmw_qos_profile_t, + result_service_qos: rmw_qos_profile_t, + cancel_service_qos: rmw_qos_profile_t, + feedback_service_qos: rmw_qos_profile_t, + status_topci_qos: rmw_qos_profile_t + ) -> None: ... + + @property + def pointer(self) -> int: + """Get the address of the entity as an integer.""" + + def take_goal_response(self, pymsg_type: type[SendGoalServiceResponse] + ) -> tuple[int, SendGoalServiceResponse] | tuple[None, None]: + """Take an action goal response.""" + + def send_result_request(self, pyrequest: GetResultServiceRequest) -> int: + """Send an action result requst.""" + + def take_cancel_response(self, pymsg_type: type[CancelGoal.Response] + ) -> tuple[int, CancelGoal.Response] | tuple[None, None]: + """Take an action cancel response.""" + + def take_feedback(self, pymsg_type: type[FeedbackMessage[FeedbackT]] + ) -> FeedbackMessage[FeedbackT] | None: + """Take a feedback message from a given action client.""" + + def send_cancel_request(self: CancelGoal.Request) -> int: + """Send an action cancel request.""" + + def send_goal_request(self: SendGoalServiceRequest[GoalT]) -> int: + """Send an action goal request.""" + + def take_result_response( + self, + pymsg_type: type[GetResultServiceResponse[ResultT]] + ) -> tuple[int, GetResultServiceResponse[ResultT]] | tuple[None, None]: + """Take an action result response.""" + + def get_num_entities(self) -> tuple[int, int, int, int, int]: + """Get the number of wait set entities that make up an action entity.""" + + def is_action_server_available(self) -> bool: + """Check if an action server is available for the given action client.""" + + def add_to_waitset(self, waitset: WaitSet) -> None: + """Add an action entity to a wait set.""" + + def is_ready(self) -> bool: + """Check if an action entity has any ready wait set entities.""" + + def take_status(self, pymsg_type: type[GoalStatusArray]) -> GoalStatusArray | None: + """Take an action status response.""" + + +class GoalEvent(Enum): + _value_: int + EXECUTE = ... + CANCEL_GOAL = ... + SUCCEED = ... + ABORT = ... + CANCELED = ... + + +class ActionServer(Generic[GoalT, ResultT, FeedbackT], Destroyable): + + def __init__( + self, + node: Node, + rclpy_clock: Clock, + pyaction_type: type[Action[GoalT, ResultT, FeedbackT]], + action_name: str, + goal_service_qos: rmw_qos_profile_t, + result_service_qos: rmw_qos_profile_t, + cancel_service_qos: rmw_qos_profile_t, + feedback_topic_qos: rmw_qos_profile_t, + status_topic_qos: rmw_qos_profile_t, + result_timeout: float + ) -> None: ... + + @property + def pointer(self) -> int: + """Get the address of the entity as an integer.""" + + def take_goal_request( + self, + pymsg_type: type[SendGoalServiceRequest[GoalT]] + ) -> tuple[rmw_request_id_t, SendGoalServiceRequest[GoalT]] | tuple[None, None]: + """Take an action goal request.""" + + def send_goal_response( + self, + header: rmw_request_id_t, + pyresponse: SendGoalServiceResponse + ) -> None: + """Send an action goal response.""" + + def send_result_response( + self, + header: rmw_request_id_t, + pyresponse: GetResultServiceResponse[ResultT] + ) -> None: + """Send an action result response.""" + + def take_cancel_request( + self, + pymsg_type: type[CancelGoal.Request] + ) -> tuple[rmw_request_id_t, CancelGoal.Request] | tuple[None, None]: + """Take an action cancel request.""" + + def take_result_request( + self, + pymsg_type: type[GetResultServiceRequest] + ) -> tuple[rmw_request_id_t, GetResultServiceRequest] | tuple[None, None]: + """Take an action result request.""" + + def send_cancel_response( + self, + header: rmw_request_id_t, + pyresponse: int + ) -> None: + """Send an action cancel response.""" + + def publish_feedback( + self, + pymsg: FeedbackMessage[FeedbackT] + ) -> None: + """Publish a feedback message from a given action server.""" + + def publish_status(self) -> None: + """Publish a status message from a given action server.""" + + def notify_goal_done(self) -> None: + """Notify goal is done.""" + + def goal_exists(self, pygoal_info: GoalInfo) -> bool: + """Check is a goal exists in the server.""" + + def process_cancel_request( + self, + pycancel_request: CancelGoal.Request, + pycancel_response_tpye: type[CancelGoal.Response] + ) -> CancelGoal.Response: + """Process a cancel request.""" + + def expire_goals(self, max_num_goals: int) -> tuple[GoalInfo, ...]: + """Expired goals.""" + + def get_num_entities(self) -> tuple[int, int, int, int, int]: + """Get the number of wait set entities that make up an action entity.""" + + def is_ready(self, wait_set: WaitSet) -> tuple[bool, bool, bool, bool]: + """Check if an action entity has any ready wait set entities.""" + + def add_to_waitset(self, wait_set: WaitSet) -> None: + """Add an action entity to a wait set.""" + + +class ActionGoalHandle: + + def __init__(self, action_server: ActionServer, pygoal_info_msg: GoalInfo) -> None: + ... + + @property + def pointer(self) -> int: + """Get the address of the entity as an integer.""" + + def get_status(self) -> GoalEvent: + """Get the status of a goal.""" + + def update_goal_state(self, event: GoalEvent) -> None: + """Update a goal state.""" + + def is_active(self) -> bool: + """Check if a goal is active.""" + + class RCLError(RuntimeError): pass diff --git a/rclpy/rclpy/type_support.py b/rclpy/rclpy/type_support.py index 263c36756..ad3bc7a99 100644 --- a/rclpy/rclpy/type_support.py +++ b/rclpy/rclpy/type_support.py @@ -12,9 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import NoReturn, Optional, Protocol, Type, TypeVar, Union +from typing import Any, ClassVar, Iterable, Optional, Protocol, Type, TypeVar, Union + +from action_msgs.msg._goal_status_array import GoalStatusArray +from action_msgs.srv._cancel_goal import CancelGoal +from builtin_interfaces.msg import Time from rclpy.exceptions import NoTypeSupportImportedException +from service_msgs.msg._service_event_info import ServiceEventInfo +from typing_extensions import TypeAlias +from unique_identifier_msgs.msg import UUID class PyCapsule(Protocol): @@ -30,7 +37,7 @@ class PyCapsule(Protocol): class CommonMsgSrvMetaClass(ProtocolType): """Shared attributes between messages and services.""" - _TYPE_SUPPORT: Optional[PyCapsule] + _TYPE_SUPPORT: ClassVar[Optional[PyCapsule]] @classmethod def __import_type_support__(cls) -> None: @@ -40,10 +47,10 @@ def __import_type_support__(cls) -> None: class MsgMetaClass(CommonMsgSrvMetaClass): """Generic Message Metaclass Alias.""" - _CREATE_ROS_MESSAGE: Optional[PyCapsule] - _CONVERT_FROM_PY: Optional[PyCapsule] - _CONVERT_TO_PY: Optional[PyCapsule] - _DESTROY_ROS_MESSAGE: Optional[PyCapsule] + _CREATE_ROS_MESSAGE: ClassVar[Optional[PyCapsule]] + _CONVERT_FROM_PY: ClassVar[Optional[PyCapsule]] + _CONVERT_TO_PY: ClassVar[Optional[PyCapsule]] + _DESTROY_ROS_MESSAGE: ClassVar[Optional[PyCapsule]] class Msg(Protocol, metaclass=MsgMetaClass): @@ -58,21 +65,82 @@ class Msg(Protocol, metaclass=MsgMetaClass): SrvResponseT = TypeVar('SrvResponseT', bound=Msg) +class EventMessage(Msg, Protocol[SrvRequestT, SrvResponseT]): + info: ServiceEventInfo + request: Iterable[SrvRequestT] + response: Iterable[SrvResponseT] + + class Srv(Protocol[SrvRequestT, SrvResponseT], metaclass=CommonMsgSrvMetaClass): """Generic Service Type Alias.""" Request: Type[SrvRequestT] Response: Type[SrvResponseT] - Event: Type[Msg] + Event: Type[EventMessage[SrvRequestT, SrvResponseT]] + + +GoalT = TypeVar('GoalT', bound=Msg) +ResultT = TypeVar('ResultT', bound=Msg) +FeedbackT = TypeVar('FeedbackT', bound=Msg) + + +class SendGoalServiceRequest(Msg, Protocol[GoalT]): + goal_id: UUID + goal: GoalT + + +class SendGoalServiceResponse(Msg, Protocol): + accepted: bool + stamp: Time + + +SendGoalService: TypeAlias = Srv[SendGoalServiceRequest[GoalT], SendGoalServiceResponse] + + +class GetResultServiceRequest(Msg, Protocol): + goal_id: UUID + + +class GetResultServiceResponse(Msg, Protocol[ResultT]): + status: int + result: ResultT + + +GetResultService: TypeAlias = Srv[GetResultServiceRequest, GetResultServiceResponse[ResultT]] + + +class FeedbackMessage(Msg, Protocol[FeedbackT]): + goal_id: UUID + feedback: FeedbackT + + +class Impl(Protocol[GoalT, ResultT, FeedbackT]): + + SendGoalService: Type[SendGoalService[GoalT]] + GetResultService: Type[GetResultService[ResultT]] + FeedbackMessage: Type[FeedbackMessage[FeedbackT]] + CancelGoalService: ClassVar[Type[CancelGoal]] + GoalStatusMessage: ClassVar[Type[GoalStatusArray]] + + +class Action(Protocol[GoalT, + ResultT, + FeedbackT], + metaclass=CommonMsgSrvMetaClass): + Goal: Type[GoalT] + Result: Type[ResultT] + Feedback: Type[FeedbackT] - def __init__(self) -> NoReturn: ... + Impl: Type[Impl[GoalT, ResultT, FeedbackT]] # Can be used if https://github.com/python/typing/issues/548 ever gets approved. -SrvT = TypeVar('SrvT', bound=Type[Srv]) +SrvT = TypeVar('SrvT', bound=Srv[Any, Any]) +ActionT = TypeVar('ActionT', bound=Action[Any, Any, Any]) -def check_for_type_support(msg_or_srv_type: Type[Union[Msg, Srv]]) -> None: +def check_for_type_support(msg_or_srv_type: Type[Union[Msg, Srv[Any, Any], + Action[Any, Any, Any]]]) -> None: try: ts = msg_or_srv_type._TYPE_SUPPORT except AttributeError as e: @@ -104,7 +172,7 @@ def check_is_valid_msg_type(msg_type: Type[Msg]) -> None: ) from None -def check_is_valid_srv_type(srv_type: Type[Srv]) -> None: +def check_is_valid_srv_type(srv_type: Type[Srv[Any, Any]]) -> None: check_for_type_support(srv_type) try: assert None not in ( diff --git a/rclpy/src/rclpy/action_server.cpp b/rclpy/src/rclpy/action_server.cpp index 230d5599f..0241bd0f2 100644 --- a/rclpy/src/rclpy/action_server.cpp +++ b/rclpy/src/rclpy/action_server.cpp @@ -400,7 +400,7 @@ define_action_server(py::object module) "Send an action cancel response.") .def( "publish_feedback", &ActionServer::publish_feedback, - " Publish a feedback message from a given action server.") + "Publish a feedback message from a given action server.") .def( "publish_status", &ActionServer::publish_status, "Publish a status message from a given action server.")