diff --git a/.github/workflows/source-build.yaml b/.github/workflows/source-build.yaml
index 802f53637..7b73ca402 100644
--- a/.github/workflows/source-build.yaml
+++ b/.github/workflows/source-build.yaml
@@ -12,5 +12,6 @@ jobs:
with:
ros_distro: 'humble'
os_name: 'ubuntu-22.04'
- ref: ${{ github.ref_name }}
+ ref: ${{ github.ref }}
vcs_repo_file_url: 'https://raw.githubusercontent.com/vortexntnu/vortex-auv/main/ros2.repos'
+ dependency_script: 'requirements.sh'
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index c867df9f1..36cb74d32 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -56,7 +56,6 @@ repos:
"--select=D",
"--ignore=D100,D101,D102,D103,D104,D105,D106,D107,D401",
"--fix",
- "--unsafe-fixes"
]
stages: [pre-commit]
pass_filenames: true
diff --git a/guidance/guidance_los/CMakeLists.txt b/guidance/guidance_los/CMakeLists.txt
new file mode 100644
index 000000000..76f242166
--- /dev/null
+++ b/guidance/guidance_los/CMakeLists.txt
@@ -0,0 +1,28 @@
+cmake_minimum_required(VERSION 3.8)
+project(guidance_los)
+
+find_package(ament_cmake_python REQUIRED)
+find_package(rclpy REQUIRED)
+find_package(std_msgs REQUIRED)
+find_package(nav_msgs REQUIRED)
+find_package(geometry_msgs REQUIRED)
+find_package(vortex_msgs REQUIRED)
+
+ament_python_install_package(${PROJECT_NAME})
+
+install(
+ PROGRAMS
+ scripts/los_guidance_action_server.py
+ RENAME
+ los_guidance_action_server
+ DESTINATION lib/${PROJECT_NAME}
+)
+
+install(
+ DIRECTORY
+ launch
+ config
+ DESTINATION share/${PROJECT_NAME}
+)
+
+ament_package()
diff --git a/guidance/guidance_los/README.md b/guidance/guidance_los/README.md
new file mode 100644
index 000000000..f9bf1abc1
--- /dev/null
+++ b/guidance/guidance_los/README.md
@@ -0,0 +1 @@
+#TODO
diff --git a/guidance/guidance_los/config/los_guidance_params.yaml b/guidance/guidance_los/config/los_guidance_params.yaml
new file mode 100644
index 000000000..71c74bf95
--- /dev/null
+++ b/guidance/guidance_los/config/los_guidance_params.yaml
@@ -0,0 +1,25 @@
+los_guidance_node:
+ ros__parameters:
+ los_guidance:
+ delta_h: 2.0 # Horizontal look-ahead distance
+ delta_v: 2.0 # Vertical look-ahead distance
+ gamma_h: 1.0 # Horizontal adaptive gain
+ gamma_v: 1.0 # Vertical adaptive gain
+ M_theta: 0.5 # Bound for parameter estimates
+ epsilon: 0.1 # Padding for parameter estimates
+ nominal_speed: 1.0 # Nominal surge speed
+ min_speed: 0.1 # Minimum allowable surge speed
+ dt: 0.01 # Time step for guidance updates
+ filter:
+ omega_diag: [2.5, 2.5, 2.5] # Natural frequencies for surge, pitch, and yaw
+ zeta_diag: [0.7, 0.7, 0.7] # Damping ratios for surge, pitch, and yaw
+
+ topics:
+ publishers:
+ los_commands: '/guidance/los'
+ debug:
+ reference: '/guidance/debug/reference'
+ errors: '/guidance/debug/errors'
+ logs: '/guidance/debug/logs'
+ subscribers:
+ odometry: '/orca/odom'
diff --git a/mission/gui_auv/auv_gui/__init__.py b/guidance/guidance_los/guidance_los/__init__.py
similarity index 100%
rename from mission/gui_auv/auv_gui/__init__.py
rename to guidance/guidance_los/guidance_los/__init__.py
diff --git a/guidance/guidance_los/guidance_los/los_guidance_algorithm.py b/guidance/guidance_los/guidance_los/los_guidance_algorithm.py
new file mode 100644
index 000000000..120a0f72c
--- /dev/null
+++ b/guidance/guidance_los/guidance_los/los_guidance_algorithm.py
@@ -0,0 +1,291 @@
+from dataclasses import dataclass, field
+
+import numpy as np
+
+
+@dataclass
+class State:
+ x: float = 0.0
+ y: float = 0.0
+ z: float = 0.0
+ pitch: float = 0.0
+ yaw: float = 0.0
+ surge_vel: float = 0.0
+
+ def __add__(self, other: "State") -> "State":
+ return State(
+ x=self.x + other.x,
+ y=self.y + other.y,
+ z=self.z + other.z,
+ pitch=self.pitch + other.pitch,
+ yaw=self.yaw + other.yaw,
+ )
+
+ def __sub__(self, other: "State") -> "State":
+ return State(
+ x=self.x - other.x,
+ y=self.y - other.y,
+ z=self.z - other.z,
+ pitch=self.pitch - other.pitch,
+ yaw=self.yaw - other.yaw,
+ )
+
+ def as_los_array(self):
+ return np.array([self.surge_vel, self.pitch, self.yaw])
+
+
+@dataclass(slots=True)
+class LOSParameters:
+ """Parameters for the 3D Line-of-Sight (LOS) guidance algorithm.
+
+ This class consist the configurable parameters used in the LOS guidance system
+ for determining navigation commands in 3D space.
+
+ Attributes:
+ Δh (float): Horizontal look-ahead distance for LOS guidance [m].
+ Determines how far ahead the algorithm looks in the horizontal plane.
+ Δv (float): Vertical look-ahead distance for LOS guidance [m].
+ Determines how far ahead the algorithm looks in the vertical plane.
+ γh (float): Horizontal adaptive gain. Controls the responsiveness to horizontal cross-track errors.
+ γv (float): Vertical adaptive gain. Controls the responsiveness to vertical cross-track errors.
+ M_theta (float): Bound for parameter estimates. Constrains the maximum allowable adaptive parameter value.
+ epsilon (float): Padding added to `M_theta` to compute `M_hat_theta`, providing a soft margin for parameter constraints.
+ nominal_speed (float): Desired nominal forward speed [m/s].
+ min_speed (float): Minimum allowable forward speed [m/s] to prevent stalling.
+ dt (float): Time step for updating LOS guidance states [s].
+
+ Properties:
+ M_hat_theta (float): Computes the extended bound for parameter estimates
+ (`M_theta + epsilon`) used in adaptive parameter updates.
+
+ """
+
+ Δh: float = 2.0
+ Δv: float = 2.0
+ γh: float = 1.0
+ γv: float = 1.0
+ M_theta: float = 0.5
+ epsilon: float = 0.1
+ nominal_speed: float = 1.0
+ min_speed: float = 0.1
+ dt: float = 0.01
+
+ @property
+ def M_hat_theta(self) -> float:
+ return self.M_theta + self.epsilon
+
+
+@dataclass(slots=True)
+class FilterParameters:
+ """Parameters for third-order filter.
+
+ Attributes:
+ omega_diag: Natural frequencies for surge, pitch, and yaw [rad/s]
+ zeta_diag: Damping ratios for surge, pitch, and yaw [-]
+
+ """
+
+ omega_diag: np.ndarray = field(default_factory=lambda: np.array([1.0, 1.0, 1.0]))
+ zeta_diag: np.ndarray = field(default_factory=lambda: np.array([1.0, 1.0, 1.0]))
+
+
+class ThirdOrderLOSGuidance:
+ """Implements a 3D Line-of-Sight (LOS) guidance algorithm.
+
+ Provides control outputs for surge, pitch, and yaw to navigate towards a target,
+ incorporating adaptive parameter estimation and third-order filtering for smooth commands.
+ """
+
+ def __init__(self, los_params: LOSParameters, filter_params: FilterParameters):
+ self.los_params = los_params
+ self.filter_params = filter_params
+ self.x = np.zeros(9) # Filter state
+ self.horizontal_distance = 0.0
+ self.setup_filter_matrices()
+ self.β_c_hat = 0.0 # Estimated course correction
+ self.α_c_hat = 0.0 # Estimated altitude correction
+
+ @staticmethod
+ def quaternion_to_euler_angle(w: float, x: float, y: float, z: float) -> tuple:
+ """Function to convert quaternion to euler angles.
+
+ Parameters:
+ w: float: w component of quaternion
+ x: float: x component of quaternion
+ y: float: y component of quaternion
+ z: float: z component of quaternion
+
+ Returns:
+ phi: float: roll angle
+ theta: float: pitch angle
+ psi: float: yaw angle
+
+ """
+ y_square = y * y
+
+ t0 = +2.0 * (w * x + y * z)
+ t1 = +1.0 - 2.0 * (x * x + y_square)
+ phi = np.arctan2(t0, t1)
+
+ t2 = +2.0 * (w * y - z * x)
+ t2 = +1.0 if t2 > +1.0 else t2
+ t2 = -1.0 if t2 < -1.0 else t2
+ theta = np.arcsin(t2)
+
+ t3 = +2.0 * (w * z + x * y)
+ t4 = +1.0 - 2.0 * (y_square + z * z)
+ psi = np.arctan2(t3, t4)
+
+ return phi, theta, psi
+
+ @staticmethod
+ def ssa(angle: float) -> float:
+ """Smallest signed angle."""
+ return (angle + np.pi) % (2 * np.pi) - np.pi
+
+ def setup_filter_matrices(self):
+ omega = np.diag(self.filter_params.omega_diag)
+ zeta = np.diag(self.filter_params.zeta_diag)
+ omega_cubed = omega @ omega @ omega
+ omega_squared = omega @ omega
+
+ self.Ad = np.zeros((9, 9))
+ self.Bd = np.zeros((9, 3))
+
+ self.Ad[0:3, 3:6] = np.eye(3)
+ self.Ad[3:6, 6:9] = np.eye(3)
+ self.Ad[6:9, 0:3] = -omega_cubed
+ self.Ad[6:9, 3:6] = -(2 * zeta + np.eye(3)) @ omega_squared
+ self.Ad[6:9, 6:9] = -(2 * zeta + np.eye(3)) @ omega
+
+ self.Bd[6:9, :] = omega_cubed
+
+ def compute_angles(self, current: State, target: State) -> tuple[float, float]:
+ """Compute azimuth and elevation angles."""
+ dx = target.x - current.x
+ dy = target.y - current.y
+ dz = target.z - current.z
+
+ self.horizontal_distance = np.sqrt(dx**2 + dy**2)
+
+ # Compute azimuth angle πh
+ π_h = np.arctan2(dy, dx)
+
+ # Compute elevation angle πv
+ π_v = np.arctan2(-dz, self.horizontal_distance)
+
+ return π_h, π_v
+
+ def compute_path_frame_errors(
+ self, current: State, target: State
+ ) -> tuple[float, float, float]:
+ """Compute errors in the path-tangential frame."""
+ dx = target.x - current.x
+ dy = target.y - current.y
+ dz = target.z - current.z
+
+ π_h, π_v = self.compute_angles(current, target)
+
+ # Create rotation matrices
+ R_z = np.array(
+ [[np.cos(π_h), -np.sin(π_h), 0], [np.sin(π_h), np.cos(π_h), 0], [0, 0, 1]]
+ )
+
+ R_y = np.array(
+ [[np.cos(π_v), 0, np.sin(π_v)], [0, 1, 0], [-np.sin(π_v), 0, np.cos(π_v)]]
+ )
+
+ # Compute errors in path frame
+ error_n = np.array([dx, dy, dz])
+ error_p = (R_y.T @ R_z.T @ error_n.reshape(3, 1)).flatten()
+
+ return error_p[0], error_p[1], error_p[2]
+
+ def parameter_projection(self, param: float, error: float) -> float:
+ """Implement the parameter projection function."""
+ M = self.los_params.M_hat_theta
+
+ if abs(param) > M and param * error > 0:
+ c = min(
+ 1.0,
+ (param**2 - self.los_params.M_theta**2)
+ / (self.los_params.M_hat_theta**2 - self.los_params.M_theta**2),
+ )
+ return (1 - c) * error
+ return error
+
+ def compute_raw_los_guidance(self, current_pos: State, target_pos: State) -> State:
+ """Compute raw LOS guidance commands."""
+ # Compute angles and errors
+ π_h, π_v = self.compute_angles(current_pos, target_pos)
+ x_e, y_e, z_e = self.compute_path_frame_errors(current_pos, target_pos)
+
+ # Compute desired heading angle ψd
+ desired_yaw = π_h - self.β_c_hat - np.arctan2(y_e, self.los_params.Δh)
+
+ # Compute desired pitch angle θd
+ desired_pitch = π_v + self.α_c_hat + np.arctan2(z_e, self.los_params.Δv)
+
+ # Update parameter estimates using projection
+ β_c_dot = (
+ self.los_params.γh
+ * (self.los_params.Δh / np.sqrt(self.los_params.Δh**2 + y_e**2))
+ * self.parameter_projection(self.β_c_hat, y_e)
+ )
+
+ α_c_dot = (
+ self.los_params.γv
+ * (self.los_params.Δv / np.sqrt(self.los_params.Δv**2 + z_e**2))
+ * self.parameter_projection(self.α_c_hat, z_e)
+ )
+
+ # Update estimates
+ self.β_c_hat += β_c_dot * self.los_params.dt
+ self.α_c_hat += α_c_dot * self.los_params.dt
+
+ # Clip parameter estimates to bounds
+ self.β_c_hat = np.clip(
+ self.β_c_hat, -self.los_params.M_hat_theta, self.los_params.M_hat_theta
+ )
+ self.α_c_hat = np.clip(
+ self.α_c_hat, -self.los_params.M_hat_theta, self.los_params.M_hat_theta
+ )
+
+ # Compute desired speed based on cross-track error
+ yaw_error = self.ssa(desired_yaw - current_pos.yaw)
+ desired_speed = self.compute_desired_speed(yaw_error, self.horizontal_distance)
+
+ commands = State(surge_vel=desired_speed, pitch=desired_pitch, yaw=desired_yaw)
+
+ return commands
+
+ def compute_desired_speed(
+ self, yaw_error: float, distance_to_target: float
+ ) -> float:
+ """Compute speed command with yaw and distance-based scaling."""
+ yaw_factor = np.cos(yaw_error)
+ distance_factor = min(1.0, distance_to_target / self.los_params.Δh)
+ desired_speed = self.los_params.nominal_speed * yaw_factor * distance_factor
+ return max(self.los_params.min_speed, desired_speed)
+
+ def apply_reference_filter(self, commands: State) -> State:
+ """Apply third-order reference filter to smooth commands."""
+ x_dot = self.Ad @ self.x + self.Bd @ commands.as_los_array()
+ self.x = self.x + x_dot * self.los_params.dt
+ return State(surge_vel=self.x[0], pitch=self.x[1], yaw=self.x[2])
+
+ def compute_guidance(self, current_pos: State, target_pos: State) -> State:
+ """Compute filtered guidance commands."""
+ raw_commands = self.compute_raw_los_guidance(current_pos, target_pos)
+ filtered_commands = self.apply_reference_filter(raw_commands)
+ filtered_commands.pitch = self.ssa(filtered_commands.pitch)
+ filtered_commands.yaw = self.ssa(filtered_commands.yaw)
+ return filtered_commands
+
+ def reset_filter_state(self, current_state: State) -> None:
+ """Reset the filter and adaptive parameter states."""
+ self.x = np.zeros(9)
+ current_state_array = np.array(current_state.as_los_array(), copy=True)
+ self.x[0:3] = current_state_array
+ self.β_c_hat = 0.0
+ self.α_c_hat = 0.0
diff --git a/guidance/guidance_los/launch/guidance.launch.py b/guidance/guidance_los/launch/guidance.launch.py
new file mode 100755
index 000000000..fc1379494
--- /dev/null
+++ b/guidance/guidance_los/launch/guidance.launch.py
@@ -0,0 +1,22 @@
+import os
+
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg_share = get_package_share_directory('guidance_los')
+ config_file = os.path.join(pkg_share, 'config', 'los_guidance_params.yaml')
+
+ guidance_node = Node(
+ package='guidance_los',
+ executable='los_guidance_action_server',
+ name='los_guidance_node',
+ output='screen',
+ emulate_tty=True,
+ parameters=[config_file],
+ respawn=False,
+ )
+
+ return LaunchDescription([guidance_node])
diff --git a/guidance/guidance_los/package.xml b/guidance/guidance_los/package.xml
new file mode 100644
index 000000000..9f36af504
--- /dev/null
+++ b/guidance/guidance_los/package.xml
@@ -0,0 +1,22 @@
+
+
+
+ guidance_los
+ 0.0.0
+ Package for implementing a LOS guidance action server.
+ badawi
+ MIT
+
+ ament_cmake_python
+
+ rclpy
+ std_msgs
+ nav_msgs
+ geometry_msgs
+ vortex_msgs
+ tf_transformations
+
+
+ ament_cmake
+
+
diff --git a/guidance/guidance_los/scripts/los_guidance_action_server.py b/guidance/guidance_los/scripts/los_guidance_action_server.py
new file mode 100755
index 000000000..4d6a8ac37
--- /dev/null
+++ b/guidance/guidance_los/scripts/los_guidance_action_server.py
@@ -0,0 +1,392 @@
+#!/usr/bin/env python3
+
+import threading
+
+import numpy as np
+import rclpy
+from geometry_msgs.msg import PoseStamped, Vector3Stamped
+from guidance_los.los_guidance_algorithm import (
+ FilterParameters,
+ LOSParameters,
+ State,
+ ThirdOrderLOSGuidance,
+)
+from nav_msgs.msg import Odometry
+from rclpy.action import ActionServer
+from rclpy.action.server import CancelResponse, GoalResponse, ServerGoalHandle
+from rclpy.callback_groups import ReentrantCallbackGroup
+from rclpy.executors import MultiThreadedExecutor
+from rclpy.node import Node
+from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
+from std_msgs.msg import String
+from vortex_msgs.action import NavigateWaypoints
+from vortex_msgs.msg import LOSGuidance
+
+best_effort_qos = QoSProfile(
+ reliability=ReliabilityPolicy.BEST_EFFORT,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=1,
+)
+
+
+class LOSActionServer(Node):
+ """ROS2 Action Server node for waypoint navigation using LOS guidance."""
+
+ def __init__(self):
+ super().__init__('los_guidance_node')
+
+ self._goal_lock = threading.Lock()
+ self.declare_parameter('update_rate', 10.0)
+ update_rate = self.get_parameter('update_rate').value
+ self.update_period = 1.0 / update_rate
+ self.filter_initialized = False
+
+ self.declare_parameter('debug_mode', False)
+ self.debug_mode = self.get_parameter('debug_mode').value
+
+ self.action_cb_group = ReentrantCallbackGroup()
+
+ self.declare_los_parameters_()
+ self._declare_topic_parameters()
+ self._initialize_publishers()
+ self._initialize_subscribers()
+ self._initialize_state()
+ self._initialize_action_server()
+
+ los_params = self.get_los_parameters_()
+ filter_params = self.get_filter_parameters_()
+
+ # Initialize guidance calculator
+ self.guidance_calculator = ThirdOrderLOSGuidance(los_params, filter_params)
+
+ def declare_los_parameters_(self):
+ """Declare all LOS guidance parameters."""
+ # Declare main LOS parameters
+ self.declare_parameter('los_guidance.h_delta_min', 1.0)
+ self.declare_parameter('los_guidance.h_delta_max', 5.0)
+ self.declare_parameter('los_guidance.h_delta_factor', 3.0)
+ self.declare_parameter('los_guidance.nominal_speed', 0.35)
+ self.declare_parameter('los_guidance.min_speed', 0.1)
+ self.declare_parameter('los_guidance.max_pitch_angle', 1.047)
+ self.declare_parameter('los_guidance.depth_gain', 10.0)
+ # self.declare_parameter('depth_integral_gain', 0.5)
+
+ # Declare filter parameters
+ self.declare_parameter('los_guidance.filter.omega_diag', [2.5, 2.5, 2.5])
+ self.declare_parameter('los_guidance.filter.zeta_diag', [0.7, 0.7, 0.7])
+
+ def get_los_parameters_(self) -> LOSParameters:
+ los_params = LOSParameters()
+
+ los_params.lookahead_distance_min = self.get_parameter(
+ 'los_guidance.h_delta_min'
+ ).value
+ los_params.lookahead_distance_max = self.get_parameter(
+ 'los_guidance.h_delta_max'
+ ).value
+ los_params.lookahead_distance_factor = self.get_parameter(
+ 'los_guidance.h_delta_factor'
+ ).value
+ los_params.nominal_speed = self.get_parameter(
+ 'los_guidance.nominal_speed'
+ ).value
+ los_params.min_speed = self.get_parameter('los_guidance.min_speed').value
+ los_params.max_pitch_angle = self.get_parameter(
+ 'los_guidance.max_pitch_angle'
+ ).value
+ los_params.depth_gain = self.get_parameter('los_guidance.depth_gain').value
+ return los_params
+
+ def get_filter_parameters_(self) -> FilterParameters:
+ filter_params = FilterParameters()
+ filter_params.omega_diag = self.get_parameter(
+ 'los_guidance.filter.omega_diag'
+ ).value
+ filter_params.zeta_diag = self.get_parameter(
+ 'los_guidance.filter.zeta_diag'
+ ).value
+ return filter_params
+
+ def _declare_topic_parameters(self):
+ # Publishers
+ self.declare_parameter('topics.publishers.los_commands', '/guidance/los')
+ self.declare_parameter(
+ 'topics.publishers.debug.reference', '/guidance/debug/reference'
+ )
+ self.declare_parameter(
+ 'topics.publishers.debug.errors', '/guidance/debug/errors'
+ )
+ self.declare_parameter('topics.publishers.debug.logs', '/guidance/debug/logs')
+
+ # Subscribers
+ self.declare_parameter('topics.subscribers.odometry', '/orca/odom')
+
+ def _initialize_publishers(self):
+ """Initialize all publishers."""
+ pub_qos_depth = 10
+
+ # Main guidance command publisher
+ los_commands_topic = self.get_parameter('topics.publishers.los_commands').value
+
+ self.get_logger().info(f"Publishing LOS commands to: {los_commands_topic}")
+
+ self.guidance_cmd_pub = self.create_publisher(
+ LOSGuidance, los_commands_topic, qos_profile=best_effort_qos
+ )
+
+ # Debug publishers
+ if self.debug_mode:
+ reference_topic = self.get_parameter(
+ 'topics.publishers.debug.reference'
+ ).value
+
+ errors_topic = self.get_parameter('topics.publishers.debug.errors').value
+
+ logs_topic = self.get_parameter('topics.publishers.debug.logs').value
+
+ self.guidance_ref_pub = self.create_publisher(
+ PoseStamped, reference_topic, pub_qos_depth
+ )
+ self.guidance_error_pub = self.create_publisher(
+ Vector3Stamped, errors_topic, pub_qos_depth
+ )
+ self.log_publisher = self.create_publisher(
+ String, logs_topic, pub_qos_depth
+ )
+
+ def _initialize_subscribers(self):
+ """Initialize subscribers."""
+ odom_topic = self.get_parameter('topics.subscribers.odometry').value
+
+ self.create_subscription(
+ Odometry,
+ odom_topic,
+ self.odom_callback,
+ qos_profile=best_effort_qos,
+ )
+
+ def _initialize_state(self):
+ """Initialize state and navigation variables."""
+ self.state: State = State()
+ self.waypoints: list[PoseStamped] = []
+ self.current_waypoint_index: int = 0
+ self.goal_handle: ServerGoalHandle = None
+ self.update_period = 0.1
+
+ def _initialize_action_server(self):
+ """Initialize the navigation action server."""
+ self._action_server = ActionServer(
+ self,
+ NavigateWaypoints,
+ 'navigate_waypoints',
+ execute_callback=self.execute_callback,
+ goal_callback=self.goal_callback,
+ cancel_callback=self.cancel_callback,
+ callback_group=self.action_cb_group,
+ )
+
+ def goal_callback(self, goal_request: NavigateWaypoints.Goal):
+ """Handle new goal requests with preemption."""
+ self.get_logger().info("Received new goal request")
+
+ # Validate waypoints exist
+ if not goal_request.waypoints:
+ self.get_logger().warn("No waypoints in request, rejecting")
+ return GoalResponse.REJECT
+
+ with self._goal_lock:
+ # If there's an active goal, preempt it
+ if self.goal_handle and self.goal_handle.is_active:
+ self.get_logger().info("Preempting current goal")
+ self.goal_handle.abort()
+
+ return GoalResponse.ACCEPT
+
+ def cancel_callback(self, goal_handle: ServerGoalHandle):
+ """Handle cancellation requests."""
+ self.get_logger().info("Received cancel request")
+
+ # with self._goal_lock:
+ # if self.goal_handle == goal_handle:
+ # # Reset navigation state
+ # self.waypoints = []
+ # self.current_waypoint_index = 0
+ # self.goal_handle = None
+
+ # # Reset guidance state if needed
+ # initial_commands = np.array([0.0, self.state.pitch, self.state.yaw])
+ # self.guidance_calculator.reset_filter_state(self.state)
+
+ # self.publish_log("Navigation canceled and state reset")
+
+ return CancelResponse.ACCEPT
+
+ def publish_log(self, message: str):
+ """Publish debug log message to ROS topic and node logger (debug mode only)."""
+ if not self.debug_mode:
+ return
+
+ msg = String()
+ msg.data = message
+ self.get_logger().info(message)
+
+ def odom_callback(self, msg: Odometry):
+ """Process odometry updates and trigger guidance calculations."""
+ orientation_q = msg.pose.pose.orientation
+ roll, pitch, yaw = self.guidance_calculator.quaternion_to_euler_angle(
+ orientation_q.w, orientation_q.x, orientation_q.y, orientation_q.z
+ )
+
+ # Update vehicle state
+ self.state.x = msg.pose.pose.position.x
+ self.state.y = msg.pose.pose.position.y
+ self.state.z = msg.pose.pose.position.z
+ self.state.yaw = yaw
+ self.state.pitch = pitch
+
+ # Initialize filter on first callback
+ if not self.filter_initialized:
+ self.guidance_calculator.reset_filter_state(self.state)
+ self.filter_initialized = True
+
+ if not self.debug_mode:
+ return
+
+ # Log current position for debugging
+ self.publish_log(
+ f"Position: x={self.state.x:.3f}, "
+ f"y={self.state.y:.3f}, "
+ f"z={self.state.z:.3f}"
+ )
+ self.publish_log(
+ f"Orientation: roll={roll:.3f}, pitch={pitch:.3f}, yaw={yaw:.3f}"
+ )
+
+ def process_guidance(self):
+ """Process guidance calculations and publish control commands."""
+ if self.current_waypoint_index >= len(self.waypoints):
+ return
+
+ # Extract position from PoseStamped waypoint
+ current_waypoint = self.waypoints[self.current_waypoint_index]
+ target_point = State(
+ x=current_waypoint.pose.position.x,
+ y=current_waypoint.pose.position.y,
+ z=current_waypoint.pose.position.z,
+ )
+ # Compute guidance commands using current state
+ commands = self.guidance_calculator.compute_guidance(self.state, target_point)
+
+ # Publish commands and monitoring data
+ self.publish_guidance(commands)
+ # self.publish_errors(target_point, depth_error)
+
+ # Log guidance status
+ # self.publish_log(
+ # f"Guidance Status: surge={commands.surge_vel:.2f}, "
+ # f"pitch={commands[1]:.2f}, yaw={commands[2]:.2f}"
+ # )
+ # self.publish_log(f"Distance to target: {distance:.2f}")
+
+ # Check if waypoint is reached (0.5m threshold)
+ if self.guidance_calculator.horizontal_distance < 0.1:
+ self.publish_log(f'Reached waypoint {self.current_waypoint_index}')
+
+ self.guidance_calculator.reset_filter_state(self.state)
+
+ self.current_waypoint_index += 1
+
+ if self.current_waypoint_index >= len(self.waypoints):
+ if self.goal_handle and self.goal_handle.is_active:
+ self.goal_handle.succeed()
+ self.goal_handle = None
+ return
+
+ # Publish progress feedback
+ if self.goal_handle and self.goal_handle.is_active:
+ feedback_msg = NavigateWaypoints.Feedback()
+ feedback_msg.current_waypoint_index = float(self.current_waypoint_index)
+ self.goal_handle.publish_feedback(feedback_msg)
+
+ def execute_callback(self, goal_handle: ServerGoalHandle):
+ """Execute waypoint navigation action."""
+ self.publish_log('Executing waypoint navigation...')
+
+ # Initialize navigation goal with lock
+ with self._goal_lock:
+ self.goal_handle = goal_handle
+ self.current_waypoint_index = 0
+ self.waypoints = goal_handle.request.waypoints
+
+ feedback = NavigateWaypoints.Feedback()
+ result = NavigateWaypoints.Result()
+
+ self.publish_log(f'Received {len(self.waypoints)} waypoints')
+
+ rate = self.create_rate(1.0 / self.update_period)
+
+ self.get_logger().info('Executing goal')
+ while rclpy.ok():
+ if not goal_handle.is_active:
+ # Preempted by another goal
+ result.success = False
+ self.guidance_calculator.reset_filter_state(self.state)
+ return result
+
+ if goal_handle.is_cancel_requested:
+ self.publish_log('Goal canceled')
+ goal_handle.canceled()
+ self.goal_handle = None
+ return NavigateWaypoints.Result(success=False)
+
+ # process guiance
+ if len(self.waypoints) > 0:
+ self.process_guidance()
+
+ if self.current_waypoint_index >= len(self.waypoints):
+ self.publish_log('All waypoints reached')
+ return NavigateWaypoints.Result(success=True)
+
+ rate.sleep()
+
+ def publish_guidance(self, commands: State):
+ """Publish the commanded surge velocity, pitch angle, and yaw angle."""
+ msg = LOSGuidance()
+ msg.surge = commands.surge_vel
+ msg.pitch = commands.pitch
+ msg.yaw = commands.yaw
+ self.guidance_cmd_pub.publish(msg)
+
+ def publish_errors(self, target_point: np.ndarray, depth_error: float):
+ """Publish position errors for monitoring (if debug mode is enabled)."""
+ if not self.debug_mode:
+ return
+
+ msg = Vector3Stamped()
+ msg.header.frame_id = "odom"
+ msg.header.stamp = self.get_clock().now().to_msg()
+ msg.vector.x = target_point[0] - self.state.x
+ msg.vector.y = target_point[1] - self.state.y
+ msg.vector.z = depth_error
+ self.guidance_error_pub.publish(msg)
+
+
+def main(args=None):
+ """Main function to initialize and run the guidance node."""
+ rclpy.init(args=args)
+ action_server = LOSActionServer()
+ executor = MultiThreadedExecutor()
+ executor.add_node(action_server)
+
+ try:
+ executor.spin()
+ except Exception as e:
+ action_server.get_logger().error(f'Error: {str(e)}')
+ finally:
+ action_server.destroy_node()
+ if rclpy.ok():
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/mission/gui_auv/auv_gui/auv_gui.py b/mission/gui_auv/auv_gui/auv_gui.py
deleted file mode 100644
index 9257c691b..000000000
--- a/mission/gui_auv/auv_gui/auv_gui.py
+++ /dev/null
@@ -1,317 +0,0 @@
-import math
-import sys
-from threading import Thread
-from typing import List, Optional
-
-import matplotlib.pyplot as plt
-import numpy as np
-import rclpy
-from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
-from nav_msgs.msg import Odometry
-from PyQt6.QtCore import QTimer
-from PyQt6.QtWidgets import (
- QApplication,
- QGridLayout,
- QLabel,
- QMainWindow,
- QTabWidget,
- QVBoxLayout,
- QWidget,
-)
-from rclpy.executors import MultiThreadedExecutor
-from rclpy.node import Node
-from std_msgs.msg import Float32
-
-# --- Quaternion to Euler angles ---
-
-
-def quaternion_to_euler(x: float, y: float, z: float, w: float) -> List[float]:
- """Convert a quaternion to Euler angles (roll, pitch, yaw).
-
- Args:
- x (float): The x component of the quaternion.
- y (float): The y component of the quaternion.
- z (float): The z component of the quaternion.
- w (float): The w component of the quaternion.
-
- Returns:
- List[float]: A list of Euler angles [roll, pitch, yaw].
- """
- # Roll (x-axis rotation)
- sinr_cosp = 2 * (w * x + y * z)
- cosr_cosp = 1 - 2 * (x * x + y * y)
- roll = np.degrees(math.atan2(sinr_cosp, cosr_cosp))
-
- # Pitch (y-axis rotation)
- sinp = 2 * (w * y - z * x)
- pitch = np.degrees(math.asin(sinp))
-
- # Yaw (z-axis rotation)
- siny_cosp = 2 * (w * z + x * y)
- cosy_cosp = 1 - 2 * (y * y + z * z)
- yaw = np.degrees(math.atan2(siny_cosp, cosy_cosp))
-
- return [roll, pitch, yaw]
-
-
-# --- GUI Node ---
-
-
-class GuiNode(Node):
- """ROS2 Node that subscribes to odometry data and stores x, y positions."""
-
- def __init__(self) -> None:
- """Initialize the GuiNode and set up the odometry subscriber."""
- super().__init__("auv_gui_node")
-
- # ROS2 parameters
- self.declare_parameter("odom_topic", "/nucleus/odom")
- self.declare_parameter("current_topic", "/auv/power_sense_module/current")
- self.declare_parameter("voltage_topic", "/auv/power_sense_module/voltage")
- self.declare_parameter("temperature_topic", "/auv/temperature")
- self.declare_parameter("pressure_topic", "/auv/pressure")
- self.declare_parameter("history_length", 30)
-
- odom_topic = self.get_parameter("odom_topic").get_parameter_value().string_value
- current_topic = (
- self.get_parameter("current_topic").get_parameter_value().string_value
- )
- voltage_topic = (
- self.get_parameter("voltage_topic").get_parameter_value().string_value
- )
- temperature_topic = (
- self.get_parameter("temperature_topic").get_parameter_value().string_value
- )
- pressure_topic = (
- self.get_parameter("pressure_topic").get_parameter_value().string_value
- )
-
- # Subscriber to the /nucleus/odom topic
- self.subscription = self.create_subscription(
- Odometry, odom_topic, self.odom_callback, 10
- )
-
- # Variables to store odometry data
- self.xpos_data: List[float] = [] # x position
- self.ypos_data: List[float] = [] # y position
- self.zpos_data: List[float] = [] # z position
-
- self.w_data: List[float] = [] # w component of the quaternion
- self.x_data: List[float] = [] # x component of the quaternion
- self.y_data: List[float] = [] # y component of the quaternion
- self.z_data: List[float] = [] # z component of the quaternion
-
- self.roll: Optional[float] = None
- self.pitch: Optional[float] = None
- self.yaw: Optional[float] = None
-
- # Subscribe to internal status topics
- self.current_subscriber = self.create_subscription(
- Float32, current_topic, self.current_callback, 5
- )
- self.voltage_subscriber = self.create_subscription(
- Float32, voltage_topic, self.voltage_callback, 5
- )
- self.temperature_subscriber = self.create_subscription(
- Float32, temperature_topic, self.temperature_callback, 5
- )
- self.pressure_subscriber = self.create_subscription(
- Float32, pressure_topic, self.pressure_callback, 5
- )
-
- # Variables for internal status
- self.current = 0.0
- self.voltage = 0.0
- self.temperature = 0.0
- self.pressure = 0.0
-
- # --- Callback functions ---
-
- def odom_callback(self, msg: Odometry) -> None:
- """Callback function that is triggered when an odometry message is received."""
- # Extract x, y, z positions from the odometry message
- x = msg.pose.pose.position.x
- y = msg.pose.pose.position.y
- z = -(msg.pose.pose.position.z)
- self.xpos_data.append(x)
- self.ypos_data.append(y)
- self.zpos_data.append(z)
-
- # Extract the quaternion components from the odometry message
- w = msg.pose.pose.orientation.w
- x = msg.pose.pose.orientation.x
- y = msg.pose.pose.orientation.y
- z = msg.pose.pose.orientation.z
- self.roll, self.pitch, self.yaw = quaternion_to_euler(x, y, z, w)
-
- # Limit the stored data for real-time plotting (avoid memory overflow)
- max_data_points = (
- self.get_parameter("history_length").get_parameter_value().integer_value
- )
- if len(self.x_data) > max_data_points:
- self.xpos_data.pop(0)
- self.ypos_data.pop(0)
- self.zpos_data.pop(0)
-
- def current_callback(self, msg: Float32) -> None:
- """Callback function that is triggered when a current message is received."""
- self.current = msg.data
-
- def voltage_callback(self, msg: Float32) -> None:
- """Callback function that is triggered when a voltage message is received."""
- self.voltage = msg.data
-
- def temperature_callback(self, msg: Float32) -> None:
- """Callback function that is triggered when a temperature message is received."""
- self.temperature = msg.data
-
- def pressure_callback(self, msg: Float32) -> None:
- """Callback function that is triggered when a pressure message is received."""
- self.pressure = msg.data
-
-
-# --- Plotting ---
-
-
-class PlotCanvas(FigureCanvas):
- """A canvas widget for plotting odometry data using matplotlib."""
-
- def __init__(self, gui_node: GuiNode, parent: Optional[QWidget] = None) -> None:
- """Initialize the PlotCanvas."""
- # Set up the 3D plot
- self.gui_node = gui_node
- self.fig = plt.figure()
- self.ax = self.fig.add_subplot(111, projection='3d')
-
- # Initialize a red dot for the current position
- (self.current_position_dot,) = self.ax.plot([], [], [], 'ro')
-
- super().__init__(self.fig)
- self.setParent(parent)
-
- # Set labels and title for the 3D plot
- self.ax.set_xlabel("X")
- self.ax.set_ylabel("Y")
- self.ax.set_zlabel("Z")
- self.ax.set_title("Position")
-
- # Initialize data lists for 3D plot
- self.x_data: List[float] = []
- self.y_data: List[float] = []
- self.z_data: List[float] = []
- (self.line,) = self.ax.plot([], [], [], 'b-')
-
- def update_plot(
- self, x_data: List[float], y_data: List[float], z_data: List[float]
- ) -> None:
- """Update the 3D plot with the latest odometry data."""
- # Convert lists to numpy arrays to ensure compatibility with the plot functions
- x_data = np.array(x_data, dtype=float)
- y_data = np.array(y_data, dtype=float)
- z_data = np.array(z_data, dtype=float)
-
- # Check if the arrays are non-empty before updating the plot
- if len(x_data) > 0 and len(y_data) > 0 and len(z_data) > 0:
- self.line.set_data(x_data, y_data)
- self.line.set_3d_properties(z_data)
-
- # Update the current position dot
- self.current_position_dot.set_data(x_data[-1:], y_data[-1:])
- self.current_position_dot.set_3d_properties(z_data[-1:])
-
- # Update the limits for the 3D plot around the latest data point
- x_latest = x_data[-1]
- y_latest = y_data[-1]
- z_latest = z_data[-1]
- margin = 2.5 # Define a margin around the latest point
-
- self.ax.set_xlim(x_latest - margin, x_latest + margin)
- self.ax.set_ylim(y_latest - margin, y_latest + margin)
- self.ax.set_zlim(z_latest - margin, z_latest + margin)
-
- self.fig.canvas.draw()
- self.fig.canvas.flush_events()
-
-
-def run_ros_node(ros_node: GuiNode, executor: MultiThreadedExecutor) -> None:
- """Run the ROS2 node in a separate thread using a MultiThreadedExecutor."""
- rclpy.spin(ros_node, executor)
-
-
-def main(args: Optional[List[str]] = None) -> None:
- """The main function to initialize ROS2 and the GUI application."""
- rclpy.init(args=args)
- ros_node = GuiNode()
- executor = MultiThreadedExecutor()
-
- # Run ROS in a separate thread
- ros_thread = Thread(target=run_ros_node, args=(ros_node, executor), daemon=True)
- ros_thread.start()
-
- # Setup the PyQt5 application and window
- app = QApplication(sys.argv)
- gui = QMainWindow()
- gui.setWindowTitle("Vortex GUI")
- gui.setGeometry(100, 100, 600, 400)
-
- # Create the tab widget
- tabs = QTabWidget()
- tabs.setTabPosition(QTabWidget.TabPosition.North)
- tabs.setMovable(True)
-
- # --- Position Tab ---
- position_widget = QWidget()
- layout = QGridLayout(position_widget) # grid layout
-
- plot_canvas = PlotCanvas(ros_node, position_widget)
- layout.addWidget(plot_canvas, 0, 0)
-
- current_pos = QLabel(parent=position_widget)
- layout.addWidget(current_pos, 0, 1)
-
- tabs.addTab(position_widget, "Position")
-
- # --- Internal Status Tab ---
- internal_widget = QWidget()
- internal_layout = QVBoxLayout(internal_widget)
-
- internal_status_label = QLabel(parent=internal_widget)
- internal_layout.addWidget(internal_status_label)
- tabs.addTab(internal_widget, "Internal")
-
- gui.setCentralWidget(tabs)
- gui.showMaximized()
-
- # Use a QTimer to update plot, current position, and internal status in the main thread
- def update_gui() -> None:
- plot_canvas.update_plot(
- ros_node.xpos_data, ros_node.ypos_data, ros_node.zpos_data
- )
- if len(ros_node.xpos_data) > 0 and ros_node.roll is not None:
- position_text = f"Current Position:\nX: {ros_node.xpos_data[-1]:.2f}\nY: {ros_node.ypos_data[-1]:.2f}\nZ: {ros_node.zpos_data[-1]:.2f}"
- orientation_text = f"Current Orientation:\nRoll: {ros_node.roll:.2f}\nPitch: {ros_node.pitch:.2f}\nYaw: {ros_node.yaw:.2f}"
- current_pos.setText(position_text + "\n\n\n" + orientation_text)
-
- # Update internal status
- internal_status_label.setText(
- f"Internal Status:\n"
- f"Current: {ros_node.current:.2f}\n"
- f"Voltage: {ros_node.voltage:.2f}\n"
- f"Temperature: {ros_node.temperature:.2f}\n"
- f"Pressure: {ros_node.pressure:.2f}"
- )
-
- # Set up the timer to call update_gui every 100ms
- timer = QTimer()
- timer.timeout.connect(update_gui)
- timer.start(100)
-
- app.exec()
-
- ros_node.destroy_node()
- rclpy.shutdown()
- sys.exit()
-
-
-if __name__ == '__main__':
- main()
diff --git a/mission/gui_auv/config/gui_params.yaml b/mission/gui_auv/config/gui_params.yaml
deleted file mode 100644
index a490595f0..000000000
--- a/mission/gui_auv/config/gui_params.yaml
+++ /dev/null
@@ -1,9 +0,0 @@
-/**:
- ros__parameters:
- gui_auv:
- odom_topic: "/nucleus/odom"
- current_topic: "/auv/power_sense_module/current"
- voltage_topic: "/auv/power_sense_module/voltage"
- temperature_topic: "/auv/temperature"
- pressure_topic: "/auv/pressure"
- history_length: 30 # seconds
diff --git a/mission/gui_auv/package.xml b/mission/gui_auv/package.xml
deleted file mode 100644
index dd89ae4cd..000000000
--- a/mission/gui_auv/package.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-
-
-
- auv-gui
- 0.0.0
- AUV GUI for ROS2
- sondre
- MIT
-
- python3-pytest
-
- rclpy
- geometry_msgs
- sensor_msgs
-
-
- ament_python
-
-
diff --git a/mission/gui_auv/setup.py b/mission/gui_auv/setup.py
deleted file mode 100644
index de9acfda1..000000000
--- a/mission/gui_auv/setup.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from setuptools import setup
-
-PACKAGE_NAME = 'auv_gui'
-
-setup(
- name=PACKAGE_NAME,
- version='0.0.0',
- packages=[PACKAGE_NAME],
- install_requires=['setuptools'],
- zip_safe=True,
- maintainer='sondre',
- maintainer_email='sondre95556888@gmail.com',
- description='AUV GUI for ROS2',
- license='MIT',
- tests_require=['pytest'],
- entry_points={
- 'console_scripts': [
- 'auv-gui = auv_gui.auv_gui:main',
- ],
- },
-)
diff --git a/requirements.sh b/requirements.sh
new file mode 100644
index 000000000..150ddec91
--- /dev/null
+++ b/requirements.sh
@@ -0,0 +1 @@
+pip install pyqt6
diff --git a/ros2.repos b/ros2.repos
index 32f842011..de8a9471e 100644
--- a/ros2.repos
+++ b/ros2.repos
@@ -2,3 +2,4 @@ repositories:
vortex_msgs:
type: git
url: https://github.com/vortexntnu/vortex-msgs.git
+ version: develop