Skip to content

Commit

Permalink
perf(robosense): move mt_queue from before decoder to after it, impro…
Browse files Browse the repository at this point in the history
…ving performance
  • Loading branch information
mojomex committed Jul 23, 2024
1 parent f77ee3b commit 3e2239f
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 65 deletions.
26 changes: 20 additions & 6 deletions nebula_ros/include/nebula_ros/robosense/decoder_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/common/watchdog_timer.hpp"

#include <nebula_common/nebula_common.hpp>
Expand All @@ -24,6 +25,7 @@
#include <rclcpp/rclcpp.hpp>

#include <nebula_msgs/msg/nebula_packet.hpp>
#include <nebula_msgs/msg/nebula_packets.hpp>
#include <robosense_msgs/msg/robosense_scan.hpp>

#include <chrono>
Expand Down Expand Up @@ -51,6 +53,15 @@ class RobosenseDecoderWrapper
nebula::Status Status();

private:
struct PublishData
{
nebula_msgs::msg::NebulaPackets::UniquePtr packets;
drivers::NebulaPointCloudPtr cloud;
double cloud_timestamp_s;
};

void publish(PublishData && data);

void PublishCloud(
std::unique_ptr<sensor_msgs::msg::PointCloud2> pointcloud,
const rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr & publisher);
Expand All @@ -69,17 +80,20 @@ class RobosenseDecoderWrapper

std::shared_ptr<nebula::drivers::RobosenseHwInterface> hw_interface_;
std::shared_ptr<const drivers::RobosenseSensorConfiguration> sensor_cfg_;
std::shared_ptr<const drivers::RobosenseCalibrationConfiguration> calibration_cfg_ptr_{};
std::shared_ptr<const drivers::RobosenseCalibrationConfiguration> calibration_cfg_ptr_;

std::shared_ptr<drivers::RobosenseDriver> driver_ptr_;
std::mutex mtx_driver_ptr_;

rclcpp::Publisher<robosense_msgs::msg::RobosenseScan>::SharedPtr packets_pub_{};
robosense_msgs::msg::RobosenseScan::UniquePtr current_scan_msg_{};
rclcpp::Publisher<robosense_msgs::msg::RobosenseScan>::SharedPtr packets_pub_;
nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_;

mt_queue<PublishData> publish_queue_;
std::thread pub_thread_;

rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr nebula_points_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_ex_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_base_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr nebula_points_pub_;
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_ex_pub_;
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_base_pub_;

std::shared_ptr<WatchdogTimer> cloud_watchdog_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/robosense/decoder_wrapper.hpp"
#include "nebula_ros/robosense/hw_interface_wrapper.hpp"
#include "nebula_ros/robosense/hw_monitor_wrapper.hpp"
Expand Down Expand Up @@ -83,14 +82,9 @@ class RobosenseRosWrapper final : public rclcpp::Node

nebula::Status wrapper_status_;

std::shared_ptr<const nebula::drivers::RobosenseSensorConfiguration> sensor_cfg_ptr_{};
std::shared_ptr<const nebula::drivers::RobosenseSensorConfiguration> sensor_cfg_ptr_;

/// @brief Stores received packets that have not been processed yet by the decoder thread
mt_queue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
/// @brief Thread to isolate decoding from receiving
std::thread decoder_thread_;

rclcpp::Subscription<robosense_msgs::msg::RobosenseScan>::SharedPtr packets_sub_{};
rclcpp::Subscription<robosense_msgs::msg::RobosenseScan>::SharedPtr packets_sub_;

bool launch_hw_;

Expand Down
92 changes: 56 additions & 36 deletions nebula_ros/src/robosense/decoder_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ RobosenseDecoderWrapper::RobosenseDecoderWrapper(
hw_interface_(hw_interface),
sensor_cfg_(config),
calibration_cfg_ptr_(calibration),
driver_ptr_(new drivers::RobosenseDriver(config, calibration))
driver_ptr_(new drivers::RobosenseDriver(config, calibration)),
current_scan_msg_(std::make_unique<nebula_msgs::msg::NebulaPackets>()),
publish_queue_(1)
{
status_ = driver_ptr_->GetStatus();

Expand All @@ -30,7 +32,6 @@ RobosenseDecoderWrapper::RobosenseDecoderWrapper(

// Publish packets only if HW interface is connected
if (hw_interface_) {
current_scan_msg_ = std::make_unique<robosense_msgs::msg::RobosenseScan>();
packets_pub_ = parent_node->create_publisher<robosense_msgs::msg::RobosenseScan>(
"robosense_packets", rclcpp::SensorDataQoS());
}
Expand All @@ -55,54 +56,91 @@ RobosenseDecoderWrapper::RobosenseDecoderWrapper(
logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline");
});

pub_thread_ = std::thread([&]() {
while (true) {
auto publish_data = publish_queue_.pop();
publish(std::move(publish_data));
}
});

RCLCPP_INFO(logger_, "Initialized decoder wrapper.");
}

void RobosenseDecoderWrapper::ProcessCloudPacket(
std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_msg)
{
auto & packet = *packet_msg;

// Accumulate packets for recording only if someone is subscribed to the topic (for performance)
if (
hw_interface_ && (packets_pub_->get_subscription_count() > 0 ||
packets_pub_->get_intra_process_subscription_count() > 0)) {
if (hw_interface_) {
if (current_scan_msg_->packets.size() == 0) {
current_scan_msg_->header.stamp = packet_msg->stamp;
current_scan_msg_->header.frame_id = sensor_cfg_->frame_id;
}

robosense_msgs::msg::RobosensePacket robosense_packet_msg{};
robosense_packet_msg.stamp = packet_msg->stamp;
std::copy(packet_msg->data.begin(), packet_msg->data.end(), robosense_packet_msg.data.begin());
current_scan_msg_->packets.emplace_back(std::move(robosense_packet_msg));
packet = current_scan_msg_->packets.emplace_back(std::move(*packet_msg));
}

std::tuple<nebula::drivers::NebulaPointCloudPtr, double> pointcloud_ts{};
nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr;

{
std::lock_guard lock(mtx_driver_ptr_);
pointcloud_ts = driver_ptr_->ParseCloudPacket(packet_msg->data);
pointcloud_ts = driver_ptr_->ParseCloudPacket(packet.data);
pointcloud = std::get<0>(pointcloud_ts);
}

if (pointcloud == nullptr) {
return;
}

publish_queue_.try_push({std::move(current_scan_msg_), pointcloud, std::get<1>(pointcloud_ts)});
current_scan_msg_ = std::make_unique<nebula_msgs::msg::NebulaPackets>();
}

void RobosenseDecoderWrapper::OnConfigChange(
const std::shared_ptr<const nebula::drivers::RobosenseSensorConfiguration> & new_config)
{
std::lock_guard lock(mtx_driver_ptr_);
auto new_driver = std::make_shared<drivers::RobosenseDriver>(new_config, calibration_cfg_ptr_);
driver_ptr_ = new_driver;
sensor_cfg_ = new_config;
}

/// @brief Get current status of this driver
/// @return Current status
nebula::Status RobosenseDecoderWrapper::Status()
{
return status_;
}

void RobosenseDecoderWrapper::publish(PublishData && data)
{
auto pointcloud = data.cloud;
auto header_stamp = rclcpp::Time(SecondsToChronoNanoSeconds(data.cloud_timestamp_s).count());

cloud_watchdog_->update();

// Publish scan message only if it has been written to
if (current_scan_msg_ && !current_scan_msg_->packets.empty()) {
packets_pub_->publish(std::move(current_scan_msg_));
current_scan_msg_ = std::make_unique<robosense_msgs::msg::RobosenseScan>();
auto robosense_scan = std::make_unique<robosense_msgs::msg::RobosenseScan>();
robosense_scan->header = data.packets->header;

for (const auto & pkt : data.packets->packets) {
auto & robosense_pkt = robosense_scan->packets.emplace_back();
robosense_pkt.stamp = pkt.stamp;
std::copy(pkt.data.begin(), pkt.data.end(), robosense_pkt.data.begin());
}

packets_pub_->publish(std::move(robosense_scan));
}

if (
nebula_points_pub_->get_subscription_count() > 0 ||
nebula_points_pub_->get_intra_process_subscription_count() > 0) {
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_);
}
if (
Expand All @@ -112,39 +150,21 @@ void RobosenseDecoderWrapper::ProcessCloudPacket(
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_);
}
if (
aw_points_ex_pub_->get_subscription_count() > 0 ||
aw_points_ex_pub_->get_intra_process_subscription_count() > 0) {
const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(
pointcloud, std::get<1>(pointcloud_ts));
const auto autoware_ex_cloud =
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(pointcloud, data.cloud_timestamp_s);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_);
}
}

void RobosenseDecoderWrapper::OnConfigChange(
const std::shared_ptr<const nebula::drivers::RobosenseSensorConfiguration> & new_config)
{
std::lock_guard lock(mtx_driver_ptr_);
auto new_driver = std::make_shared<drivers::RobosenseDriver>(new_config, calibration_cfg_ptr_);
driver_ptr_ = new_driver;
sensor_cfg_ = new_config;
}

/// @brief Get current status of this driver
/// @return Current status
nebula::Status RobosenseDecoderWrapper::Status()
{
return status_;
}

void RobosenseDecoderWrapper::PublishCloud(
std::unique_ptr<sensor_msgs::msg::PointCloud2> pointcloud,
const rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr & publisher)
Expand Down
18 changes: 3 additions & 15 deletions nebula_ros/src/robosense/robosense_ros_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ namespace ros
RobosenseRosWrapper::RobosenseRosWrapper(const rclcpp::NodeOptions & options)
: rclcpp::Node("robosense_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)),
wrapper_status_(Status::NOT_INITIALIZED),
sensor_cfg_ptr_(nullptr),
packet_queue_(3000),
hw_interface_wrapper_(),
hw_monitor_wrapper_(),
decoder_wrapper_()
sensor_cfg_ptr_(nullptr)
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);

Expand All @@ -40,12 +36,6 @@ RobosenseRosWrapper::RobosenseRosWrapper(const rclcpp::NodeOptions & options)

RCLCPP_DEBUG(get_logger(), "Starting stream");

decoder_thread_ = std::thread([this]() {
while (true) {
decoder_wrapper_->ProcessCloudPacket(packet_queue_.pop());
}
});

if (launch_hw_) {
hw_interface_wrapper_->HwInterface()->RegisterScanCallback(
std::bind(&RobosenseRosWrapper::ReceiveCloudPacketCallback, this, std::placeholders::_1));
Expand Down Expand Up @@ -144,7 +134,7 @@ void RobosenseRosWrapper::ReceiveScanMessageCallback(
nebula_pkt_ptr->stamp = pkt.stamp;
std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data));

packet_queue_.push(std::move(nebula_pkt_ptr));
decoder_wrapper_->ProcessCloudPacket(std::move(nebula_pkt_ptr));
}
}

Expand Down Expand Up @@ -281,9 +271,7 @@ void RobosenseRosWrapper::ReceiveCloudPacketCallback(std::vector<uint8_t> & pack
msg_ptr->stamp.nanosec = static_cast<int>(timestamp_ns % 1'000'000'000);
msg_ptr->data.swap(packet);

if (!packet_queue_.try_push(std::move(msg_ptr))) {
RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped");
}
decoder_wrapper_->ProcessCloudPacket(std::move(msg_ptr));
}

RCLCPP_COMPONENTS_REGISTER_NODE(RobosenseRosWrapper)
Expand Down

0 comments on commit 3e2239f

Please sign in to comment.