diff --git a/nav2_docking/opennav_docking/test/test_docking_server.py b/nav2_docking/opennav_docking/test/test_docking_server.py index 43e3e2df329..f4fd05cdf38 100644 --- a/nav2_docking/opennav_docking/test/test_docking_server.py +++ b/nav2_docking/opennav_docking/test/test_docking_server.py @@ -24,9 +24,12 @@ from nav2_msgs.action import DockRobot, NavigateToPose, UndockRobot import pytest import rclpy -from rclpy.action import ActionClient, ActionServer +import rclpy.time +import rclpy.duration +from rclpy.action.client import ActionClient +from rclpy.action.server import ActionServer from sensor_msgs.msg import BatteryState -from tf2_ros import TransformBroadcaster +from tf2_ros import TransformBroadcaster, Buffer, TransformListener # This test can be run standalone with: @@ -142,9 +145,40 @@ def nav_execute_callback(self, goal_handle): result.error_code = 0 return result + def check_transform(self, timeout_sec, source_frame='odom', target_frame='base_link'): + """Try to look up the transform we're publishing""" + if self.transform_available: + return True + + try: + # Wait for transform to become available with timeout + when = rclpy.time.Time() + transform = self.tf_buffer.lookup_transform( + target_frame, + source_frame, + when, + timeout=rclpy.duration.Duration(seconds=timeout_sec) + ) + + self.node.get_logger().info('Successfully found transform:') + self.node.get_logger().info(f'Translation: [{transform.transform.translation.x}, ' + f'{transform.transform.translation.y}, ' + f'{transform.transform.translation.z}]') + self.transform_available = True + except Exception as e: + self.node.get_logger().error(f'Error looking up transform: {str(e)}') + self.transform_available = False + + return self.transform_available + def test_docking_server(self): # Publish TF for odometry self.tf_broadcaster = TransformBroadcaster(self.node) + # Create tf buffer and listener + self.tf_buffer = Buffer() + self.tf_listener = TransformListener(self.tf_buffer, self.node) + self.transform_available = False + # Create a timer to run "control loop" at 20hz self.timer = self.node.create_timer(0.05, self.timer_callback) @@ -175,12 +209,21 @@ def test_docking_server(self): 'navigate_to_pose', self.nav_execute_callback) - # Spin once so that TF is published - rclpy.spin_once(self.node, timeout_sec=0.1) + # Publish transform + self.publish() + + # Run for 2 seconds to allow tf to propogate + for i in range(20): + rclpy.spin_once(self.node, timeout_sec=0.1) + time.sleep(0.1) + + assert self.check_transform(timeout_sec=1.0), 'transform not available' # Test docking action self.action_result = [] - self.dock_action_client.wait_for_server(timeout_sec=5.0) + assert self.dock_action_client.wait_for_server(timeout_sec=5.0), \ + 'dock_robot service not available' + goal = DockRobot.Goal() goal.use_dock_id = True goal.dock_id = 'test_dock' @@ -188,7 +231,8 @@ def test_docking_server(self): goal, feedback_callback=self.action_feedback_callback) rclpy.spin_until_future_complete(self.node, future) self.goal_handle = future.result() - assert self.goal_handle.accepted + assert self.goal_handle is not None, 'goal_handle should not be None' + assert self.goal_handle.accepted, 'goal not accepted' result_future_original = self.goal_handle.get_result_async() # Run for 2 seconds