Skip to content

Commit

Permalink
event_handler.py types (#1340)
Browse files Browse the repository at this point in the history
* Add types to wait_for_message.py

Signed-off-by: Michael Carlstrom <[email protected]>

* Add copyright

Signed-off-by: Michael Carlstrom <[email protected]>

* re-run CI

Signed-off-by: Michael Carlstrom <[email protected]>

* re-run CI

Signed-off-by: Michael Carlstrom <[email protected]>

* move Handles into _rclpy_pybind11

Signed-off-by: Michael Carlstrom <[email protected]>

* Move Handles into type stubs:

Signed-off-by: Michael Carlstrom <[email protected]>

* Move Handles into type stubs

Signed-off-by: Michael Carlstrom <[email protected]>

* move [] into string

Signed-off-by: Michael Carlstrom <[email protected]>

* fix imports

Signed-off-by: Michael Carlstrom <[email protected]>

* remove extra line

Signed-off-by: Michael Carlstrom <[email protected]>

* puy _rclpy.Publisher in quotes

Signed-off-by: Michael Carlstrom <[email protected]>

* fix capitalization

Signed-off-by: Michael Carlstrom <[email protected]>

* Add EventHandle Constructor

Signed-off-by: Michael Carlstrom <[email protected]>

* Use RuntimeError for context

Signed-off-by: Michael Carlstrom <[email protected]>

* Add TYPE_CHECKING import

Signed-off-by: Michael Carlstrom <[email protected]>

* init

Signed-off-by: Michael Carlstrom <[email protected]>

* remove .vscode file

Signed-off-by: Michael Carlstrom <[email protected]>

* move into string

Signed-off-by: Michael Carlstrom <[email protected]>

* fix flake8

Signed-off-by: Michael Carlstrom <[email protected]>

---------

Signed-off-by: Michael Carlstrom <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
InvincibleRMC and mergify[bot] authored Jan 3, 2025
1 parent ba72a01 commit 4e8b071
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 63 deletions.
96 changes: 56 additions & 40 deletions rclpy/rclpy/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import IntEnum
from __future__ import annotations

from types import TracebackType
from typing import Any
from typing import Callable
from typing import List
from typing import Optional
from typing import Type
from typing import Union
import warnings

import rclpy
Expand All @@ -30,50 +34,62 @@
from typing_extensions import TypeAlias


QoSPublisherEventType = _rclpy.rcl_publisher_event_type_t
QoSSubscriptionEventType = _rclpy.rcl_subscription_event_type_t
QoSPublisherEventType: TypeAlias = _rclpy.rcl_publisher_event_type_t
QoSSubscriptionEventType: TypeAlias = _rclpy.rcl_subscription_event_type_t


# Payload type for Subscription Deadline callback.
QoSRequestedDeadlineMissedInfo = _rclpy.rmw_requested_deadline_missed_status_t
QoSRequestedDeadlineMissedInfo: TypeAlias = _rclpy.rmw_requested_deadline_missed_status_t

# Payload type for Subscription Liveliness callback.
QoSLivelinessChangedInfo = _rclpy.rmw_liveliness_changed_status_t
QoSLivelinessChangedInfo: TypeAlias = _rclpy.rmw_liveliness_changed_status_t

# Payload type for Subscription Message Lost callback.
QoSMessageLostInfo = _rclpy.rmw_message_lost_status_t
QoSMessageLostInfo: TypeAlias = _rclpy.rmw_message_lost_status_t

# Payload type for Subscription Incompatible QoS callback.
QoSRequestedIncompatibleQoSInfo = _rclpy.rmw_requested_qos_incompatible_event_status_t
QoSRequestedIncompatibleQoSInfo: TypeAlias = _rclpy.rmw_requested_qos_incompatible_event_status_t

# Payload type for Subscription matched callback.
QoSSubscriptionMatchedInfo = _rclpy.rmw_matched_status_t
QoSSubscriptionMatchedInfo: TypeAlias = _rclpy.rmw_matched_status_t

# Payload type for Publisher Deadline callback.
QoSOfferedDeadlineMissedInfo = _rclpy.rmw_offered_deadline_missed_status_t
QoSOfferedDeadlineMissedInfo: TypeAlias = _rclpy.rmw_offered_deadline_missed_status_t

# Payload type for Publisher Liveliness callback.
QoSLivelinessLostInfo = _rclpy.rmw_liveliness_lost_status_t
QoSLivelinessLostInfo: TypeAlias = _rclpy.rmw_liveliness_lost_status_t

# Payload type for Publisher matched callback.
QoSPublisherMatchedInfo = _rclpy.rmw_matched_status_t
QoSPublisherMatchedInfo: TypeAlias = _rclpy.rmw_matched_status_t

"""
Payload type for Publisher Incompatible QoS callback.
Mirrors rmw_offered_incompatible_qos_status_t from rmw/types.h
"""
QoSOfferedIncompatibleQoSInfo = QoSRequestedIncompatibleQoSInfo
QoSOfferedIncompatibleQoSInfo: TypeAlias = QoSRequestedIncompatibleQoSInfo

# Payload type for Incompatible Type callback.
IncompatibleTypeInfo = _rclpy.rmw_incompatible_type_status_t
IncompatibleTypeInfo: TypeAlias = _rclpy.rmw_incompatible_type_status_t


"""Raised when registering a callback for an event type that is not supported."""
UnsupportedEventTypeError = _rclpy.UnsupportedEventTypeError
UnsupportedEventTypeError: TypeAlias = _rclpy.UnsupportedEventTypeError


EventHandlerData: TypeAlias = Optional[Any]
EventHandlerData: TypeAlias = Optional[Union[
QoSRequestedDeadlineMissedInfo,
QoSLivelinessChangedInfo,
QoSMessageLostInfo,
QoSRequestedIncompatibleQoSInfo,
IncompatibleTypeInfo,
QoSSubscriptionMatchedInfo,
QoSOfferedDeadlineMissedInfo,
QoSLivelinessLostInfo,
'_rclpy.rmw_offered_qos_incompatible_event_status_t',
IncompatibleTypeInfo,
QoSPublisherMatchedInfo
]]


class EventHandler(Waitable[EventHandlerData]):
Expand All @@ -83,23 +99,23 @@ def __init__(
self,
*,
callback_group: CallbackGroup,
callback: Callable,
event_type: IntEnum,
parent_impl,
):
callback: Callable[..., None],
event_type: Union[QoSSubscriptionEventType, QoSPublisherEventType],
parent_impl: 'Union[ _rclpy.Subscription[Any], _rclpy.Publisher[Any]]',
) -> None:
# Waitable init adds self to callback_group
super().__init__(callback_group)
self.event_type = event_type
self.callback = callback

with parent_impl:
self.__event = _rclpy.EventHandle(parent_impl, event_type)
self.__event: '_rclpy.EventHandle[Any]' = _rclpy.EventHandle(parent_impl, event_type)

self._ready_to_take_data = False
self._event_index = None
self._event_index: Optional[int] = None

# Start Waitable API
def is_ready(self, wait_set):
def is_ready(self, wait_set: _rclpy.WaitSet) -> bool:
"""Return True if entities are ready in the wait set."""
if self._event_index is None:
return False
Expand All @@ -121,20 +137,25 @@ async def execute(self, taken_data: EventHandlerData) -> None:
return
await rclpy.executors.await_or_execute(self.callback, taken_data)

def get_num_entities(self):
def get_num_entities(self) -> NumberOfEntities:
"""Return number of each type of entity used."""
return NumberOfEntities(num_events=1)

def add_to_wait_set(self, wait_set):
def add_to_wait_set(self, wait_set: _rclpy.WaitSet) -> None:
"""Add entites to wait set."""
with self.__event:
self._event_index = wait_set.add_event(self.__event)

def __enter__(self):
def __enter__(self) -> None:
"""Mark event as in-use to prevent destruction while waiting on it."""
self.__event.__enter__()

def __exit__(self, t, v, tb):
def __exit__(
self,
t: Optional[Type[BaseException]],
v: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
"""Mark event as not-in-use to allow destruction after waiting on it."""
self.__event.__exit__(t, v, tb)

Expand All @@ -146,12 +167,7 @@ def destroy(self) -> None:
category=DeprecationWarning, stacklevel=2)
class QoSEventHandler(EventHandler):

def __init_subclass__(cls, **kwargs):
warnings.warn('QoSEventHandler foo is deprecated, use EventHandler instead.',
DeprecationWarning, stacklevel=2)
super().__init_subclass__(**kwargs)

def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn('QoSEventHandler is deprecated, use EventHandler instead.',
DeprecationWarning, stacklevel=2)
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -197,7 +213,7 @@ def __init__(
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
self, callback_group: CallbackGroup, subscription: '_rclpy.Subscription',
self, callback_group: CallbackGroup, subscription: '_rclpy.Subscription[Any]',
topic_name: str) -> List[EventHandler]:
with subscription:
logger = get_logger(subscription.get_logger_name())
Expand All @@ -215,7 +231,7 @@ def create_event_handlers(
incompatible_qos_callback = self.incompatible_qos
elif self.use_default_callbacks:
# Register default callback when not specified
def _default_incompatible_qos_callback(event):
def _default_incompatible_qos_callback(event: QoSRequestedIncompatibleQoSInfo) -> None:
policy_name = qos_policy_name_from_kind(event.last_policy_kind)
logger.warn(
"New publisher discovered on topic '{}', offering incompatible QoS. "
Expand Down Expand Up @@ -252,7 +268,7 @@ def _default_incompatible_qos_callback(event):
incompatible_type_callback = self.incompatible_type
elif self.use_default_callbacks:
# Register default callback when not specified
def _default_incompatible_type_callback(event):
def _default_incompatible_type_callback(event: Any) -> None:
logger.warn(
"Incompatible type on topic '{}', no messages will be sent to it."
.format(topic_name))
Expand All @@ -269,7 +285,7 @@ def _default_incompatible_type_callback(event):
pass

if self.matched:
event_handlers.append(QoSEventHandler(
event_handlers.append(EventHandler(
callback_group=callback_group,
callback=self.matched,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED,
Expand Down Expand Up @@ -315,7 +331,7 @@ def __init__(
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
self, callback_group: CallbackGroup, publisher: _rclpy.Publisher, topic_name: str,
self, callback_group: CallbackGroup, publisher: '_rclpy.Publisher[Any]', topic_name: str,
) -> List[EventHandler]:
with publisher:
logger = get_logger(publisher.get_logger_name())
Expand All @@ -340,7 +356,7 @@ def create_event_handlers(
incompatible_qos_callback = self.incompatible_qos
elif self.use_default_callbacks:
# Register default callback when not specified
def _default_incompatible_qos_callback(event):
def _default_incompatible_qos_callback(event: QoSRequestedIncompatibleQoSInfo) -> None:
policy_name = qos_policy_name_from_kind(event.last_policy_kind)
logger.warn(
"New subscription discovered on topic '{}', requesting incompatible QoS. "
Expand All @@ -363,7 +379,7 @@ def _default_incompatible_qos_callback(event):
incompatible_type_callback = self.incompatible_type
elif self.use_default_callbacks:
# Register default callback when not specified
def _default_incompatible_type_callback(event):
def _default_incompatible_type_callback(event: Any) -> None:
logger.warn(
"Incompatible type on topic '{}', no messages will be sent to it."
.format(topic_name))
Expand All @@ -380,7 +396,7 @@ def _default_incompatible_type_callback(event):
pass

if self.matched:
event_handlers.append(QoSEventHandler(
event_handlers.append(EventHandler(
callback_group=callback_group,
callback=self.matched,
event_type=QoSPublisherEventType.RCL_PUBLISHER_MATCHED,
Expand Down
116 changes: 109 additions & 7 deletions rclpy/rclpy/impl/_rclpy_pybind11.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ from __future__ import annotations

from enum import Enum
from types import TracebackType
from typing import Any, Generic, Literal, overload, Sequence, TypeAlias, TypedDict
from typing import Any, Generic, Literal, overload, Sequence, TypeAlias, TypedDict, TypeVar

from rclpy.clock import JumpHandle
from rclpy.clock_type import ClockType
Expand All @@ -27,6 +27,9 @@ from rclpy.subscription import MessageInfo
from rclpy.type_support import MsgT, Srv, SrvEventT, SrvRequestT, SrvResponseT


T = TypeVar('T')


def rclpy_remove_ros_args(pycli_args: Sequence[str]) -> list[str]:
"""Remove ROS-specific arguments from argument vector."""

Expand Down Expand Up @@ -202,20 +205,119 @@ class rcl_publisher_event_type_t(Enum):
RCL_PUBLISHER_MATCHED = ...


class EventHandle(Destroyable):
class rmw_requested_deadline_missed_status_t:
total_count: int
total_count_change: int


class rmw_liveliness_changed_status_t:
alive_count: int
not_alive_count: int
alive_count_change: int
not_alive_count_change: int


class rmw_message_lost_status_t:
total_count: int
total_count_change: int


class rmw_qos_policy_kind_e(Enum):
_value_: int
RMW_QOS_POLICY_INVALID = ...
RMW_QOS_POLICY_DURABILITY = ...
RMW_QOS_POLICY_DEADLINE = ...
RMW_QOS_POLICY_LIVELINESS = ...
RMW_QOS_POLICY_RELIABILITY = ...
RMW_QOS_POLICY_HISTORY = ...
RMW_QOS_POLICY_LIFESPAN = ...
RMW_QOS_POLICY_DEPTH = ...
RMW_QOS_POLICY_LIVELINESS_LEASE_DURATION = ...
RMW_QOS_POLICY_AVOID_ROS_NAMESPACE_CONVENTIONS = ...


rmw_qos_policy_kind_t = rmw_qos_policy_kind_e


class rmw_requested_qos_incompatible_event_status_t:
total_count: int
total_count_change: int
last_policy_kind: rmw_qos_policy_kind_t


class rmw_matched_status_s:
total_count: int
total_count_change: int
current_count: int
current_count_change: int


rmw_matched_status_t = rmw_matched_status_s


class rmw_offered_deadline_missed_status_s:
total_count: int
total_count_change: int


rmw_offered_deadline_missed_status_t = rmw_offered_deadline_missed_status_s


class rmw_liveliness_lost_status_s:
total_count: int
total_count_change: int


rmw_liveliness_lost_status_t = rmw_liveliness_lost_status_s


class rmw_incompatible_type_status_s:
total_count: int
total_count_change: int


rmw_incompatible_type_status_t = rmw_incompatible_type_status_s


class rmw_qos_incompatible_event_status_s:
total_count: int
total_count_change: int
last_policy_kind: rmw_qos_policy_kind_t


rmw_qos_incompatible_event_status_t = rmw_qos_incompatible_event_status_s
rmw_offered_qos_incompatible_event_status_t = rmw_qos_incompatible_event_status_t


class RCLError(BaseException):
def __init__(self, error_text: str) -> None: ...


class UnsupportedEventTypeError(RCLError):
pass


class EventHandle(Destroyable, Generic[T]):

@overload
def __init__(self, subcription: Subscription,
event_type: rcl_subscription_event_type_t) -> None: ...
def __init__(
self,
subcription: Subscription[Any],
event_type: rcl_subscription_event_type_t
) -> None: ...

@overload
def __init__(self, publisher: Publisher, event_type: rcl_publisher_event_type_t) -> None: ...
def __init__(
self,
subcription: Publisher[Any],
event_type: rcl_publisher_event_type_t
) -> None: ...

@property
def pointer(self) -> int:
"""Get the address of the entity as an integer."""

def take_event(self) -> Any | None:
def take_event(self) -> T | None:
"""Get pending data from a ready event."""


Expand Down Expand Up @@ -577,7 +679,7 @@ class WaitSet(Destroyable):
def add_timer(self, timer: Timer) -> int:
"""Add a timer to the wait set structure."""

def add_event(self, event: EventHandle) -> int:
def add_event(self, event: EventHandle[Any]) -> int:
"""Add an event to the wait set structure."""

def is_ready(self, entity_type: IsReadyValues, index: int) -> bool:
Expand Down
Loading

0 comments on commit 4e8b071

Please sign in to comment.