Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime and static type checks #639

Merged
merged 14 commits into from
Jan 10, 2025
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,11 @@
"ros.distro": "iron",
"search.useIgnoreFiles": false,
"python.autoComplete.extraPaths": [
"/opt/openrobots/lib/python3.10/site-packages",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, but VS code always adds it and I dont want to make a special rule.

"/opt/ros/iron/lib/python3.10/site-packages"
],
"python.analysis.extraPaths": [
"/opt/openrobots/lib/python3.10/site-packages",
"/opt/ros/iron/lib/python3.10/site-packages"
],
"cmake.configureOnOpen": false,
Expand Down
19 changes: 12 additions & 7 deletions bitbots_behavior/bitbots_blackboard/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()

find_package(bio_ik_msgs REQUIRED)
find_package(ament_cmake REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(rclpy REQUIRED)
find_package(tf2 REQUIRED)
find_package(bio_ik_msgs REQUIRED)
find_package(bitbots_docs REQUIRED)
find_package(bitbots_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(rclpy REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(std_msgs REQUIRED)
find_package(tf2_geometry_msgs REQUIRED)
find_package(std_srvs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(bitbots_docs REQUIRED)
find_package(tf2 REQUIRED)
find_package(tf2_geometry_msgs REQUIRED)

set(INCLUDE_DIRS
${bio_ik_msgs_INCLUDE_DIRS}
Expand Down Expand Up @@ -76,6 +76,11 @@ ament_export_dependencies(geometry_msgs)
ament_export_dependencies(bitbots_docs)
ament_export_include_directories(${INCLUDE_DIRS})

if(BUILD_TESTING)
find_package(ament_cmake_mypy REQUIRED)
ament_mypy(CONFIG_FILE "${CMAKE_CURRENT_LIST_DIR}/mypy.ini")
endif()

ament_python_install_package(${PROJECT_NAME})

ament_package()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Setting up runtime type checking for this package
from beartype.claw import beartype_this_package

beartype_this_package()
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


class BodyBlackboard:
def __init__(self, node: Node, tf_buffer: tf2.Buffer):
def __init__(self, node: Node, tf_buffer: tf2.BufferInterface):
# References
self.node = node
self.tf_buffer = tf_buffer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING

from bitbots_utils.utils import nobeartype
from rclpy.node import Node

if TYPE_CHECKING:
Expand All @@ -9,6 +10,7 @@
class AbstractBlackboardCapsule:
"""Abstract class for blackboard capsules."""

def __init__(self, node: Node, blackboard: Optional["BodyBlackboard"] = None):
@nobeartype
def __init__(self, node: Node, blackboard: "BodyBlackboard"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is nobeartype needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has something to do with circular imports. The blackboard is imported for static type checking, but importing it for dynamic type checking leads to a circular dependency during runtime. Afaik bear type has no mechanism to handle this.

self._node = node
self._blackboard = blackboard
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, node, blackboard):
self.calc_base_costmap()
self.calc_gradients()

def robot_callback(self, msg: RobotArray):
def robot_callback(self, msg: RobotArray) -> None:
"""
Callback with new robot detections
"""
Expand All @@ -77,10 +77,11 @@ def robot_callback(self, msg: RobotArray):
# Publish debug costmap
self.publish_costmap()

def publish_costmap(self):
def publish_costmap(self) -> None:
"""
Publishes the costmap for rviz
"""
assert self.costmap is not None, "Costmap is not initialized"
# Normalize costmap to match the rviz color scheme in a good way
normalized_costmap = (
(255 - ((self.costmap - np.min(self.costmap)) / (np.max(self.costmap) - np.min(self.costmap))) * 255 / 2.1)
Expand Down Expand Up @@ -131,7 +132,7 @@ def get_pass_regions(self) -> np.ndarray:
# Smooth obstacle map
return gaussian_filter(costmap, pass_smooth)

def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
def field_2_costmap_coord(self, x: float, y: float) -> Tuple[int, int]:
"""
Converts a field position to the corresponding indices for the costmap.

Expand All @@ -153,10 +154,11 @@ def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
)
return idx_x, idx_y

def calc_gradients(self):
def calc_gradients(self) -> None:
"""
Recalculates the gradient map based on the current costmap.
"""
assert self.base_costmap is not None, "Base costmap is not initialized"
gradient = np.gradient(self.base_costmap)
norms = np.linalg.norm(gradient, axis=0)

Expand Down Expand Up @@ -186,7 +188,7 @@ def cost_at_relative_xy(self, x: float, y: float) -> float:

return self.get_cost_at_field_position(point.point.x, point.point.y)

def calc_base_costmap(self):
def calc_base_costmap(self) -> None:
"""
Builds the base costmap based on the behavior parameters.
This costmap includes a gradient towards the enemy goal and high costs outside the playable area
Expand All @@ -203,8 +205,8 @@ def calc_base_costmap(self):

# Create Grid
grid_x, grid_y = np.mgrid[
0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j,
0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j,
0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j, # type: ignore[misc]
0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j, # type: ignore[misc]
]

fix_points: List[Tuple[Tuple[float, float], float]] = []
Expand Down Expand Up @@ -278,7 +280,8 @@ def calc_base_costmap(self):
)

# Smooth the costmap to get more continuous gradients
self.base_costmap = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
base_costmap: np.ndarray = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
self.base_costmap = base_costmap
self.costmap = self.base_costmap.copy()

def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, float]:
Expand All @@ -287,6 +290,7 @@ def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, flo
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
assert self.gradient_map is not None, "Gradient map is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return -self.gradient_map[0][idx_x, idx_y], -self.gradient_map[1][idx_x, idx_y]

Expand All @@ -296,10 +300,11 @@ def get_cost_at_field_position(self, x: float, y: float) -> float:
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
assert self.costmap is not None, "Costmap is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return self.costmap[idx_x, idx_y]

def get_gradient_direction_at_field_position(self, x: float, y: float):
def get_gradient_direction_at_field_position(self, x: float, y: float) -> float:
"""
Returns the gradient direction at the given position
:param x: Field coordinate in the x direction
Expand All @@ -318,7 +323,9 @@ def get_gradient_direction_at_field_position(self, x: float, y: float):
grad = self.get_gradient_at_field_position(x, y)
return math.atan2(grad[1], grad[0])

def get_cost_of_kick_relative(self, x: float, y: float, direction: float, kick_length: float, angular_range: float):
def get_cost_of_kick_relative(
self, x: float, y: float, direction: float, kick_length: float, angular_range: float
) -> float:
"""
Returns the cost of a kick at the given position and direction in base footprint frame
:param x: Field coordinate in the x direction
Expand Down Expand Up @@ -356,6 +363,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
:param kick_length: The length of the kick
:param angular_range: The angular range of the kick
"""
assert self.costmap is not None, "Costmap is not initialized"

# create a mask in the size of the costmap consisting of 8-bit values initialized as 0
mask = Image.new("L", (self.costmap.shape[1], self.costmap.shape[0]))
Expand Down Expand Up @@ -386,7 +394,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
# This should contribute way less than the max and should have an impact if the max values are similar in all directions.
return masked_costmap.max() * 0.75 + masked_costmap.min() * 0.25

def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float):
def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float) -> float:
"""
Returns the cost of the kick at the current position
:param direction: The direction of the kick
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from bitbots_utils.utils import get_parameters_from_other_node
from game_controller_hl_interfaces.msg import GameState

Expand All @@ -9,96 +11,86 @@ class GameStatusCapsule(AbstractBlackboardCapsule):

def __init__(self, node, blackboard=None):
super().__init__(node, blackboard)
self.team_id = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.team_id: int = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.gamestate = GameState()
self.last_update = 0
self.unpenalized_time = 0
self.last_goal_from_us_time = -86400
self.last_goal_time = -86400
self.free_kick_kickoff_team = None

def is_game_state_equals(self, value):
assert value in [
GameState.GAMESTATE_PLAYING,
GameState.GAMESTATE_FINISHED,
GameState.GAMESTATE_INITIAL,
GameState.GAMESTATE_READY,
GameState.GAMESTATE_SET,
]
return value == self.get_gamestate()

def get_gamestate(self):
self.last_update: float = 0.0
self.unpenalized_time: float = 0.0
self.last_goal_from_us_time = -86400.0
self.last_goal_time = -86400.0
self.free_kick_kickoff_team: Optional[bool] = None

def get_gamestate(self) -> int:
return self.gamestate.game_state

def get_secondary_state(self):
def get_secondary_state(self) -> int:
return self.gamestate.secondary_state

def get_secondary_state_mode(self):
def get_secondary_state_mode(self) -> int:
return self.gamestate.secondary_state_mode

def get_secondary_team(self):
def get_secondary_team(self) -> int:
return self.gamestate.secondary_state_team

def has_kickoff(self):
def has_kickoff(self) -> bool:
return self.gamestate.has_kick_off

def has_penalty_kick(self):
def has_penalty_kick(self) -> bool:
return (
self.gamestate.secondary_state == GameState.STATE_PENALTYKICK
or self.gamestate.secondary_state == GameState.STATE_PENALTYSHOOT
) and self.gamestate._secondary_state_team == self.team_id

def get_own_goals(self):
def get_our_goals(self) -> int:
return self.gamestate.own_score

def get_opp_goals(self):
def get_opp_goals(self) -> int:
return self.gamestate.rival_score

def get_seconds_since_own_goal(self):
def get_seconds_since_own_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_from_us_time

def get_seconds_since_any_goal(self):
def get_seconds_since_any_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_time

def get_seconds_remaining(self):
def get_seconds_remaining(self) -> float:
# Time from the message minus time passed since receiving it
return max(
self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0
self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0.0
)

def get_secondary_seconds_remaining(self):
def get_secondary_seconds_remaining(self) -> float:
"""Seconds remaining for things like kickoff"""
# Time from the message minus time passed since receiving it
return max(
self.gamestate.secondary_seconds_remaining
- (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update),
0,
0.0,
)

def get_seconds_since_last_drop_ball(self):
def get_seconds_since_last_drop_ball(self) -> Optional[float]:
"""Returns the seconds since the last drop in"""
if self.gamestate.drop_in_time == -1:
return None
else:
# Time from the message plus seconds passed since receiving it
return self.gamestate.drop_in_time + (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update)

def get_seconds_since_unpenalized(self):
def get_seconds_since_unpenalized(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.unpenalized_time

def get_is_penalized(self):
def get_is_penalized(self) -> bool:
return self.gamestate.penalized

def received_gamestate(self):
return self.last_update != 0
def received_gamestate(self) -> bool:
return self.last_update != 0.0

def get_team_id(self):
def get_team_id(self) -> int:
return self.team_id

def get_red_cards(self):
def get_red_cards(self) -> int:
return self.gamestate.team_mates_with_red_card

def gamestate_callback(self, gamestate_msg: GameState):
def gamestate_callback(self, gamestate_msg: GameState) -> None:
if self.gamestate.penalized and not gamestate_msg.penalized:
self.unpenalized_time = self._node.get_clock().now().nanoseconds / 1e9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class KickCapsule(AbstractBlackboardCapsule):
is_currently_kicking: bool = False

__connected: bool = False
__action_client: ActionClient = None
__action_client: Optional[ActionClient] = None

class WalkKickTargets(Flag):
"""
Expand All @@ -43,22 +43,22 @@ def __init__(self, node, blackboard):
self.walk_kick_pub = self._node.create_publisher(Bool, "/kick", 1)
# self.connect_dynamic_kick() Do not connect if dynamic_kick is disabled

def walk_kick(self, target: WalkKickTargets):
def walk_kick(self, target: WalkKickTargets) -> None:
"""
Kick the ball while walking
:param target: Target for the walk kick (e.g. left or right foot)
"""
self.walk_kick_pub.publish(Bool(data=target.value))

def connect_dynamic_kick(self):
def connect_dynamic_kick(self) -> None:
topic = self._blackboard.config["dynamic_kick"]["topic"]
self.__action_client = ActionClient(self._node, Kick, topic, callback_group=ReentrantCallbackGroup())
self.__connected = self.__action_client.wait_for_server(self._blackboard.config["dynamic_kick"]["wait_time"])

if not self.__connected:
self._node.get_logger().error(f"No dynamic_kick server running on {topic}")

def dynamic_kick(self, goal: Kick.Goal):
def dynamic_kick(self, goal: Kick.Goal) -> None:
"""
:param goal: Goal to kick to
:type goal: KickGoal
Expand Down
Loading
Loading