diff --git a/.vscode/settings.json b/.vscode/settings.json
index ad9463f11..aac9f7c1b 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -215,7 +215,8 @@
"regex": "cpp",
"future": "cpp",
"*.ipp": "cpp",
- "span": "cpp"
+ "span": "cpp",
+ "ranges": "cpp"
},
// Tell the ROS extension where to find the setup.bash
// This also utilizes the COLCON_WS environment variable, which needs to be set
diff --git a/bitbots_behavior/bitbots_blackboard/CMakeLists.txt b/bitbots_behavior/bitbots_blackboard/CMakeLists.txt
index be7e5dad5..c363d76a8 100644
--- a/bitbots_behavior/bitbots_blackboard/CMakeLists.txt
+++ b/bitbots_behavior/bitbots_blackboard/CMakeLists.txt
@@ -6,17 +6,17 @@ if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()
-find_package(bio_ik_msgs REQUIRED)
find_package(ament_cmake REQUIRED)
-find_package(sensor_msgs REQUIRED)
-find_package(rclpy REQUIRED)
-find_package(tf2 REQUIRED)
+find_package(bio_ik_msgs REQUIRED)
+find_package(bitbots_docs REQUIRED)
find_package(bitbots_msgs REQUIRED)
+find_package(geometry_msgs REQUIRED)
+find_package(rclpy REQUIRED)
+find_package(sensor_msgs REQUIRED)
find_package(std_msgs REQUIRED)
-find_package(tf2_geometry_msgs REQUIRED)
find_package(std_srvs REQUIRED)
-find_package(geometry_msgs REQUIRED)
-find_package(bitbots_docs REQUIRED)
+find_package(tf2 REQUIRED)
+find_package(tf2_geometry_msgs REQUIRED)
set(INCLUDE_DIRS
${bio_ik_msgs_INCLUDE_DIRS}
@@ -76,6 +76,11 @@ ament_export_dependencies(geometry_msgs)
ament_export_dependencies(bitbots_docs)
ament_export_include_directories(${INCLUDE_DIRS})
+if(BUILD_TESTING)
+ find_package(ament_cmake_mypy REQUIRED)
+ ament_mypy(CONFIG_FILE "${CMAKE_CURRENT_LIST_DIR}/mypy.ini")
+endif()
+
ament_python_install_package(${PROJECT_NAME})
ament_package()
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py
index 813f86efe..436f060f9 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py
@@ -13,7 +13,7 @@
class BodyBlackboard:
- def __init__(self, node: Node, tf_buffer: tf2.Buffer):
+ def __init__(self, node: Node, tf_buffer: tf2.BufferInterface):
# References
self.node = node
self.tf_buffer = tf_buffer
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py
index 20c418e7c..62d4cd368 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py
@@ -1,5 +1,6 @@
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING
+from bitbots_utils.utils import nobeartype
from rclpy.node import Node
if TYPE_CHECKING:
@@ -9,6 +10,7 @@
class AbstractBlackboardCapsule:
"""Abstract class for blackboard capsules."""
- def __init__(self, node: Node, blackboard: Optional["BodyBlackboard"] = None):
+ @nobeartype
+ def __init__(self, node: Node, blackboard: "BodyBlackboard"):
self._node = node
self._blackboard = blackboard
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py
index bf51664f6..466381dc0 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py
@@ -53,7 +53,7 @@ def __init__(self, node, blackboard):
self.calc_base_costmap()
self.calc_gradients()
- def robot_callback(self, msg: RobotArray):
+ def robot_callback(self, msg: RobotArray) -> None:
"""
Callback with new robot detections
"""
@@ -77,10 +77,11 @@ def robot_callback(self, msg: RobotArray):
# Publish debug costmap
self.publish_costmap()
- def publish_costmap(self):
+ def publish_costmap(self) -> None:
"""
Publishes the costmap for rviz
"""
+ assert self.costmap is not None, "Costmap is not initialized"
# Normalize costmap to match the rviz color scheme in a good way
normalized_costmap = (
(255 - ((self.costmap - np.min(self.costmap)) / (np.max(self.costmap) - np.min(self.costmap))) * 255 / 2.1)
@@ -131,7 +132,7 @@ def get_pass_regions(self) -> np.ndarray:
# Smooth obstacle map
return gaussian_filter(costmap, pass_smooth)
- def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
+ def field_2_costmap_coord(self, x: float, y: float) -> Tuple[int, int]:
"""
Converts a field position to the corresponding indices for the costmap.
@@ -153,10 +154,11 @@ def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
)
return idx_x, idx_y
- def calc_gradients(self):
+ def calc_gradients(self) -> None:
"""
Recalculates the gradient map based on the current costmap.
"""
+ assert self.base_costmap is not None, "Base costmap is not initialized"
gradient = np.gradient(self.base_costmap)
norms = np.linalg.norm(gradient, axis=0)
@@ -186,7 +188,7 @@ def cost_at_relative_xy(self, x: float, y: float) -> float:
return self.get_cost_at_field_position(point.point.x, point.point.y)
- def calc_base_costmap(self):
+ def calc_base_costmap(self) -> None:
"""
Builds the base costmap based on the behavior parameters.
This costmap includes a gradient towards the enemy goal and high costs outside the playable area
@@ -203,8 +205,8 @@ def calc_base_costmap(self):
# Create Grid
grid_x, grid_y = np.mgrid[
- 0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j,
- 0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j,
+ 0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j, # type: ignore[misc]
+ 0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j, # type: ignore[misc]
]
fix_points: List[Tuple[Tuple[float, float], float]] = []
@@ -278,7 +280,8 @@ def calc_base_costmap(self):
)
# Smooth the costmap to get more continuous gradients
- self.base_costmap = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
+ base_costmap: np.ndarray = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
+ self.base_costmap = base_costmap
self.costmap = self.base_costmap.copy()
def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, float]:
@@ -287,6 +290,7 @@ def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, flo
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
+ assert self.gradient_map is not None, "Gradient map is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return -self.gradient_map[0][idx_x, idx_y], -self.gradient_map[1][idx_x, idx_y]
@@ -296,10 +300,11 @@ def get_cost_at_field_position(self, x: float, y: float) -> float:
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
+ assert self.costmap is not None, "Costmap is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return self.costmap[idx_x, idx_y]
- def get_gradient_direction_at_field_position(self, x: float, y: float):
+ def get_gradient_direction_at_field_position(self, x: float, y: float) -> float:
"""
Returns the gradient direction at the given position
:param x: Field coordinate in the x direction
@@ -318,7 +323,9 @@ def get_gradient_direction_at_field_position(self, x: float, y: float):
grad = self.get_gradient_at_field_position(x, y)
return math.atan2(grad[1], grad[0])
- def get_cost_of_kick_relative(self, x: float, y: float, direction: float, kick_length: float, angular_range: float):
+ def get_cost_of_kick_relative(
+ self, x: float, y: float, direction: float, kick_length: float, angular_range: float
+ ) -> float:
"""
Returns the cost of a kick at the given position and direction in base footprint frame
:param x: Field coordinate in the x direction
@@ -356,6 +363,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
:param kick_length: The length of the kick
:param angular_range: The angular range of the kick
"""
+ assert self.costmap is not None, "Costmap is not initialized"
# create a mask in the size of the costmap consisting of 8-bit values initialized as 0
mask = Image.new("L", (self.costmap.shape[1], self.costmap.shape[0]))
@@ -386,7 +394,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
# This should contribute way less than the max and should have an impact if the max values are similar in all directions.
return masked_costmap.max() * 0.75 + masked_costmap.min() * 0.25
- def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float):
+ def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float) -> float:
"""
Returns the cost of the kick at the current position
:param direction: The direction of the kick
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
index 58cbcab5a..0f52543a6 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
@@ -1,3 +1,5 @@
+from typing import Optional
+
from bitbots_utils.utils import get_parameters_from_other_node
from game_controller_hl_interfaces.msg import GameState
@@ -9,73 +11,63 @@ class GameStatusCapsule(AbstractBlackboardCapsule):
def __init__(self, node, blackboard=None):
super().__init__(node, blackboard)
- self.team_id = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
+ self.team_id: int = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.gamestate = GameState()
- self.last_update = 0
- self.unpenalized_time = 0
- self.last_goal_from_us_time = -86400
- self.last_goal_time = -86400
- self.free_kick_kickoff_team = None
-
- def is_game_state_equals(self, value):
- assert value in [
- GameState.GAMESTATE_PLAYING,
- GameState.GAMESTATE_FINISHED,
- GameState.GAMESTATE_INITIAL,
- GameState.GAMESTATE_READY,
- GameState.GAMESTATE_SET,
- ]
- return value == self.get_gamestate()
-
- def get_gamestate(self):
+ self.last_update: float = 0.0
+ self.unpenalized_time: float = 0.0
+ self.last_goal_from_us_time = -86400.0
+ self.last_goal_time = -86400.0
+ self.free_kick_kickoff_team: Optional[bool] = None
+
+ def get_gamestate(self) -> int:
return self.gamestate.game_state
- def get_secondary_state(self):
+ def get_secondary_state(self) -> int:
return self.gamestate.secondary_state
- def get_secondary_state_mode(self):
+ def get_secondary_state_mode(self) -> int:
return self.gamestate.secondary_state_mode
- def get_secondary_team(self):
+ def get_secondary_team(self) -> int:
return self.gamestate.secondary_state_team
- def has_kickoff(self):
+ def has_kickoff(self) -> bool:
return self.gamestate.has_kick_off
- def has_penalty_kick(self):
+ def has_penalty_kick(self) -> bool:
return (
self.gamestate.secondary_state == GameState.STATE_PENALTYKICK
or self.gamestate.secondary_state == GameState.STATE_PENALTYSHOOT
) and self.gamestate._secondary_state_team == self.team_id
- def get_own_goals(self):
+ def get_our_goals(self) -> int:
return self.gamestate.own_score
- def get_opp_goals(self):
+ def get_opp_goals(self) -> int:
return self.gamestate.rival_score
- def get_seconds_since_own_goal(self):
+ def get_seconds_since_own_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_from_us_time
- def get_seconds_since_any_goal(self):
+ def get_seconds_since_any_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_time
- def get_seconds_remaining(self):
+ def get_seconds_remaining(self) -> float:
# Time from the message minus time passed since receiving it
return max(
- self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0
+ self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0.0
)
- def get_secondary_seconds_remaining(self):
+ def get_secondary_seconds_remaining(self) -> float:
"""Seconds remaining for things like kickoff"""
# Time from the message minus time passed since receiving it
return max(
self.gamestate.secondary_seconds_remaining
- (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update),
- 0,
+ 0.0,
)
- def get_seconds_since_last_drop_ball(self):
+ def get_seconds_since_last_drop_ball(self) -> Optional[float]:
"""Returns the seconds since the last drop in"""
if self.gamestate.drop_in_time == -1:
return None
@@ -83,22 +75,22 @@ def get_seconds_since_last_drop_ball(self):
# Time from the message plus seconds passed since receiving it
return self.gamestate.drop_in_time + (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update)
- def get_seconds_since_unpenalized(self):
+ def get_seconds_since_unpenalized(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.unpenalized_time
- def get_is_penalized(self):
+ def get_is_penalized(self) -> bool:
return self.gamestate.penalized
- def received_gamestate(self):
- return self.last_update != 0
+ def received_gamestate(self) -> bool:
+ return self.last_update != 0.0
- def get_team_id(self):
+ def get_team_id(self) -> int:
return self.team_id
- def get_red_cards(self):
+ def get_red_cards(self) -> int:
return self.gamestate.team_mates_with_red_card
- def gamestate_callback(self, gamestate_msg: GameState):
+ def gamestate_callback(self, gamestate_msg: GameState) -> None:
if self.gamestate.penalized and not gamestate_msg.penalized:
self.unpenalized_time = self._node.get_clock().now().nanoseconds / 1e9
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py
index 6d9eaf628..6ed79c92b 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py
@@ -23,7 +23,7 @@ class KickCapsule(AbstractBlackboardCapsule):
is_currently_kicking: bool = False
__connected: bool = False
- __action_client: ActionClient = None
+ __action_client: Optional[ActionClient] = None
class WalkKickTargets(Flag):
"""
@@ -43,14 +43,14 @@ def __init__(self, node, blackboard):
self.walk_kick_pub = self._node.create_publisher(Bool, "/kick", 1)
# self.connect_dynamic_kick() Do not connect if dynamic_kick is disabled
- def walk_kick(self, target: WalkKickTargets):
+ def walk_kick(self, target: WalkKickTargets) -> None:
"""
Kick the ball while walking
:param target: Target for the walk kick (e.g. left or right foot)
"""
self.walk_kick_pub.publish(Bool(data=target.value))
- def connect_dynamic_kick(self):
+ def connect_dynamic_kick(self) -> None:
topic = self._blackboard.config["dynamic_kick"]["topic"]
self.__action_client = ActionClient(self._node, Kick, topic, callback_group=ReentrantCallbackGroup())
self.__connected = self.__action_client.wait_for_server(self._blackboard.config["dynamic_kick"]["wait_time"])
@@ -58,7 +58,7 @@ def connect_dynamic_kick(self):
if not self.__connected:
self._node.get_logger().error(f"No dynamic_kick server running on {topic}")
- def dynamic_kick(self, goal: Kick.Goal):
+ def dynamic_kick(self, goal: Kick.Goal) -> None:
"""
:param goal: Goal to kick to
:type goal: KickGoal
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py
index 490eee491..90107d664 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py
@@ -1,4 +1,4 @@
-from typing import Optional
+from typing import Literal, Optional, TypeAlias
from bitbots_utils.utils import get_parameters_from_other_node
from rclpy.duration import Duration
@@ -7,6 +7,15 @@
from bitbots_blackboard.capsules import AbstractBlackboardCapsule
from bitbots_msgs.msg import Audio, HeadMode, RobotControlState
+THeadMode: TypeAlias = Literal[ # type: ignore[valid-type]
+ HeadMode.BALL_MODE,
+ HeadMode.FIELD_FEATURES,
+ HeadMode.LOOK_FORWARD,
+ HeadMode.DONT_MOVE,
+ HeadMode.BALL_MODE_PENALTY,
+ HeadMode.LOOK_FRONT,
+]
+
class MiscCapsule(AbstractBlackboardCapsule):
"""Capsule for miscellaneous functions."""
@@ -33,7 +42,10 @@ def __init__(self, node, blackboard):
# ## Tracking Part ##
#####################
- def set_head_duty(self, head_duty: int):
+ def set_head_duty(
+ self,
+ head_duty: THeadMode,
+ ) -> None:
head_duty_msg = HeadMode()
head_duty_msg.head_mode = head_duty
self.head_pub.publish(head_duty_msg)
@@ -42,7 +54,7 @@ def set_head_duty(self, head_duty: int):
# ## Robot state ##
###################
- def robot_state_callback(self, msg: RobotControlState):
+ def robot_state_callback(self, msg: RobotControlState) -> None:
self.robot_control_state = msg
def is_currently_walking(self) -> bool:
@@ -52,7 +64,7 @@ def is_currently_walking(self) -> bool:
# ## Timer ##
#############
- def start_timer(self, timer_name: str, duration_secs: int):
+ def start_timer(self, timer_name: str, duration_secs: int) -> None:
"""
Starts a timer
:param timer_name: Name of the timer
@@ -61,7 +73,7 @@ def start_timer(self, timer_name: str, duration_secs: int):
"""
self.timers[timer_name] = self._node.get_clock().now() + Duration(seconds=duration_secs)
- def end_timer(self, timer_name: str):
+ def end_timer(self, timer_name: str) -> None:
"""
Ends a timer
:param timer_name: Name of the timer
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py
index a621bf0a3..30754da64 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py
@@ -15,7 +15,7 @@
# Type of pathfinding goal relative to the ball
-class BallGoalType(Enum):
+class BallGoalType(str, Enum):
GRADIENT = "gradient"
MAP = "map"
CLOSE = "close"
@@ -26,8 +26,8 @@ class PathfindingCapsule(AbstractBlackboardCapsule):
def __init__(self, node, blackboard):
super().__init__(node, blackboard)
- self.position_threshold: str = self._node.get_parameter("pathfinding_position_threshold").value
- self.orientation_threshold: str = self._node.get_parameter("pathfinding_orientation_threshold").value
+ self.position_threshold: float = self._node.get_parameter("pathfinding_position_threshold").value
+ self.orientation_threshold: float = self._node.get_parameter("pathfinding_orientation_threshold").value
self.direct_cmd_vel_pub = self._node.create_publisher(Twist, "cmd_vel", 1)
self.pathfinding_pub = self._node.create_publisher(PoseStamped, "goal_pose", 1)
@@ -41,20 +41,20 @@ def __init__(self, node, blackboard):
self._node, "bitbots_path_planning", ["controller.orient_to_goal_distance"]
)["controller.orient_to_goal_distance"]
- def publish(self, msg: PoseStamped):
+ def publish(self, msg: PoseStamped) -> None:
"""
Sends a goal to the pathfinding.
"""
self.goal = msg
self.pathfinding_pub.publish(msg)
- def get_goal(self) -> PoseStamped:
+ def get_goal(self) -> Optional[PoseStamped]:
"""
Returns the latest goal that was send to the pathfinding.
"""
return self.goal
- def cancel_goal(self):
+ def cancel_goal(self) -> None:
"""
This function cancels the current goal of the pathfinding,
which will stop sending cmd_vel messages to the walking.
@@ -62,7 +62,7 @@ def cancel_goal(self):
"""
self.pathfinding_cancel_pub.publish(Empty())
- def cmd_vel_cb(self, msg: Twist):
+ def cmd_vel_cb(self, msg: Twist) -> None:
self.current_cmd_vel = msg
def get_current_cmd_vel(self) -> Twist:
@@ -73,7 +73,7 @@ def get_current_cmd_vel(self) -> Twist:
"""
return self.current_cmd_vel
- def stop_walk(self):
+ def stop_walk(self) -> None:
"""
This function stops the walking. It does not cancel the current goal of the
pathfinding and the walking will start again if the pathfinding sends a new message.
@@ -86,7 +86,7 @@ def stop_walk(self):
# Publish the stop command
self.direct_cmd_vel_pub.publish(msg)
- def calculate_time_to_ball(self):
+ def calculate_time_to_ball(self) -> None:
"""
Calculates the time to ball and saves it in the team data capsule.
"""
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py
index e22dafd38..3fea4f71f 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py
@@ -1,4 +1,4 @@
-from typing import Dict, List, Optional, Tuple
+from typing import List, Literal, Optional, Tuple
import numpy as np
from bitbots_utils.utils import get_parameters_from_other_node
@@ -27,7 +27,7 @@ def __init__(self, node, blackboard):
# Data
# indexed with one to match robot ids
- self.team_data: Dict[TeamData] = {}
+ self.team_data: dict[int, TeamData] = {}
for i in range(1, 7):
self.team_data[i] = TeamData()
self.times_to_ball = dict()
@@ -65,8 +65,8 @@ def __init__(self, node, blackboard):
self.action_update: float = 0.0
# Config
- self.data_timeout: float = self._node.get_parameter("team_data_timeout").value
- self.ball_max_covariance: float = self._node.get_parameter("ball_max_covariance").value
+ self.data_timeout: float = float(self._node.get_parameter("team_data_timeout").value)
+ self.ball_max_covariance: float = float(self._node.get_parameter("ball_max_covariance").value)
def is_valid(self, data: TeamData) -> bool:
"""
@@ -78,7 +78,7 @@ def is_valid(self, data: TeamData) -> bool:
and data.state != TeamData.STATE_PENALIZED
)
- def is_goalie_handling_ball(self):
+ def is_goalie_handling_ball(self) -> bool:
"""Returns true if the goalie is going to the ball."""
data: TeamData
for data in self.team_data.values():
@@ -90,7 +90,7 @@ def is_goalie_handling_ball(self):
return True
return False
- def is_team_mate_kicking(self):
+ def is_team_mate_kicking(self) -> bool:
"""Returns true if one of the players in the own team is kicking."""
data: TeamData
for data in self.team_data.values():
@@ -98,7 +98,9 @@ def is_team_mate_kicking(self):
return True
return False
- def team_rank_to_ball(self, own_ball_distance: float, count_goalies: bool = True, use_time_to_ball: bool = False):
+ def team_rank_to_ball(
+ self, own_ball_distance: float, count_goalies: bool = True, use_time_to_ball: bool = False
+ ) -> int:
"""
Returns the rank of this robot compared to the team robots concerning ball distance.
@@ -132,7 +134,7 @@ def team_rank_to_ball(self, own_ball_distance: float, count_goalies: bool = True
return rank + 1
return len(distances) + 1
- def set_action(self, action: int):
+ def set_action(self, action: int) -> None:
"""Set the action of this robot
:param action: An action from bitbots_msgs/Strategy"""
@@ -143,14 +145,11 @@ def set_action(self, action: int):
def get_action(self) -> Tuple[int, float]:
return self.strategy.action, self.action_update
- def set_role(self, role: str):
+ def set_role(self, role: Literal["goalie", "offense", "defense"]) -> None:
"""Set the role of this robot in the team
- :param role: String describing the role, possible values are:
- ['goalie', 'offense', 'defense']
+ :param role: String describing the role.
"""
- assert role in ["goalie", "offense", "defense"]
-
self.role = role
self.strategy.role = self.roles_mapping[role]
self.role_update = self._node.get_clock().now().nanoseconds / 1e9
@@ -158,8 +157,10 @@ def set_role(self, role: str):
def get_role(self) -> Tuple[int, float]:
return self.strategy.role, self.role_update
- def set_kickoff_strategy(self, strategy: int):
- assert strategy in [Strategy.SIDE_LEFT, Strategy.SIDE_MIDDLE, Strategy.SIDE_RIGHT]
+ def set_kickoff_strategy(
+ self,
+ strategy: Literal[Strategy.SIDE_LEFT, Strategy.SIDE_MIDDLE, Strategy.SIDE_RIGHT], # type: ignore[valid-type]
+ ) -> None:
self.strategy.offensive_side = strategy
self.strategy_update = self._node.get_clock().now().nanoseconds / 1e9
@@ -184,7 +185,7 @@ def is_not_goalie(team_data: TeamData) -> bool:
# Remove goalie data if needed
if not count_goalie:
- team_data_infos = filter(is_not_goalie, team_data_infos)
+ team_data_infos = filter(is_not_goalie, team_data_infos) # type: ignore[assignment]
# Count valid team data infos (aka robots with valid team data)
return sum(map(self.is_valid, team_data_infos))
@@ -194,10 +195,10 @@ def is_a_goalie(team_data: TeamData) -> bool:
return team_data.strategy.role == Strategy.ROLE_GOALIE
# Get the team data infos for all robots (ignoring the robot id/name)
- team_data_infos = self.team_data.values()
+ team_data_infos = self.team_data.values() # type: ignore[assignment]
# Remove none goalie Data
- team_data_infos = filter(is_a_goalie, team_data_infos)
+ team_data_infos = filter(is_a_goalie, team_data_infos) # type: ignore[assignment]
# Count valid team data infos (aka robots with valid team data)
return sum(map(self.is_valid, team_data_infos)) == 1
@@ -209,14 +210,14 @@ def team_data_callback(self, msg: TeamData):
# Save team data
self.team_data[msg.robot_id] = msg
- def publish_strategy(self):
+ def publish_strategy(self) -> None:
"""Publish for team comm"""
self.strategy_sender.publish(self.strategy)
def publish_time_to_ball(self):
self.time_to_ball_publisher.publish(Float32(data=self.own_time_to_ball))
- def teammate_ball_is_valid(self):
+ def teammate_ball_is_valid(self) -> bool:
"""Returns true if a teammate has seen the ball accurately enough"""
return self.get_teammate_ball() is not None
@@ -227,7 +228,7 @@ def get_teammate_ball(self) -> Optional[PointStamped]:
team_data_infos = filter(self.is_valid, self.team_data.values())
def is_ball_good_enough(team_data: TeamData) -> bool:
- return (
+ return bool(
team_data.ball_absolute.covariance[0] < self.ball_max_covariance
and team_data.ball_absolute.covariance[7] < self.ball_max_covariance
)
@@ -246,3 +247,4 @@ def get_ball_max_covariance(team_data: TeamData) -> float:
return PointStamped(
header=team_data_with_best_ball.header, point=team_data_with_best_ball.ball_absolute.pose.position
)
+ return None
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py
index 47c0ef3d8..6f382ee8c 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py
@@ -1,5 +1,5 @@
import math
-from typing import Optional, Tuple
+from typing import Tuple
import numpy as np
import tf2_ros as tf2
@@ -22,6 +22,18 @@
from bitbots_blackboard.capsules import AbstractBlackboardCapsule
+class WorldModelTFError(Exception):
+ ...
+
+
+class WorldModelPositionTFError(WorldModelTFError):
+ ...
+
+
+class WorldModelBallTFError(WorldModelTFError):
+ ...
+
+
class WorldModelCapsule(AbstractBlackboardCapsule):
"""Provides information about the world model."""
@@ -118,7 +130,7 @@ def get_ball_position_uv(self) -> Tuple[float, float]:
except tf2.ExtrapolationException as e:
self._node.get_logger().warn(str(e))
self._node.get_logger().error("Severe transformation problem concerning the ball!")
- return None
+ raise WorldModelBallTFError("Could not transform ball to base_footprint frame") from e
return ball_bfp.x, ball_bfp.y
def get_ball_distance(self) -> float:
@@ -171,38 +183,38 @@ def forget_ball(self) -> None:
# Goal #
########
- def get_map_based_opp_goal_center_uv(self):
+ def get_map_based_opp_goal_center_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_uv_from_xy(x, y)
- def get_map_based_opp_goal_center_xy(self):
+ def get_map_based_opp_goal_center_xy(self) -> Tuple[float, float]:
return self.field_length / 2, 0.0
- def get_map_based_own_goal_center_uv(self):
+ def get_map_based_own_goal_center_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_own_goal_center_xy()
return self.get_uv_from_xy(x, y)
- def get_map_based_own_goal_center_xy(self):
+ def get_map_based_own_goal_center_xy(self) -> Tuple[float, float]:
return -self.field_length / 2, 0.0
- def get_map_based_opp_goal_angle_from_ball(self):
+ def get_map_based_opp_goal_angle_from_ball(self) -> float:
ball_x, ball_y = self.get_ball_position_xy()
goal_x, goal_y = self.get_map_based_opp_goal_center_xy()
return math.atan2(goal_y - ball_y, goal_x - ball_x)
- def get_map_based_opp_goal_distance(self):
+ def get_map_based_opp_goal_distance(self) -> float:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_distance_to_xy(x, y)
- def get_map_based_opp_goal_angle(self):
+ def get_map_based_opp_goal_angle(self) -> float:
x, y = self.get_map_based_opp_goal_center_uv()
return math.atan2(y, x)
- def get_map_based_opp_goal_left_post_uv(self):
+ def get_map_based_opp_goal_left_post_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_uv_from_xy(x, y - self.goal_width / 2)
- def get_map_based_opp_goal_right_post_uv(self):
+ def get_map_based_opp_goal_right_post_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_uv_from_xy(x, y + self.goal_width / 2)
@@ -210,23 +222,21 @@ def get_map_based_opp_goal_right_post_uv(self):
# Pose #
########
- def get_current_position(self) -> Optional[Tuple[float, float, float]]:
+ def get_current_position(self) -> Tuple[float, float, float]:
"""
Returns the current position on the field as determined by the localization.
0,0,0 is the center of the field looking in the direction of the opponent goal.
:returns x,y,theta:
"""
- if not (transform := self.get_current_position_transform()):
- return None
+ transform = self.get_current_position_transform()
theta = euler_from_quaternion(numpify(transform.transform.rotation))[2]
return transform.transform.translation.x, transform.transform.translation.y, theta
- def get_current_position_pose_stamped(self) -> Optional[PoseStamped]:
+ def get_current_position_pose_stamped(self) -> PoseStamped:
"""
Returns the current position as determined by the localization as a PoseStamped
"""
- if not (transform := self.get_current_position_transform()):
- return None
+ transform = self.get_current_position_transform()
return PoseStamped(
header=transform.header,
pose=Pose(
@@ -245,13 +255,13 @@ def get_current_position_transform(self) -> TransformStamped:
)
except (tf2.LookupException, tf2.ConnectivityException, tf2.ExtrapolationException) as e:
self._node.get_logger().warn(str(e))
- return None
+ raise WorldModelPositionTFError("Could not get current position transform") from e
##########
# Common #
##########
- def get_uv_from_xy(self, x, y) -> Tuple[float, float]:
+ def get_uv_from_xy(self, x: float, y: float) -> Tuple[float, float]:
"""Returns the relativ positions of the robot to this absolute position"""
current_position = self.get_current_position()
x2 = x - current_position[0]
@@ -261,14 +271,14 @@ def get_uv_from_xy(self, x, y) -> Tuple[float, float]:
v = math.cos(theta) * y2 - math.sin(theta) * x2
return u, v
- def get_xy_from_uv(self, u, v):
+ def get_xy_from_uv(self, u: float, v: float) -> Tuple[float, float]:
"""Returns the absolute position from the given relative position to the robot"""
pos_x, pos_y, theta = self.get_current_position()
angle = math.atan2(v, u) + theta
hypotenuse = math.hypot(u, v)
return pos_x + math.sin(angle) * hypotenuse, pos_y + math.cos(angle) * hypotenuse
- def get_distance_to_xy(self, x, y):
+ def get_distance_to_xy(self, x: float, y: float) -> float:
"""Returns distance from robot to given position"""
u, v = self.get_uv_from_xy(x, y)
dist = math.hypot(u, v)
diff --git a/bitbots_behavior/bitbots_blackboard/mypy.ini b/bitbots_behavior/bitbots_blackboard/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_behavior/bitbots_blackboard/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_behavior/bitbots_blackboard/package.xml b/bitbots_behavior/bitbots_blackboard/package.xml
index 9463c26f2..4bbed0ca3 100644
--- a/bitbots_behavior/bitbots_blackboard/package.xml
+++ b/bitbots_behavior/bitbots_blackboard/package.xml
@@ -37,6 +37,10 @@
std_srvs
tf2_geometry_msgs
tf2
+ ament_mypy
+ ament_cmake_mypy
+
+ ament_cmake
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py
index e69de29bb..2b0d3a359 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py
@@ -0,0 +1,6 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py
index e879b7d2c..72068ebbd 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py
@@ -12,7 +12,7 @@
class GoToRelativePosition(AbstractActionElement):
blackboard: BodyBlackboard
- def __init__(self, blackboard, dsd, parameters: dict = None):
+ def __init__(self, blackboard, dsd, parameters):
super().__init__(blackboard, dsd, parameters)
self.point = float(parameters.get("x", 0)), float(parameters.get("y", 0)), float(parameters.get("t", 0))
self.threshold = float(parameters.get("threshold", 0.1))
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py
index 79786dd23..63ae26d99 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py
@@ -59,7 +59,7 @@ def __init__(self, blackboard, dsd, parameters):
self.max_speed = parameters.get("max_speed", 0.4)
# Check if the have a duration
- self.duration: Optional[float] = parameters.get("duration", None)
+ self.duration: Optional[float] = float(parameters.get("duration", None))
self.start_time = self.blackboard.node.get_clock().now()
def perform(self, reevaluate=False):
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py
index e69de29bb..2b0d3a359 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py
@@ -0,0 +1,6 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py
index c403adcef..3ae2f8854 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py
@@ -26,10 +26,6 @@ def perform(self, reevaluate=False):
# Get our position
current_pose = self.blackboard.world_model.get_current_position()
- # Check if we know our position
- if current_pose is None:
- return "NO"
-
# Get maximum kick angle relative to our base footprint
angle_difference_right = current_pose[2] - self.max_kick_angle
angle_difference_left = current_pose[2] + self.max_kick_angle
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py
index 3aa9147b8..54d02574a 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py
@@ -25,6 +25,9 @@ def perform(self, reevaluate=False):
ball_position = self.blackboard.world_model.get_ball_position_uv()
self.publish_debug_data("ball_position", {"u": ball_position[0], "v": ball_position[1]})
+ self.publish_debug_data(
+ "smoothing_close", f"{self.no_near_decisions} ({self.no_near_decisions / self.smoothing * 10}%)"
+ )
# Check if the ball is in the enter area
if 0 <= ball_position[0] <= self.kick_x_enter and 0 <= abs(ball_position[1]) <= self.kick_y_enter:
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py
index 4d3746787..ac29d5fa8 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py
@@ -9,7 +9,7 @@ def __init__(self, blackboard, dsd, parameters):
super().__init__(blackboard, dsd, parameters)
def perform(self, reevaluate=False):
- own_goals = self.blackboard.gamestate.get_own_goals()
+ own_goals = self.blackboard.gamestate.get_our_goals()
opp_goals = self.blackboard.gamestate.get_opp_goals()
if own_goals == opp_goals:
diff --git a/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml b/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml
index 4b2e179a5..cd65c1175 100644
--- a/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml
+++ b/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml
@@ -146,7 +146,7 @@ body_behavior:
dribble_ball_distance_threshold: 0.5
dribble_kick_angle: 0.6
- kick_decision_smoothing: 5.0
+ kick_decision_smoothing: 1.0
##################
# costmap params #
diff --git a/bitbots_behavior/bitbots_body_behavior/package.xml b/bitbots_behavior/bitbots_body_behavior/package.xml
index 6a12bc9ce..c0256d7dc 100644
--- a/bitbots_behavior/bitbots_body_behavior/package.xml
+++ b/bitbots_behavior/bitbots_body_behavior/package.xml
@@ -19,6 +19,7 @@
bitbots_blackboard
bitbots_msgs
+ bitbots_tts
bitbots_utils
dynamic_stack_decider
game_controller_hl_interfaces
@@ -29,6 +30,7 @@
soccer_vision_3d_msgs
tf_transformations
tf2
+ ament_mypy
python3-pytest
diff --git a/bitbots_behavior/bitbots_body_behavior/test/mypy.ini b/bitbots_behavior/bitbots_body_behavior/test/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_behavior/bitbots_body_behavior/test/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_behavior/bitbots_body_behavior/test/test_mypy.py b/bitbots_behavior/bitbots_body_behavior/test/test_mypy.py
new file mode 100644
index 000000000..3dacd6040
--- /dev/null
+++ b/bitbots_behavior/bitbots_body_behavior/test/test_mypy.py
@@ -0,0 +1,10 @@
+from pathlib import Path
+
+import pytest
+from ament_mypy.main import main
+
+
+@pytest.mark.mypy
+def test_mypy():
+ rc = main(argv=["--config", str((Path(__file__).parent / "mypy.ini").resolve())])
+ assert rc == 0, "Found code style errors / warnings"
diff --git a/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py b/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py
index 2aeb11345..3c679b86c 100755
--- a/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py
+++ b/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py
@@ -29,7 +29,7 @@
)
-def provide_config(path):
+def provide_config(path: str) -> dict:
"""
reads out the yaml you are asking for with the path parameter
:param path: filepath for your yaml
@@ -113,7 +113,7 @@ def check_new_value(new_value: str, value_type: Any, valid_options: Optional[lis
return True
-def ask_for_confirmation(question) -> bool:
+def ask_for_confirmation(question: str) -> bool:
result = None
prompt = " [Y/n] "
valid = {"": True, "y": True, "n": False}
diff --git a/bitbots_misc/bitbots_tf_buffer/CMakeLists.txt b/bitbots_misc/bitbots_tf_buffer/CMakeLists.txt
deleted file mode 100644
index b4922e8d6..000000000
--- a/bitbots_misc/bitbots_tf_buffer/CMakeLists.txt
+++ /dev/null
@@ -1,53 +0,0 @@
-cmake_minimum_required(VERSION 3.9)
-project(bitbots_tf_buffer)
-
-# Add support for C++17
-if(NOT CMAKE_CXX_STANDARD)
- set(CMAKE_CXX_STANDARD 17)
-endif()
-
-# Include CheckIPOSupported module
-include(CheckIPOSupported)
-
-# Check if IPO is supported
-check_ipo_supported(RESULT ipo_supported)
-if(ipo_supported)
- set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
- # Set the number of parallel LTO jobs
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -flto=4")
-endif()
-
-set(PYBIND11_PYTHON_VERSION 3)
-set(PYBIND11_FINDPYTHON ON)
-find_package(ament_cmake REQUIRED)
-find_package(backward_ros REQUIRED)
-find_package(pybind11 REQUIRED)
-find_package(Python3 REQUIRED COMPONENTS Interpreter Development)
-find_package(rcl REQUIRED)
-find_package(rclcpp REQUIRED)
-find_package(ros2_python_extension REQUIRED)
-find_package(tf2 REQUIRED)
-find_package(tf2_ros REQUIRED)
-
-add_compile_options(-Wall -Wno-unused)
-
-pybind11_add_module(cpp_wrapper SHARED src/bitbots_tf_buffer.cpp)
-
-ament_target_dependencies(
- cpp_wrapper
- PUBLIC
- rclcpp
- rcl
- tf2
- tf2_ros
- ros2_python_extension)
-
-target_link_libraries(cpp_wrapper PRIVATE pybind11::module)
-
-ament_python_install_package(${PROJECT_NAME})
-
-ament_get_python_install_dir(PYTHON_INSTALL_DIR)
-
-install(TARGETS cpp_wrapper DESTINATION "${PYTHON_INSTALL_DIR}/${PROJECT_NAME}")
-
-ament_package()
diff --git a/bitbots_misc/bitbots_tf_buffer/README.md b/bitbots_misc/bitbots_tf_buffer/README.md
deleted file mode 100644
index c9d7e4bf1..000000000
--- a/bitbots_misc/bitbots_tf_buffer/README.md
+++ /dev/null
@@ -1,19 +0,0 @@
-# bitbots_tf_buffer
-
-This is a nearly drop-in replacement for `tf2_ros.Buffer` in Python. It wraps a C++ node that holds the tf buffer and listener. The interface should be the same as the original `tf2_ros.Buffer`, except that we need to pass a reference to the node to the constructor.
-
-## Usage
-
-- Replace `from tf2_ros import Buffer, TransformListener` with `from bitbots_tf_buffer import
-Buffer`.
-- Remove the `TransformListener` from the code
-- Pass a reference to the node to the constructor of `Buffer`:
-
-```python
-from bitbots_tf_buffer import Buffer
-
-class MyNode(Node):
- def __init__(self):
- super().__init__('my_node')
- self.tf_buffer = Buffer(self)
-```
diff --git a/bitbots_misc/bitbots_tf_buffer/bitbots_tf_buffer/__init__.py b/bitbots_misc/bitbots_tf_buffer/bitbots_tf_buffer/__init__.py
deleted file mode 100644
index 33230d76e..000000000
--- a/bitbots_misc/bitbots_tf_buffer/bitbots_tf_buffer/__init__.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from typing import Optional
-
-import tf2_ros as tf2
-from builtin_interfaces.msg import Duration as DurationMsg
-from builtin_interfaces.msg import Time as TimeMsg
-from geometry_msgs.msg import TransformStamped
-from rclpy.duration import Duration
-from rclpy.serialization import deserialize_message, serialize_message
-from rclpy.time import Time
-
-from bitbots_tf_buffer.cpp_wrapper import Buffer as CppBuffer
-
-
-class Buffer(tf2.BufferCore, tf2.BufferInterface):
- """
- Buffer class that wraps the C++ implementation of the tf2 buffer and listener.
- It spawns a new node with the suffix "_tf" to handle the C++ side of the ROS communication.
- """
-
- def __init__(self, node, cache_time: Optional[Duration] = None, *args, **kwargs):
- if cache_time is None:
- cache_time = Duration(seconds=10.0)
-
- tf2.BufferCore.__init__(self, cache_time)
- tf2.BufferInterface.__init__(self)
-
- self._impl = CppBuffer(serialize_message(Duration.to_msg(cache_time)), node)
-
- def lookup_transform(
- self, target_frame: str, source_frame: str, time: Time, timeout: Optional[Duration] = None
- ) -> TransformStamped:
- # Handle timeout as None
- timeout = timeout or Duration()
- # Call cpp implementation
- transform_str = self._impl.lookup_transform(
- target_frame,
- source_frame,
- serialize_message(time if isinstance(time, TimeMsg) else Time.to_msg(time)),
- serialize_message(timeout if isinstance(timeout, DurationMsg) else Duration.to_msg(timeout)),
- )
- return deserialize_message(transform_str, TransformStamped)
-
- def can_transform(
- self, target_frame: str, source_frame: str, time: Time, timeout: Optional[Duration] = None
- ) -> bool:
- # Handle timeout as None
- timeout = timeout or Duration()
- # Call cpp implementation
- return self._impl.can_transform(
- target_frame,
- source_frame,
- serialize_message(time if isinstance(time, TimeMsg) else Time.to_msg(time)),
- serialize_message(timeout if isinstance(timeout, DurationMsg) else Duration.to_msg(timeout)),
- )
diff --git a/bitbots_misc/bitbots_tf_buffer/package.xml b/bitbots_misc/bitbots_tf_buffer/package.xml
deleted file mode 100644
index 941051664..000000000
--- a/bitbots_misc/bitbots_tf_buffer/package.xml
+++ /dev/null
@@ -1,20 +0,0 @@
-
-
- bitbots_tf_buffer
- 1.0.0
- Drop-in replacement for tf listener to improve performance
-
- Timon Engelke
-
- MIT
-
- ament_cmake
- pybind11_vendor
- ros2_python_extension
- tf2_ros
- tf2
-
-
- ament_cmake
-
-
diff --git a/bitbots_misc/bitbots_tf_buffer/setup.py b/bitbots_misc/bitbots_tf_buffer/setup.py
deleted file mode 100644
index 0c4c1d8ee..000000000
--- a/bitbots_misc/bitbots_tf_buffer/setup.py
+++ /dev/null
@@ -1,9 +0,0 @@
-from setuptools import setup
-
-setup(
- name="bitbots_tf_buffer",
- packages=["bitbots_tf_buffer"],
- zip_safe=True,
- keywords=["ROS"],
- license="MIT",
-)
diff --git a/bitbots_misc/bitbots_tf_buffer/src/bitbots_tf_buffer.cpp b/bitbots_misc/bitbots_tf_buffer/src/bitbots_tf_buffer.cpp
deleted file mode 100644
index ade8337c0..000000000
--- a/bitbots_misc/bitbots_tf_buffer/src/bitbots_tf_buffer.cpp
+++ /dev/null
@@ -1,115 +0,0 @@
-#include
-#include
-#include
-#include
-#include
-#include
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-
-namespace py = pybind11;
-
-class Buffer {
- public:
- Buffer(py::bytes duration_raw, py::object node) {
- // initialize rclcpp if not already done
- if (!rclcpp::contexts::get_global_default_context()->is_valid()) {
- rclcpp::init(0, nullptr);
- }
-
- // Register the tf2 exceptions, so they can be caught in Python as expected
- auto py_tf2_ros = py::module::import("tf2_ros");
- py::register_local_exception(py_tf2_ros, "LookupExceptionCpp",
- py_tf2_ros.attr("LookupException"));
- py::register_local_exception(py_tf2_ros, "ConnectivityExceptionCpp",
- py_tf2_ros.attr("ConnectivityException"));
- py::register_local_exception(py_tf2_ros, "ExtrapolationExceptionCpp",
- py_tf2_ros.attr("ExtrapolationException"));
- py::register_local_exception(py_tf2_ros, "InvalidArgumentExceptionCpp",
- py_tf2_ros.attr("InvalidArgumentException"));
- py::register_local_exception(py_tf2_ros, "TimeoutExceptionCpp",
- py_tf2_ros.attr("TimeoutException"));
-
- // get node name from python node object
- rcl_node_t *node_handle =
- static_cast(reinterpret_cast(node.attr("handle").attr("pointer").cast()));
- const char *node_name = rcl_node_get_name(node_handle);
- // create node with name _tf
- node_ = std::make_shared((std::string(node_name) + "_tf").c_str());
-
- // Get buffer duration from python duration
- auto duration =
- tf2_ros::fromMsg(ros2_python_extension::fromPython(duration_raw));
-
- // create subscribers
- buffer_ = std::make_shared(this->node_->get_clock(), duration);
- buffer_->setUsingDedicatedThread(true);
- listener_ = std::make_shared(*buffer_, node_, false);
-
- // create executor and start thread spinning the executor
- executor_ = std::make_shared();
- executor_->add_node(node_);
- thread_ = std::make_shared([this]() {
- while (rclcpp::ok()) {
- executor_->spin_once();
- }
- });
- }
-
- py::bytes lookup_transform(py::str target_frame, py::str source_frame, py::bytes time_raw, py::bytes timeout_raw) {
- // Convert python objects to C++ objects
- const std::string target_frame_str = target_frame.cast();
- const std::string source_frame_str = source_frame.cast();
- const rclcpp::Time time_msg{ros2_python_extension::fromPython(time_raw)};
- const rclcpp::Duration timeout{ros2_python_extension::fromPython(timeout_raw)};
-
- // Lookup transform
- auto transform = buffer_->lookupTransform(target_frame_str, source_frame_str, time_msg, timeout);
-
- // Convert C++ object back to python object
- return ros2_python_extension::toPython(transform);
- }
-
- bool can_transform(py::str target_frame, py::str source_frame, py::bytes time_raw, py::bytes timeout_raw) {
- // Convert python objects to C++ objects
- const std::string target_frame_str = target_frame.cast();
- const std::string source_frame_str = source_frame.cast();
- const rclcpp::Time time_msg{ros2_python_extension::fromPython(time_raw)};
- const rclcpp::Duration timeout{ros2_python_extension::fromPython(timeout_raw)};
- // Check if transform can be looked up
- return buffer_->canTransform(target_frame_str, source_frame_str, time_msg, timeout);
- }
-
- // destructor
- ~Buffer() {
- // the executor finishes when rclcpp is shutdown, so the thread can be joined
- rclcpp::shutdown();
- thread_->join();
- }
-
- private:
- rclcpp::Serialization time_serializer_;
- rclcpp::Serialization duration_serializer_;
- rclcpp::Serialization transform_serializer_;
-
- std::shared_ptr node_;
- std::shared_ptr buffer_;
- std::shared_ptr listener_;
- std::shared_ptr thread_;
- std::shared_ptr executor_;
-};
-
-PYBIND11_MODULE(cpp_wrapper, m) {
- py::class_>(m, "Buffer")
- .def(py::init())
- .def("lookup_transform", &Buffer::lookup_transform)
- .def("can_transform", &Buffer::can_transform);
-}
diff --git a/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py b/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py
+++ b/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py b/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py
+++ b/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py b/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py
index f40d482d7..92b448418 100644
--- a/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py
+++ b/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py
@@ -1,4 +1,5 @@
import math
+from typing import Literal
import numpy as np
from geometry_msgs.msg import Quaternion
@@ -37,7 +38,7 @@ def sixd2quat(sixd):
return mat2quat(mat)
-def quat2fused(q, order="wxyz"):
+def quat2fused(q, order: Literal["xyzw", "wxyz"] = "wxyz"):
# Convert to numpy array if necessary
q = np.asarray(q)
@@ -46,8 +47,6 @@ def quat2fused(q, order="wxyz"):
q_xyzw = q
elif order == "wxyz":
q_xyzw = wxyz2xyzw(q)
- else:
- raise ValueError(f"Unknown quaternion order: {order}")
# Fused yaw of Quaternion
fused_yaw = 2.0 * math.atan2(
@@ -78,7 +77,7 @@ def quat2fused(q, order="wxyz"):
# Conversion: Fused angles (3D/4D) --> Quaternion (wxyz)
-def fused2quat(fused_roll, fused_pitch, fused_yaw, hemi):
+def fused2quat(fused_roll: float, fused_pitch: float, fused_yaw: float, hemi: bool) -> np.ndarray:
# Precalculate the sine values
sth = math.sin(fused_pitch)
sphi = math.sin(fused_roll)
@@ -115,7 +114,7 @@ def fused2quat(fused_roll, fused_pitch, fused_yaw, hemi):
return np.array([chalpha * chpsi, shalpha * chgampsi, shalpha * shgampsi, chalpha * shpsi]) # Order: (w,x,y,z)
-def compute_imu_orientation_from_world(robot_quat_in_world):
+def compute_imu_orientation_from_world(robot_quat_in_world) -> tuple[tuple[float, float, float], np.ndarray]:
# imu orientation has roll and pitch relative to gravity vector. yaw in world frame
# get global yaw
yrp_world_frame = quat2euler(robot_quat_in_world, axes="szxy")
@@ -126,5 +125,5 @@ def compute_imu_orientation_from_world(robot_quat_in_world):
return [rp[0], rp[1], 0], yaw_quat
-def quat_from_yaw(yaw: float) -> Quaternion:
+def quat_from_yaw(yaw: float | int) -> Quaternion:
return msgify(Quaternion, wxyz2xyzw(euler2quat(yaw, 0, 0, axes="szxy")))
diff --git a/bitbots_misc/bitbots_utils/bitbots_utils/utils.py b/bitbots_misc/bitbots_utils/bitbots_utils/utils.py
index a0e346750..2a187488b 100644
--- a/bitbots_misc/bitbots_utils/bitbots_utils/utils.py
+++ b/bitbots_misc/bitbots_utils/bitbots_utils/utils.py
@@ -4,6 +4,7 @@
import rclpy
import yaml
from ament_index_python import get_package_share_directory
+from beartype import BeartypeConf, BeartypeStrategy, beartype
from rcl_interfaces.msg import Parameter as ParameterMsg
from rcl_interfaces.msg import ParameterType as ParameterTypeMsg
from rcl_interfaces.msg import ParameterValue as ParameterValueMsg
@@ -11,8 +12,11 @@
from rclpy.node import Node
from rclpy.parameter import PARAMETER_SEPARATOR_STRING, Parameter, parameter_value_to_python
+# Create a new @nobeartype decorator disabling type-checking.
+nobeartype = beartype(conf=BeartypeConf(strategy=BeartypeStrategy.O0))
-def read_urdf(robot_name):
+
+def read_urdf(robot_name: str) -> str:
urdf = os.popen(
f"xacro {get_package_share_directory(f'{robot_name}_description')}"
f"/urdf/robot.urdf use_fake_walk:=false sim_ns:=false"
@@ -20,7 +24,7 @@ def read_urdf(robot_name):
return urdf
-def load_moveit_parameter(robot_name):
+def load_moveit_parameter(robot_name: str) -> List[ParameterMsg]:
moveit_parameters = get_parameters_from_plain_yaml(
f"{get_package_share_directory(f'{robot_name}_moveit_config')}/config/kinematics.yaml",
"robot_description_kinematics.",
@@ -40,7 +44,7 @@ def load_moveit_parameter(robot_name):
return moveit_parameters
-def get_parameters_from_ros_yaml(node_name, parameter_file, use_wildcard):
+def get_parameters_from_ros_yaml(node_name: str, parameter_file: str, use_wildcard: bool) -> List[ParameterMsg]:
# Remove leading slash and namespaces
with open(parameter_file) as f:
param_file = yaml.safe_load(f)
@@ -66,7 +70,7 @@ def get_parameters_from_ros_yaml(node_name, parameter_file, use_wildcard):
return parse_parameter_dict(namespace="", parameter_dict=param_dict)
-def get_parameters_from_plain_yaml(parameter_file, namespace=""):
+def get_parameters_from_plain_yaml(parameter_file, namespace="") -> List[ParameterMsg]:
with open(parameter_file) as f:
param_dict = yaml.safe_load(f)
return parse_parameter_dict(namespace=namespace, parameter_dict=param_dict)
@@ -180,7 +184,7 @@ def set_parameters_of_other_node(
return [res.success for res in response.results]
-def parse_parameter_dict(*, namespace, parameter_dict):
+def parse_parameter_dict(*, namespace: str, parameter_dict: dict) -> list[ParameterMsg]:
parameters = []
for param_name, param_value in parameter_dict.items():
full_param_name = namespace + param_name
diff --git a/bitbots_misc/system_monitor/system_monitor/__init__.py b/bitbots_misc/system_monitor/system_monitor/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_misc/system_monitor/system_monitor/__init__.py
+++ b/bitbots_misc/system_monitor/system_monitor/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
index 3631eb9e4..075036004 100644
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
@@ -4,8 +4,9 @@
import math
import os
from copy import deepcopy
+from dataclasses import dataclass, field
from datetime import datetime
-from typing import Optional
+from typing import Any, Optional
import regex as re
from rclpy.action import ActionClient
@@ -17,18 +18,40 @@
from bitbots_msgs.srv import AddAnimation
+@dataclass
+class Keyframe:
+ """
+ Defines a keyframe of the recorded Animation
+ """
+
+ name: str
+ duration: float
+ pause: float
+ goals: dict[str, float]
+ torque: dict[str, int]
+
+ def as_dict(self) -> dict[str, Any]:
+ return {
+ "name": self.name,
+ "duration": self.duration,
+ "pause": self.pause,
+ "goals": self.goals,
+ "torque": self.torque,
+ }
+
+
+@dataclass
class AnimationData:
"""
Defines a current status of the recorded Animation
"""
- def __init__(self):
- self.key_frames: list[dict] = []
- self.author: str = "Unknown"
- self.description: str = "Edit me!"
- self.last_edited: datetime = datetime.now().isoformat(" ")
- self.name: str = "None yet"
- self.version: int = 0
+ author: str = "Unknown"
+ key_frames: list[Keyframe] = field(default_factory=list)
+ last_edited: str = datetime.now().isoformat(" ")
+ description: str = "Edit me!"
+ version: str = "0"
+ name: str = "None yet"
class Recorder:
@@ -48,7 +71,7 @@ def __init__(
self.hcm_record_mode_client = hcm_record_mode_client
self.save_step("Initial step", frozen=True)
- def get_keyframes(self) -> list[dict]:
+ def get_keyframes(self) -> list[Keyframe]:
"""
Gets the keyframes of the current animation
@@ -56,7 +79,7 @@ def get_keyframes(self) -> list[dict]:
"""
return self.current_state.key_frames
- def get_keyframe(self, name: str) -> Optional[dict]:
+ def get_keyframe(self, name: str) -> Optional[Keyframe]:
"""
Gets the keyframe with the given name
@@ -64,7 +87,7 @@ def get_keyframe(self, name: str) -> Optional[dict]:
:return: the keyframe with the given name
"""
# Get the first keyframe with the given name or None
- return next(filter(lambda key_frame: key_frame["name"] == name, self.get_keyframes()), None)
+ return next(filter(lambda key_frame: key_frame.name == name, self.get_keyframes()), None) # type: ignore[arg-type, union-attr]
def get_keyframe_index(self, name: str) -> Optional[int]:
"""
@@ -79,20 +102,20 @@ def get_keyframe_index(self, name: str) -> Optional[int]:
return None
return self.get_keyframes().index(key_frame)
- def get_meta_data(self) -> tuple[str, int, str, str]:
+ def get_meta_data(self) -> tuple[str, str, str, str]:
"""
Returns the meta data of the current animation
"""
data = self.current_state
return data.name, data.version, data.author, data.description
- def set_meta_data(self, name: str, version: int, author: str, description: str) -> None:
+ def set_meta_data(self, name: str, version: str, author: str, description: str) -> None:
self.current_state.name = name
self.current_state.version = version
self.current_state.author = author
self.current_state.description = description
- def save_step(self, description: str, state: Optional[dict] = None, frozen: bool = False) -> None:
+ def save_step(self, description: str, state: Optional[AnimationData] = None, frozen: bool = False) -> None:
"""Save the current state of the Animation
for later restoration by the undo-command
@@ -161,8 +184,15 @@ def record(
:param override: Wether or not to override the frame at the given position
:param frozen: if True, the step cannot be undone
"""
- frame = {"name": frame_name, "duration": duration, "pause": pause, "goals": motor_pos, "torque": motor_torque}
- new_frame = deepcopy(frame)
+ new_frame = deepcopy(
+ Keyframe(
+ name=frame_name,
+ duration=duration,
+ pause=pause,
+ goals=motor_pos,
+ torque=motor_torque,
+ )
+ )
if seq_pos is None:
self.current_state.key_frames.append(new_frame)
self.save_step(f"Appending new keyframe '{frame_name}'", frozen=frozen)
@@ -172,9 +202,8 @@ def record(
else:
self.current_state.key_frames[seq_pos] = new_frame
self.save_step(f"Overriding keyframe at position {seq_pos} with '{frame_name}'", frozen=frozen)
- return True
- def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
+ def save_animation(self, path: str, file_name: Optional[str] = None) -> str:
"""Dumps all keyframe data to an animation .json file
:param path: The folder where the file should be saved
@@ -194,8 +223,8 @@ def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
# Convert the angles from radians to degrees
keyframes = deepcopy(self.current_state.key_frames)
for kf in keyframes:
- for k, v in kf["goals"].items():
- kf["goals"][k] = math.degrees(v)
+ for k, v in kf.goals.items():
+ kf.goals[k] = math.degrees(v)
# Construct the animation dictionary with meta data and keyframes
animation_dict = {
@@ -213,7 +242,7 @@ def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
return "Saving to '%s'" % path + ". Done."
- def load_animation(self, path: str) -> None:
+ def load_animation(self, path: str) -> Optional[str]:
"""Record command, load a animation '.json' file
:param path: path of the animation to load
@@ -223,23 +252,34 @@ def load_animation(self, path: str) -> None:
data = json.load(fp)
# Check if the file is a valid animation and convert the angles from degrees to radians
try:
+ keyframes: list[Keyframe] = []
for keyframe in data["keyframes"]:
- for k, v in keyframe["goals"].items():
- keyframe["goals"][k] = math.radians(v)
+ keyframes.append(
+ Keyframe(
+ name=keyframe["name"],
+ duration=keyframe["duration"],
+ pause=keyframe["pause"],
+ goals={k: math.radians(v) for k, v in keyframe["goals"].items()},
+ torque=keyframe["torque"],
+ )
+ )
except ValueError as e:
self._node.get_logger().error("Animation {} is corrupt:\n {}".format(path, e.message.partition("\n")[0]))
return "Animation {} is corrupt:\n {}".format(path, e.message.partition("\n")[0])
# Set the current state to the loaded animation
- self.current_state.key_frames = data["keyframes"]
- self.current_state.name = data["name"]
- self.current_state.version = data["version"] if "version" in data else 0
- self.current_state.last_edited = data["last_edited"] if "last_edited" in data else datetime.now().isoformat(" ")
- self.current_state.author = data["author"] if "author" in data else "Unknown"
- self.current_state.description = data["description"] if "description" in data else ""
+ self.current_state = AnimationData(
+ key_frames=keyframes,
+ name=data["name"],
+ version=data["version"] if "version" in data else "0",
+ last_edited=data["last_edited"] if "last_edited" in data else datetime.now().isoformat(" "),
+ author=data["author"] if "author" in data else "Unknown",
+ description=data["description"] if "description" in data else "",
+ )
# Save the current state
self.save_step(f"Loading of animation named '{data['name']}'")
+ return None
def play(self, from_frame: int = 0, until_frame: int = -1) -> tuple[str, bool]:
"""Plays (a range of) the current animation using the animation server
@@ -275,13 +315,13 @@ def play(self, from_frame: int = 0, until_frame: int = -1) -> tuple[str, bool]:
"last_edited": datetime.isoformat(datetime.now(), " "),
"author": self.current_state.author,
"description": self.current_state.description,
- "keyframes": deepcopy(self.current_state.key_frames),
+ "keyframes": deepcopy(list(map(Keyframe.as_dict, self.current_state.key_frames))),
}
# Convert the angles from radians to degrees
for key_frame in animation_dict["keyframes"]:
- for joint, v in key_frame["goals"].items():
- key_frame["goals"][joint] = math.degrees(v)
+ for joint, v in key_frame["goals"].items(): # type: ignore[index]
+ key_frame["goals"][joint] = math.degrees(v) # type: ignore[index]
self._node.get_logger().debug(f"Playing {len(animation_dict['keyframes'])} frames...")
@@ -317,7 +357,7 @@ def change_frame_order(self, new_order: list[str]) -> None:
"""Changes the order of the frames given an array of frame names"""
new_ordered_frames = [self.get_keyframe(frame_name) for frame_name in new_order]
assert None not in new_ordered_frames, "One of the given frame names does not exist"
- self.current_state.key_frames = new_ordered_frames
+ self.current_state.key_frames = new_ordered_frames # type: ignore[assignment]
self.save_step("Reordered frames")
def duplicate(self, frame_name: str) -> None:
@@ -329,15 +369,16 @@ def duplicate(self, frame_name: str) -> None:
assert index is not None, "The given frame name does not exist"
# Create a deep copy of the frame
new_frame = deepcopy(self.get_keyframe(frame_name))
+ assert new_frame is not None, "The given frame name does not exist (even less possible here)"
# Add suffix to the frame name if it already exists and increment the number instead of making it longer
- while new_frame["name"] in [frame["name"] for frame in self.current_state.key_frames]:
+ while new_frame.name in [frame.name for frame in self.current_state.key_frames]:
# Check if the frame name ends with "_copy_" and a number, if so increment the number
- if re.match(r".*?_copy_(\d+)", new_frame["name"]):
- new_frame["name"] = re.sub(
- r"(_copy_)(\d+)", lambda m: f"{m.group(1)}{int(m.group(2)) + 1}", new_frame["name"]
+ if re.match(r".*?_copy_(\d+)", new_frame.name):
+ new_frame.name = re.sub(
+ r"(_copy_)(\d+)", lambda m: f"{m.group(1)}{int(m.group(2)) + 1}", new_frame.name
)
else:
- new_frame["name"] = f"{frame_name}_copy_1"
+ new_frame.name = f"{frame_name}_copy_1"
# Insert the new frame after the original frame
self.current_state.key_frames.insert(index + 1, new_frame)
self.save_step(f"Duplicated Frame '{frame_name}'")
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
index d89806360..e3e6e1d70 100755
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
@@ -72,7 +72,7 @@ def __init__(self, context):
self.create_initialized_recorder()
# Initialize the window
- self._widget = QMainWindow()
+ self._widget = QMainWindow() # type: ignore
# Load XML ui definition
ui_file = os.path.join(get_package_share_directory("bitbots_animation_rqt"), "resource", "RecordUI.ui")
@@ -314,7 +314,7 @@ def connect_gui_callbacks(self) -> None:
self._widget.frameList.key_pressed.connect(self.delete)
self._widget.frameList.itemSelectionChanged.connect(self.frame_select)
- def play_walkready(self) -> None:
+ def play_walkready(self, _) -> None:
"""
Plays the walkready animation on the robot
"""
@@ -401,7 +401,7 @@ def save(self, new_location: bool = False) -> None:
else:
self._widget.statusBar.showMessage("There is nothing to save!")
- def open(self) -> None:
+ def open(self, _) -> None:
"""
Deletes all current frames and instead loads an animation from a json file
"""
@@ -453,7 +453,7 @@ def play_until(self):
Plays the animation up to a certain frame
"""
# Get the index of the selected frame
- end_index = self._recorder.get_keyframe_index(self._selected_frame) + 1
+ end_index = self._recorder.get_keyframe_index(self._selected_frame) + 1 # type: ignore[operator]
# Play the animation
status, success = self._recorder.play(until_frame=end_index)
@@ -482,7 +482,7 @@ def goto_next(self):
# Go to the frame in the UI
self._widget.frameList.setCurrentRow(next_frame_index)
- self._selected_frame = self._recorder.get_keyframes()[next_frame_index]["name"]
+ self._selected_frame = self._recorder.get_keyframes()[next_frame_index].name
self.react_to_frame_change()
def goto_init(self):
@@ -567,8 +567,10 @@ def record(self):
# Display a message
self._widget.statusBar.showMessage(f"Recorded frame {self._selected_frame}")
- # Update the frames in the UI
+ # Update the frames in the UI, but don't trigger the frame selection signal
+ self._widget.frameList.blockSignals(True)
self.update_frames()
+ self._widget.frameList.blockSignals(False)
def undo(self):
"""
@@ -661,13 +663,19 @@ def frame_select(self):
if selected_frame is not None:
# check if unrecorded changes would be lost
unrecorded_changes = []
- current_keyframe_goals = self._recorder.get_keyframe(self._selected_frame)["goals"]
+ current_keyframe = self._recorder.get_keyframe(self._selected_frame)
+ assert current_keyframe is not None, "Current keyframe not found"
for motor_name, text_field in self._motor_controller_text_fields.items():
- # Get the angle from the textfield
- angle = text_field.value()
- # compare with angles in current keyframe
- if not current_keyframe_goals[motor_name] == math.radians(angle):
+ # Check if the motor active state has changed
+ currently_active = (
+ self._motor_switcher_active_checkbox[motor_name].checkState(0) == Qt.CheckState.Checked
+ )
+ active_in_current_keyframe = motor_name in current_keyframe.goals
+ motor_active_changed = currently_active != active_in_current_keyframe
+ # Check if the motor goal has changed
+ motor_goal_changed = current_keyframe.goals.get(motor_name, 0) != math.radians(text_field.value())
+ if motor_active_changed or motor_goal_changed:
unrecorded_changes.append(motor_name)
# warn user about unrecorded changes
@@ -678,6 +686,10 @@ def frame_select(self):
sure = QMessageBox.question(self._widget, "Sure?", message, QMessageBox.Yes | QMessageBox.No)
# Cancel the open if the user does not want to discard the changes
if sure == QMessageBox.No:
+ # Update the UI to reselect the current frame (blocking signals to avoid infinite loop)
+ self._widget.frameList.blockSignals(True)
+ self._widget.frameList.setCurrentRow(self._recorder.get_keyframe_index(self._selected_frame))
+ self._widget.frameList.blockSignals(False)
return
# Update state so we have a new selected frame
self._selected_frame = selected_frame_name
@@ -691,6 +703,7 @@ def react_to_frame_change(self):
"""
# Get the selected frame
selected_frame = self._recorder.get_keyframe(self._selected_frame)
+ assert selected_frame is not None, "Selected frame not found in recorder"
# Update the name
self._widget.lineFrameName.setText(self._selected_frame)
@@ -698,28 +711,28 @@ def react_to_frame_change(self):
# Update what motors are active, what are stiff and set the angles accordingly
for motor_name in self.motors:
# Update checkbox if the motor is active or not
- active = motor_name in selected_frame["goals"]
+ active = motor_name in selected_frame.goals.keys()
self._motor_switcher_active_checkbox[motor_name].setCheckState(0, Qt.Checked if active else Qt.Unchecked)
# Update checkbox if the motor is stiff or not
- stiff = "torques" not in selected_frame or (
- motor_name in selected_frame["torques"] and bool(selected_frame["torques"][motor_name])
+ stiff = len(selected_frame.torque) == 0 or (
+ motor_name in selected_frame.torque and bool(selected_frame.torque[motor_name])
)
self._motor_controller_torque_checkbox[motor_name].setCheckState(0, Qt.Checked if stiff else Qt.Unchecked)
# Update the motor angle controls (value and active state)
if active:
self._motor_controller_text_fields[motor_name].setValue(
- round(math.degrees(selected_frame["goals"][motor_name]), 2)
+ round(math.degrees(selected_frame.goals[motor_name]), 2)
)
- self._working_angles[motor_name] = selected_frame["goals"][motor_name]
+ self._working_angles[motor_name] = selected_frame.goals[motor_name]
else:
self._motor_controller_text_fields[motor_name].setValue(0.0)
# Update the duration and pause
- self._widget.spinBoxDuration.setValue(selected_frame["duration"])
- self._widget.spinBoxPause.setValue(selected_frame["pause"])
+ self._widget.spinBoxDuration.setValue(selected_frame.duration)
+ self._widget.spinBoxPause.setValue(selected_frame.pause)
self.react_to_motor_selection()
@@ -773,7 +786,7 @@ def react_to_motor_selection(self):
def update_frames(self) -> None:
"""
- Updates the list of frames in the GUI based on the keyframes in the recorder
+ Updates the list of frames in the GUI based on the keyframes in the recorder'
"""
# Get the current state of the recorder and the index of the selected frame
keyframes = self._recorder.get_keyframes()
@@ -783,7 +796,7 @@ def update_frames(self) -> None:
if index is None:
# If it is not, select the first frame
index = 0
- self._selected_frame = keyframes[index]["name"]
+ self._selected_frame = keyframes[index].name
print(f"Selected frame {self._selected_frame}")
# Clear the list of frames
@@ -792,7 +805,7 @@ def update_frames(self) -> None:
# Add all frames to the list
for frame in keyframes:
item = QListWidgetItem()
- item.setText(frame["name"])
+ item.setText(frame.name)
self._widget.frameList.addItem(item)
# Select the correct frame
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
index f9f46c05a..6217bed78 100644
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
@@ -1,4 +1,4 @@
-from typing import Callable
+from typing import Any, Callable
from PyQt5.QtCore import QObject, Qt, pyqtSignal
from PyQt5.QtWidgets import QListWidget, QWidget
@@ -28,12 +28,12 @@ def dropEvent(self, e): # noqa: N802
# fmt: off
def keyPressEvent(self, event): # noqa: N802
# fmt: on
- if event.key() == Qt.Key_Delete:
+ if event.key() == Qt.Key_Delete: # type: ignore[attr-defined]
super().keyPressEvent(event)
self.key_pressed.emit()
- elif event.key() == Qt.Key_Up and self.currentRow() - 1 >= 0:
+ elif event.key() == Qt.Key_Up and self.currentRow() - 1 >= 0: # type: ignore[attr-defined]
self.setCurrentRow(self.currentRow() - 1)
- elif event.key() == Qt.Key_Down and self.currentRow() + 1 < self.count():
+ elif event.key() == Qt.Key_Down and self.currentRow() + 1 < self.count(): # type: ignore[attr-defined]
self.setCurrentRow(self.currentRow() + 1)
@@ -43,7 +43,7 @@ class JointStateCommunicate(QObject):
def flatten_dict(input_dict: dict, parent_key: str = "", sep: str = ".") -> dict:
"""Flatten a nested dictionary."""
- items = []
+ items: list[Any] = []
for k, v in input_dict.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
diff --git a/bitbots_motion/bitbots_animation_rqt/package.xml b/bitbots_motion/bitbots_animation_rqt/package.xml
index 40019b786..2801efdac 100644
--- a/bitbots_motion/bitbots_animation_rqt/package.xml
+++ b/bitbots_motion/bitbots_animation_rqt/package.xml
@@ -21,6 +21,7 @@
rqt_gui_py
rqt_gui
std_srvs
+ ament_mypy
python3-pytest
diff --git a/bitbots_motion/bitbots_animation_rqt/setup.py b/bitbots_motion/bitbots_animation_rqt/setup.py
index 639dc194a..d672f0aed 100644
--- a/bitbots_motion/bitbots_animation_rqt/setup.py
+++ b/bitbots_motion/bitbots_animation_rqt/setup.py
@@ -14,6 +14,7 @@
install_requires=["setuptools"],
tests_require=["pytest"],
zip_safe=True,
+ tests_require=["pytest"],
entry_points={
"console_scripts": [
"animation_gui = " + package_name + ".record_ui:main",
diff --git a/bitbots_motion/bitbots_animation_rqt/test/mypy.ini b/bitbots_motion/bitbots_animation_rqt/test/mypy.ini
new file mode 100644
index 000000000..b7b297c6a
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/test/mypy.ini
@@ -0,0 +1,6 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
+disable_error_code = attr-defined
diff --git a/bitbots_motion/bitbots_animation_rqt/test/test_mypy.py b/bitbots_motion/bitbots_animation_rqt/test/test_mypy.py
new file mode 100644
index 000000000..3dacd6040
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/test/test_mypy.py
@@ -0,0 +1,10 @@
+from pathlib import Path
+
+import pytest
+from ament_mypy.main import main
+
+
+@pytest.mark.mypy
+def test_mypy():
+ rc = main(argv=["--config", str((Path(__file__).parent / "mypy.ini").resolve())])
+ assert rc == 0, "Found code style errors / warnings"
diff --git a/bitbots_motion/bitbots_animation_server/CMakeLists.txt b/bitbots_motion/bitbots_animation_server/CMakeLists.txt
deleted file mode 100644
index 988d53714..000000000
--- a/bitbots_motion/bitbots_animation_server/CMakeLists.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-cmake_minimum_required(VERSION 3.5)
-project(bitbots_animation_server)
-
-find_package(bitbots_docs REQUIRED)
-find_package(rclpy REQUIRED)
-find_package(ament_cmake REQUIRED)
-find_package(ament_cmake_python REQUIRED)
-
-enable_bitbots_docs()
-
-ament_python_install_package(${PROJECT_NAME})
-
-ament_package()
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
index fb525677c..55c456deb 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
@@ -44,9 +44,9 @@ def parse(info: dict) -> Animation:
This method is parsing an animation from a :class:`dict`
instance *info*, as created by :func:`as_dict`.
"""
- anim = Animation(info["name"], ())
+ anim = Animation(info["name"], [])
- keyframes = info.get("keyframes", ())
+ keyframes = info.get("keyframes", [])
anim.keyframes = [
Keyframe(
k.get("goals", {}),
@@ -59,24 +59,3 @@ def parse(info: dict) -> Animation:
]
return anim
-
-
-def as_dict(anim: Animation) -> dict:
- """
- Convert an animation to builtin python types to
- make it serializable to formats like ``json``.
- """
- return {
- "name": anim.name,
- "interpolators": {n: ip.__name__ for n, ip in anim.interpolators},
- "keyframes": [
- {
- "duration": k.duration,
- "pause": k.pause,
- "goals": k.goals,
- "torque": k.torque,
- "stabilization_functions": k.stabilization_functions,
- }
- for k in anim.keyframes
- ],
- }
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
index f7cefea55..d9c473ae7 100755
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
@@ -27,7 +27,7 @@
from bitbots_msgs.srv import AddAnimation
# A hashable definition for the UUID of the goals
-UUID = tuple[int]
+UUID = tuple[np.uint8, ...]
class AnimationNode(Node):
@@ -165,7 +165,7 @@ def finish(successful: bool) -> PlayAnimation.Result:
# Send request to make the HCM to go into animation play mode
num_tries = 0
- while rclpy.ok() and (not self.hcm_animation_mode.call(SetBool.Request(data=True)).success):
+ while rclpy.ok() and (not self.hcm_animation_mode.call(SetBool.Request(data=True)).success): # type: ignore[attr-defined]
if num_tries >= 10:
self.get_logger().error("Failed to request HCM to go into animation play mode")
return finish(successful=False)
@@ -263,7 +263,7 @@ def finish(successful: bool) -> PlayAnimation.Result:
if pose is None or (request.bounds and once and t > animator.get_keyframe_times()[request.end - 1]):
# Animation is finished, tell it to the hcm (except if it is from the hcm)
if not request.hcm:
- hcm_result = self.hcm_animation_mode.call(SetBool.Request(data=False))
+ hcm_result: SetBool.Response = self.hcm_animation_mode.call(SetBool.Request(data=False))
if not hcm_result.success:
self.get_logger().error(f"Failed to finish animation on HCM. Reason: {hcm_result.message}")
@@ -288,7 +288,7 @@ def finish(successful: bool) -> PlayAnimation.Result:
sys.exit(0)
return finish(successful=False)
- def update_current_pose(self, msg) -> None:
+ def update_current_pose(self, msg: JointState) -> None:
"""Gets the current motor positions and updates the representing pose accordingly."""
self.current_joint_states = msg
@@ -296,7 +296,7 @@ def imu_callback(self, msg: Imu) -> None:
"""Callback for the IMU data."""
self.imu_data = msg
- def send_animation(self, from_hcm: bool, pose: dict, torque: Optional[dict]):
+ def send_animation(self, from_hcm: bool, pose: dict[str, float], torque: Optional[dict[str, float]] = None) -> None:
"""Sends an animation to the hcm"""
self.hcm_publisher.publish(
AnimationMsg(
@@ -309,8 +309,8 @@ def send_animation(self, from_hcm: bool, pose: dict, torque: Optional[dict]):
positions=pose.values(),
velocities=[-1.0] * len(pose),
accelerations=[-1.0] * len(pose),
- max_currents=[np.clip((torque[joint]), 0.0, 1.0) for joint in pose]
- if torque and False # TODO
+ max_currents=[np.clip((torque[joint]), 0.0, 1.0) for joint in pose.keys()] # type: ignore[index]
+ if torque and False
else [-1.0] * len(pose), # fmt: skip
),
)
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py
index e7436e403..315f23466 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py
@@ -18,9 +18,9 @@ def __init__(self, robot_type: str):
# Get the path to the animations folder
self.basepath = abspath(os.path.join(get_package_share_directory(robot_type + "_animations"), "animations"))
- self.cache = {}
- self.files = []
- self.names = [] # Plain animation names, without filename-extension
+ self.cache: dict[str, str] = {}
+ self.files: list[str] = []
+ self.names: list[str] = [] # Plain animation names, without filename-extension
self.animpath = self._get_animpath()
def search(self, path, folders, filename=""):
@@ -121,7 +121,7 @@ def find_resource(self, name):
"""Finds a resource relative to self.basepath"""
return self.find(name)
- def _get_animpath(self):
+ def _get_animpath(self) -> list[str]:
"""
Get a list of folders in the animations/ folder.
"""
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
index b2bb84e75..6dffb7649 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
@@ -74,8 +74,8 @@ def get_positions_rad(self, time):
return ret_dict
- def get_torque(self, current_time):
- torque = {}
+ def get_torque(self, current_time) -> dict[str, float]:
+ torque: dict[str, float] = {}
if current_time < 0 or current_time > self.animation_duration:
return torque
diff --git a/bitbots_motion/bitbots_animation_server/package.xml b/bitbots_motion/bitbots_animation_server/package.xml
index 77427c8ef..3b00445bf 100644
--- a/bitbots_motion/bitbots_animation_server/package.xml
+++ b/bitbots_motion/bitbots_animation_server/package.xml
@@ -21,6 +21,7 @@
bitbots_utils
rclpy
std_msgs
+ ament_mypy
python3-pytest
diff --git a/bitbots_motion/bitbots_animation_server/setup.py b/bitbots_motion/bitbots_animation_server/setup.py
index c41689c5f..3b52c7026 100644
--- a/bitbots_motion/bitbots_animation_server/setup.py
+++ b/bitbots_motion/bitbots_animation_server/setup.py
@@ -22,6 +22,7 @@
zip_safe=True,
keywords=["ROS"],
license="MIT",
+ tests_require=["pytest"],
entry_points={
"console_scripts": [
"animation_node = bitbots_animation_server.animation_node:main",
diff --git a/bitbots_motion/bitbots_animation_server/test/mypy.ini b/bitbots_motion/bitbots_animation_server/test/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_server/test/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_motion/bitbots_animation_server/test/test_mypy.py b/bitbots_motion/bitbots_animation_server/test/test_mypy.py
new file mode 100644
index 000000000..3dacd6040
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_server/test/test_mypy.py
@@ -0,0 +1,10 @@
+from pathlib import Path
+
+import pytest
+from ament_mypy.main import main
+
+
+@pytest.mark.mypy
+def test_mypy():
+ rc = main(argv=["--config", str((Path(__file__).parent / "mypy.ini").resolve())])
+ assert rc == 0, "Found code style errors / warnings"
diff --git a/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp b/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp
index 6909337a9..ae214c2ec 100644
--- a/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp
+++ b/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp
@@ -115,6 +115,7 @@ class DynupNode {
double start_time_ = 0;
bool server_free_ = true;
bool debug_ = false;
+ bool cancel_goal_ = false;
// TF2 related things
tf2_ros::Buffer tf_buffer_;
diff --git a/bitbots_motion/bitbots_dynup/src/dynup_node.cpp b/bitbots_motion/bitbots_dynup/src/dynup_node.cpp
index 3316a348e..a37f009f3 100644
--- a/bitbots_motion/bitbots_dynup/src/dynup_node.cpp
+++ b/bitbots_motion/bitbots_dynup/src/dynup_node.cpp
@@ -215,6 +215,10 @@ rclcpp_action::GoalResponse DynupNode::goalCb(const rclcpp_action::GoalUUID &uui
server_free_ = false;
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
} else {
+ if (goal->from_hcm) {
+ cancel_goal_ = true;
+ return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
+ }
RCLCPP_WARN(node_->get_logger(), "Dynup is busy, goal rejected!");
return rclcpp_action::GoalResponse::REJECT;
}
@@ -259,11 +263,20 @@ void DynupNode::loopEngine(int loop_rate, std::shared_ptr goal_
/* Do the loop as long as nothing cancels it */
while (rclcpp::ok()) {
rclcpp::Time startTime = node_->get_clock()->now();
+ // This is used when the cancelation is requested externally
if (goal_handle->is_canceling()) {
goal_handle->canceled(result);
RCLCPP_INFO(node_->get_logger(), "Goal canceled");
return;
}
+ // This is used when the dynup server descides to cancel itself
+ if (cancel_goal_) {
+ result->successful = false;
+ goal_handle->abort(result);
+ server_free_ = true;
+ cancel_goal_ = false;
+ return;
+ }
msg = step(getTimeDelta());
auto feedback = std::make_shared();
feedback->percent_done = engine_.getPercentDone();
@@ -282,9 +295,12 @@ void DynupNode::loopEngine(int loop_rate, std::shared_ptr goal_
if (msg.joint_names.empty()) {
continue;
}
+ msg.from_hcm = goal_handle->get_goal()->from_hcm;
joint_goal_publisher_->publish(msg);
node_->get_clock()->sleep_until(startTime + rclcpp::Duration::from_nanoseconds(1e9 / loop_rate));
}
+ // Wether we canceled the goal or we finished cleanly the cancelation request is not valid anymore
+ cancel_goal_ = false;
}
bitbots_dynup::msg::DynupPoses DynupNode::getCurrentPoses() {
diff --git a/bitbots_motion/bitbots_hcm/CMakeLists.txt b/bitbots_motion/bitbots_hcm/CMakeLists.txt
index 2c24196eb..b0581fb82 100644
--- a/bitbots_motion/bitbots_hcm/CMakeLists.txt
+++ b/bitbots_motion/bitbots_hcm/CMakeLists.txt
@@ -57,6 +57,9 @@ if(BUILD_TESTING)
ament_add_pytest_test(
hcm_py_test test/pytest PYTHON_EXECUTABLE "${PYTHON_EXECUTABLE}" APPEND_ENV
PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR})
+
+ find_package(ament_cmake_mypy REQUIRED)
+ ament_mypy(CONFIG_FILE "${CMAKE_CURRENT_LIST_DIR}/mypy.ini")
endif()
ament_python_install_package(${PROJECT_NAME})
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py
index 25bdf4bbe..61a6ccd0c 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py
@@ -1,7 +1,13 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
from dynamic_stack_decider.abstract_action_element import AbstractActionElement
from bitbots_hcm.hcm_dsd.hcm_blackboard import HcmBlackboard
+beartype_this_package()
+
class AbstractHCMActionElement(AbstractActionElement):
"""
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
index 8f805b1da..882ae4624 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
@@ -88,6 +88,9 @@ def animation_feedback_cb(self, msg):
self.publish_debug_data("Animation Percent Done", str(feedback.percent_done))
def animation_finished(self):
+ assert (
+ self.blackboard.animation_action_current_goal is not None
+ ), "No animation action goal set, so we cannot check if it is finished"
return (
self.blackboard.animation_action_current_goal.done()
and self.blackboard.animation_action_current_goal.result().status
@@ -202,7 +205,7 @@ def on_pop(self):
Cancel the current goal when the action is popped
"""
super().on_pop()
- if not self.animation_finished():
+ if self.blackboard.dynup_action_current_goal is not None and not self.animation_finished():
self.blackboard.dynup_action_current_goal.result().cancel_goal_async()
def start_animation(self):
@@ -231,6 +234,10 @@ def start_animation(self):
goal = Dynup.Goal()
goal.direction = self.direction
+ # This indicates that the goal is send from the hcm and therfore has prio,
+ # canceling other tasks. The published joint goals are also marked with this flag so they
+ # are handled differently in the HCM joint mutex
+ goal.from_hcm = True
self.blackboard.dynup_action_current_goal = self.blackboard.dynup_action_client.send_goal_async(
goal, feedback_callback=self.animation_feedback_cb
)
@@ -241,6 +248,9 @@ def animation_feedback_cb(self, msg):
self.publish_debug_data("Dynup Percent Done", str(feedback.percent_done))
def animation_finished(self):
+ assert (
+ self.blackboard.dynup_action_current_goal is not None
+ ), "No dynup action goal set, so we cannot check if it is finished"
return (
self.blackboard.dynup_action_current_goal.done()
and self.blackboard.dynup_action_current_goal.result().status
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py
index b58b4231e..4c2804302 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from bitbots_hcm.hcm_dsd.actions import AbstractHCMActionElement
+from bitbots_hcm.type_utils import T_RobotControlState
from bitbots_msgs.msg import RobotControlState
@@ -11,7 +12,7 @@ class AbstractRobotState(AbstractHCMActionElement, ABC):
"""
@abstractmethod
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
"""
Returns the state which should be set. This will be implemented by the subclasses.
"""
@@ -27,65 +28,65 @@ def perform(self, reevaluate=False):
class RobotStateStartup(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.STARTUP
class RobotStateControllable(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.CONTROLLABLE
class RobotStateWalking(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.WALKING
class RobotStateAnimationRunning(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.ANIMATION_RUNNING
class RobotStatePickedUp(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.PICKED_UP
class RobotStateFalling(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.FALLING
class RobotStateFallen(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.FALLEN
class RobotStateGettingUp(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.GETTING_UP
class RobotStatePenalty(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.PENALTY
class RobotStateRecord(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.RECORD
class RobotStateKicking(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.KICKING
class RobotStateHardwareProblem(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.HARDWARE_PROBLEM
class RobotStateMotorOff(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.MOTOR_OFF
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py
index 82101034e..e79e007a9 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py
@@ -1,7 +1,13 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
from dynamic_stack_decider.abstract_decision_element import AbstractDecisionElement
from bitbots_hcm.hcm_dsd.hcm_blackboard import HcmBlackboard
+beartype_this_package()
+
class AbstractHCMDecisionElement(AbstractDecisionElement):
"""
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
index c770b8326..c7072a5c0 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
@@ -29,6 +29,9 @@ def perform(self, reevaluate=False):
# We can safely assume that the last animation start time is set because the animation is running
# Calculate time since last animation start
+ assert (
+ self.blackboard.last_animation_start_time is not None
+ ), "Last animation start time is not set, this should be impossible"
time_delta_since_last_animation_start = (
self.blackboard.node.get_clock().now().nanoseconds / 1e9
- self.blackboard.last_animation_start_time.nanoseconds / 1e9
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py
index e36034297..2b584e439 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py
@@ -126,10 +126,14 @@ def perform(self, reevaluate=False):
else:
return "OKAY"
- if self.blackboard.previous_imu_msg is None or (
- self.blackboard.node.get_clock().now().nanoseconds
- - self.blackboard.last_different_imu_state_time.nanoseconds
- > self.blackboard.imu_timeout_duration * 1e9
+ if (
+ self.blackboard.previous_imu_msg is None
+ or self.blackboard.last_different_imu_state_time is None
+ or (
+ self.blackboard.node.get_clock().now().nanoseconds
+ - self.blackboard.last_different_imu_state_time.nanoseconds
+ > self.blackboard.imu_timeout_duration * 1e9
+ )
):
if (
self.blackboard.current_state == RobotControlState.STARTUP
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py
index 44a421875..e8f7005b1 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py
@@ -4,6 +4,7 @@
import numpy as np
from bitbots_utils.transforms import quat2fused
from rclpy.duration import Duration
+from rclpy.time import Time
from bitbots_hcm.hcm_dsd.decisions import AbstractHCMDecisionElement
@@ -32,7 +33,7 @@ def __init__(self, blackboard, dsd, parameters):
self.smoothing = self.blackboard.node.get_parameter("smooth_threshold").value
# Initialize smoothing list that stores the last results
- self.smoothing_list: list[FallDirection] = []
+ self.smoothing_list: list[tuple[Time, FallDirection]] = []
def perform(self, reevaluate=False):
"""Checks if the robot is currently falling and in which direction."""
@@ -77,7 +78,7 @@ def perform(self, reevaluate=False):
# Prune old elements from smoothing history
self.smoothing_list = list(
filter(
- lambda x: x[0] > self.blackboard.node.get_clock().now() - Duration(seconds=self.smoothing),
+ lambda x: x[0] > self.blackboard.node.get_clock().now() - Duration(seconds=self.smoothing), # type: ignore[arg-type]
self.smoothing_list,
)
)
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
index 3bbdaea14..b0ae2b7e9 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
@@ -12,6 +12,7 @@
from std_srvs.srv import Empty as EmptySrv
from std_srvs.srv import SetBool
+from bitbots_hcm.type_utils import T_RobotControlState
from bitbots_msgs.action import Dynup, PlayAnimation
from bitbots_msgs.msg import Audio, JointTorque, RobotControlState
from bitbots_msgs.srv import SetTeachingMode
@@ -22,7 +23,7 @@ def __init__(self, node: Node):
self.node = node
# Basic state
- self.current_state: RobotControlState = RobotControlState.STARTUP
+ self.current_state: T_RobotControlState = RobotControlState.STARTUP
self.stopped: bool = False
# Save start time
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
index b35ea8e71..a0bc0ec88 100755
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
@@ -24,6 +24,7 @@
from bitbots_hcm import hcm_dsd
from bitbots_hcm.hcm_dsd.hcm_blackboard import HcmBlackboard
+from bitbots_hcm.type_utils import T_RobotControlState
from bitbots_msgs.msg import FootPressure, RobotControlState
from bitbots_msgs.srv import ManualPenalize, SetTeachingMode
@@ -34,18 +35,15 @@ def __init__(self, use_sim_time, simulation_active, visualization_active):
node_name = "hcm_py"
# Load parameters from yaml file because this is a hacky cpp python hybrid node for performance reasons
- parameter_msgs: list(ParameterMsg) = get_parameters_from_ros_yaml(
+ parameter_msgs: list[ParameterMsg] = get_parameters_from_ros_yaml(
node_name, f"{get_package_share_directory('bitbots_hcm')}/config/hcm_wolfgang.yaml", use_wildcard=True
)
- parameters = []
- for parameter_msg in parameter_msgs:
- parameters.append(Parameter.from_parameter_msg(parameter_msg))
- if use_sim_time:
- parameters.append(Parameter("use_sim_time", type_=Parameter.Type.BOOL, value=True))
- if simulation_active:
- parameters.append(Parameter("simulation_active", type_=Parameter.Type.BOOL, value=True))
- if visualization_active:
- parameters.append(Parameter("visualization_active", type_=Parameter.Type.BOOL, value=True))
+ parameters = [
+ Parameter("use_sim_time", type_=Parameter.Type.BOOL, value=use_sim_time),
+ Parameter("simulation_active", type_=Parameter.Type.BOOL, value=simulation_active),
+ Parameter("visualization_active", type_=Parameter.Type.BOOL, value=visualization_active),
+ ]
+ parameters.extend(map(Parameter.from_parameter_msg, parameter_msgs))
# Create Python node
self.node = Node(
@@ -161,7 +159,7 @@ def diag_cb(self, msg: DiagnosticArray):
elif "/Pressure" in status.name:
self.blackboard.pressure_diag_error = status.level in (DiagnosticStatus.ERROR, DiagnosticStatus.STALE)
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
"""Returns the current state of the HCM."""
return self.blackboard.current_state
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/type_utils.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/type_utils.py
new file mode 100644
index 000000000..20bd175fc
--- /dev/null
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/type_utils.py
@@ -0,0 +1,21 @@
+from typing import Literal, TypeAlias
+
+from bitbots_msgs.msg import RobotControlState
+
+T_RobotControlState: TypeAlias = Literal[ # type: ignore[valid-type]
+ RobotControlState.CONTROLLABLE,
+ RobotControlState.FALLING,
+ RobotControlState.FALLEN,
+ RobotControlState.GETTING_UP,
+ RobotControlState.ANIMATION_RUNNING,
+ RobotControlState.STARTUP,
+ RobotControlState.SHUTDOWN,
+ RobotControlState.PENALTY,
+ RobotControlState.RECORD,
+ RobotControlState.WALKING,
+ RobotControlState.MOTOR_OFF,
+ RobotControlState.HCM_OFF,
+ RobotControlState.HARDWARE_PROBLEM,
+ RobotControlState.PICKED_UP,
+ RobotControlState.KICKING,
+]
diff --git a/bitbots_motion/bitbots_hcm/mypy.ini b/bitbots_motion/bitbots_hcm/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_motion/bitbots_hcm/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_motion/bitbots_hcm/package.xml b/bitbots_motion/bitbots_hcm/package.xml
index 9f6fd464a..9e90af219 100644
--- a/bitbots_motion/bitbots_hcm/package.xml
+++ b/bitbots_motion/bitbots_hcm/package.xml
@@ -9,7 +9,7 @@
Marc Bestmann
Hamburg Bit-Bots
-
+
MIT
Marc Bestmann
@@ -33,6 +33,7 @@
ros2_python_extension
sensor_msgs
std_msgs
+ ament_cmake_mypy
diff --git a/bitbots_motion/bitbots_hcm/src/hcm.cpp b/bitbots_motion/bitbots_hcm/src/hcm.cpp
index d82358c0d..c78574c32 100644
--- a/bitbots_motion/bitbots_hcm/src/hcm.cpp
+++ b/bitbots_motion/bitbots_hcm/src/hcm.cpp
@@ -86,12 +86,15 @@ class HCM_CPP : public rclcpp::Node {
}
void dynup_callback(const bitbots_msgs::msg::JointCommand msg) {
- if (current_state_ == bitbots_msgs::msg::RobotControlState::STARTUP ||
- current_state_ == bitbots_msgs::msg::RobotControlState::GETTING_UP ||
- current_state_ == bitbots_msgs::msg::RobotControlState::MOTOR_OFF ||
- current_state_ == bitbots_msgs::msg::RobotControlState::PICKED_UP ||
- current_state_ == bitbots_msgs::msg::RobotControlState::PENALTY ||
- current_state_ == bitbots_msgs::msg::RobotControlState::RECORD ||
+ if (msg.from_hcm and (current_state_ == bitbots_msgs::msg::RobotControlState::STARTUP ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::GETTING_UP ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::MOTOR_OFF ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::PICKED_UP ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::PENALTY ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::CONTROLLABLE)) {
+ pub_controller_command_->publish(msg);
+ }
+ if (current_state_ == bitbots_msgs::msg::RobotControlState::RECORD ||
current_state_ == bitbots_msgs::msg::RobotControlState::CONTROLLABLE) {
pub_controller_command_->publish(msg);
}
diff --git a/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py b/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py
+++ b/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_msgs/action/Dynup.action b/bitbots_msgs/action/Dynup.action
index 6e5620f2f..939261314 100644
--- a/bitbots_msgs/action/Dynup.action
+++ b/bitbots_msgs/action/Dynup.action
@@ -9,6 +9,9 @@ string DIRECTION_RISE = "rise"
string DIRECTION_DESCEND = "descend"
string DIRECTION_WALKREADY = "walkready"
+# Requests from the HCM have higher prio and are published to a different topic
+bool from_hcm
+
---
# Result definition
bool successful
diff --git a/bitbots_msgs/msg/JointCommand.msg b/bitbots_msgs/msg/JointCommand.msg
index 115e0db78..c86b230dc 100644
--- a/bitbots_msgs/msg/JointCommand.msg
+++ b/bitbots_msgs/msg/JointCommand.msg
@@ -1,4 +1,8 @@
std_msgs/Header header
+
+# Mark this message so it has prio
+bool from_hcm
+
string[] joint_names
float64[] positions
diff --git a/bitbots_navigation/bitbots_localization/config/config.yaml b/bitbots_navigation/bitbots_localization/config/config.yaml
index 22d0e5b52..13e135e94 100644
--- a/bitbots_navigation/bitbots_localization/config/config.yaml
+++ b/bitbots_navigation/bitbots_localization/config/config.yaml
@@ -3,15 +3,11 @@ bitbots_localization:
misc:
init_mode: 0
percentage_best_particles: 100
- min_motion_linear: 0.0
- min_motion_angular: 0.0
max_motion_linear: 0.5
max_motion_angular: 3.14
- filter_only_with_motion: false
ros:
line_pointcloud_topic: 'line_mask_relative_pc'
goal_topic: 'goals_simulated'
- fieldboundary_topic: 'field_boundary_relative'
particle_publishing_topic: 'pose_particles'
debug_visualization: true
particle_filter:
@@ -46,10 +42,6 @@ bitbots_localization:
goal:
factor: 0.0
out_of_field_score: 0.0
- field_boundary:
- factor: 0.0
- out_of_field_score: 0.0
confidences:
line_element: 0.01
goal_element: 0.0
- field_boundary_element: 0.0
diff --git a/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp b/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp
index 50631ad56..30ecaf993 100644
--- a/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp
+++ b/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp
@@ -5,6 +5,8 @@
#ifndef BITBOTS_LOCALIZATION_OBSERVATIONMODEL_H
#define BITBOTS_LOCALIZATION_OBSERVATIONMODEL_H
+#include
+
#include
#include
#include
@@ -12,11 +14,8 @@
#include
#include
#include
-#include
#include
#include
-#include
-#include
namespace sm = sensor_msgs;
namespace bl = bitbots_localization;
@@ -29,8 +28,7 @@ class RobotPoseObservationModel : public particle_filter::ObservationModel map_lines, std::shared_ptr