diff --git a/.vscode/settings.json b/.vscode/settings.json
index 707c2f422..7168aec42 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -214,16 +214,19 @@
"regex": "cpp",
"future": "cpp",
"*.ipp": "cpp",
- "span": "cpp"
+ "span": "cpp",
+ "ranges": "cpp"
},
// Tell the ROS extension where to find the setup.bash
// This also utilizes the COLCON_WS environment variable, which needs to be set
"ros.distro": "iron",
"search.useIgnoreFiles": false,
"python.autoComplete.extraPaths": [
+ "/opt/openrobots/lib/python3.10/site-packages",
"/opt/ros/iron/lib/python3.10/site-packages"
],
"python.analysis.extraPaths": [
+ "/opt/openrobots/lib/python3.10/site-packages",
"/opt/ros/iron/lib/python3.10/site-packages"
],
"cmake.configureOnOpen": false,
diff --git a/bitbots_behavior/bitbots_blackboard/CMakeLists.txt b/bitbots_behavior/bitbots_blackboard/CMakeLists.txt
index be7e5dad5..c363d76a8 100644
--- a/bitbots_behavior/bitbots_blackboard/CMakeLists.txt
+++ b/bitbots_behavior/bitbots_blackboard/CMakeLists.txt
@@ -6,17 +6,17 @@ if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()
-find_package(bio_ik_msgs REQUIRED)
find_package(ament_cmake REQUIRED)
-find_package(sensor_msgs REQUIRED)
-find_package(rclpy REQUIRED)
-find_package(tf2 REQUIRED)
+find_package(bio_ik_msgs REQUIRED)
+find_package(bitbots_docs REQUIRED)
find_package(bitbots_msgs REQUIRED)
+find_package(geometry_msgs REQUIRED)
+find_package(rclpy REQUIRED)
+find_package(sensor_msgs REQUIRED)
find_package(std_msgs REQUIRED)
-find_package(tf2_geometry_msgs REQUIRED)
find_package(std_srvs REQUIRED)
-find_package(geometry_msgs REQUIRED)
-find_package(bitbots_docs REQUIRED)
+find_package(tf2 REQUIRED)
+find_package(tf2_geometry_msgs REQUIRED)
set(INCLUDE_DIRS
${bio_ik_msgs_INCLUDE_DIRS}
@@ -76,6 +76,11 @@ ament_export_dependencies(geometry_msgs)
ament_export_dependencies(bitbots_docs)
ament_export_include_directories(${INCLUDE_DIRS})
+if(BUILD_TESTING)
+ find_package(ament_cmake_mypy REQUIRED)
+ ament_mypy(CONFIG_FILE "${CMAKE_CURRENT_LIST_DIR}/mypy.ini")
+endif()
+
ament_python_install_package(${PROJECT_NAME})
ament_package()
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py
index 813f86efe..436f060f9 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/body_blackboard.py
@@ -13,7 +13,7 @@
class BodyBlackboard:
- def __init__(self, node: Node, tf_buffer: tf2.Buffer):
+ def __init__(self, node: Node, tf_buffer: tf2.BufferInterface):
# References
self.node = node
self.tf_buffer = tf_buffer
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py
index 20c418e7c..62d4cd368 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/__init__.py
@@ -1,5 +1,6 @@
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING
+from bitbots_utils.utils import nobeartype
from rclpy.node import Node
if TYPE_CHECKING:
@@ -9,6 +10,7 @@
class AbstractBlackboardCapsule:
"""Abstract class for blackboard capsules."""
- def __init__(self, node: Node, blackboard: Optional["BodyBlackboard"] = None):
+ @nobeartype
+ def __init__(self, node: Node, blackboard: "BodyBlackboard"):
self._node = node
self._blackboard = blackboard
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py
index bf51664f6..466381dc0 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/costmap_capsule.py
@@ -53,7 +53,7 @@ def __init__(self, node, blackboard):
self.calc_base_costmap()
self.calc_gradients()
- def robot_callback(self, msg: RobotArray):
+ def robot_callback(self, msg: RobotArray) -> None:
"""
Callback with new robot detections
"""
@@ -77,10 +77,11 @@ def robot_callback(self, msg: RobotArray):
# Publish debug costmap
self.publish_costmap()
- def publish_costmap(self):
+ def publish_costmap(self) -> None:
"""
Publishes the costmap for rviz
"""
+ assert self.costmap is not None, "Costmap is not initialized"
# Normalize costmap to match the rviz color scheme in a good way
normalized_costmap = (
(255 - ((self.costmap - np.min(self.costmap)) / (np.max(self.costmap) - np.min(self.costmap))) * 255 / 2.1)
@@ -131,7 +132,7 @@ def get_pass_regions(self) -> np.ndarray:
# Smooth obstacle map
return gaussian_filter(costmap, pass_smooth)
- def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
+ def field_2_costmap_coord(self, x: float, y: float) -> Tuple[int, int]:
"""
Converts a field position to the corresponding indices for the costmap.
@@ -153,10 +154,11 @@ def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
)
return idx_x, idx_y
- def calc_gradients(self):
+ def calc_gradients(self) -> None:
"""
Recalculates the gradient map based on the current costmap.
"""
+ assert self.base_costmap is not None, "Base costmap is not initialized"
gradient = np.gradient(self.base_costmap)
norms = np.linalg.norm(gradient, axis=0)
@@ -186,7 +188,7 @@ def cost_at_relative_xy(self, x: float, y: float) -> float:
return self.get_cost_at_field_position(point.point.x, point.point.y)
- def calc_base_costmap(self):
+ def calc_base_costmap(self) -> None:
"""
Builds the base costmap based on the behavior parameters.
This costmap includes a gradient towards the enemy goal and high costs outside the playable area
@@ -203,8 +205,8 @@ def calc_base_costmap(self):
# Create Grid
grid_x, grid_y = np.mgrid[
- 0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j,
- 0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j,
+ 0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j, # type: ignore[misc]
+ 0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j, # type: ignore[misc]
]
fix_points: List[Tuple[Tuple[float, float], float]] = []
@@ -278,7 +280,8 @@ def calc_base_costmap(self):
)
# Smooth the costmap to get more continuous gradients
- self.base_costmap = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
+ base_costmap: np.ndarray = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
+ self.base_costmap = base_costmap
self.costmap = self.base_costmap.copy()
def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, float]:
@@ -287,6 +290,7 @@ def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, flo
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
+ assert self.gradient_map is not None, "Gradient map is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return -self.gradient_map[0][idx_x, idx_y], -self.gradient_map[1][idx_x, idx_y]
@@ -296,10 +300,11 @@ def get_cost_at_field_position(self, x: float, y: float) -> float:
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
+ assert self.costmap is not None, "Costmap is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return self.costmap[idx_x, idx_y]
- def get_gradient_direction_at_field_position(self, x: float, y: float):
+ def get_gradient_direction_at_field_position(self, x: float, y: float) -> float:
"""
Returns the gradient direction at the given position
:param x: Field coordinate in the x direction
@@ -318,7 +323,9 @@ def get_gradient_direction_at_field_position(self, x: float, y: float):
grad = self.get_gradient_at_field_position(x, y)
return math.atan2(grad[1], grad[0])
- def get_cost_of_kick_relative(self, x: float, y: float, direction: float, kick_length: float, angular_range: float):
+ def get_cost_of_kick_relative(
+ self, x: float, y: float, direction: float, kick_length: float, angular_range: float
+ ) -> float:
"""
Returns the cost of a kick at the given position and direction in base footprint frame
:param x: Field coordinate in the x direction
@@ -356,6 +363,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
:param kick_length: The length of the kick
:param angular_range: The angular range of the kick
"""
+ assert self.costmap is not None, "Costmap is not initialized"
# create a mask in the size of the costmap consisting of 8-bit values initialized as 0
mask = Image.new("L", (self.costmap.shape[1], self.costmap.shape[0]))
@@ -386,7 +394,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
# This should contribute way less than the max and should have an impact if the max values are similar in all directions.
return masked_costmap.max() * 0.75 + masked_costmap.min() * 0.25
- def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float):
+ def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float) -> float:
"""
Returns the cost of the kick at the current position
:param direction: The direction of the kick
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
index 58cbcab5a..0f52543a6 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
@@ -1,3 +1,5 @@
+from typing import Optional
+
from bitbots_utils.utils import get_parameters_from_other_node
from game_controller_hl_interfaces.msg import GameState
@@ -9,73 +11,63 @@ class GameStatusCapsule(AbstractBlackboardCapsule):
def __init__(self, node, blackboard=None):
super().__init__(node, blackboard)
- self.team_id = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
+ self.team_id: int = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.gamestate = GameState()
- self.last_update = 0
- self.unpenalized_time = 0
- self.last_goal_from_us_time = -86400
- self.last_goal_time = -86400
- self.free_kick_kickoff_team = None
-
- def is_game_state_equals(self, value):
- assert value in [
- GameState.GAMESTATE_PLAYING,
- GameState.GAMESTATE_FINISHED,
- GameState.GAMESTATE_INITIAL,
- GameState.GAMESTATE_READY,
- GameState.GAMESTATE_SET,
- ]
- return value == self.get_gamestate()
-
- def get_gamestate(self):
+ self.last_update: float = 0.0
+ self.unpenalized_time: float = 0.0
+ self.last_goal_from_us_time = -86400.0
+ self.last_goal_time = -86400.0
+ self.free_kick_kickoff_team: Optional[bool] = None
+
+ def get_gamestate(self) -> int:
return self.gamestate.game_state
- def get_secondary_state(self):
+ def get_secondary_state(self) -> int:
return self.gamestate.secondary_state
- def get_secondary_state_mode(self):
+ def get_secondary_state_mode(self) -> int:
return self.gamestate.secondary_state_mode
- def get_secondary_team(self):
+ def get_secondary_team(self) -> int:
return self.gamestate.secondary_state_team
- def has_kickoff(self):
+ def has_kickoff(self) -> bool:
return self.gamestate.has_kick_off
- def has_penalty_kick(self):
+ def has_penalty_kick(self) -> bool:
return (
self.gamestate.secondary_state == GameState.STATE_PENALTYKICK
or self.gamestate.secondary_state == GameState.STATE_PENALTYSHOOT
) and self.gamestate._secondary_state_team == self.team_id
- def get_own_goals(self):
+ def get_our_goals(self) -> int:
return self.gamestate.own_score
- def get_opp_goals(self):
+ def get_opp_goals(self) -> int:
return self.gamestate.rival_score
- def get_seconds_since_own_goal(self):
+ def get_seconds_since_own_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_from_us_time
- def get_seconds_since_any_goal(self):
+ def get_seconds_since_any_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_time
- def get_seconds_remaining(self):
+ def get_seconds_remaining(self) -> float:
# Time from the message minus time passed since receiving it
return max(
- self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0
+ self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0.0
)
- def get_secondary_seconds_remaining(self):
+ def get_secondary_seconds_remaining(self) -> float:
"""Seconds remaining for things like kickoff"""
# Time from the message minus time passed since receiving it
return max(
self.gamestate.secondary_seconds_remaining
- (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update),
- 0,
+ 0.0,
)
- def get_seconds_since_last_drop_ball(self):
+ def get_seconds_since_last_drop_ball(self) -> Optional[float]:
"""Returns the seconds since the last drop in"""
if self.gamestate.drop_in_time == -1:
return None
@@ -83,22 +75,22 @@ def get_seconds_since_last_drop_ball(self):
# Time from the message plus seconds passed since receiving it
return self.gamestate.drop_in_time + (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update)
- def get_seconds_since_unpenalized(self):
+ def get_seconds_since_unpenalized(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.unpenalized_time
- def get_is_penalized(self):
+ def get_is_penalized(self) -> bool:
return self.gamestate.penalized
- def received_gamestate(self):
- return self.last_update != 0
+ def received_gamestate(self) -> bool:
+ return self.last_update != 0.0
- def get_team_id(self):
+ def get_team_id(self) -> int:
return self.team_id
- def get_red_cards(self):
+ def get_red_cards(self) -> int:
return self.gamestate.team_mates_with_red_card
- def gamestate_callback(self, gamestate_msg: GameState):
+ def gamestate_callback(self, gamestate_msg: GameState) -> None:
if self.gamestate.penalized and not gamestate_msg.penalized:
self.unpenalized_time = self._node.get_clock().now().nanoseconds / 1e9
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py
index 6d9eaf628..6ed79c92b 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/kick_capsule.py
@@ -23,7 +23,7 @@ class KickCapsule(AbstractBlackboardCapsule):
is_currently_kicking: bool = False
__connected: bool = False
- __action_client: ActionClient = None
+ __action_client: Optional[ActionClient] = None
class WalkKickTargets(Flag):
"""
@@ -43,14 +43,14 @@ def __init__(self, node, blackboard):
self.walk_kick_pub = self._node.create_publisher(Bool, "/kick", 1)
# self.connect_dynamic_kick() Do not connect if dynamic_kick is disabled
- def walk_kick(self, target: WalkKickTargets):
+ def walk_kick(self, target: WalkKickTargets) -> None:
"""
Kick the ball while walking
:param target: Target for the walk kick (e.g. left or right foot)
"""
self.walk_kick_pub.publish(Bool(data=target.value))
- def connect_dynamic_kick(self):
+ def connect_dynamic_kick(self) -> None:
topic = self._blackboard.config["dynamic_kick"]["topic"]
self.__action_client = ActionClient(self._node, Kick, topic, callback_group=ReentrantCallbackGroup())
self.__connected = self.__action_client.wait_for_server(self._blackboard.config["dynamic_kick"]["wait_time"])
@@ -58,7 +58,7 @@ def connect_dynamic_kick(self):
if not self.__connected:
self._node.get_logger().error(f"No dynamic_kick server running on {topic}")
- def dynamic_kick(self, goal: Kick.Goal):
+ def dynamic_kick(self, goal: Kick.Goal) -> None:
"""
:param goal: Goal to kick to
:type goal: KickGoal
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py
index 490eee491..90107d664 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/misc_capsule.py
@@ -1,4 +1,4 @@
-from typing import Optional
+from typing import Literal, Optional, TypeAlias
from bitbots_utils.utils import get_parameters_from_other_node
from rclpy.duration import Duration
@@ -7,6 +7,15 @@
from bitbots_blackboard.capsules import AbstractBlackboardCapsule
from bitbots_msgs.msg import Audio, HeadMode, RobotControlState
+THeadMode: TypeAlias = Literal[ # type: ignore[valid-type]
+ HeadMode.BALL_MODE,
+ HeadMode.FIELD_FEATURES,
+ HeadMode.LOOK_FORWARD,
+ HeadMode.DONT_MOVE,
+ HeadMode.BALL_MODE_PENALTY,
+ HeadMode.LOOK_FRONT,
+]
+
class MiscCapsule(AbstractBlackboardCapsule):
"""Capsule for miscellaneous functions."""
@@ -33,7 +42,10 @@ def __init__(self, node, blackboard):
# ## Tracking Part ##
#####################
- def set_head_duty(self, head_duty: int):
+ def set_head_duty(
+ self,
+ head_duty: THeadMode,
+ ) -> None:
head_duty_msg = HeadMode()
head_duty_msg.head_mode = head_duty
self.head_pub.publish(head_duty_msg)
@@ -42,7 +54,7 @@ def set_head_duty(self, head_duty: int):
# ## Robot state ##
###################
- def robot_state_callback(self, msg: RobotControlState):
+ def robot_state_callback(self, msg: RobotControlState) -> None:
self.robot_control_state = msg
def is_currently_walking(self) -> bool:
@@ -52,7 +64,7 @@ def is_currently_walking(self) -> bool:
# ## Timer ##
#############
- def start_timer(self, timer_name: str, duration_secs: int):
+ def start_timer(self, timer_name: str, duration_secs: int) -> None:
"""
Starts a timer
:param timer_name: Name of the timer
@@ -61,7 +73,7 @@ def start_timer(self, timer_name: str, duration_secs: int):
"""
self.timers[timer_name] = self._node.get_clock().now() + Duration(seconds=duration_secs)
- def end_timer(self, timer_name: str):
+ def end_timer(self, timer_name: str) -> None:
"""
Ends a timer
:param timer_name: Name of the timer
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py
index a621bf0a3..30754da64 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/pathfinding_capsule.py
@@ -15,7 +15,7 @@
# Type of pathfinding goal relative to the ball
-class BallGoalType(Enum):
+class BallGoalType(str, Enum):
GRADIENT = "gradient"
MAP = "map"
CLOSE = "close"
@@ -26,8 +26,8 @@ class PathfindingCapsule(AbstractBlackboardCapsule):
def __init__(self, node, blackboard):
super().__init__(node, blackboard)
- self.position_threshold: str = self._node.get_parameter("pathfinding_position_threshold").value
- self.orientation_threshold: str = self._node.get_parameter("pathfinding_orientation_threshold").value
+ self.position_threshold: float = self._node.get_parameter("pathfinding_position_threshold").value
+ self.orientation_threshold: float = self._node.get_parameter("pathfinding_orientation_threshold").value
self.direct_cmd_vel_pub = self._node.create_publisher(Twist, "cmd_vel", 1)
self.pathfinding_pub = self._node.create_publisher(PoseStamped, "goal_pose", 1)
@@ -41,20 +41,20 @@ def __init__(self, node, blackboard):
self._node, "bitbots_path_planning", ["controller.orient_to_goal_distance"]
)["controller.orient_to_goal_distance"]
- def publish(self, msg: PoseStamped):
+ def publish(self, msg: PoseStamped) -> None:
"""
Sends a goal to the pathfinding.
"""
self.goal = msg
self.pathfinding_pub.publish(msg)
- def get_goal(self) -> PoseStamped:
+ def get_goal(self) -> Optional[PoseStamped]:
"""
Returns the latest goal that was send to the pathfinding.
"""
return self.goal
- def cancel_goal(self):
+ def cancel_goal(self) -> None:
"""
This function cancels the current goal of the pathfinding,
which will stop sending cmd_vel messages to the walking.
@@ -62,7 +62,7 @@ def cancel_goal(self):
"""
self.pathfinding_cancel_pub.publish(Empty())
- def cmd_vel_cb(self, msg: Twist):
+ def cmd_vel_cb(self, msg: Twist) -> None:
self.current_cmd_vel = msg
def get_current_cmd_vel(self) -> Twist:
@@ -73,7 +73,7 @@ def get_current_cmd_vel(self) -> Twist:
"""
return self.current_cmd_vel
- def stop_walk(self):
+ def stop_walk(self) -> None:
"""
This function stops the walking. It does not cancel the current goal of the
pathfinding and the walking will start again if the pathfinding sends a new message.
@@ -86,7 +86,7 @@ def stop_walk(self):
# Publish the stop command
self.direct_cmd_vel_pub.publish(msg)
- def calculate_time_to_ball(self):
+ def calculate_time_to_ball(self) -> None:
"""
Calculates the time to ball and saves it in the team data capsule.
"""
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py
index e22dafd38..3fea4f71f 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/team_data_capsule.py
@@ -1,4 +1,4 @@
-from typing import Dict, List, Optional, Tuple
+from typing import List, Literal, Optional, Tuple
import numpy as np
from bitbots_utils.utils import get_parameters_from_other_node
@@ -27,7 +27,7 @@ def __init__(self, node, blackboard):
# Data
# indexed with one to match robot ids
- self.team_data: Dict[TeamData] = {}
+ self.team_data: dict[int, TeamData] = {}
for i in range(1, 7):
self.team_data[i] = TeamData()
self.times_to_ball = dict()
@@ -65,8 +65,8 @@ def __init__(self, node, blackboard):
self.action_update: float = 0.0
# Config
- self.data_timeout: float = self._node.get_parameter("team_data_timeout").value
- self.ball_max_covariance: float = self._node.get_parameter("ball_max_covariance").value
+ self.data_timeout: float = float(self._node.get_parameter("team_data_timeout").value)
+ self.ball_max_covariance: float = float(self._node.get_parameter("ball_max_covariance").value)
def is_valid(self, data: TeamData) -> bool:
"""
@@ -78,7 +78,7 @@ def is_valid(self, data: TeamData) -> bool:
and data.state != TeamData.STATE_PENALIZED
)
- def is_goalie_handling_ball(self):
+ def is_goalie_handling_ball(self) -> bool:
"""Returns true if the goalie is going to the ball."""
data: TeamData
for data in self.team_data.values():
@@ -90,7 +90,7 @@ def is_goalie_handling_ball(self):
return True
return False
- def is_team_mate_kicking(self):
+ def is_team_mate_kicking(self) -> bool:
"""Returns true if one of the players in the own team is kicking."""
data: TeamData
for data in self.team_data.values():
@@ -98,7 +98,9 @@ def is_team_mate_kicking(self):
return True
return False
- def team_rank_to_ball(self, own_ball_distance: float, count_goalies: bool = True, use_time_to_ball: bool = False):
+ def team_rank_to_ball(
+ self, own_ball_distance: float, count_goalies: bool = True, use_time_to_ball: bool = False
+ ) -> int:
"""
Returns the rank of this robot compared to the team robots concerning ball distance.
@@ -132,7 +134,7 @@ def team_rank_to_ball(self, own_ball_distance: float, count_goalies: bool = True
return rank + 1
return len(distances) + 1
- def set_action(self, action: int):
+ def set_action(self, action: int) -> None:
"""Set the action of this robot
:param action: An action from bitbots_msgs/Strategy"""
@@ -143,14 +145,11 @@ def set_action(self, action: int):
def get_action(self) -> Tuple[int, float]:
return self.strategy.action, self.action_update
- def set_role(self, role: str):
+ def set_role(self, role: Literal["goalie", "offense", "defense"]) -> None:
"""Set the role of this robot in the team
- :param role: String describing the role, possible values are:
- ['goalie', 'offense', 'defense']
+ :param role: String describing the role.
"""
- assert role in ["goalie", "offense", "defense"]
-
self.role = role
self.strategy.role = self.roles_mapping[role]
self.role_update = self._node.get_clock().now().nanoseconds / 1e9
@@ -158,8 +157,10 @@ def set_role(self, role: str):
def get_role(self) -> Tuple[int, float]:
return self.strategy.role, self.role_update
- def set_kickoff_strategy(self, strategy: int):
- assert strategy in [Strategy.SIDE_LEFT, Strategy.SIDE_MIDDLE, Strategy.SIDE_RIGHT]
+ def set_kickoff_strategy(
+ self,
+ strategy: Literal[Strategy.SIDE_LEFT, Strategy.SIDE_MIDDLE, Strategy.SIDE_RIGHT], # type: ignore[valid-type]
+ ) -> None:
self.strategy.offensive_side = strategy
self.strategy_update = self._node.get_clock().now().nanoseconds / 1e9
@@ -184,7 +185,7 @@ def is_not_goalie(team_data: TeamData) -> bool:
# Remove goalie data if needed
if not count_goalie:
- team_data_infos = filter(is_not_goalie, team_data_infos)
+ team_data_infos = filter(is_not_goalie, team_data_infos) # type: ignore[assignment]
# Count valid team data infos (aka robots with valid team data)
return sum(map(self.is_valid, team_data_infos))
@@ -194,10 +195,10 @@ def is_a_goalie(team_data: TeamData) -> bool:
return team_data.strategy.role == Strategy.ROLE_GOALIE
# Get the team data infos for all robots (ignoring the robot id/name)
- team_data_infos = self.team_data.values()
+ team_data_infos = self.team_data.values() # type: ignore[assignment]
# Remove none goalie Data
- team_data_infos = filter(is_a_goalie, team_data_infos)
+ team_data_infos = filter(is_a_goalie, team_data_infos) # type: ignore[assignment]
# Count valid team data infos (aka robots with valid team data)
return sum(map(self.is_valid, team_data_infos)) == 1
@@ -209,14 +210,14 @@ def team_data_callback(self, msg: TeamData):
# Save team data
self.team_data[msg.robot_id] = msg
- def publish_strategy(self):
+ def publish_strategy(self) -> None:
"""Publish for team comm"""
self.strategy_sender.publish(self.strategy)
def publish_time_to_ball(self):
self.time_to_ball_publisher.publish(Float32(data=self.own_time_to_ball))
- def teammate_ball_is_valid(self):
+ def teammate_ball_is_valid(self) -> bool:
"""Returns true if a teammate has seen the ball accurately enough"""
return self.get_teammate_ball() is not None
@@ -227,7 +228,7 @@ def get_teammate_ball(self) -> Optional[PointStamped]:
team_data_infos = filter(self.is_valid, self.team_data.values())
def is_ball_good_enough(team_data: TeamData) -> bool:
- return (
+ return bool(
team_data.ball_absolute.covariance[0] < self.ball_max_covariance
and team_data.ball_absolute.covariance[7] < self.ball_max_covariance
)
@@ -246,3 +247,4 @@ def get_ball_max_covariance(team_data: TeamData) -> float:
return PointStamped(
header=team_data_with_best_ball.header, point=team_data_with_best_ball.ball_absolute.pose.position
)
+ return None
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py
index 47c0ef3d8..6f382ee8c 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/world_model_capsule.py
@@ -1,5 +1,5 @@
import math
-from typing import Optional, Tuple
+from typing import Tuple
import numpy as np
import tf2_ros as tf2
@@ -22,6 +22,18 @@
from bitbots_blackboard.capsules import AbstractBlackboardCapsule
+class WorldModelTFError(Exception):
+ ...
+
+
+class WorldModelPositionTFError(WorldModelTFError):
+ ...
+
+
+class WorldModelBallTFError(WorldModelTFError):
+ ...
+
+
class WorldModelCapsule(AbstractBlackboardCapsule):
"""Provides information about the world model."""
@@ -118,7 +130,7 @@ def get_ball_position_uv(self) -> Tuple[float, float]:
except tf2.ExtrapolationException as e:
self._node.get_logger().warn(str(e))
self._node.get_logger().error("Severe transformation problem concerning the ball!")
- return None
+ raise WorldModelBallTFError("Could not transform ball to base_footprint frame") from e
return ball_bfp.x, ball_bfp.y
def get_ball_distance(self) -> float:
@@ -171,38 +183,38 @@ def forget_ball(self) -> None:
# Goal #
########
- def get_map_based_opp_goal_center_uv(self):
+ def get_map_based_opp_goal_center_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_uv_from_xy(x, y)
- def get_map_based_opp_goal_center_xy(self):
+ def get_map_based_opp_goal_center_xy(self) -> Tuple[float, float]:
return self.field_length / 2, 0.0
- def get_map_based_own_goal_center_uv(self):
+ def get_map_based_own_goal_center_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_own_goal_center_xy()
return self.get_uv_from_xy(x, y)
- def get_map_based_own_goal_center_xy(self):
+ def get_map_based_own_goal_center_xy(self) -> Tuple[float, float]:
return -self.field_length / 2, 0.0
- def get_map_based_opp_goal_angle_from_ball(self):
+ def get_map_based_opp_goal_angle_from_ball(self) -> float:
ball_x, ball_y = self.get_ball_position_xy()
goal_x, goal_y = self.get_map_based_opp_goal_center_xy()
return math.atan2(goal_y - ball_y, goal_x - ball_x)
- def get_map_based_opp_goal_distance(self):
+ def get_map_based_opp_goal_distance(self) -> float:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_distance_to_xy(x, y)
- def get_map_based_opp_goal_angle(self):
+ def get_map_based_opp_goal_angle(self) -> float:
x, y = self.get_map_based_opp_goal_center_uv()
return math.atan2(y, x)
- def get_map_based_opp_goal_left_post_uv(self):
+ def get_map_based_opp_goal_left_post_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_uv_from_xy(x, y - self.goal_width / 2)
- def get_map_based_opp_goal_right_post_uv(self):
+ def get_map_based_opp_goal_right_post_uv(self) -> Tuple[float, float]:
x, y = self.get_map_based_opp_goal_center_xy()
return self.get_uv_from_xy(x, y + self.goal_width / 2)
@@ -210,23 +222,21 @@ def get_map_based_opp_goal_right_post_uv(self):
# Pose #
########
- def get_current_position(self) -> Optional[Tuple[float, float, float]]:
+ def get_current_position(self) -> Tuple[float, float, float]:
"""
Returns the current position on the field as determined by the localization.
0,0,0 is the center of the field looking in the direction of the opponent goal.
:returns x,y,theta:
"""
- if not (transform := self.get_current_position_transform()):
- return None
+ transform = self.get_current_position_transform()
theta = euler_from_quaternion(numpify(transform.transform.rotation))[2]
return transform.transform.translation.x, transform.transform.translation.y, theta
- def get_current_position_pose_stamped(self) -> Optional[PoseStamped]:
+ def get_current_position_pose_stamped(self) -> PoseStamped:
"""
Returns the current position as determined by the localization as a PoseStamped
"""
- if not (transform := self.get_current_position_transform()):
- return None
+ transform = self.get_current_position_transform()
return PoseStamped(
header=transform.header,
pose=Pose(
@@ -245,13 +255,13 @@ def get_current_position_transform(self) -> TransformStamped:
)
except (tf2.LookupException, tf2.ConnectivityException, tf2.ExtrapolationException) as e:
self._node.get_logger().warn(str(e))
- return None
+ raise WorldModelPositionTFError("Could not get current position transform") from e
##########
# Common #
##########
- def get_uv_from_xy(self, x, y) -> Tuple[float, float]:
+ def get_uv_from_xy(self, x: float, y: float) -> Tuple[float, float]:
"""Returns the relativ positions of the robot to this absolute position"""
current_position = self.get_current_position()
x2 = x - current_position[0]
@@ -261,14 +271,14 @@ def get_uv_from_xy(self, x, y) -> Tuple[float, float]:
v = math.cos(theta) * y2 - math.sin(theta) * x2
return u, v
- def get_xy_from_uv(self, u, v):
+ def get_xy_from_uv(self, u: float, v: float) -> Tuple[float, float]:
"""Returns the absolute position from the given relative position to the robot"""
pos_x, pos_y, theta = self.get_current_position()
angle = math.atan2(v, u) + theta
hypotenuse = math.hypot(u, v)
return pos_x + math.sin(angle) * hypotenuse, pos_y + math.cos(angle) * hypotenuse
- def get_distance_to_xy(self, x, y):
+ def get_distance_to_xy(self, x: float, y: float) -> float:
"""Returns distance from robot to given position"""
u, v = self.get_uv_from_xy(x, y)
dist = math.hypot(u, v)
diff --git a/bitbots_behavior/bitbots_blackboard/mypy.ini b/bitbots_behavior/bitbots_blackboard/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_behavior/bitbots_blackboard/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_behavior/bitbots_blackboard/package.xml b/bitbots_behavior/bitbots_blackboard/package.xml
index 9463c26f2..4bbed0ca3 100644
--- a/bitbots_behavior/bitbots_blackboard/package.xml
+++ b/bitbots_behavior/bitbots_blackboard/package.xml
@@ -37,6 +37,10 @@
std_srvs
tf2_geometry_msgs
tf2
+ ament_mypy
+ ament_cmake_mypy
+
+ ament_cmake
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py
index e69de29bb..2b0d3a359 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/__init__.py
@@ -0,0 +1,6 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py
index e879b7d2c..72068ebbd 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/go_to.py
@@ -12,7 +12,7 @@
class GoToRelativePosition(AbstractActionElement):
blackboard: BodyBlackboard
- def __init__(self, blackboard, dsd, parameters: dict = None):
+ def __init__(self, blackboard, dsd, parameters):
super().__init__(blackboard, dsd, parameters)
self.point = float(parameters.get("x", 0)), float(parameters.get("y", 0)), float(parameters.get("t", 0))
self.threshold = float(parameters.get("threshold", 0.1))
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py
index 79786dd23..63ae26d99 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/actions/turn.py
@@ -59,7 +59,7 @@ def __init__(self, blackboard, dsd, parameters):
self.max_speed = parameters.get("max_speed", 0.4)
# Check if the have a duration
- self.duration: Optional[float] = parameters.get("duration", None)
+ self.duration: Optional[float] = float(parameters.get("duration", None))
self.start_time = self.blackboard.node.get_clock().now()
def perform(self, reevaluate=False):
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py
index e69de29bb..2b0d3a359 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/__init__.py
@@ -0,0 +1,6 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py
index c403adcef..3ae2f8854 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/aligned_to_goal.py
@@ -26,10 +26,6 @@ def perform(self, reevaluate=False):
# Get our position
current_pose = self.blackboard.world_model.get_current_position()
- # Check if we know our position
- if current_pose is None:
- return "NO"
-
# Get maximum kick angle relative to our base footprint
angle_difference_right = current_pose[2] - self.max_kick_angle
angle_difference_left = current_pose[2] + self.max_kick_angle
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py
index 3aa9147b8..54d02574a 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/ball_kick_area.py
@@ -25,6 +25,9 @@ def perform(self, reevaluate=False):
ball_position = self.blackboard.world_model.get_ball_position_uv()
self.publish_debug_data("ball_position", {"u": ball_position[0], "v": ball_position[1]})
+ self.publish_debug_data(
+ "smoothing_close", f"{self.no_near_decisions} ({self.no_near_decisions / self.smoothing * 10}%)"
+ )
# Check if the ball is in the enter area
if 0 <= ball_position[0] <= self.kick_x_enter and 0 <= abs(ball_position[1]) <= self.kick_y_enter:
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py
index 4d3746787..ac29d5fa8 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/behavior_dsd/decisions/current_score.py
@@ -9,7 +9,7 @@ def __init__(self, blackboard, dsd, parameters):
super().__init__(blackboard, dsd, parameters)
def perform(self, reevaluate=False):
- own_goals = self.blackboard.gamestate.get_own_goals()
+ own_goals = self.blackboard.gamestate.get_our_goals()
opp_goals = self.blackboard.gamestate.get_opp_goals()
if own_goals == opp_goals:
diff --git a/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml b/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml
index 4b2e179a5..cd65c1175 100644
--- a/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml
+++ b/bitbots_behavior/bitbots_body_behavior/config/body_behavior.yaml
@@ -146,7 +146,7 @@ body_behavior:
dribble_ball_distance_threshold: 0.5
dribble_kick_angle: 0.6
- kick_decision_smoothing: 5.0
+ kick_decision_smoothing: 1.0
##################
# costmap params #
diff --git a/bitbots_behavior/bitbots_body_behavior/package.xml b/bitbots_behavior/bitbots_body_behavior/package.xml
index b660e9c5a..ed5935edf 100644
--- a/bitbots_behavior/bitbots_body_behavior/package.xml
+++ b/bitbots_behavior/bitbots_body_behavior/package.xml
@@ -19,6 +19,7 @@
bitbots_blackboard
bitbots_msgs
+ bitbots_tts
bitbots_utils
dynamic_stack_decider
game_controller_hl_interfaces
@@ -29,6 +30,7 @@
soccer_vision_3d_msgs
tf_transformations
tf2
+ ament_mypy
diff --git a/bitbots_behavior/bitbots_body_behavior/test/mypy.ini b/bitbots_behavior/bitbots_body_behavior/test/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_behavior/bitbots_body_behavior/test/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_behavior/bitbots_body_behavior/test/test_mypy.py b/bitbots_behavior/bitbots_body_behavior/test/test_mypy.py
new file mode 100644
index 000000000..3dacd6040
--- /dev/null
+++ b/bitbots_behavior/bitbots_body_behavior/test/test_mypy.py
@@ -0,0 +1,10 @@
+from pathlib import Path
+
+import pytest
+from ament_mypy.main import main
+
+
+@pytest.mark.mypy
+def test_mypy():
+ rc = main(argv=["--config", str((Path(__file__).parent / "mypy.ini").resolve())])
+ assert rc == 0, "Found code style errors / warnings"
diff --git a/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py b/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py
index 2aeb11345..3c679b86c 100755
--- a/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py
+++ b/bitbots_misc/bitbots_parameter_blackboard/bitbots_parameter_blackboard/game_settings.py
@@ -29,7 +29,7 @@
)
-def provide_config(path):
+def provide_config(path: str) -> dict:
"""
reads out the yaml you are asking for with the path parameter
:param path: filepath for your yaml
@@ -113,7 +113,7 @@ def check_new_value(new_value: str, value_type: Any, valid_options: Optional[lis
return True
-def ask_for_confirmation(question) -> bool:
+def ask_for_confirmation(question: str) -> bool:
result = None
prompt = " [Y/n] "
valid = {"": True, "y": True, "n": False}
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/.gitignore b/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/.gitignore
deleted file mode 100644
index ab08e78d3..000000000
--- a/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-# Ignore generate parameter library build artifacts
-bitbots_technical_challenge_vision_params.py
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/__init__.py b/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/__init__.py
deleted file mode 100644
index e69de29bb..000000000
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision.py b/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision.py
deleted file mode 100755
index 7c7957aad..000000000
--- a/bitbots_misc/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision/bitbots_technical_challenge_vision.py
+++ /dev/null
@@ -1,183 +0,0 @@
-from typing import Optional, Tuple
-
-import cv2
-import numpy as np
-import rclpy
-import rclpy.logging
-from ament_index_python.packages import get_package_share_directory
-from cv_bridge import CvBridge
-from rclpy.node import Node
-from sensor_msgs.msg import Image
-from soccer_vision_2d_msgs.msg import Robot, RobotArray
-
-from bitbots_technical_challenge_vision.bitbots_technical_challenge_vision_params import (
- bitbots_technical_challenge_vision,
-)
-
-
-class TechnicalChallengeVision(Node):
- def __init__(self):
- super().__init__("bitbots_technical_challenge_vision")
-
- self._package_path = get_package_share_directory("bitbots_technical_challenge_vision")
- self._cv_bridge = CvBridge()
- self._annotations_pub = self.create_publisher(RobotArray, "/robots_in_image", 10)
- self._debug_img_pub = self.create_publisher(Image, "/bitbots_technical_challenge_vision_debug_img", 10)
- self._debug_clrmp_pub_blue = self.create_publisher(
- Image, "/bitbots_technical_challenge_vision_debug_clrmp_blue", 10
- )
- self._debug_clrmp_pub_red = self.create_publisher(
- Image, "/bitbots_technical_challenge_vision_debug_clrmp_red", 10
- )
- self._img_sub = self.create_subscription(Image, "/camera/image_proc", self.image_callback, 10)
- self._param_listener = bitbots_technical_challenge_vision.ParamListener(self)
- self._params = self._param_listener.get_params()
-
- def create_robot_msg(self, x: int, y: int, w: int, h: int, t: int) -> Robot:
- """
- Creates a Robot message from a robots bounding box data and its color.
-
- :param x: bb top left x
- :param y: bb top left y
- :param w: bb width
- :param h: bb height
- :param t: robot team
- :return: robot message for that robot
- """
- robot = Robot()
-
- robot.bb.center.position.x = float(x + (w / 2))
- robot.bb.center.position.y = float(y + (h / 2))
- robot.bb.size_x = float(w)
- robot.bb.size_y = float(h)
- robot.attributes.team = t
-
- return robot
-
- def process_image(
- self, img: np.ndarray, debug_img: np.ndarray, arg
- ) -> Tuple[list[Robot], list[Robot], np.ndarray, np.ndarray]:
- """
- gets annotations from the camera image
-
- :param img: ndarray containing the camera image
- :param debug_img: copy of img debug annotations should be drawn here
- :param arg: __RosParameters object containing the dynamic parameters
- :return: [[blue_robots], [red_robots], clrmp_blue, clrmp_red]
- """
- # convert img to hsv to isolate colors better
- img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
-
- # get color maps
- blue_map = cv2.inRange(
- img,
- (arg.blue_lower_h, arg.blue_lower_s, arg.blue_lower_v),
- (arg.blue_upper_h, arg.blue_upper_s, arg.blue_upper_v),
- )
-
- red_map = cv2.inRange(
- img,
- (arg.red_lower_h, arg.red_lower_s, arg.red_lower_v),
- (arg.red_upper_h, arg.red_upper_s, arg.red_upper_v),
- )
-
- # get contours in color maps
- blue_contours, _ = cv2.findContours(blue_map, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
- red_contours, _ = cv2.findContours(red_map, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
-
- # get lists of bounding boxes
- blue_robots = []
- red_robots = []
-
- def annotate(x, y, w, h, c) -> Optional[np.ndarray]:
- if not arg.debug_mode:
- return None
- return cv2.rectangle(
- debug_img,
- (x, y),
- (x + w, y + h),
- c,
- 2,
- )
-
- for cnt in blue_contours:
- x, y, w, h = cv2.boundingRect(cnt)
- if min(w, h) >= arg.min_size and max(h, w) <= arg.max_size:
- # draw bb on debug img
- annotate(x, y, w, h, (255, 0, 0))
-
- # TODO I think 1 is for the blue team?
- blue_robots.append(self.create_robot_msg(x, y, w, h, 1))
-
- for cnt in red_contours:
- x, y, w, h = cv2.boundingRect(cnt)
- if min(w, h) >= arg.min_size and max(h, w) <= arg.max_size:
- # draw bb on debug img
- annotate(x, y, w, h, (0, 0, 255))
-
- red_robots.append(self.create_robot_msg(x, y, w, h, 2))
-
- return blue_robots, red_robots, blue_map, red_map, debug_img
-
- def image_callback(self, msg: Image):
- # get dynamic parameters
- if self._param_listener.is_old(self._params):
- self._param_listener.refresh_dynamic_parameters()
- self._params = self._param_listener.get_params()
-
- arg = self._params
-
- # set variables
- img = self._cv_bridge.imgmsg_to_cv2(img_msg=msg, desired_encoding="bgr8")
- header = msg.header
-
- if arg.debug_mode:
- debug_img = np.copy(img)
- else:
- debug_img = None
-
- # get annotations
- blue_robots, red_robots, blue_map, red_map, debug_img = self.process_image(img, debug_img, arg)
- robots = []
- robots.extend(blue_robots)
- robots.extend(red_robots)
-
- # make output message
- robot_array_message = RobotArray()
- robot_array_message.header = header
- robot_array_message.robots = robots
-
- # publish output message
- self._annotations_pub.publish(robot_array_message)
-
- if arg.debug_mode:
- # make debug image message
- debug_img_msg = self._cv_bridge.cv2_to_imgmsg(cvim=debug_img, encoding="bgr8", header=header)
-
- # publish debug image
- self._debug_img_pub.publish(debug_img_msg)
-
- # make color map messages
- clrmp_blue_img = cv2.cvtColor(blue_map, cv2.COLOR_GRAY2BGR)
- clrmp_blue_msg = self._cv_bridge.cv2_to_imgmsg(cvim=clrmp_blue_img, encoding="bgr8", header=header)
-
- clrmp_red_img = cv2.cvtColor(red_map, cv2.COLOR_GRAY2BGR)
- clrmp_red_msg = self._cv_bridge.cv2_to_imgmsg(clrmp_red_img, encoding="bgr8", header=header)
-
- # publish color map messages
- self._debug_clrmp_pub_blue.publish(clrmp_blue_msg)
- self._debug_clrmp_pub_red.publish(clrmp_red_msg)
-
-
-def main(args=None):
- rclpy.init(args=args)
- node = TechnicalChallengeVision()
- try:
- rclpy.spin(node)
- except KeyboardInterrupt:
- pass
- node.destroy_node()
-
-
-if __name__ == "__main__":
- main()
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/config/range.yaml b/bitbots_misc/bitbots_technical_challenge_vision/config/range.yaml
deleted file mode 100644
index 6716d55af..000000000
--- a/bitbots_misc/bitbots_technical_challenge_vision/config/range.yaml
+++ /dev/null
@@ -1,118 +0,0 @@
-bitbots_technical_challenge_vision:
- blue_lower_h: {
- type: int,
- default_value: 92,
- description: "hue value of the lower boundary for blue obstacles",
- validation: {
- bounds<>: [0, 179]
- }
- }
- blue_upper_h: {
- type: int,
- default_value: 110,
- description: "hue value of the upper boundary for blue obstacles",
- validation: {
- bounds<>: [0, 179]
- }
- }
- blue_lower_s: {
- type: int,
- default_value: 90,
- description: "separation value of the lower boundary for blue obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- blue_upper_s: {
- type: int,
- default_value: 236,
- description: "separation value of the upper boundary for blue obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- blue_lower_v: {
- type: int,
- default_value: 0,
- description: "value value of the lower boundary for blue obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- blue_upper_v: {
- type: int,
- default_value: 255,
- description: "value value of the upper boundary for blue obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- red_lower_h: {
- type: int,
- default_value: 138,
- description: "hue value of the lower boundary for red obstacles",
- validation: {
- bounds<>: [0, 179]
- }
- }
- red_upper_h: {
- type: int,
- default_value: 179,
- description: "hue value of the upper boundary for red obstacles",
- validation: {
- bounds<>: [0, 179]
- }
- }
- red_lower_s: {
- type: int,
- default_value: 78,
- description: "separation value of the lower boundary for red obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- red_upper_s: {
- type: int,
- default_value: 255,
- description: "separation value of the upper boundary for red obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- red_lower_v: {
- type: int,
- default_value: 0,
- description: "value value of the lower boundary for red obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- red_upper_v: {
- type: int,
- default_value: 255,
- description: "value value of the upper boundary for red obstacles",
- validation: {
- bounds<>: [0, 255]
- }
- }
- min_size: {
- type: int,
- default_value: 20,
- description: "minimum size of an obstacle to be considered",
- validation: {
- gt_eq<>: 0
- }
- }
- max_size: {
- type: int,
- default_value: 400,
- description: "maximum size of an obstacle to be considered",
- validation: {
- gt_eq<>: 0
- }
- }
- debug_mode: {
- type: bool,
- default_value: false,
- description: "set true if debug images should be drawn and published",
- }
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/launch/vision.launch b/bitbots_misc/bitbots_technical_challenge_vision/launch/vision.launch
deleted file mode 100644
index 1d36dc25f..000000000
--- a/bitbots_misc/bitbots_technical_challenge_vision/launch/vision.launch
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/package.xml b/bitbots_misc/bitbots_technical_challenge_vision/package.xml
deleted file mode 100644
index 9c0a53d6e..000000000
--- a/bitbots_misc/bitbots_technical_challenge_vision/package.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-
-
-
- bitbots_technical_challenge_vision
- 0.0.0
- TODO: Package description
- par
- TODO: License declaration
-
- rclpy
- python3-opencv
- python3-numpy
- cv_bridge
- sensor_msgs
- soccer_vision_2d_msgs
-
- generate_parameter_library
-
- ament_copyright
- ament_flake8
- ament_pep257
- python3-pytest
-
-
- ament_python
-
-
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/resource/bitbots_technical_challenge_vision b/bitbots_misc/bitbots_technical_challenge_vision/resource/bitbots_technical_challenge_vision
deleted file mode 100644
index e69de29bb..000000000
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/setup.cfg b/bitbots_misc/bitbots_technical_challenge_vision/setup.cfg
deleted file mode 100644
index 34ccaf1e9..000000000
--- a/bitbots_misc/bitbots_technical_challenge_vision/setup.cfg
+++ /dev/null
@@ -1,4 +0,0 @@
-[develop]
-script_dir=$base/lib/bitbots_technical_challenge_vision
-[install]
-install_scripts=$base/lib/bitbots_technical_challenge_vision
diff --git a/bitbots_misc/bitbots_technical_challenge_vision/setup.py b/bitbots_misc/bitbots_technical_challenge_vision/setup.py
deleted file mode 100644
index 4603d372c..000000000
--- a/bitbots_misc/bitbots_technical_challenge_vision/setup.py
+++ /dev/null
@@ -1,34 +0,0 @@
-import glob
-
-from generate_parameter_library_py.setup_helper import generate_parameter_module
-from setuptools import find_packages, setup
-
-package_name = "bitbots_technical_challenge_vision"
-
-setup(
- name=package_name,
- version="0.0.0",
- packages=find_packages(exclude=["test"]),
- data_files=[
- ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
- ("share/" + package_name, ["package.xml"]),
- ("share/" + package_name + "/config", glob.glob("config/*.yaml")),
- ("share/" + package_name + "/launch", glob.glob("launch/*.launch")),
- ],
- install_requires=["setuptools"],
- zip_safe=True,
- maintainer="par",
- maintainer_email="paer-wiegmann@gmx.de",
- description="This Package provides a simple vision to detect the obstacles for the obstacle avoidance challenge.",
- license="MIT",
- entry_points={
- "console_scripts": [
- "bitbots_technical_challenge_vision = bitbots_technical_challenge_vision.bitbots_technical_challenge_vision:main",
- ],
- },
-)
-
-generate_parameter_module(
- "bitbots_technical_challenge_vision_params",
- "config/range.yaml",
-)
diff --git a/bitbots_misc/bitbots_teleop/scripts/teleop_keyboard.py b/bitbots_misc/bitbots_teleop/scripts/teleop_keyboard.py
index 885df006e..341518df0 100755
--- a/bitbots_misc/bitbots_teleop/scripts/teleop_keyboard.py
+++ b/bitbots_misc/bitbots_teleop/scripts/teleop_keyboard.py
@@ -20,6 +20,7 @@
from bitbots_msgs.action import Dynup, Kick
from bitbots_msgs.msg import HeadMode, JointCommand
+from bitbots_msgs.srv import SimulatorPush
msg = """
BitBots Teleop
@@ -57,6 +58,13 @@
4: Ball Mode adapted for Penalty Kick
5: Do a pattern which only looks in front of the robot
+Pushing:
+p: execute Push
+Shift-p: reset Power to 0
+ü/ä: increase/decrease power forward (x axis)
++/#: increase/decrease power left (y axis)
+SHIFT increases/decreases with factor 10
+
CTRL-C to quit
@@ -121,6 +129,9 @@ def __init__(self):
self.th = 0
self.status = 0
+ self.push_force_x = 0.0
+ self.push_force_y = 0.0
+
# Head Part
self.create_subscription(JointState, "joint_states", self.joint_state_cb, 1)
self.head_mode_pub = self.create_publisher(HeadMode, "head_mode", 1)
@@ -145,6 +156,7 @@ def __init__(self):
self.reset_robot = self.create_client(Empty, "/reset_pose")
self.reset_ball = self.create_client(Empty, "/reset_ball")
+ self.simulator_push = self.create_client(SimulatorPush, "/simulator_push")
self.frame_prefix = "" if os.environ.get("ROS_NAMESPACE") is None else os.environ.get("ROS_NAMESPACE") + "/"
@@ -308,6 +320,32 @@ def loop(self):
self.z = 0
self.a_x = -1
self.th = 0
+ elif key == "p":
+ # push robot in simulation
+ push_request = SimulatorPush.Request()
+ push_request.force.x = self.push_force_x
+ push_request.force.y = self.push_force_y
+ push_request.relative = True
+ self.simulator_push.call_async(push_request)
+ elif key == "P":
+ self.push_force_x = 0.0
+ self.push_force_y = 0.0
+ elif key == "ü":
+ self.push_force_x += 1
+ elif key == "Ü":
+ self.push_force_x += 10
+ elif key == "ä":
+ self.push_force_x -= 1
+ elif key == "Ä":
+ self.push_force_x -= 10
+ elif key == "+":
+ self.push_force_y += 1
+ elif key == "*":
+ self.push_force_y += 10
+ elif key == "#":
+ self.push_force_y -= 1
+ elif key == "'":
+ self.push_force_y -= 10
else:
self.x = 0
self.y = 0
@@ -324,7 +362,15 @@ def loop(self):
twist.linear = Vector3(x=float(self.x), y=float(self.y), z=0.0)
twist.angular = Vector3(x=float(self.a_x), y=0.0, z=float(self.th))
self.pub.publish(twist)
- state_str = f"x: {self.x} \ny: {self.y} \nturn: {self.th} \nhead mode: {self.head_mode_msg.head_mode} \n"
+ state_str = (
+ f"x: {self.x} \n"
+ f"y: {self.y} \n"
+ f"turn: {self.th} \n"
+ f"head mode: {self.head_mode_msg.head_mode} \n"
+ f"push force x (+forward/-back): {self.push_force_x} \n"
+ f"push force y (+left/-right): {self.push_force_y} "
+ )
+
for _ in range(state_str.count("\n") + 1):
sys.stdout.write("\x1b[A")
print(state_str)
diff --git a/bitbots_misc/bitbots_tf_buffer/bitbots_tf_buffer/__init__.py b/bitbots_misc/bitbots_tf_buffer/bitbots_tf_buffer/__init__.py
index 33230d76e..cf824a0b6 100644
--- a/bitbots_misc/bitbots_tf_buffer/bitbots_tf_buffer/__init__.py
+++ b/bitbots_misc/bitbots_tf_buffer/bitbots_tf_buffer/__init__.py
@@ -1,6 +1,9 @@
from typing import Optional
import tf2_ros as tf2
+
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
from builtin_interfaces.msg import Duration as DurationMsg
from builtin_interfaces.msg import Time as TimeMsg
from geometry_msgs.msg import TransformStamped
@@ -10,6 +13,8 @@
from bitbots_tf_buffer.cpp_wrapper import Buffer as CppBuffer
+beartype_this_package()
+
class Buffer(tf2.BufferCore, tf2.BufferInterface):
"""
diff --git a/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py b/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py
+++ b/bitbots_misc/bitbots_tts/bitbots_tts/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py b/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py
+++ b/bitbots_misc/bitbots_utils/bitbots_utils/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py b/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py
index f40d482d7..92b448418 100644
--- a/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py
+++ b/bitbots_misc/bitbots_utils/bitbots_utils/transforms.py
@@ -1,4 +1,5 @@
import math
+from typing import Literal
import numpy as np
from geometry_msgs.msg import Quaternion
@@ -37,7 +38,7 @@ def sixd2quat(sixd):
return mat2quat(mat)
-def quat2fused(q, order="wxyz"):
+def quat2fused(q, order: Literal["xyzw", "wxyz"] = "wxyz"):
# Convert to numpy array if necessary
q = np.asarray(q)
@@ -46,8 +47,6 @@ def quat2fused(q, order="wxyz"):
q_xyzw = q
elif order == "wxyz":
q_xyzw = wxyz2xyzw(q)
- else:
- raise ValueError(f"Unknown quaternion order: {order}")
# Fused yaw of Quaternion
fused_yaw = 2.0 * math.atan2(
@@ -78,7 +77,7 @@ def quat2fused(q, order="wxyz"):
# Conversion: Fused angles (3D/4D) --> Quaternion (wxyz)
-def fused2quat(fused_roll, fused_pitch, fused_yaw, hemi):
+def fused2quat(fused_roll: float, fused_pitch: float, fused_yaw: float, hemi: bool) -> np.ndarray:
# Precalculate the sine values
sth = math.sin(fused_pitch)
sphi = math.sin(fused_roll)
@@ -115,7 +114,7 @@ def fused2quat(fused_roll, fused_pitch, fused_yaw, hemi):
return np.array([chalpha * chpsi, shalpha * chgampsi, shalpha * shgampsi, chalpha * shpsi]) # Order: (w,x,y,z)
-def compute_imu_orientation_from_world(robot_quat_in_world):
+def compute_imu_orientation_from_world(robot_quat_in_world) -> tuple[tuple[float, float, float], np.ndarray]:
# imu orientation has roll and pitch relative to gravity vector. yaw in world frame
# get global yaw
yrp_world_frame = quat2euler(robot_quat_in_world, axes="szxy")
@@ -126,5 +125,5 @@ def compute_imu_orientation_from_world(robot_quat_in_world):
return [rp[0], rp[1], 0], yaw_quat
-def quat_from_yaw(yaw: float) -> Quaternion:
+def quat_from_yaw(yaw: float | int) -> Quaternion:
return msgify(Quaternion, wxyz2xyzw(euler2quat(yaw, 0, 0, axes="szxy")))
diff --git a/bitbots_misc/bitbots_utils/bitbots_utils/utils.py b/bitbots_misc/bitbots_utils/bitbots_utils/utils.py
index 74f8d356e..2a187488b 100644
--- a/bitbots_misc/bitbots_utils/bitbots_utils/utils.py
+++ b/bitbots_misc/bitbots_utils/bitbots_utils/utils.py
@@ -4,6 +4,7 @@
import rclpy
import yaml
from ament_index_python import get_package_share_directory
+from beartype import BeartypeConf, BeartypeStrategy, beartype
from rcl_interfaces.msg import Parameter as ParameterMsg
from rcl_interfaces.msg import ParameterType as ParameterTypeMsg
from rcl_interfaces.msg import ParameterValue as ParameterValueMsg
@@ -11,8 +12,11 @@
from rclpy.node import Node
from rclpy.parameter import PARAMETER_SEPARATOR_STRING, Parameter, parameter_value_to_python
+# Create a new @nobeartype decorator disabling type-checking.
+nobeartype = beartype(conf=BeartypeConf(strategy=BeartypeStrategy.O0))
-def read_urdf(robot_name):
+
+def read_urdf(robot_name: str) -> str:
urdf = os.popen(
f"xacro {get_package_share_directory(f'{robot_name}_description')}"
f"/urdf/robot.urdf use_fake_walk:=false sim_ns:=false"
@@ -20,7 +24,7 @@ def read_urdf(robot_name):
return urdf
-def load_moveit_parameter(robot_name):
+def load_moveit_parameter(robot_name: str) -> List[ParameterMsg]:
moveit_parameters = get_parameters_from_plain_yaml(
f"{get_package_share_directory(f'{robot_name}_moveit_config')}/config/kinematics.yaml",
"robot_description_kinematics.",
@@ -40,7 +44,7 @@ def load_moveit_parameter(robot_name):
return moveit_parameters
-def get_parameters_from_ros_yaml(node_name, parameter_file, use_wildcard):
+def get_parameters_from_ros_yaml(node_name: str, parameter_file: str, use_wildcard: bool) -> List[ParameterMsg]:
# Remove leading slash and namespaces
with open(parameter_file) as f:
param_file = yaml.safe_load(f)
@@ -66,7 +70,7 @@ def get_parameters_from_ros_yaml(node_name, parameter_file, use_wildcard):
return parse_parameter_dict(namespace="", parameter_dict=param_dict)
-def get_parameters_from_plain_yaml(parameter_file, namespace=""):
+def get_parameters_from_plain_yaml(parameter_file, namespace="") -> List[ParameterMsg]:
with open(parameter_file) as f:
param_dict = yaml.safe_load(f)
return parse_parameter_dict(namespace=namespace, parameter_dict=param_dict)
@@ -130,6 +134,28 @@ def get_parameters_from_other_node(
return results
+def get_parameters_from_other_node_sync(
+ own_node: Node, other_node_name: str, parameter_names: List[str], service_timeout_sec: float = 20.0
+) -> Dict:
+ """
+ Used to receive parameters from other running nodes. It does not use async internally.
+ It should not be used in callback functions, but it is a bit more reliable than the async version.
+ Returns a dict with requested parameter name as dict key and parameter value as dict value.
+ """
+ client = own_node.create_client(GetParameters, f"{other_node_name}/get_parameters")
+ ready = client.wait_for_service(timeout_sec=service_timeout_sec)
+ if not ready:
+ raise RuntimeError(f"Wait for {other_node_name} parameter service timed out")
+ request = GetParameters.Request()
+ request.names = parameter_names
+ response = client.call(request)
+
+ results = {} # Received parameter
+ for i, param in enumerate(parameter_names):
+ results[param] = parameter_value_to_python(response.values[i])
+ return results
+
+
def set_parameters_of_other_node(
own_node: Node,
other_node_name: str,
@@ -158,7 +184,7 @@ def set_parameters_of_other_node(
return [res.success for res in response.results]
-def parse_parameter_dict(*, namespace, parameter_dict):
+def parse_parameter_dict(*, namespace: str, parameter_dict: dict) -> list[ParameterMsg]:
parameters = []
for param_name, param_value in parameter_dict.items():
full_param_name = namespace + param_name
diff --git a/bitbots_misc/system_monitor/system_monitor/__init__.py b/bitbots_misc/system_monitor/system_monitor/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_misc/system_monitor/system_monitor/__init__.py
+++ b/bitbots_misc/system_monitor/system_monitor/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
index 3631eb9e4..075036004 100644
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
@@ -4,8 +4,9 @@
import math
import os
from copy import deepcopy
+from dataclasses import dataclass, field
from datetime import datetime
-from typing import Optional
+from typing import Any, Optional
import regex as re
from rclpy.action import ActionClient
@@ -17,18 +18,40 @@
from bitbots_msgs.srv import AddAnimation
+@dataclass
+class Keyframe:
+ """
+ Defines a keyframe of the recorded Animation
+ """
+
+ name: str
+ duration: float
+ pause: float
+ goals: dict[str, float]
+ torque: dict[str, int]
+
+ def as_dict(self) -> dict[str, Any]:
+ return {
+ "name": self.name,
+ "duration": self.duration,
+ "pause": self.pause,
+ "goals": self.goals,
+ "torque": self.torque,
+ }
+
+
+@dataclass
class AnimationData:
"""
Defines a current status of the recorded Animation
"""
- def __init__(self):
- self.key_frames: list[dict] = []
- self.author: str = "Unknown"
- self.description: str = "Edit me!"
- self.last_edited: datetime = datetime.now().isoformat(" ")
- self.name: str = "None yet"
- self.version: int = 0
+ author: str = "Unknown"
+ key_frames: list[Keyframe] = field(default_factory=list)
+ last_edited: str = datetime.now().isoformat(" ")
+ description: str = "Edit me!"
+ version: str = "0"
+ name: str = "None yet"
class Recorder:
@@ -48,7 +71,7 @@ def __init__(
self.hcm_record_mode_client = hcm_record_mode_client
self.save_step("Initial step", frozen=True)
- def get_keyframes(self) -> list[dict]:
+ def get_keyframes(self) -> list[Keyframe]:
"""
Gets the keyframes of the current animation
@@ -56,7 +79,7 @@ def get_keyframes(self) -> list[dict]:
"""
return self.current_state.key_frames
- def get_keyframe(self, name: str) -> Optional[dict]:
+ def get_keyframe(self, name: str) -> Optional[Keyframe]:
"""
Gets the keyframe with the given name
@@ -64,7 +87,7 @@ def get_keyframe(self, name: str) -> Optional[dict]:
:return: the keyframe with the given name
"""
# Get the first keyframe with the given name or None
- return next(filter(lambda key_frame: key_frame["name"] == name, self.get_keyframes()), None)
+ return next(filter(lambda key_frame: key_frame.name == name, self.get_keyframes()), None) # type: ignore[arg-type, union-attr]
def get_keyframe_index(self, name: str) -> Optional[int]:
"""
@@ -79,20 +102,20 @@ def get_keyframe_index(self, name: str) -> Optional[int]:
return None
return self.get_keyframes().index(key_frame)
- def get_meta_data(self) -> tuple[str, int, str, str]:
+ def get_meta_data(self) -> tuple[str, str, str, str]:
"""
Returns the meta data of the current animation
"""
data = self.current_state
return data.name, data.version, data.author, data.description
- def set_meta_data(self, name: str, version: int, author: str, description: str) -> None:
+ def set_meta_data(self, name: str, version: str, author: str, description: str) -> None:
self.current_state.name = name
self.current_state.version = version
self.current_state.author = author
self.current_state.description = description
- def save_step(self, description: str, state: Optional[dict] = None, frozen: bool = False) -> None:
+ def save_step(self, description: str, state: Optional[AnimationData] = None, frozen: bool = False) -> None:
"""Save the current state of the Animation
for later restoration by the undo-command
@@ -161,8 +184,15 @@ def record(
:param override: Wether or not to override the frame at the given position
:param frozen: if True, the step cannot be undone
"""
- frame = {"name": frame_name, "duration": duration, "pause": pause, "goals": motor_pos, "torque": motor_torque}
- new_frame = deepcopy(frame)
+ new_frame = deepcopy(
+ Keyframe(
+ name=frame_name,
+ duration=duration,
+ pause=pause,
+ goals=motor_pos,
+ torque=motor_torque,
+ )
+ )
if seq_pos is None:
self.current_state.key_frames.append(new_frame)
self.save_step(f"Appending new keyframe '{frame_name}'", frozen=frozen)
@@ -172,9 +202,8 @@ def record(
else:
self.current_state.key_frames[seq_pos] = new_frame
self.save_step(f"Overriding keyframe at position {seq_pos} with '{frame_name}'", frozen=frozen)
- return True
- def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
+ def save_animation(self, path: str, file_name: Optional[str] = None) -> str:
"""Dumps all keyframe data to an animation .json file
:param path: The folder where the file should be saved
@@ -194,8 +223,8 @@ def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
# Convert the angles from radians to degrees
keyframes = deepcopy(self.current_state.key_frames)
for kf in keyframes:
- for k, v in kf["goals"].items():
- kf["goals"][k] = math.degrees(v)
+ for k, v in kf.goals.items():
+ kf.goals[k] = math.degrees(v)
# Construct the animation dictionary with meta data and keyframes
animation_dict = {
@@ -213,7 +242,7 @@ def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
return "Saving to '%s'" % path + ". Done."
- def load_animation(self, path: str) -> None:
+ def load_animation(self, path: str) -> Optional[str]:
"""Record command, load a animation '.json' file
:param path: path of the animation to load
@@ -223,23 +252,34 @@ def load_animation(self, path: str) -> None:
data = json.load(fp)
# Check if the file is a valid animation and convert the angles from degrees to radians
try:
+ keyframes: list[Keyframe] = []
for keyframe in data["keyframes"]:
- for k, v in keyframe["goals"].items():
- keyframe["goals"][k] = math.radians(v)
+ keyframes.append(
+ Keyframe(
+ name=keyframe["name"],
+ duration=keyframe["duration"],
+ pause=keyframe["pause"],
+ goals={k: math.radians(v) for k, v in keyframe["goals"].items()},
+ torque=keyframe["torque"],
+ )
+ )
except ValueError as e:
self._node.get_logger().error("Animation {} is corrupt:\n {}".format(path, e.message.partition("\n")[0]))
return "Animation {} is corrupt:\n {}".format(path, e.message.partition("\n")[0])
# Set the current state to the loaded animation
- self.current_state.key_frames = data["keyframes"]
- self.current_state.name = data["name"]
- self.current_state.version = data["version"] if "version" in data else 0
- self.current_state.last_edited = data["last_edited"] if "last_edited" in data else datetime.now().isoformat(" ")
- self.current_state.author = data["author"] if "author" in data else "Unknown"
- self.current_state.description = data["description"] if "description" in data else ""
+ self.current_state = AnimationData(
+ key_frames=keyframes,
+ name=data["name"],
+ version=data["version"] if "version" in data else "0",
+ last_edited=data["last_edited"] if "last_edited" in data else datetime.now().isoformat(" "),
+ author=data["author"] if "author" in data else "Unknown",
+ description=data["description"] if "description" in data else "",
+ )
# Save the current state
self.save_step(f"Loading of animation named '{data['name']}'")
+ return None
def play(self, from_frame: int = 0, until_frame: int = -1) -> tuple[str, bool]:
"""Plays (a range of) the current animation using the animation server
@@ -275,13 +315,13 @@ def play(self, from_frame: int = 0, until_frame: int = -1) -> tuple[str, bool]:
"last_edited": datetime.isoformat(datetime.now(), " "),
"author": self.current_state.author,
"description": self.current_state.description,
- "keyframes": deepcopy(self.current_state.key_frames),
+ "keyframes": deepcopy(list(map(Keyframe.as_dict, self.current_state.key_frames))),
}
# Convert the angles from radians to degrees
for key_frame in animation_dict["keyframes"]:
- for joint, v in key_frame["goals"].items():
- key_frame["goals"][joint] = math.degrees(v)
+ for joint, v in key_frame["goals"].items(): # type: ignore[index]
+ key_frame["goals"][joint] = math.degrees(v) # type: ignore[index]
self._node.get_logger().debug(f"Playing {len(animation_dict['keyframes'])} frames...")
@@ -317,7 +357,7 @@ def change_frame_order(self, new_order: list[str]) -> None:
"""Changes the order of the frames given an array of frame names"""
new_ordered_frames = [self.get_keyframe(frame_name) for frame_name in new_order]
assert None not in new_ordered_frames, "One of the given frame names does not exist"
- self.current_state.key_frames = new_ordered_frames
+ self.current_state.key_frames = new_ordered_frames # type: ignore[assignment]
self.save_step("Reordered frames")
def duplicate(self, frame_name: str) -> None:
@@ -329,15 +369,16 @@ def duplicate(self, frame_name: str) -> None:
assert index is not None, "The given frame name does not exist"
# Create a deep copy of the frame
new_frame = deepcopy(self.get_keyframe(frame_name))
+ assert new_frame is not None, "The given frame name does not exist (even less possible here)"
# Add suffix to the frame name if it already exists and increment the number instead of making it longer
- while new_frame["name"] in [frame["name"] for frame in self.current_state.key_frames]:
+ while new_frame.name in [frame.name for frame in self.current_state.key_frames]:
# Check if the frame name ends with "_copy_" and a number, if so increment the number
- if re.match(r".*?_copy_(\d+)", new_frame["name"]):
- new_frame["name"] = re.sub(
- r"(_copy_)(\d+)", lambda m: f"{m.group(1)}{int(m.group(2)) + 1}", new_frame["name"]
+ if re.match(r".*?_copy_(\d+)", new_frame.name):
+ new_frame.name = re.sub(
+ r"(_copy_)(\d+)", lambda m: f"{m.group(1)}{int(m.group(2)) + 1}", new_frame.name
)
else:
- new_frame["name"] = f"{frame_name}_copy_1"
+ new_frame.name = f"{frame_name}_copy_1"
# Insert the new frame after the original frame
self.current_state.key_frames.insert(index + 1, new_frame)
self.save_step(f"Duplicated Frame '{frame_name}'")
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
index d89806360..e3e6e1d70 100755
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
@@ -72,7 +72,7 @@ def __init__(self, context):
self.create_initialized_recorder()
# Initialize the window
- self._widget = QMainWindow()
+ self._widget = QMainWindow() # type: ignore
# Load XML ui definition
ui_file = os.path.join(get_package_share_directory("bitbots_animation_rqt"), "resource", "RecordUI.ui")
@@ -314,7 +314,7 @@ def connect_gui_callbacks(self) -> None:
self._widget.frameList.key_pressed.connect(self.delete)
self._widget.frameList.itemSelectionChanged.connect(self.frame_select)
- def play_walkready(self) -> None:
+ def play_walkready(self, _) -> None:
"""
Plays the walkready animation on the robot
"""
@@ -401,7 +401,7 @@ def save(self, new_location: bool = False) -> None:
else:
self._widget.statusBar.showMessage("There is nothing to save!")
- def open(self) -> None:
+ def open(self, _) -> None:
"""
Deletes all current frames and instead loads an animation from a json file
"""
@@ -453,7 +453,7 @@ def play_until(self):
Plays the animation up to a certain frame
"""
# Get the index of the selected frame
- end_index = self._recorder.get_keyframe_index(self._selected_frame) + 1
+ end_index = self._recorder.get_keyframe_index(self._selected_frame) + 1 # type: ignore[operator]
# Play the animation
status, success = self._recorder.play(until_frame=end_index)
@@ -482,7 +482,7 @@ def goto_next(self):
# Go to the frame in the UI
self._widget.frameList.setCurrentRow(next_frame_index)
- self._selected_frame = self._recorder.get_keyframes()[next_frame_index]["name"]
+ self._selected_frame = self._recorder.get_keyframes()[next_frame_index].name
self.react_to_frame_change()
def goto_init(self):
@@ -567,8 +567,10 @@ def record(self):
# Display a message
self._widget.statusBar.showMessage(f"Recorded frame {self._selected_frame}")
- # Update the frames in the UI
+ # Update the frames in the UI, but don't trigger the frame selection signal
+ self._widget.frameList.blockSignals(True)
self.update_frames()
+ self._widget.frameList.blockSignals(False)
def undo(self):
"""
@@ -661,13 +663,19 @@ def frame_select(self):
if selected_frame is not None:
# check if unrecorded changes would be lost
unrecorded_changes = []
- current_keyframe_goals = self._recorder.get_keyframe(self._selected_frame)["goals"]
+ current_keyframe = self._recorder.get_keyframe(self._selected_frame)
+ assert current_keyframe is not None, "Current keyframe not found"
for motor_name, text_field in self._motor_controller_text_fields.items():
- # Get the angle from the textfield
- angle = text_field.value()
- # compare with angles in current keyframe
- if not current_keyframe_goals[motor_name] == math.radians(angle):
+ # Check if the motor active state has changed
+ currently_active = (
+ self._motor_switcher_active_checkbox[motor_name].checkState(0) == Qt.CheckState.Checked
+ )
+ active_in_current_keyframe = motor_name in current_keyframe.goals
+ motor_active_changed = currently_active != active_in_current_keyframe
+ # Check if the motor goal has changed
+ motor_goal_changed = current_keyframe.goals.get(motor_name, 0) != math.radians(text_field.value())
+ if motor_active_changed or motor_goal_changed:
unrecorded_changes.append(motor_name)
# warn user about unrecorded changes
@@ -678,6 +686,10 @@ def frame_select(self):
sure = QMessageBox.question(self._widget, "Sure?", message, QMessageBox.Yes | QMessageBox.No)
# Cancel the open if the user does not want to discard the changes
if sure == QMessageBox.No:
+ # Update the UI to reselect the current frame (blocking signals to avoid infinite loop)
+ self._widget.frameList.blockSignals(True)
+ self._widget.frameList.setCurrentRow(self._recorder.get_keyframe_index(self._selected_frame))
+ self._widget.frameList.blockSignals(False)
return
# Update state so we have a new selected frame
self._selected_frame = selected_frame_name
@@ -691,6 +703,7 @@ def react_to_frame_change(self):
"""
# Get the selected frame
selected_frame = self._recorder.get_keyframe(self._selected_frame)
+ assert selected_frame is not None, "Selected frame not found in recorder"
# Update the name
self._widget.lineFrameName.setText(self._selected_frame)
@@ -698,28 +711,28 @@ def react_to_frame_change(self):
# Update what motors are active, what are stiff and set the angles accordingly
for motor_name in self.motors:
# Update checkbox if the motor is active or not
- active = motor_name in selected_frame["goals"]
+ active = motor_name in selected_frame.goals.keys()
self._motor_switcher_active_checkbox[motor_name].setCheckState(0, Qt.Checked if active else Qt.Unchecked)
# Update checkbox if the motor is stiff or not
- stiff = "torques" not in selected_frame or (
- motor_name in selected_frame["torques"] and bool(selected_frame["torques"][motor_name])
+ stiff = len(selected_frame.torque) == 0 or (
+ motor_name in selected_frame.torque and bool(selected_frame.torque[motor_name])
)
self._motor_controller_torque_checkbox[motor_name].setCheckState(0, Qt.Checked if stiff else Qt.Unchecked)
# Update the motor angle controls (value and active state)
if active:
self._motor_controller_text_fields[motor_name].setValue(
- round(math.degrees(selected_frame["goals"][motor_name]), 2)
+ round(math.degrees(selected_frame.goals[motor_name]), 2)
)
- self._working_angles[motor_name] = selected_frame["goals"][motor_name]
+ self._working_angles[motor_name] = selected_frame.goals[motor_name]
else:
self._motor_controller_text_fields[motor_name].setValue(0.0)
# Update the duration and pause
- self._widget.spinBoxDuration.setValue(selected_frame["duration"])
- self._widget.spinBoxPause.setValue(selected_frame["pause"])
+ self._widget.spinBoxDuration.setValue(selected_frame.duration)
+ self._widget.spinBoxPause.setValue(selected_frame.pause)
self.react_to_motor_selection()
@@ -773,7 +786,7 @@ def react_to_motor_selection(self):
def update_frames(self) -> None:
"""
- Updates the list of frames in the GUI based on the keyframes in the recorder
+ Updates the list of frames in the GUI based on the keyframes in the recorder'
"""
# Get the current state of the recorder and the index of the selected frame
keyframes = self._recorder.get_keyframes()
@@ -783,7 +796,7 @@ def update_frames(self) -> None:
if index is None:
# If it is not, select the first frame
index = 0
- self._selected_frame = keyframes[index]["name"]
+ self._selected_frame = keyframes[index].name
print(f"Selected frame {self._selected_frame}")
# Clear the list of frames
@@ -792,7 +805,7 @@ def update_frames(self) -> None:
# Add all frames to the list
for frame in keyframes:
item = QListWidgetItem()
- item.setText(frame["name"])
+ item.setText(frame.name)
self._widget.frameList.addItem(item)
# Select the correct frame
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
index f9f46c05a..6217bed78 100644
--- a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
@@ -1,4 +1,4 @@
-from typing import Callable
+from typing import Any, Callable
from PyQt5.QtCore import QObject, Qt, pyqtSignal
from PyQt5.QtWidgets import QListWidget, QWidget
@@ -28,12 +28,12 @@ def dropEvent(self, e): # noqa: N802
# fmt: off
def keyPressEvent(self, event): # noqa: N802
# fmt: on
- if event.key() == Qt.Key_Delete:
+ if event.key() == Qt.Key_Delete: # type: ignore[attr-defined]
super().keyPressEvent(event)
self.key_pressed.emit()
- elif event.key() == Qt.Key_Up and self.currentRow() - 1 >= 0:
+ elif event.key() == Qt.Key_Up and self.currentRow() - 1 >= 0: # type: ignore[attr-defined]
self.setCurrentRow(self.currentRow() - 1)
- elif event.key() == Qt.Key_Down and self.currentRow() + 1 < self.count():
+ elif event.key() == Qt.Key_Down and self.currentRow() + 1 < self.count(): # type: ignore[attr-defined]
self.setCurrentRow(self.currentRow() + 1)
@@ -43,7 +43,7 @@ class JointStateCommunicate(QObject):
def flatten_dict(input_dict: dict, parent_key: str = "", sep: str = ".") -> dict:
"""Flatten a nested dictionary."""
- items = []
+ items: list[Any] = []
for k, v in input_dict.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
diff --git a/bitbots_motion/bitbots_animation_rqt/package.xml b/bitbots_motion/bitbots_animation_rqt/package.xml
index aec6962c0..f8af2aefb 100644
--- a/bitbots_motion/bitbots_animation_rqt/package.xml
+++ b/bitbots_motion/bitbots_animation_rqt/package.xml
@@ -21,6 +21,7 @@
rqt_gui_py
rqt_gui
std_srvs
+ ament_mypy
diff --git a/bitbots_motion/bitbots_animation_rqt/setup.py b/bitbots_motion/bitbots_animation_rqt/setup.py
index ff22687d2..45e680dd3 100644
--- a/bitbots_motion/bitbots_animation_rqt/setup.py
+++ b/bitbots_motion/bitbots_animation_rqt/setup.py
@@ -13,6 +13,7 @@
],
install_requires=["setuptools"],
zip_safe=True,
+ tests_require=["pytest"],
entry_points={
"console_scripts": [
"animation_gui = " + package_name + ".record_ui:main",
diff --git a/bitbots_motion/bitbots_animation_rqt/test/mypy.ini b/bitbots_motion/bitbots_animation_rqt/test/mypy.ini
new file mode 100644
index 000000000..b7b297c6a
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/test/mypy.ini
@@ -0,0 +1,6 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
+disable_error_code = attr-defined
diff --git a/bitbots_motion/bitbots_animation_rqt/test/test_mypy.py b/bitbots_motion/bitbots_animation_rqt/test/test_mypy.py
new file mode 100644
index 000000000..3dacd6040
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/test/test_mypy.py
@@ -0,0 +1,10 @@
+from pathlib import Path
+
+import pytest
+from ament_mypy.main import main
+
+
+@pytest.mark.mypy
+def test_mypy():
+ rc = main(argv=["--config", str((Path(__file__).parent / "mypy.ini").resolve())])
+ assert rc == 0, "Found code style errors / warnings"
diff --git a/bitbots_motion/bitbots_animation_server/CMakeLists.txt b/bitbots_motion/bitbots_animation_server/CMakeLists.txt
deleted file mode 100644
index 988d53714..000000000
--- a/bitbots_motion/bitbots_animation_server/CMakeLists.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-cmake_minimum_required(VERSION 3.5)
-project(bitbots_animation_server)
-
-find_package(bitbots_docs REQUIRED)
-find_package(rclpy REQUIRED)
-find_package(ament_cmake REQUIRED)
-find_package(ament_cmake_python REQUIRED)
-
-enable_bitbots_docs()
-
-ament_python_install_package(${PROJECT_NAME})
-
-ament_package()
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
index fb525677c..55c456deb 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
@@ -44,9 +44,9 @@ def parse(info: dict) -> Animation:
This method is parsing an animation from a :class:`dict`
instance *info*, as created by :func:`as_dict`.
"""
- anim = Animation(info["name"], ())
+ anim = Animation(info["name"], [])
- keyframes = info.get("keyframes", ())
+ keyframes = info.get("keyframes", [])
anim.keyframes = [
Keyframe(
k.get("goals", {}),
@@ -59,24 +59,3 @@ def parse(info: dict) -> Animation:
]
return anim
-
-
-def as_dict(anim: Animation) -> dict:
- """
- Convert an animation to builtin python types to
- make it serializable to formats like ``json``.
- """
- return {
- "name": anim.name,
- "interpolators": {n: ip.__name__ for n, ip in anim.interpolators},
- "keyframes": [
- {
- "duration": k.duration,
- "pause": k.pause,
- "goals": k.goals,
- "torque": k.torque,
- "stabilization_functions": k.stabilization_functions,
- }
- for k in anim.keyframes
- ],
- }
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
index f7cefea55..d9c473ae7 100755
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
@@ -27,7 +27,7 @@
from bitbots_msgs.srv import AddAnimation
# A hashable definition for the UUID of the goals
-UUID = tuple[int]
+UUID = tuple[np.uint8, ...]
class AnimationNode(Node):
@@ -165,7 +165,7 @@ def finish(successful: bool) -> PlayAnimation.Result:
# Send request to make the HCM to go into animation play mode
num_tries = 0
- while rclpy.ok() and (not self.hcm_animation_mode.call(SetBool.Request(data=True)).success):
+ while rclpy.ok() and (not self.hcm_animation_mode.call(SetBool.Request(data=True)).success): # type: ignore[attr-defined]
if num_tries >= 10:
self.get_logger().error("Failed to request HCM to go into animation play mode")
return finish(successful=False)
@@ -263,7 +263,7 @@ def finish(successful: bool) -> PlayAnimation.Result:
if pose is None or (request.bounds and once and t > animator.get_keyframe_times()[request.end - 1]):
# Animation is finished, tell it to the hcm (except if it is from the hcm)
if not request.hcm:
- hcm_result = self.hcm_animation_mode.call(SetBool.Request(data=False))
+ hcm_result: SetBool.Response = self.hcm_animation_mode.call(SetBool.Request(data=False))
if not hcm_result.success:
self.get_logger().error(f"Failed to finish animation on HCM. Reason: {hcm_result.message}")
@@ -288,7 +288,7 @@ def finish(successful: bool) -> PlayAnimation.Result:
sys.exit(0)
return finish(successful=False)
- def update_current_pose(self, msg) -> None:
+ def update_current_pose(self, msg: JointState) -> None:
"""Gets the current motor positions and updates the representing pose accordingly."""
self.current_joint_states = msg
@@ -296,7 +296,7 @@ def imu_callback(self, msg: Imu) -> None:
"""Callback for the IMU data."""
self.imu_data = msg
- def send_animation(self, from_hcm: bool, pose: dict, torque: Optional[dict]):
+ def send_animation(self, from_hcm: bool, pose: dict[str, float], torque: Optional[dict[str, float]] = None) -> None:
"""Sends an animation to the hcm"""
self.hcm_publisher.publish(
AnimationMsg(
@@ -309,8 +309,8 @@ def send_animation(self, from_hcm: bool, pose: dict, torque: Optional[dict]):
positions=pose.values(),
velocities=[-1.0] * len(pose),
accelerations=[-1.0] * len(pose),
- max_currents=[np.clip((torque[joint]), 0.0, 1.0) for joint in pose]
- if torque and False # TODO
+ max_currents=[np.clip((torque[joint]), 0.0, 1.0) for joint in pose.keys()] # type: ignore[index]
+ if torque and False
else [-1.0] * len(pose), # fmt: skip
),
)
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py
index e7436e403..315f23466 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/resource_manager.py
@@ -18,9 +18,9 @@ def __init__(self, robot_type: str):
# Get the path to the animations folder
self.basepath = abspath(os.path.join(get_package_share_directory(robot_type + "_animations"), "animations"))
- self.cache = {}
- self.files = []
- self.names = [] # Plain animation names, without filename-extension
+ self.cache: dict[str, str] = {}
+ self.files: list[str] = []
+ self.names: list[str] = [] # Plain animation names, without filename-extension
self.animpath = self._get_animpath()
def search(self, path, folders, filename=""):
@@ -121,7 +121,7 @@ def find_resource(self, name):
"""Finds a resource relative to self.basepath"""
return self.find(name)
- def _get_animpath(self):
+ def _get_animpath(self) -> list[str]:
"""
Get a list of folders in the animations/ folder.
"""
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
index b2bb84e75..6dffb7649 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
@@ -74,8 +74,8 @@ def get_positions_rad(self, time):
return ret_dict
- def get_torque(self, current_time):
- torque = {}
+ def get_torque(self, current_time) -> dict[str, float]:
+ torque: dict[str, float] = {}
if current_time < 0 or current_time > self.animation_duration:
return torque
diff --git a/bitbots_motion/bitbots_animation_server/package.xml b/bitbots_motion/bitbots_animation_server/package.xml
index 5552f2810..a1e94693e 100644
--- a/bitbots_motion/bitbots_animation_server/package.xml
+++ b/bitbots_motion/bitbots_animation_server/package.xml
@@ -21,6 +21,7 @@
bitbots_utils
rclpy
std_msgs
+ ament_mypy
diff --git a/bitbots_motion/bitbots_animation_server/setup.py b/bitbots_motion/bitbots_animation_server/setup.py
index c93d21760..7a1b78643 100644
--- a/bitbots_motion/bitbots_animation_server/setup.py
+++ b/bitbots_motion/bitbots_animation_server/setup.py
@@ -21,6 +21,7 @@
zip_safe=True,
keywords=["ROS"],
license="MIT",
+ tests_require=["pytest"],
entry_points={
"console_scripts": [
"animation_node = bitbots_animation_server.animation_node:main",
diff --git a/bitbots_motion/bitbots_animation_server/test/mypy.ini b/bitbots_motion/bitbots_animation_server/test/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_server/test/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_motion/bitbots_animation_server/test/test_mypy.py b/bitbots_motion/bitbots_animation_server/test/test_mypy.py
new file mode 100644
index 000000000..3dacd6040
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_server/test/test_mypy.py
@@ -0,0 +1,10 @@
+from pathlib import Path
+
+import pytest
+from ament_mypy.main import main
+
+
+@pytest.mark.mypy
+def test_mypy():
+ rc = main(argv=["--config", str((Path(__file__).parent / "mypy.ini").resolve())])
+ assert rc == 0, "Found code style errors / warnings"
diff --git a/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp b/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp
index bb563e7f9..9111452a8 100644
--- a/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp
+++ b/bitbots_motion/bitbots_dynup/include/bitbots_dynup/dynup_node.hpp
@@ -116,6 +116,7 @@ class DynupNode {
double start_time_ = 0;
bool server_free_ = true;
bool debug_ = false;
+ bool cancel_goal_ = false;
// TF2 related things
tf2_ros::Buffer tf_buffer_;
diff --git a/bitbots_motion/bitbots_dynup/src/dynup_node.cpp b/bitbots_motion/bitbots_dynup/src/dynup_node.cpp
index 69ed264c8..f05953faf 100644
--- a/bitbots_motion/bitbots_dynup/src/dynup_node.cpp
+++ b/bitbots_motion/bitbots_dynup/src/dynup_node.cpp
@@ -215,6 +215,10 @@ rclcpp_action::GoalResponse DynupNode::goalCb(const rclcpp_action::GoalUUID &uui
server_free_ = false;
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
} else {
+ if (goal->from_hcm) {
+ cancel_goal_ = true;
+ return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
+ }
RCLCPP_WARN(node_->get_logger(), "Dynup is busy, goal rejected!");
return rclcpp_action::GoalResponse::REJECT;
}
@@ -259,11 +263,20 @@ void DynupNode::loopEngine(int loop_rate, std::shared_ptr goal_
/* Do the loop as long as nothing cancels it */
while (rclcpp::ok()) {
rclcpp::Time startTime = node_->get_clock()->now();
+ // This is used when the cancelation is requested externally
if (goal_handle->is_canceling()) {
goal_handle->canceled(result);
RCLCPP_INFO(node_->get_logger(), "Goal canceled");
return;
}
+ // This is used when the dynup server descides to cancel itself
+ if (cancel_goal_) {
+ result->successful = false;
+ goal_handle->abort(result);
+ server_free_ = true;
+ cancel_goal_ = false;
+ return;
+ }
msg = step(getTimeDelta());
auto feedback = std::make_shared();
feedback->percent_done = engine_.getPercentDone();
@@ -282,9 +295,12 @@ void DynupNode::loopEngine(int loop_rate, std::shared_ptr goal_
if (msg.joint_names.empty()) {
continue;
}
+ msg.from_hcm = goal_handle->get_goal()->from_hcm;
joint_goal_publisher_->publish(msg);
node_->get_clock()->sleep_until(startTime + rclcpp::Duration::from_nanoseconds(1e9 / loop_rate));
}
+ // Wether we canceled the goal or we finished cleanly the cancelation request is not valid anymore
+ cancel_goal_ = false;
}
bitbots_dynup::msg::DynupPoses DynupNode::getCurrentPoses() {
diff --git a/bitbots_motion/bitbots_hcm/CMakeLists.txt b/bitbots_motion/bitbots_hcm/CMakeLists.txt
index 9e427ccae..4fa8c90a9 100644
--- a/bitbots_motion/bitbots_hcm/CMakeLists.txt
+++ b/bitbots_motion/bitbots_hcm/CMakeLists.txt
@@ -58,6 +58,9 @@ if(BUILD_TESTING)
ament_add_pytest_test(
hcm_py_test test/pytest PYTHON_EXECUTABLE "${PYTHON_EXECUTABLE}" APPEND_ENV
PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR})
+
+ find_package(ament_cmake_mypy REQUIRED)
+ ament_mypy(CONFIG_FILE "${CMAKE_CURRENT_LIST_DIR}/mypy.ini")
endif()
ament_python_install_package(${PROJECT_NAME})
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py
index 25bdf4bbe..61a6ccd0c 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/__init__.py
@@ -1,7 +1,13 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
from dynamic_stack_decider.abstract_action_element import AbstractActionElement
from bitbots_hcm.hcm_dsd.hcm_blackboard import HcmBlackboard
+beartype_this_package()
+
class AbstractHCMActionElement(AbstractActionElement):
"""
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/change_motor_power.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/change_motor_power.py
index 6b08ac85c..cdbbe7bab 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/change_motor_power.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/change_motor_power.py
@@ -21,15 +21,6 @@ def perform(self, reevaluate=False):
raise NotImplementedError
-class TurnMotorsOn(AbstractChangeMotorPower):
- def perform(self, reevaluate=False):
- if not self.blackboard.visualization_active and not self.blackboard.simulation_active:
- req = SetBool.Request()
- req.data = True
- self.blackboard.motor_switch_service.call(req)
- return self.pop()
-
-
class TurnMotorsOff(AbstractChangeMotorPower):
def perform(self, reevaluate=False):
if not self.blackboard.visualization_active and not self.blackboard.simulation_active:
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
index 8f805b1da..882ae4624 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
@@ -88,6 +88,9 @@ def animation_feedback_cb(self, msg):
self.publish_debug_data("Animation Percent Done", str(feedback.percent_done))
def animation_finished(self):
+ assert (
+ self.blackboard.animation_action_current_goal is not None
+ ), "No animation action goal set, so we cannot check if it is finished"
return (
self.blackboard.animation_action_current_goal.done()
and self.blackboard.animation_action_current_goal.result().status
@@ -202,7 +205,7 @@ def on_pop(self):
Cancel the current goal when the action is popped
"""
super().on_pop()
- if not self.animation_finished():
+ if self.blackboard.dynup_action_current_goal is not None and not self.animation_finished():
self.blackboard.dynup_action_current_goal.result().cancel_goal_async()
def start_animation(self):
@@ -231,6 +234,10 @@ def start_animation(self):
goal = Dynup.Goal()
goal.direction = self.direction
+ # This indicates that the goal is send from the hcm and therfore has prio,
+ # canceling other tasks. The published joint goals are also marked with this flag so they
+ # are handled differently in the HCM joint mutex
+ goal.from_hcm = True
self.blackboard.dynup_action_current_goal = self.blackboard.dynup_action_client.send_goal_async(
goal, feedback_callback=self.animation_feedback_cb
)
@@ -241,6 +248,9 @@ def animation_feedback_cb(self, msg):
self.publish_debug_data("Dynup Percent Done", str(feedback.percent_done))
def animation_finished(self):
+ assert (
+ self.blackboard.dynup_action_current_goal is not None
+ ), "No dynup action goal set, so we cannot check if it is finished"
return (
self.blackboard.dynup_action_current_goal.done()
and self.blackboard.dynup_action_current_goal.result().status
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py
index b58b4231e..4c2804302 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/state.py
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from bitbots_hcm.hcm_dsd.actions import AbstractHCMActionElement
+from bitbots_hcm.type_utils import T_RobotControlState
from bitbots_msgs.msg import RobotControlState
@@ -11,7 +12,7 @@ class AbstractRobotState(AbstractHCMActionElement, ABC):
"""
@abstractmethod
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
"""
Returns the state which should be set. This will be implemented by the subclasses.
"""
@@ -27,65 +28,65 @@ def perform(self, reevaluate=False):
class RobotStateStartup(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.STARTUP
class RobotStateControllable(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.CONTROLLABLE
class RobotStateWalking(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.WALKING
class RobotStateAnimationRunning(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.ANIMATION_RUNNING
class RobotStatePickedUp(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.PICKED_UP
class RobotStateFalling(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.FALLING
class RobotStateFallen(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.FALLEN
class RobotStateGettingUp(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.GETTING_UP
class RobotStatePenalty(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.PENALTY
class RobotStateRecord(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.RECORD
class RobotStateKicking(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.KICKING
class RobotStateHardwareProblem(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.HARDWARE_PROBLEM
class RobotStateMotorOff(AbstractRobotState):
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
return RobotControlState.MOTOR_OFF
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait.py
index e70bae830..c29996ab3 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait.py
@@ -6,6 +6,7 @@
Just waits for something (i.e. that preconditions will be fullfilled)
"""
+
from bitbots_hcm.hcm_dsd.actions import AbstractHCMActionElement
@@ -34,3 +35,16 @@ def perform(self, reevaluate=False):
# Pop if the time has elapsed
if self.blackboard.node.get_clock().now().nanoseconds / 1e9 > self.start_time + self.duration:
self.pop()
+
+
+class WaitRosControlStartDelay(Wait):
+ """
+ The motors need some time to start up,
+ the time at which we start sending commands is defined in the config of the ros_control node.
+ This action waits for that time.
+ """
+
+ def __init__(self, blackboard, dsd, parameters):
+ super().__init__(blackboard, dsd, parameters)
+ self.duration = self.blackboard.motor_start_delay
+ self.blackboard.node.get_logger().info("Waiting for the motors to start up...")
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py
index 82101034e..e79e007a9 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/__init__.py
@@ -1,7 +1,13 @@
+# Setting up runtime type checking for this package
+# We need to do this again here because the dsd imports
+# the decisions and actions from this package in a standalone way
+from beartype.claw import beartype_this_package
from dynamic_stack_decider.abstract_decision_element import AbstractDecisionElement
from bitbots_hcm.hcm_dsd.hcm_blackboard import HcmBlackboard
+beartype_this_package()
+
class AbstractHCMDecisionElement(AbstractDecisionElement):
"""
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
index c770b8326..c7072a5c0 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
@@ -29,6 +29,9 @@ def perform(self, reevaluate=False):
# We can safely assume that the last animation start time is set because the animation is running
# Calculate time since last animation start
+ assert (
+ self.blackboard.last_animation_start_time is not None
+ ), "Last animation start time is not set, this should be impossible"
time_delta_since_last_animation_start = (
self.blackboard.node.get_clock().now().nanoseconds / 1e9
- self.blackboard.last_animation_start_time.nanoseconds / 1e9
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py
index 2c153ba47..2b584e439 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/check_hardware.py
@@ -12,6 +12,7 @@ class CheckMotors(AbstractHCMDecisionElement):
def __init__(self, blackboard, dsd, parameters):
super().__init__(blackboard, dsd, parameters)
self.had_problem = False
+ self.power_was_off = False
def perform(self, reevaluate=False):
self.clear_debug_data()
@@ -71,8 +72,16 @@ def perform(self, reevaluate=False):
# wait for motors to connect
return "PROBLEM"
else:
- # we have to turn the motors on
- return "TURN_ON"
+ # The motors are off, so we will not complain
+ self.power_was_off = True
+ return "MOTORS_NOT_STARTED"
+
+ elif self.power_was_off:
+ # motors are now on and we can continue
+ self.blackboard.node.get_logger().info("Motors are now connected. Will resume.")
+ self.power_was_off = False
+ # But we want to perform a clean start, so we don't jump directly into the last goal position
+ return "TURN_ON"
if self.had_problem:
# had problem before, just tell that this is solved now
@@ -117,10 +126,14 @@ def perform(self, reevaluate=False):
else:
return "OKAY"
- if self.blackboard.previous_imu_msg is None or (
- self.blackboard.node.get_clock().now().nanoseconds
- - self.blackboard.last_different_imu_state_time.nanoseconds
- > self.blackboard.imu_timeout_duration * 1e9
+ if (
+ self.blackboard.previous_imu_msg is None
+ or self.blackboard.last_different_imu_state_time is None
+ or (
+ self.blackboard.node.get_clock().now().nanoseconds
+ - self.blackboard.last_different_imu_state_time.nanoseconds
+ > self.blackboard.imu_timeout_duration * 1e9
+ )
):
if (
self.blackboard.current_state == RobotControlState.STARTUP
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py
index 44a421875..e8f7005b1 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/falling.py
@@ -4,6 +4,7 @@
import numpy as np
from bitbots_utils.transforms import quat2fused
from rclpy.duration import Duration
+from rclpy.time import Time
from bitbots_hcm.hcm_dsd.decisions import AbstractHCMDecisionElement
@@ -32,7 +33,7 @@ def __init__(self, blackboard, dsd, parameters):
self.smoothing = self.blackboard.node.get_parameter("smooth_threshold").value
# Initialize smoothing list that stores the last results
- self.smoothing_list: list[FallDirection] = []
+ self.smoothing_list: list[tuple[Time, FallDirection]] = []
def perform(self, reevaluate=False):
"""Checks if the robot is currently falling and in which direction."""
@@ -77,7 +78,7 @@ def perform(self, reevaluate=False):
# Prune old elements from smoothing history
self.smoothing_list = list(
filter(
- lambda x: x[0] > self.blackboard.node.get_clock().now() - Duration(seconds=self.smoothing),
+ lambda x: x[0] > self.blackboard.node.get_clock().now() - Duration(seconds=self.smoothing), # type: ignore[arg-type]
self.smoothing_list,
)
)
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd
index fb6cbccc9..2e0b29b04 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd
@@ -1,17 +1,20 @@
#EMERGENCY_FALL
@RobotStateMotorOff, @CancelGoals, @StopWalking, @PlayAnimationFallingFront, @Wait + time:1 + r:false, @TurnMotorsOff, @Wait
+#INIT_PATTERN
+@RobotStateStartup, @SetTorque + stiff:false + r:false, @WaitRosControlStartDelay + r:false, @SetTorque + stiff:true + r:false, @PlayAnimationStartup, @PlayAnimationDynup + direction:walkready + r:false
+
-->HCM
$StartHCM
START_UP --> $Simulation
YES --> @RobotStateStartup, @PlayAnimationStartup, @PlayAnimationDynup + direction:walkready + r:false
- NO --> @RobotStateStartup, @Wait + time:1 + r:false, @PlayAnimationStartup, @PlayAnimationDynup + direction:walkready + r:false
+ NO --> #INIT_PATTERN
RUNNING --> $CheckMotors
MOTORS_NOT_STARTED --> @RobotStateStartup, @Wait
OVERLOAD --> #EMERGENCY_FALL
OVERHEAT --> #EMERGENCY_FALL
PROBLEM --> @RobotStateHardwareProblem, @WaitForMotors
- TURN_ON --> @TurnMotorsOn, @PlayAnimationDynup + direction:walkready + r:false, @Wait
+ TURN_ON --> #INIT_PATTERN
OKAY --> $RecordAnimation
RECORD_ACTIVE --> @RobotStateRecord, @Wait
FREE --> $TeachingMode
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
index 72803ba89..b0ae2b7e9 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
@@ -1,6 +1,7 @@
from typing import List, Optional
import numpy
+from bitbots_utils.utils import get_parameters_from_other_node_sync
from geometry_msgs.msg import Twist
from rclpy.action import ActionClient
from rclpy.node import Node
@@ -11,6 +12,7 @@
from std_srvs.srv import Empty as EmptySrv
from std_srvs.srv import SetBool
+from bitbots_hcm.type_utils import T_RobotControlState
from bitbots_msgs.action import Dynup, PlayAnimation
from bitbots_msgs.msg import Audio, JointTorque, RobotControlState
from bitbots_msgs.srv import SetTeachingMode
@@ -21,7 +23,7 @@ def __init__(self, node: Node):
self.node = node
# Basic state
- self.current_state: RobotControlState = RobotControlState.STARTUP
+ self.current_state: T_RobotControlState = RobotControlState.STARTUP
self.stopped: bool = False
# Save start time
@@ -32,6 +34,11 @@ def __init__(self, node: Node):
self.visualization_active: bool = self.node.get_parameter("visualization_active").value
self.pickup_accel_threshold: float = self.node.get_parameter("pick_up_accel_threshold").value
self.pressure_sensors_installed: bool = self.node.get_parameter("pressure_sensors_installed").value
+ self.motor_start_delay: int = 0
+ if not self.simulation_active: # The hardware interface is obviously not available in simulation
+ self.motor_start_delay = get_parameters_from_other_node_sync(
+ self.node, "/wolfgang_hardware_interface", ["start_delay"]
+ )["start_delay"]
# Create service clients
self.foot_zero_service = self.node.create_client(EmptySrv, "set_foot_zero")
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
index fa3ca2c45..a0bc0ec88 100755
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
@@ -24,6 +24,7 @@
from bitbots_hcm import hcm_dsd
from bitbots_hcm.hcm_dsd.hcm_blackboard import HcmBlackboard
+from bitbots_hcm.type_utils import T_RobotControlState
from bitbots_msgs.msg import FootPressure, RobotControlState
from bitbots_msgs.srv import ManualPenalize, SetTeachingMode
@@ -34,18 +35,15 @@ def __init__(self, use_sim_time, simulation_active, visualization_active):
node_name = "hcm_py"
# Load parameters from yaml file because this is a hacky cpp python hybrid node for performance reasons
- parameter_msgs: list(ParameterMsg) = get_parameters_from_ros_yaml(
+ parameter_msgs: list[ParameterMsg] = get_parameters_from_ros_yaml(
node_name, f"{get_package_share_directory('bitbots_hcm')}/config/hcm_wolfgang.yaml", use_wildcard=True
)
- parameters = []
- for parameter_msg in parameter_msgs:
- parameters.append(Parameter.from_parameter_msg(parameter_msg))
- if use_sim_time:
- parameters.append(Parameter("use_sim_time", type_=Parameter.Type.BOOL, value=True))
- if simulation_active:
- parameters.append(Parameter("simulation_active", type_=Parameter.Type.BOOL, value=True))
- if visualization_active:
- parameters.append(Parameter("visualization_active", type_=Parameter.Type.BOOL, value=True))
+ parameters = [
+ Parameter("use_sim_time", type_=Parameter.Type.BOOL, value=use_sim_time),
+ Parameter("simulation_active", type_=Parameter.Type.BOOL, value=simulation_active),
+ Parameter("visualization_active", type_=Parameter.Type.BOOL, value=visualization_active),
+ ]
+ parameters.extend(map(Parameter.from_parameter_msg, parameter_msgs))
# Create Python node
self.node = Node(
@@ -56,7 +54,7 @@ def __init__(self, use_sim_time, simulation_active, visualization_active):
)
# Create own executor for Python part
- multi_executor = MultiThreadedExecutor()
+ multi_executor = MultiThreadedExecutor(num_threads=10)
multi_executor.add_node(self.node)
self.spin_thread = threading.Thread(target=multi_executor.spin, args=(), daemon=True)
self.spin_thread.start()
@@ -161,7 +159,7 @@ def diag_cb(self, msg: DiagnosticArray):
elif "/Pressure" in status.name:
self.blackboard.pressure_diag_error = status.level in (DiagnosticStatus.ERROR, DiagnosticStatus.STALE)
- def get_state(self) -> RobotControlState:
+ def get_state(self) -> T_RobotControlState:
"""Returns the current state of the HCM."""
return self.blackboard.current_state
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/type_utils.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/type_utils.py
new file mode 100644
index 000000000..20bd175fc
--- /dev/null
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/type_utils.py
@@ -0,0 +1,21 @@
+from typing import Literal, TypeAlias
+
+from bitbots_msgs.msg import RobotControlState
+
+T_RobotControlState: TypeAlias = Literal[ # type: ignore[valid-type]
+ RobotControlState.CONTROLLABLE,
+ RobotControlState.FALLING,
+ RobotControlState.FALLEN,
+ RobotControlState.GETTING_UP,
+ RobotControlState.ANIMATION_RUNNING,
+ RobotControlState.STARTUP,
+ RobotControlState.SHUTDOWN,
+ RobotControlState.PENALTY,
+ RobotControlState.RECORD,
+ RobotControlState.WALKING,
+ RobotControlState.MOTOR_OFF,
+ RobotControlState.HCM_OFF,
+ RobotControlState.HARDWARE_PROBLEM,
+ RobotControlState.PICKED_UP,
+ RobotControlState.KICKING,
+]
diff --git a/bitbots_motion/bitbots_hcm/mypy.ini b/bitbots_motion/bitbots_hcm/mypy.ini
new file mode 100644
index 000000000..6b19e8374
--- /dev/null
+++ b/bitbots_motion/bitbots_hcm/mypy.ini
@@ -0,0 +1,5 @@
+# Global options:
+
+[mypy]
+check_untyped_defs = True
+ignore_missing_imports = True
diff --git a/bitbots_motion/bitbots_hcm/package.xml b/bitbots_motion/bitbots_hcm/package.xml
index 9f6fd464a..9e90af219 100644
--- a/bitbots_motion/bitbots_hcm/package.xml
+++ b/bitbots_motion/bitbots_hcm/package.xml
@@ -9,7 +9,7 @@
Marc Bestmann
Hamburg Bit-Bots
-
+
MIT
Marc Bestmann
@@ -33,6 +33,7 @@
ros2_python_extension
sensor_msgs
std_msgs
+ ament_cmake_mypy
diff --git a/bitbots_motion/bitbots_hcm/src/hcm.cpp b/bitbots_motion/bitbots_hcm/src/hcm.cpp
index d22a44cee..e27c66268 100644
--- a/bitbots_motion/bitbots_hcm/src/hcm.cpp
+++ b/bitbots_motion/bitbots_hcm/src/hcm.cpp
@@ -87,12 +87,15 @@ class HCM_CPP : public rclcpp::Node {
}
void dynup_callback(const bitbots_msgs::msg::JointCommand msg) {
- if (current_state_ == bitbots_msgs::msg::RobotControlState::STARTUP ||
- current_state_ == bitbots_msgs::msg::RobotControlState::GETTING_UP ||
- current_state_ == bitbots_msgs::msg::RobotControlState::MOTOR_OFF ||
- current_state_ == bitbots_msgs::msg::RobotControlState::PICKED_UP ||
- current_state_ == bitbots_msgs::msg::RobotControlState::PENALTY ||
- current_state_ == bitbots_msgs::msg::RobotControlState::RECORD ||
+ if (msg.from_hcm and (current_state_ == bitbots_msgs::msg::RobotControlState::STARTUP ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::GETTING_UP ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::MOTOR_OFF ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::PICKED_UP ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::PENALTY ||
+ current_state_ == bitbots_msgs::msg::RobotControlState::CONTROLLABLE)) {
+ pub_controller_command_->publish(msg);
+ }
+ if (current_state_ == bitbots_msgs::msg::RobotControlState::RECORD ||
current_state_ == bitbots_msgs::msg::RobotControlState::CONTROLLABLE) {
pub_controller_command_->publish(msg);
}
diff --git a/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py b/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py
index e69de29bb..4d2e15007 100644
--- a/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py
+++ b/bitbots_motion/bitbots_splines/bitbots_splines/__init__.py
@@ -0,0 +1,4 @@
+# Setting up runtime type checking for this package
+from beartype.claw import beartype_this_package
+
+beartype_this_package()
diff --git a/bitbots_msgs/CMakeLists.txt b/bitbots_msgs/CMakeLists.txt
index 61884419f..74c50a259 100644
--- a/bitbots_msgs/CMakeLists.txt
+++ b/bitbots_msgs/CMakeLists.txt
@@ -50,6 +50,7 @@ rosidl_generate_interfaces(
"srv/SetObjectPose.srv"
"srv/SetObjectPosition.srv"
"srv/SetTeachingMode.srv"
+ "srv/SimulatorPush.srv"
DEPENDENCIES
action_msgs
geometry_msgs
diff --git a/bitbots_msgs/action/Dynup.action b/bitbots_msgs/action/Dynup.action
index 6e5620f2f..939261314 100644
--- a/bitbots_msgs/action/Dynup.action
+++ b/bitbots_msgs/action/Dynup.action
@@ -9,6 +9,9 @@ string DIRECTION_RISE = "rise"
string DIRECTION_DESCEND = "descend"
string DIRECTION_WALKREADY = "walkready"
+# Requests from the HCM have higher prio and are published to a different topic
+bool from_hcm
+
---
# Result definition
bool successful
diff --git a/bitbots_msgs/msg/JointCommand.msg b/bitbots_msgs/msg/JointCommand.msg
index 115e0db78..c86b230dc 100644
--- a/bitbots_msgs/msg/JointCommand.msg
+++ b/bitbots_msgs/msg/JointCommand.msg
@@ -1,4 +1,8 @@
std_msgs/Header header
+
+# Mark this message so it has prio
+bool from_hcm
+
string[] joint_names
float64[] positions
diff --git a/bitbots_msgs/srv/SimulatorPush.srv b/bitbots_msgs/srv/SimulatorPush.srv
new file mode 100644
index 000000000..db5f753f7
--- /dev/null
+++ b/bitbots_msgs/srv/SimulatorPush.srv
@@ -0,0 +1,4 @@
+geometry_msgs/Vector3 force
+bool relative
+string robot "amy"
+---
diff --git a/bitbots_navigation/bitbots_localization/config/config.yaml b/bitbots_navigation/bitbots_localization/config/config.yaml
index 22d0e5b52..13e135e94 100644
--- a/bitbots_navigation/bitbots_localization/config/config.yaml
+++ b/bitbots_navigation/bitbots_localization/config/config.yaml
@@ -3,15 +3,11 @@ bitbots_localization:
misc:
init_mode: 0
percentage_best_particles: 100
- min_motion_linear: 0.0
- min_motion_angular: 0.0
max_motion_linear: 0.5
max_motion_angular: 3.14
- filter_only_with_motion: false
ros:
line_pointcloud_topic: 'line_mask_relative_pc'
goal_topic: 'goals_simulated'
- fieldboundary_topic: 'field_boundary_relative'
particle_publishing_topic: 'pose_particles'
debug_visualization: true
particle_filter:
@@ -46,10 +42,6 @@ bitbots_localization:
goal:
factor: 0.0
out_of_field_score: 0.0
- field_boundary:
- factor: 0.0
- out_of_field_score: 0.0
confidences:
line_element: 0.01
goal_element: 0.0
- field_boundary_element: 0.0
diff --git a/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp b/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp
index 2f50bc044..79065d1ff 100644
--- a/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp
+++ b/bitbots_navigation/bitbots_localization/include/bitbots_localization/ObservationModel.hpp
@@ -6,17 +6,15 @@
#define BITBOTS_LOCALIZATION_OBSERVATIONMODEL_H
#include
+#include
#include
#include
#include
#include
#include
-#include
#include
#include
-#include
-#include
#include "localization_parameters.hpp"
@@ -31,8 +29,7 @@ class RobotPoseObservationModel : public particle_filter::ObservationModel map_lines, std::shared_ptr