Skip to content

Commit

Permalink
driver only
Browse files Browse the repository at this point in the history
  • Loading branch information
mojomex committed Jul 22, 2024
1 parent 30d580a commit 2342cef
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
#endif
#include "boost_tcp_driver/http_client_driver.hpp"
#include "boost_tcp_driver/tcp_driver.hpp"
#include "boost_udp_driver/udp_driver.hpp"
#include "nebula_common/hesai/hesai_common.hpp"
#include "nebula_common/hesai/hesai_status.hpp"
#include "nebula_common/util/expected.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp_receiver.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp"

#include <rclcpp/rclcpp.hpp>
Expand Down Expand Up @@ -127,14 +127,14 @@ class HesaiHwInterface

typedef nebula::util::expected<std::vector<uint8_t>, ptc_error_t> ptc_cmd_result_t;

std::unique_ptr<::drivers::common::IoContext> cloud_io_context_;
std::shared_ptr<boost::asio::io_context> m_owned_ctx;
std::unique_ptr<::drivers::udp_driver::UdpDriver> cloud_udp_driver_;
std::shared_ptr<::drivers::tcp_driver::TcpDriver> tcp_driver_;
std::shared_ptr<const HesaiSensorConfiguration> sensor_configuration_;
std::function<void(std::vector<uint8_t> & buffer)>
std::function<void(std::vector<uint8_t> && buffer)>
cloud_packet_callback_; /**This function pointer is called when the scan is complete*/

connections::UdpReceiver udp_driver_;

std::mutex mtx_inflight_tcp_request_;

int target_model_no;
Expand Down Expand Up @@ -208,7 +208,7 @@ class HesaiHwInterface

/// @brief Callback function to receive the Cloud Packet data from the UDP Driver
/// @param buffer Buffer containing the data received from the UDP socket
void ReceiveSensorPacketCallback(std::vector<uint8_t> & buffer);
void ReceiveSensorPacketCallback(std::vector<uint8_t> && buffer);
/// @brief Starting the interface that handles UDP streams
/// @return Resulting status
Status SensorInterfaceStart();
Expand All @@ -231,7 +231,7 @@ class HesaiHwInterface
/// @brief Registering callback for PandarScan
/// @param scan_callback Callback function
/// @return Resulting status
Status RegisterScanCallback(std::function<void(std::vector<uint8_t> &)> scan_callback);
Status RegisterScanCallback(std::function<void(std::vector<uint8_t> &&)> scan_callback);
/// @brief Getting data with PTC_COMMAND_GET_LIDAR_CALIBRATION
/// @return Resulting status
std::string GetLidarCalibrationString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp"

#include <utility>

// #define WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE

#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
Expand All @@ -16,10 +18,11 @@ namespace nebula
namespace drivers
{
HesaiHwInterface::HesaiHwInterface()
: cloud_io_context_{new ::drivers::common::IoContext(1)},
m_owned_ctx{new boost::asio::io_context(1)},
cloud_udp_driver_{new ::drivers::udp_driver::UdpDriver(*cloud_io_context_)},
tcp_driver_{new ::drivers::tcp_driver::TcpDriver(m_owned_ctx)}
: m_owned_ctx{new boost::asio::io_context(1)},
tcp_driver_{new ::drivers::tcp_driver::TcpDriver(m_owned_ctx)},
udp_driver_("255.255.255.255", 2368, [&](auto && data) {
ReceiveSensorPacketCallback(std::forward<decltype(data)>(data));
})
{
}
HesaiHwInterface::~HesaiHwInterface()
Expand Down Expand Up @@ -140,46 +143,20 @@ Status HesaiHwInterface::SetSensorConfiguration(

Status HesaiHwInterface::SensorInterfaceStart()
{
try {
std::cout << "Starting UDP server on: " << *sensor_configuration_ << std::endl;
cloud_udp_driver_->init_receiver(
sensor_configuration_->host_ip, sensor_configuration_->data_port);
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("init ok");
#endif
cloud_udp_driver_->receiver()->open();
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("open ok");
#endif
cloud_udp_driver_->receiver()->bind();
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("bind ok");
#endif

cloud_udp_driver_->receiver()->asyncReceive(
std::bind(&HesaiHwInterface::ReceiveSensorPacketCallback, this, std::placeholders::_1));
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("async receive set");
#endif
} catch (const std::exception & ex) {
Status status = Status::UDP_CONNECTION_ERROR;
std::cerr << status << sensor_configuration_->sensor_ip << ","
<< sensor_configuration_->data_port << std::endl;
return status;
}
return Status::OK;
}

Status HesaiHwInterface::RegisterScanCallback(
std::function<void(std::vector<uint8_t> &)> scan_callback)
std::function<void(std::vector<uint8_t> &&)> scan_callback)
{
cloud_packet_callback_ = std::move(scan_callback);
return Status::OK;
}

void HesaiHwInterface::ReceiveSensorPacketCallback(std::vector<uint8_t> & buffer)
void HesaiHwInterface::ReceiveSensorPacketCallback(std::vector<uint8_t> && buffer)
{
cloud_packet_callback_(buffer);
if (!cloud_packet_callback_) return;
cloud_packet_callback_(std::move(buffer));
}
Status HesaiHwInterface::SensorInterfaceStop()
{
Expand Down
8 changes: 0 additions & 8 deletions nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

#include "nebula_decoders/nebula_decoders_hesai/hesai_driver.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp"
#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/common/parameter_descriptors.hpp"
#include "nebula_ros/common/watchdog_timer.hpp"

#include <nebula_common/hesai/hesai_common.hpp>
#include <nebula_common/nebula_common.hpp>
#include <nebula_common/point_types.hpp>
#include <rclcpp/rclcpp.hpp>

#include "nebula_msgs/msg/nebula_packet.hpp"
Expand All @@ -33,9 +31,6 @@
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>

namespace nebula
Expand Down Expand Up @@ -104,9 +99,6 @@ class HesaiDecoderWrapper
std::shared_ptr<drivers::HesaiDriver> driver_ptr_{};
std::mutex mtx_driver_ptr_;

mt_queue<std::tuple<drivers::NebulaPointCloudPtr, double>> pc_queue_;
std::thread pub_thread_;

rclcpp::Publisher<pandar_msgs::msg::PandarScan>::SharedPtr packets_pub_{};
pandar_msgs::msg::PandarScan::UniquePtr current_scan_msg_{};

Expand Down
7 changes: 4 additions & 3 deletions nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "nebula_common/hesai/hesai_common.hpp"
#include "nebula_common/nebula_common.hpp"
#include "nebula_common/nebula_status.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp"
#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/common/parameter_descriptors.hpp"
#include "nebula_ros/hesai/decoder_wrapper.hpp"
Expand Down Expand Up @@ -63,7 +64,7 @@ class HesaiRosWrapper final : public rclcpp::Node
Status StreamStart();

private:
void ReceiveCloudPacketCallback(std::vector<uint8_t> & packet);
void ReceiveCloudPacketCallback(std::vector<uint8_t> && packet);

void ReceiveScanMessageCallback(std::unique_ptr<pandar_msgs::msg::PandarScan> scan_msg);

Expand All @@ -83,9 +84,9 @@ class HesaiRosWrapper final : public rclcpp::Node
std::shared_ptr<const nebula::drivers::HesaiSensorConfiguration> sensor_cfg_ptr_{};

/// @brief Stores received packets that have not been processed yet by the decoder thread
// mt_queue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
mt_queue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
/// @brief Thread to isolate decoding from receiving
// std::thread decoder_thread_;
std::thread decoder_thread_;

rclcpp::Subscription<pandar_msgs::msg::PandarScan>::SharedPtr packets_sub_{};

Expand Down
122 changes: 56 additions & 66 deletions nebula_ros/src/hesai/decoder_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

#include <rclcpp/logging.hpp>

#include <thread>
#include <utility>

#pragma clang diagnostic ignored "-Wbitwise-instead-of-logical"
namespace nebula
{
Expand All @@ -22,8 +19,7 @@ HesaiDecoderWrapper::HesaiDecoderWrapper(
: status_(nebula::Status::NOT_INITIALIZED),
logger_(parent_node->get_logger().get_child("HesaiDecoder")),
hw_interface_(hw_interface),
sensor_cfg_(config),
pc_queue_(1)
sensor_cfg_(config)
{
if (!config) {
throw std::runtime_error("HesaiDecoderWrapper cannot be instantiated without a valid config!");
Expand Down Expand Up @@ -84,53 +80,6 @@ HesaiDecoderWrapper::HesaiDecoderWrapper(
RCLCPP_WARN_THROTTLE(
logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline");
});

pub_thread_ = std::thread([&]() {
while (true) {
auto pointcloud_ts = pc_queue_.pop();
auto pointcloud = std::get<0>(pointcloud_ts);

cloud_watchdog_->update();

// Publish scan message only if it has been written to
if (current_scan_msg_ && !current_scan_msg_->packets.empty()) {
packets_pub_->publish(std::move(current_scan_msg_));
current_scan_msg_ = std::make_unique<pandar_msgs::msg::PandarScan>();
}

if (
nebula_points_pub_->get_subscription_count() > 0 ||
nebula_points_pub_->get_intra_process_subscription_count() > 0) {
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_);
}
if (
aw_points_base_pub_->get_subscription_count() > 0 ||
aw_points_base_pub_->get_intra_process_subscription_count() > 0) {
const auto autoware_cloud_xyzi =
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_);
}
if (
aw_points_ex_pub_->get_subscription_count() > 0 ||
aw_points_ex_pub_->get_intra_process_subscription_count() > 0) {
const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(
pointcloud, std::get<1>(pointcloud_ts));
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_);
}
}
});
}

void HesaiDecoderWrapper::OnConfigChange(
Expand Down Expand Up @@ -277,19 +226,19 @@ void HesaiDecoderWrapper::ProcessCloudPacket(
static size_t packet_count = 0;
packet_count++;
// Accumulate packets for recording only if someone is subscribed to the topic (for performance)
// if (
// hw_interface_ && (packets_pub_->get_subscription_count() > 0 ||
// packets_pub_->get_intra_process_subscription_count() > 0)) {
// if (current_scan_msg_->packets.size() == 0) {
// current_scan_msg_->header.stamp = packet_msg->stamp;
// }

// pandar_msgs::msg::PandarPacket pandar_packet_msg{};
// pandar_packet_msg.stamp = packet_msg->stamp;
// pandar_packet_msg.size = packet_msg->data.size();
// std::copy(packet_msg->data.begin(), packet_msg->data.end(), pandar_packet_msg.data.begin());
// current_scan_msg_->packets.emplace_back(pandar_packet_msg);
// }
if (
hw_interface_ && (packets_pub_->get_subscription_count() > 0 ||
packets_pub_->get_intra_process_subscription_count() > 0)) {
if (current_scan_msg_->packets.size() == 0) {
current_scan_msg_->header.stamp = packet_msg->stamp;
}

pandar_msgs::msg::PandarPacket pandar_packet_msg{};
pandar_packet_msg.stamp = packet_msg->stamp;
pandar_packet_msg.size = packet_msg->data.size();
std::copy(packet_msg->data.begin(), packet_msg->data.end(), pandar_packet_msg.data.begin());
current_scan_msg_->packets.emplace_back(std::move(pandar_packet_msg));
}

std::tuple<nebula::drivers::NebulaPointCloudPtr, double> pointcloud_ts{};
nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr;
Expand All @@ -308,7 +257,48 @@ void HesaiDecoderWrapper::ProcessCloudPacket(
return;
}

pc_queue_.try_push(std::move(pointcloud_ts));
cloud_watchdog_->update();
RCLCPP_INFO(
logger_, "### %4lu / %4d (lost %7.3f%%)", packet_count, 3600, 100. - (packet_count / 36.));
packet_count = 0;

// Publish scan message only if it has been written to
if (current_scan_msg_ && !current_scan_msg_->packets.empty()) {
packets_pub_->publish(std::move(current_scan_msg_));
current_scan_msg_ = std::make_unique<pandar_msgs::msg::PandarScan>();
}

if (
nebula_points_pub_->get_subscription_count() > 0 ||
nebula_points_pub_->get_intra_process_subscription_count() > 0) {
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_);
}
if (
aw_points_base_pub_->get_subscription_count() > 0 ||
aw_points_base_pub_->get_intra_process_subscription_count() > 0) {
const auto autoware_cloud_xyzi =
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_);
}
if (
aw_points_ex_pub_->get_subscription_count() > 0 ||
aw_points_ex_pub_->get_intra_process_subscription_count() > 0) {
const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(
pointcloud, std::get<1>(pointcloud_ts));
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_);
}
}

void HesaiDecoderWrapper::PublishCloud(
Expand Down
Loading

0 comments on commit 2342cef

Please sign in to comment.