Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to set various cartesian path service parameters #58

39 changes: 36 additions & 3 deletions examples/ex_pose_goal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ def main():
# Declare parameters for position and orientation
node.declare_parameter("position", [0.5, 0.0, 0.25])
node.declare_parameter("quat_xyzw", [1.0, 0.0, 0.0, 0.0])
node.declare_parameter("cartesian", False)
node.declare_parameter("synchronous", True)
# If non-positive, don't cancel. Only used if synchronous is False
node.declare_parameter("cancel_after_secs", 0.0)
# Planner ID
node.declare_parameter("planner_id", "RRTConnectkConfigDefault")
# Declare parameters for cartesian planning
node.declare_parameter("cartesian", False)
node.declare_parameter("cartesian_max_step", 0.0025)
node.declare_parameter("cartesian_fraction_threshold", 0.0)
node.declare_parameter("cartesian_jump_threshold", 0.0)
node.declare_parameter("cartesian_avoid_collisions", False)

# Create callback group that allows execution of callbacks in parallel without restrictions
callback_group = ReentrantCallbackGroup()
Expand Down Expand Up @@ -62,17 +67,45 @@ def main():
# Get parameters
position = node.get_parameter("position").get_parameter_value().double_array_value
quat_xyzw = node.get_parameter("quat_xyzw").get_parameter_value().double_array_value
cartesian = node.get_parameter("cartesian").get_parameter_value().bool_value
synchronous = node.get_parameter("synchronous").get_parameter_value().bool_value
cancel_after_secs = (
node.get_parameter("cancel_after_secs").get_parameter_value().double_value
)
cartesian = node.get_parameter("cartesian").get_parameter_value().bool_value
cartesian_max_step = (
node.get_parameter("cartesian_max_step").get_parameter_value().double_value
)
cartesian_fraction_threshold = (
node.get_parameter("cartesian_fraction_threshold")
.get_parameter_value()
.double_value
)
cartesian_jump_threshold = (
node.get_parameter("cartesian_jump_threshold")
.get_parameter_value()
.double_value
)
cartesian_avoid_collisions = (
node.get_parameter("cartesian_avoid_collisions")
.get_parameter_value()
.bool_value
)

# Set parameters for cartesian planning
moveit2.cartesian_avoid_collisions = cartesian_avoid_collisions
moveit2.cartesian_jump_threshold = cartesian_jump_threshold

# Move to pose
node.get_logger().info(
f"Moving to {{position: {list(position)}, quat_xyzw: {list(quat_xyzw)}}}"
)
moveit2.move_to_pose(position=position, quat_xyzw=quat_xyzw, cartesian=cartesian)
moveit2.move_to_pose(
position=position,
quat_xyzw=quat_xyzw,
cartesian=cartesian,
cartesian_max_step=cartesian_max_step,
cartesian_fraction_threshold=cartesian_fraction_threshold,
)
if synchronous:
# Note: the same functionality can be achieved by setting
# `synchronous:=false` and `cancel_after_secs` to a negative value.
Expand Down
79 changes: 74 additions & 5 deletions pymoveit2/moveit2.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def move_to_pose(
weight_position: float = 1.0,
cartesian: bool = False,
weight_orientation: float = 1.0,
cartesian_max_step: float = 0.0025,
cartesian_fraction_threshold: float = 0.0,
):
"""
Plan and execute motion based on previously set goals. Optional arguments can be
Expand Down Expand Up @@ -428,6 +430,8 @@ def move_to_pose(
weight_position=weight_position,
weight_orientation=weight_orientation,
cartesian=cartesian,
max_step=cartesian_max_step,
cartesian_fraction_threshold=cartesian_fraction_threshold,
)
)

Expand Down Expand Up @@ -500,12 +504,18 @@ def plan(
weight_joint_position: float = 1.0,
start_joint_state: Optional[Union[JointState, List[float]]] = None,
cartesian: bool = False,
max_step: float = 0.0025,
cartesian_fraction_threshold: float = 0.0,
) -> Optional[JointTrajectory]:
"""
Call plan_async and wait on future
"""
future = self.plan_async(
**{key: value for key, value in locals().items() if key != "self"}
**{
key: value
for key, value in locals().items()
if key not in ["self", "cartesian_fraction_threshold"]
}
)

if future is None:
Expand All @@ -516,7 +526,11 @@ def plan(
while not future.done():
rate.sleep()

return self.get_trajectory(future, cartesian=cartesian)
return self.get_trajectory(
future,
cartesian=cartesian,
cartesian_fraction_threshold=cartesian_fraction_threshold,
)

def plan_async(
self,
Expand All @@ -537,6 +551,7 @@ def plan_async(
weight_joint_position: float = 1.0,
start_joint_state: Optional[Union[JointState, List[float]]] = None,
cartesian: bool = False,
max_step: float = 0.0025,
) -> Optional[Future]:
"""
Plan motion based on previously set goals. Optional arguments can be passed in to
Expand Down Expand Up @@ -634,9 +649,10 @@ def plan_async(
# Plan trajectory asynchronously by service call
if cartesian:
future = self._plan_cartesian_path(
max_step=max_step,
frame_id=pose_stamped.header.frame_id
if pose_stamped is not None
else frame_id
else frame_id,
)
else:
# Use service
Expand All @@ -649,11 +665,17 @@ def plan_async(
return future

def get_trajectory(
self, future: Future, cartesian: bool = False
self,
future: Future,
cartesian: bool = False,
cartesian_fraction_threshold: float = 0.0,
) -> Optional[JointTrajectory]:
"""
Takes in a future returned by plan_async and returns the trajectory if the future is done
and planning was successful, else None.

For cartesian plans, the plan is rejected if the fraction of the path that was completed is
less than `cartesian_fraction_threshold`.
"""
if not future.done():
self._node.get_logger().warn(
Expand All @@ -666,7 +688,14 @@ def get_trajectory(
# Cartesian
if cartesian:
if MoveItErrorCodes.SUCCESS == res.error_code.val:
return res.solution.joint_trajectory
if res.fraction >= cartesian_fraction_threshold:
return res.solution.joint_trajectory
else:
self._node.get_logger().warn(
f"Planning failed! Cartesian planner completed {res.fraction} "
f"of the trajectory, less than the threshold {cartesian_fraction_threshold}."
)
return None
AndrejOrsula marked this conversation as resolved.
Show resolved Hide resolved
else:
self._node.get_logger().warn(
f"Planning failed! Error code: {res.error_code.val}."
Expand Down Expand Up @@ -2121,6 +2150,14 @@ def __init_compute_ik(self):
def follow_joint_trajectory_action_client(self) -> str:
return self.__follow_joint_trajectory_action_client

@property
def end_effector_name(self) -> str:
return self.__end_effector_name

@property
def base_link_name(self) -> str:
return self.__base_link_name

@property
def joint_names(self) -> List[str]:
return self.__joint_names
Expand Down Expand Up @@ -2176,6 +2213,38 @@ def allowed_planning_time(self) -> float:
def allowed_planning_time(self, value: float):
self.__move_action_goal.request.allowed_planning_time = value

@property
def cartesian_avoid_collisions(self) -> bool:
return self.__cartesian_path_request.request.avoid_collisions

@cartesian_avoid_collisions.setter
def cartesian_avoid_collisions(self, value: bool):
self.__cartesian_path_request.avoid_collisions = value

@property
def cartesian_jump_threshold(self) -> float:
return self.__cartesian_path_request.request.jump_threshold

@cartesian_jump_threshold.setter
def cartesian_jump_threshold(self, value: float):
self.__cartesian_path_request.jump_threshold = value

@property
def cartesian_prismatic_jump_threshold(self) -> float:
return self.__cartesian_path_request.request.prismatic_jump_threshold

@cartesian_prismatic_jump_threshold.setter
def cartesian_prismatic_jump_threshold(self, value: float):
self.__cartesian_path_request.prismatic_jump_threshold = value

@property
def cartesian_revolute_jump_threshold(self) -> float:
return self.__cartesian_path_request.request.revolute_jump_threshold

@cartesian_revolute_jump_threshold.setter
def cartesian_revolute_jump_threshold(self, value: float):
self.__cartesian_path_request.revolute_jump_threshold = value

@property
def pipeline_id(self) -> int:
return self.__move_action_goal.request.pipeline_id
Expand Down
Loading