Skip to content

Commit

Permalink
Fix perception race conditions, web app streaming, and F/T QoS (#151)
Browse files Browse the repository at this point in the history
* Addressed race condition where SegmentFromPoint checks image publishers before republisher has advertised them

* Make face_detection_img a CompressedImage

* Match FT sensor QoS

* Address race condition in getting topic type in FaceDetection

* Allow republisher to set rate
  • Loading branch information
amalnanavati authored Jan 10, 2024
1 parent 8f2dfb3 commit aac525e
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 14 deletions.
5 changes: 3 additions & 2 deletions ada_feeding/ada_feeding/idioms/ft_thresh_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
# Standard imports

# Third-part_y imports
from geometry_msgs.msg import WrenchStamped
import py_trees
from py_trees.blackboard import Blackboard
import py_trees_ros
from geometry_msgs.msg import WrenchStamped
import rclpy

# Local imports

Expand Down Expand Up @@ -103,7 +104,7 @@ def is_satisfied(stamped: WrenchStamped, _: WrenchStamped):
name=name + " FTSubscriber",
topic_name=ft_topic,
topic_type=WrenchStamped,
qos_profile=(py_trees_ros.utilities.qos_profile_unlatched()),
qos_profile=rclpy.qos.QoSPresetProfiles.SENSOR_DATA.value,
blackboard_variables={
ft_absolute_key: None,
},
Expand Down
24 changes: 20 additions & 4 deletions ada_feeding_perception/ada_feeding_perception/face_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def __init__(
# Currently, RVIZ2 doesn't support visualization of CompressedImage
# (other than as part of a Camera). Hence, for vizualization purposes
# this must be an Image. https://github.com/ros2/rviz/issues/738
self.publisher_image = self.create_publisher(Image, "~/face_detection_img", 1)
self.publisher_image = self.create_publisher(
CompressedImage, "~/face_detection_img/compressed", 1
)

# Create an instance of the Face Detection Cascade Classifier
self.detector = cv2.CascadeClassifier(self.face_model_path)
Expand All @@ -143,8 +145,15 @@ def __init__(
self.latest_img_msg = None
self.latest_img_msg_lock = threading.Lock()
image_topic = "~/image"
try:
image_type = get_img_msg_type(image_topic, self)
except ValueError as err:
self.get_logger().error(
f"Error getting type of image topic. Defaulting to CompressedImage. {err}"
)
image_type = CompressedImage
self.img_subscription = self.create_subscription(
get_img_msg_type(image_topic, self),
image_type,
image_topic,
self.camera_callback,
1,
Expand All @@ -155,9 +164,16 @@ def __init__(
self.depth_buffer = collections.deque(maxlen=depth_buffer_size)
self.depth_buffer_lock = threading.Lock()
aligned_depth_topic = "~/aligned_depth"
try:
aligned_depth_type = get_img_msg_type(aligned_depth_topic, self)
except ValueError as err:
self.get_logger().error(
f"Error getting type of depth image topic. Defaulting to Image. {err}"
)
aligned_depth_type = Image
# Subscribe to the depth image
self.depth_subscription = self.create_subscription(
get_img_msg_type(aligned_depth_topic, self),
aligned_depth_type,
aligned_depth_topic,
self.depth_callback,
1,
Expand Down Expand Up @@ -667,7 +683,7 @@ def run(self) -> None:

# Publish the face detection image
self.publisher_image.publish(
cv2_image_to_ros_msg(image_bgr, compress=False, encoding="bgr8")
cv2_image_to_ros_msg(image_bgr, compress=True, encoding="bgr8")
)

# Publish 3d point
Expand Down
38 changes: 33 additions & 5 deletions ada_feeding_perception/ada_feeding_perception/republisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
"""

# Standard imports
import math
import os
import time
from typing import Any, Callable, List, Tuple

# Third-party imports
Expand Down Expand Up @@ -54,6 +56,7 @@ def __init__(self) -> None:
self.from_topics,
topic_type_strs,
to_topics,
target_rates,
post_processors_strs,
mask_relative_path,
temporal_window_size,
Expand Down Expand Up @@ -115,7 +118,7 @@ def __init__(self) -> None:
self.topic_types.append(import_from_string(topic_type_str))

# For each topic, create a callback, publisher, and subscriber
num_topics = min(len(self.from_topics), len(self.topic_types), len(to_topics))
num_topics = min(len(self.from_topics), len(self.topic_types), len(to_topics), len(target_rates))
self.callbacks = []
self.pubs = []
self.subs = []
Expand Down Expand Up @@ -147,7 +150,7 @@ def __init__(self) -> None:
)

# Create the callback
callback = self.create_callback(i, post_processor_fns)
callback = self.create_callback(i, post_processor_fns, target_rates[i])
self.callbacks.append(callback)

# Create the publisher
Expand All @@ -172,7 +175,7 @@ def __init__(self) -> None:
def load_parameters(
self,
) -> Tuple[
List[str], List[str], List[str], List[List[str]], str, int, int, int, int
List[str], List[str], List[str], List[str], List[List[str]], str, int, int, int, int
]:
"""
Load the parameters for the republisher.
Expand Down Expand Up @@ -235,6 +238,17 @@ def load_parameters(
),
)

# Read the target rates
target_rates = self.declare_parameter(
"target_rates",
descriptor=ParameterDescriptor(
name="target_rates",
type=ParameterType.PARAMETER_INTEGER_ARRAY,
description="Target rates (hz) for the republications.",
read_only=True,
),
)

# Read the post-processing functions
post_processors = self.declare_parameter(
"post_processors",
Expand Down Expand Up @@ -344,11 +358,15 @@ def load_parameters(
to_topics_retval = to_topics.value
if to_topics_retval is None:
to_topics_retval = []
target_rates_retval = target_rates.value
if target_rates_retval is None:
target_rates_retval = []

return (
from_topics_retval,
topic_types_retval,
to_topics.value,
target_rates_retval,
post_processors_retval,
mask_relative_path.value,
temporal_window_size.value,
Expand All @@ -358,7 +376,7 @@ def load_parameters(
)

def create_callback(
self, i: int, post_processors: List[Callable[[Any], Any]]
self, i: int, post_processors: List[Callable[[Any], Any]], target_rate: float,
) -> Callable:
"""
Create the callback for the subscriber.
Expand All @@ -370,12 +388,19 @@ def create_callback(
post_processor : List[Callable[[Any], Any]]
The post-processing functions to apply to the message before republishing.
Each must take in a message and return a message of the same type.
target_rate : float
the target rate for the publication
Returns
-------
callback : Callable
The callback for the subscriber.
"""
if target_rate <= 0:
interval = -math.inf
else:
interval = 1.0/target_rate
last_published_time = None

def callback(msg: Any):
"""
Expand All @@ -386,12 +411,15 @@ def callback(msg: Any):
msg : Any
The message from the subscriber.
"""
nonlocal last_published_time
# self.get_logger().info(
# f"Received message on topic {i} {self.from_topics[i]}"
# )
for post_processor in post_processors:
msg = post_processor(msg)
self.pubs[i].publish(msg)
if (last_published_time is None or time.time() - last_published_time >= interval):
self.pubs[i].publish(msg)
last_published_time = time.time()

return callback

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,15 @@ def __init__(self) -> None:
self.latest_depth_img_msg = None
self.latest_depth_img_msg_lock = threading.Lock()
aligned_depth_topic = "~/aligned_depth"
try:
aligned_depth_type = get_img_msg_type(aligned_depth_topic, self)
except ValueError as err:
self.get_logger().error(
f"Error getting type of depth image topic. Defaulting to Image. {err}"
)
aligned_depth_type = Image
self.depth_image_subscriber = self.create_subscription(
get_img_msg_type(aligned_depth_topic, self),
aligned_depth_type,
aligned_depth_topic,
self.depth_image_callback,
1,
Expand All @@ -112,8 +119,15 @@ def __init__(self) -> None:
self.latest_img_msg = None
self.latest_img_msg_lock = threading.Lock()
image_topic = "~/image"
try:
image_type = get_img_msg_type(image_topic, self)
except ValueError as err:
self.get_logger().error(
f"Error getting type of image topic. Defaulting to CompressedImage. {err}"
)
image_type = CompressedImage
self.image_subscriber = self.create_subscription(
get_img_msg_type(image_topic, self),
image_type,
image_topic,
self.image_callback,
1,
Expand Down
19 changes: 19 additions & 0 deletions ada_feeding_perception/config/republisher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,43 @@ republisher:
# The name of the topics to republish from
from_topics:
- /camera/color/image_raw/compressed
- /local/camera/color/image_raw/compressed
- /camera/color/camera_info
- /camera/aligned_depth_to_color/image_raw
- /camera/aligned_depth_to_color/camera_info
- /local/camera/aligned_depth_to_color/image_raw
- /face_detection_img/compressed

# The types of topics to republish from
topic_types:
- sensor_msgs.msg.CompressedImage
- sensor_msgs.msg.CompressedImage
- sensor_msgs.msg.CameraInfo
- sensor_msgs.msg.Image
- sensor_msgs.msg.CameraInfo
- sensor_msgs.msg.Image
- sensor_msgs.msg.CompressedImage

# The name of the topics to republish to
to_topics:
- /local/camera/color/image_raw/compressed
- /local/camera/color/image_raw/compressed/low_hz
- /local/camera/color/camera_info
- /local/camera/aligned_depth_to_color/image_raw
- /local/camera/aligned_depth_to_color/camera_info
- /local/camera/aligned_depth_to_color/depth_octomap
- /face_detection_img/compressed/low_hz

# The target rates (Hz) for the republished topics. Rates <= 0 will publish
# every message.
target_rates:
- 0
- 3
- 0
- 0
- 0
- 0
- 3

# The names of a post-processing functions to apply to the message before
# republishing it. Current options are:
Expand All @@ -42,7 +59,9 @@ republisher:
- ""
- ""
- ""
- ""
- threshold,mask,temporal,spatial # Apply filters to the depth image for the Octomap
- ""
# The binary mask used for "mask" post-processing. This mask will get scaled,
# possibly disproportionately, to the same of the image.
mask_relative_path: model/fork_handle_mask.png
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def generate_launch_description():
# points have been set to 0.
face_detection_remappings = [
("~/face_detection", "/face_detection"),
("~/face_detection_img", "/face_detection_img"),
("~/face_detection_img/compressed", "/face_detection_img/compressed"),
("~/toggle_face_detection", "/toggle_face_detection"),
(
"~/aligned_depth",
Expand Down

0 comments on commit aac525e

Please sign in to comment.