Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft: avoid race condition in action client #1125

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 135 additions & 127 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import time
import uuid
import weakref

import queue
from action_msgs.msg import GoalStatus
from action_msgs.srv import CancelGoal

Expand All @@ -30,7 +30,7 @@
from rclpy.waitable import NumberOfEntities, Waitable

from unique_identifier_msgs.msg import UUID

WAIT_TIMEOUT = 15.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never used?


class ClientGoalHandle():
"""Goal handle for working with Action Clients."""
Expand Down Expand Up @@ -160,7 +160,6 @@ def __init__(
feedback_sub_qos_profile.get_c_qos_profile(),
status_sub_qos_profile.get_c_qos_profile()
)

self._is_ready = False

# key: UUID in bytes, value: weak reference to ClientGoalHandle
Expand All @@ -177,6 +176,11 @@ def __init__(
self._result_sequence_number_to_goal_id = {}
# key: UUID in bytes, value: callback function
self._feedback_callbacks = {}
self._data_lock = threading.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we can introduce the threading lock against the specific member data object, instead of having giant lock in the ActionClient. that can be also more performative.

see https://github.com/ros2/rclcpp/blob/e2965831d51e9be03470cb07f8721012afcade9b/rclcpp_action/src/client.cpp#L112-L119

self._goal_event = threading.Event()
self._result_event = threading.Event()
self._cancel_event = threading.Event()
Comment on lines +180 to +182
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never used?



callback_group.add_entity(self)
self._node.add_waitable(self)
Expand Down Expand Up @@ -283,73 +287,74 @@ async def execute(self, taken_data):
This will set results for Future objects for any received service responses and
call any user-defined callbacks (e.g. feedback).
"""
if 'goal' in taken_data:
sequence_number, goal_response = taken_data['goal']
if sequence_number in self._goal_sequence_number_to_goal_id:
goal_handle = ClientGoalHandle(
self,
self._goal_sequence_number_to_goal_id[sequence_number],
goal_response)

if goal_handle.accepted:
goal_uuid = bytes(goal_handle.goal_id.uuid)
with self._data_lock:
if 'goal' in taken_data:
sequence_number, goal_response = taken_data['goal']
if sequence_number in self._goal_sequence_number_to_goal_id:
goal_handle = ClientGoalHandle(
self,
self._goal_sequence_number_to_goal_id[sequence_number],
goal_response)

if goal_handle.accepted:
goal_uuid = bytes(goal_handle.goal_id.uuid)
if goal_uuid in self._goal_handles:
raise RuntimeError(
'Two goals were accepted with the same ID ({})'.format(goal_handle))
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)

self._pending_goal_requests[sequence_number].set_result(goal_handle)
else:
self._node.get_logger().warning(
'Ignoring unexpected goal response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'cancel' in taken_data:
sequence_number, cancel_response = taken_data['cancel']
if sequence_number in self._pending_cancel_requests:
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
else:
self._node.get_logger().warning(
'Ignoring unexpected cancel response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'result' in taken_data:
sequence_number, result_response = taken_data['result']
if sequence_number in self._pending_result_requests:
self._pending_result_requests[sequence_number].set_result(result_response)
else:
self._node.get_logger().warning(
'Ignoring unexpected result response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'feedback' in taken_data:
feedback_msg = taken_data['feedback']
goal_uuid = bytes(feedback_msg.goal_id.uuid)
# Call a registered callback if there is one
if goal_uuid in self._feedback_callbacks:
await await_or_execute(self._feedback_callbacks[goal_uuid], feedback_msg)

if 'status' in taken_data:
# Update the status of all goal handles maintained by this Action Client
for status_msg in taken_data['status'].status_list:
goal_uuid = bytes(status_msg.goal_info.goal_id.uuid)
status = status_msg.status

if goal_uuid in self._goal_handles:
raise RuntimeError(
'Two goals were accepted with the same ID ({})'.format(goal_handle))
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)

self._pending_goal_requests[sequence_number].set_result(goal_handle)
else:
self._node.get_logger().warning(
'Ignoring unexpected goal response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'cancel' in taken_data:
sequence_number, cancel_response = taken_data['cancel']
if sequence_number in self._pending_cancel_requests:
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
else:
self._node.get_logger().warning(
'Ignoring unexpected cancel response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'result' in taken_data:
sequence_number, result_response = taken_data['result']
if sequence_number in self._pending_result_requests:
self._pending_result_requests[sequence_number].set_result(result_response)
else:
self._node.get_logger().warning(
'Ignoring unexpected result response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'feedback' in taken_data:
feedback_msg = taken_data['feedback']
goal_uuid = bytes(feedback_msg.goal_id.uuid)
# Call a registered callback if there is one
if goal_uuid in self._feedback_callbacks:
await await_or_execute(self._feedback_callbacks[goal_uuid], feedback_msg)

if 'status' in taken_data:
# Update the status of all goal handles maintained by this Action Client
for status_msg in taken_data['status'].status_list:
goal_uuid = bytes(status_msg.goal_info.goal_id.uuid)
status = status_msg.status

if goal_uuid in self._goal_handles:
goal_handle = self._goal_handles[goal_uuid]()
if goal_handle is not None:
goal_handle._status = status
# Remove "done" goals from the list
if (GoalStatus.STATUS_SUCCEEDED == status or
GoalStatus.STATUS_CANCELED == status or
GoalStatus.STATUS_ABORTED == status):
goal_handle = self._goal_handles[goal_uuid]()
if goal_handle is not None:
goal_handle._status = status
# Remove "done" goals from the list
if (GoalStatus.STATUS_SUCCEEDED == status or
GoalStatus.STATUS_CANCELED == status or
GoalStatus.STATUS_ABORTED == status):
del self._goal_handles[goal_uuid]
else:
# Weak reference is None
del self._goal_handles[goal_uuid]
else:
# Weak reference is None
del self._goal_handles[goal_uuid]

def get_num_entities(self):
"""Return number of each type of entity used in the wait set."""
Expand Down Expand Up @@ -430,30 +435,31 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
the Goal type of the provided action when the service was
constructed.
"""
if not isinstance(goal, self._action_type.Goal):
raise TypeError()

request = self._action_type.Impl.SendGoalService.Request()
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
request.goal = goal
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))

if feedback_callback is not None:
# TODO(jacobperron): Move conversion function to a general-use package
goal_uuid = bytes(request.goal_id.uuid)
self._feedback_callbacks[goal_uuid] = feedback_callback

future = Future()
self._pending_goal_requests[sequence_number] = future
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
future.add_done_callback(self._remove_pending_goal_request)
# Add future so executor is aware
self.add_future(future)

return future
with self._data_lock:
if not isinstance(goal, self._action_type.Goal):
raise TypeError()

request = self._action_type.Impl.SendGoalService.Request()
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
request.goal = goal
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))

if feedback_callback is not None:
# TODO(jacobperron): Move conversion function to a general-use package
goal_uuid = bytes(request.goal_id.uuid)
self._feedback_callbacks[goal_uuid] = feedback_callback

future = Future()
self._pending_goal_requests[sequence_number] = future
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
future.add_done_callback(self._remove_pending_goal_request)
# Add future so executor is aware
self.add_future(future)

return future

def _cancel_goal(self, goal_handle):
"""
Expand Down Expand Up @@ -488,24 +494,25 @@ def _cancel_goal_async(self, goal_handle):
:return: a Future instance that completes when the cancel request has been processed.
:rtype: :class:`rclpy.task.Future` instance
"""
if not isinstance(goal_handle, ClientGoalHandle):
raise TypeError(
'Expected type ClientGoalHandle but received {}'.format(type(goal_handle)))

cancel_request = CancelGoal.Request()
cancel_request.goal_info.goal_id = goal_handle.goal_id
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
with self._data_lock:
if not isinstance(goal_handle, ClientGoalHandle):
raise TypeError(
'Expected type ClientGoalHandle but received {}'.format(type(goal_handle)))

cancel_request = CancelGoal.Request()
cancel_request.goal_info.goal_id = goal_handle.goal_id
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))

future = Future()
self._pending_cancel_requests[sequence_number] = future
future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)
future = Future()
self._pending_cancel_requests[sequence_number] = future
future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)

return future
return future

def _get_result(self, goal_handle):
"""
Expand Down Expand Up @@ -540,25 +547,26 @@ def _get_result_async(self, goal_handle):
:return: a Future instance that completes when the get result request has been processed.
:rtype: :class:`rclpy.task.Future` instance
"""
if not isinstance(goal_handle, ClientGoalHandle):
raise TypeError(
'Expected type ClientGoalHandle but received {}'.format(type(goal_handle)))

result_request = self._action_type.Impl.GetResultService.Request()
result_request.goal_id = goal_handle.goal_id
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending result request'.format(sequence_number))

future = Future()
self._pending_result_requests[sequence_number] = future
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)

return future
with self._data_lock:
if not isinstance(goal_handle, ClientGoalHandle):
raise TypeError(
'Expected type ClientGoalHandle but received {}'.format(type(goal_handle)))

result_request = self._action_type.Impl.GetResultService.Request()
result_request.goal_id = goal_handle.goal_id
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending result request'.format(sequence_number))

future = Future()
self._pending_result_requests[sequence_number] = future
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)

return future

def server_is_ready(self):
"""
Expand Down