From 210a89aa656d989fcdf9cdfa1148aa6fc1f50ed1 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 11:47:29 +0900 Subject: [PATCH 01/37] feat(udp): a new UDP socket implementation Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 286 ++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp new file mode 100644 index 000000000..c05b13f54 --- /dev/null +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -0,0 +1,286 @@ +// Copyright 2024 TIER IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nebula::drivers::connections +{ + +class SocketError : public std::exception +{ +public: + explicit SocketError(int err_no) : what_(strerror(err_no)) {} + + explicit SocketError(const char * msg) : what_(msg) {} + + const char * what() const noexcept override { return what_.c_str(); } + +private: + std::string what_; +}; + +class UdpSocket +{ +private: + struct Endpoint + { + std::string ip; + uint16_t port; + }; + + struct MsgBuffers + { + msghdr msg{}; + iovec iov{}; + std::array control; + sockaddr_in sender_addr; + }; + + enum class State { UNINITIALIZED, INITIALIZED, BOUND, ACTIVE }; + +public: + using callback_t = std::function &, uint64_t)>; + + /** + * @brief Construct a UDP socket with timestamp measuring enabled. The minimal way to start + * receiving on the socket is `UdpSocket().init(...).bind().subscribe(...);`. + */ + UdpSocket() + { + int sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + if (sock_fd == -1) throw SocketError(errno); + + int enable = 1; + int result = setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMP, &enable, sizeof(enable)) < 0; + if (result < 0) throw SocketError((errno)); + + sock_fd_ = sock_fd; + } + + /** + * @brief Specify the host address and port for this socket to be bound to. To bind the socket, + * call the `bind()` function. + * + * @param host_ip The address to bind to. + * @param host_port The port to bind to. + */ + UdpSocket & init(const std::string & host_ip, uint16_t host_port) + { + if (state_ > State::INITIALIZED) throw SocketError("Socket must be initialized before binding"); + + host_ = {host_ip, host_port}; + state_ = State::INITIALIZED; + return *this; + } + + /** + * @brief Set the socket to drop all packets not coming from `sender_ip` and `sender_port`. + * + * @param sender_ip The only allowed sender IP. Cannot be a multicast or broadcast address. + * @param sender_port The only allowed sender port. + */ + UdpSocket & limit_to_sender(const std::string & sender_ip, uint16_t sender_port) + { + if (state_ > State::INITIALIZED) throw SocketError("Buffer size has to be set before binding"); + + sender_.emplace(Endpoint{sender_ip, sender_port}); + return *this; + } + + /** + * @brief Set the MTU this socket supports. While this can be set arbitrarily, it is best set to + * the MTU of the network interface, or to the maximum expected packet length. + * + * @param bytes The MTU size. The default value is 1500. + */ + UdpSocket & set_mtu(size_t bytes) + { + if (state_ > State::INITIALIZED) throw SocketError("Buffer size has to be set before binding"); + + buffer_size_ = bytes; + return *this; + } + + /** + * @brief Join an IP multicast group. Only one group can be joined by the socket. + * + * @param group_ip The multicast IP. It has to be in the multicast range `224.0.0.0/4` (between + * `224.0.0.0` and `239.255.255.255`). + */ + UdpSocket & join_multicast_group(const std::string & group_ip) + { + if (state_ < State::INITIALIZED) throw SocketError("Socket has to be initialized first"); + + if (state_ >= State::BOUND) + throw SocketError("Multicast groups have to be joined before binding"); + + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = inet_addr(group_ip.c_str()); // Multicast group address + mreq.imr_interface.s_addr = inet_addr(host_.ip.c_str()); + + int result = setsockopt( + sock_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast(&mreq), sizeof(mreq)); + if (result < 0) throw SocketError((errno)); + + multicast_ip_.emplace(group_ip); + return *this; + } + + /** + * @brief Bind the socket to host IP and port given in `init()`. If `join_multicast_group()` was + * called before this function, the socket will be bound to `group_ip` instead. + */ + UdpSocket & bind() + { + if (state_ < State::INITIALIZED) throw SocketError("Socket has to be initialized first"); + + if (state_ >= State::BOUND) throw SocketError("Re-binding already bound socket"); + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(host_.port); + addr.sin_addr.s_addr = multicast_ip_ ? inet_addr(multicast_ip_->c_str()) : INADDR_ANY; + + int result = ::bind(sock_fd_, (struct sockaddr *)&addr, sizeof(addr)); + if (result == -1) throw SocketError((errno)); + return *this; + } + + /** + * @brief Register a callback for processing received packets and start the receiver thread. The + * callback will be called for each received packet, and will be executed in the receive thread. + * + * @param callback The function to be executed for each received packet. + */ + UdpSocket & subscribe(callback_t && callback) + { + callback_ = std::move(callback); + launch_receiver(); + return *this; + } + + ~UdpSocket() + { + if (state_ == State::ACTIVE) state_ = State::BOUND; + if (receive_thread_.joinable()) receive_thread_.join(); + close(sock_fd_); + } + +private: + void launch_receiver() + { + assert(state_ == State::BOUND); + assert(callback_); + + state_ = State::ACTIVE; + receive_thread_ = std::thread([this]() { + std::vector buffer; + while (state_ == State::ACTIVE) { + buffer.resize(buffer_size_); + auto msg_header = make_msg_header(buffer); + + ssize_t received = recvmsg(sock_fd_, &msg_header.msg, 0); + if (received < 0) throw SocketError((errno)); + if (!is_accepted_sender(msg_header.sender_addr)) continue; + + auto timestamp_ns_opt = get_receive_timestamp(msg_header.msg); + if (!timestamp_ns_opt) continue; + + uint64_t timestamp_ns = timestamp_ns_opt.value(); + + buffer.resize(received); + callback_(buffer, timestamp_ns); + } + }); + } + + std::optional get_receive_timestamp(msghdr & msg) + { + timeval const * tv = nullptr; + for (cmsghdr * cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) { + tv = (struct timeval *)CMSG_DATA(cmsg); + break; + } + } + + if (!tv) { + return {}; + } + + uint64_t timestamp_ns = tv->tv_sec * 1'000'000'000 + tv->tv_usec * 1000; + return timestamp_ns; + } + + bool is_accepted_sender(const sockaddr_in & sender_addr) + { + if (!sender_) return true; + + std::array sender_name; + inet_ntop(AF_INET, &sender_addr.sin_addr, sender_name.data(), INET_ADDRSTRLEN); + return std::strncmp(sender_->ip.c_str(), sender_name.data(), INET_ADDRSTRLEN) == 0; + } + + MsgBuffers make_msg_header(std::vector & receive_buffer) const + { + msghdr msg{}; + iovec iov{}; + std::array control; + + sockaddr_in sender_addr; + socklen_t sender_addr_len = sizeof(sender_addr); + + iov.iov_base = receive_buffer.data(); + iov.iov_len = receive_buffer.size(); + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = control.data(); + msg.msg_controllen = control.size(); + msg.msg_name = &sender_addr; + msg.msg_namelen = sender_addr_len; + + return MsgBuffers{msg, iov, control, sender_addr}; + } + + std::atomic state_{State::UNINITIALIZED}; + int sock_fd_; + size_t buffer_size_ = 1500; + Endpoint host_; + std::optional multicast_ip_; + std::optional sender_; + std::thread receive_thread_; + callback_t callback_; +}; + +} // namespace nebula::drivers::connections From 9d8ee7c98bb122b9eff439e0c336893ad56f0eb6 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 11:50:51 +0900 Subject: [PATCH 02/37] chore(udp): more error handling and doc comments Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index c05b13f54..9335e1e8a 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -158,7 +158,8 @@ class UdpSocket /** * @brief Bind the socket to host IP and port given in `init()`. If `join_multicast_group()` was - * called before this function, the socket will be bound to `group_ip` instead. + * called before this function, the socket will be bound to `group_ip` instead. At least `init()` + * has to have been called before. */ UdpSocket & bind() { @@ -179,11 +180,16 @@ class UdpSocket /** * @brief Register a callback for processing received packets and start the receiver thread. The * callback will be called for each received packet, and will be executed in the receive thread. + * Has to be called on a bound socket (`bind()` has to have been called before). * * @param callback The function to be executed for each received packet. */ UdpSocket & subscribe(callback_t && callback) { + if (state_ < State::BOUND) throw SocketError("Socket has to be bound first"); + + if (state_ > State::BOUND) throw SocketError("Cannot re-subscribe to socket"); + callback_ = std::move(callback); launch_receiver(); return *this; From 49ba10c6e086fc6a7eca503a89cf4868c26e244c Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 11:53:52 +0900 Subject: [PATCH 03/37] chore(udp): always enable socket reuse Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 9335e1e8a..1cf6e82d2 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -84,6 +84,10 @@ class UdpSocket int result = setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMP, &enable, sizeof(enable)) < 0; if (result < 0) throw SocketError((errno)); + int reuse = 1; + result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + if (result < 0) throw SocketError((errno)); + sock_fd_ = sock_fd; } From c2819b9a0b91afed9460fc46be591126131ef607 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 11:54:16 +0900 Subject: [PATCH 04/37] chore(udp): clean up C-style code Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 1cf6e82d2..aca6131a2 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -148,12 +148,11 @@ class UdpSocket if (state_ >= State::BOUND) throw SocketError("Multicast groups have to be joined before binding"); - struct ip_mreq mreq; + ip_mreq mreq; mreq.imr_multiaddr.s_addr = inet_addr(group_ip.c_str()); // Multicast group address mreq.imr_interface.s_addr = inet_addr(host_.ip.c_str()); - int result = setsockopt( - sock_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast(&mreq), sizeof(mreq)); + int result = setsockopt(sock_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); if (result < 0) throw SocketError((errno)); multicast_ip_.emplace(group_ip); From dfb64c59f20797a0466f123b199c6c8b31d36ffd Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 14:13:39 +0900 Subject: [PATCH 05/37] chore(udp): remove unnecessary double parens Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index aca6131a2..04d7f7379 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -82,11 +82,11 @@ class UdpSocket int enable = 1; int result = setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMP, &enable, sizeof(enable)) < 0; - if (result < 0) throw SocketError((errno)); + if (result < 0) throw SocketError(errno); int reuse = 1; result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - if (result < 0) throw SocketError((errno)); + if (result < 0) throw SocketError(errno); sock_fd_ = sock_fd; } @@ -153,7 +153,7 @@ class UdpSocket mreq.imr_interface.s_addr = inet_addr(host_.ip.c_str()); int result = setsockopt(sock_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); - if (result < 0) throw SocketError((errno)); + if (result < 0) throw SocketError(errno); multicast_ip_.emplace(group_ip); return *this; @@ -176,7 +176,7 @@ class UdpSocket addr.sin_addr.s_addr = multicast_ip_ ? inet_addr(multicast_ip_->c_str()) : INADDR_ANY; int result = ::bind(sock_fd_, (struct sockaddr *)&addr, sizeof(addr)); - if (result == -1) throw SocketError((errno)); + if (result == -1) throw SocketError(errno); return *this; } @@ -219,7 +219,7 @@ class UdpSocket auto msg_header = make_msg_header(buffer); ssize_t received = recvmsg(sock_fd_, &msg_header.msg, 0); - if (received < 0) throw SocketError((errno)); + if (received < 0) throw SocketError(errno); if (!is_accepted_sender(msg_header.sender_addr)) continue; auto timestamp_ns_opt = get_receive_timestamp(msg_header.msg); From fb11beb8592128ecf5e9ac6cc0c62299cdf00df2 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 14:14:32 +0900 Subject: [PATCH 06/37] feat(udp): use poll to prevent blocking when there is no received data Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 04d7f7379..121d71f3e 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -14,8 +14,11 @@ #pragma once +#include + #include #include +#include #include #include @@ -68,6 +71,8 @@ class UdpSocket enum class State { UNINITIALIZED, INITIALIZED, BOUND, ACTIVE }; + static const int g_poll_timeout_ms = 10; + public: using callback_t = std::function &, uint64_t)>; @@ -88,6 +93,7 @@ class UdpSocket result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); if (result < 0) throw SocketError(errno); + poll_fd_ = {sock_fd, POLLIN, 0}; sock_fd_ = sock_fd; } @@ -215,6 +221,10 @@ class UdpSocket receive_thread_ = std::thread([this]() { std::vector buffer; while (state_ == State::ACTIVE) { + auto data_available = is_data_available(); + if (!data_available.has_value()) throw SocketError(data_available.error()); + if (!data_available.value()) continue; + buffer.resize(buffer_size_); auto msg_header = make_msg_header(buffer); @@ -251,6 +261,13 @@ class UdpSocket return timestamp_ns; } + util::expected is_data_available() + { + int status = poll(&poll_fd_, 1, g_poll_timeout_ms); + if (status < 0) return errno; + return (poll_fd_.revents & POLLIN) && (status > 0); + } + bool is_accepted_sender(const sockaddr_in & sender_addr) { if (!sender_) return true; @@ -283,7 +300,10 @@ class UdpSocket } std::atomic state_{State::UNINITIALIZED}; + int sock_fd_; + pollfd poll_fd_; + size_t buffer_size_ = 1500; Endpoint host_; std::optional multicast_ip_; From 56afd43151b925bcee078ddffba3bd658d569cde Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 14:33:59 +0900 Subject: [PATCH 07/37] chore(udp): differentiate between socket and usage-related errors Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 121d71f3e..e842d77dd 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -52,6 +53,12 @@ class SocketError : public std::exception std::string what_; }; +class UsageError : public std::runtime_error +{ +public: + explicit UsageError(const std::string & msg) : std::runtime_error(msg) {} +}; + class UdpSocket { private: @@ -106,7 +113,7 @@ class UdpSocket */ UdpSocket & init(const std::string & host_ip, uint16_t host_port) { - if (state_ > State::INITIALIZED) throw SocketError("Socket must be initialized before binding"); + if (state_ > State::INITIALIZED) throw UsageError("Socket must be initialized before binding"); host_ = {host_ip, host_port}; state_ = State::INITIALIZED; @@ -121,7 +128,7 @@ class UdpSocket */ UdpSocket & limit_to_sender(const std::string & sender_ip, uint16_t sender_port) { - if (state_ > State::INITIALIZED) throw SocketError("Buffer size has to be set before binding"); + if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); sender_.emplace(Endpoint{sender_ip, sender_port}); return *this; @@ -135,7 +142,7 @@ class UdpSocket */ UdpSocket & set_mtu(size_t bytes) { - if (state_ > State::INITIALIZED) throw SocketError("Buffer size has to be set before binding"); + if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); buffer_size_ = bytes; return *this; @@ -149,10 +156,10 @@ class UdpSocket */ UdpSocket & join_multicast_group(const std::string & group_ip) { - if (state_ < State::INITIALIZED) throw SocketError("Socket has to be initialized first"); + if (state_ < State::INITIALIZED) throw UsageError("Socket has to be initialized first"); if (state_ >= State::BOUND) - throw SocketError("Multicast groups have to be joined before binding"); + throw UsageError("Multicast groups have to be joined before binding"); ip_mreq mreq; mreq.imr_multiaddr.s_addr = inet_addr(group_ip.c_str()); // Multicast group address @@ -172,9 +179,9 @@ class UdpSocket */ UdpSocket & bind() { - if (state_ < State::INITIALIZED) throw SocketError("Socket has to be initialized first"); + if (state_ < State::INITIALIZED) throw UsageError("Socket has to be initialized first"); - if (state_ >= State::BOUND) throw SocketError("Re-binding already bound socket"); + if (state_ >= State::BOUND) throw UsageError("Re-binding already bound socket"); sockaddr_in addr{}; addr.sin_family = AF_INET; @@ -195,9 +202,9 @@ class UdpSocket */ UdpSocket & subscribe(callback_t && callback) { - if (state_ < State::BOUND) throw SocketError("Socket has to be bound first"); + if (state_ < State::BOUND) throw UsageError("Socket has to be bound first"); - if (state_ > State::BOUND) throw SocketError("Cannot re-subscribe to socket"); + if (state_ > State::BOUND) throw UsageError("Cannot re-subscribe to socket"); callback_ = std::move(callback); launch_receiver(); From 55bed26224b0b171330f58793c16804bf874764e Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 14:34:32 +0900 Subject: [PATCH 08/37] feat(udp): allow setting receive buffer size Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index e842d77dd..f8ff68df3 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -148,6 +148,16 @@ class UdpSocket return *this; } + UdpSocket & set_socket_buffer_size(size_t bytes) + { + if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); + + auto buf_size = static_cast(bytes); + int result = setsockopt(sock_fd_, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)); + if (result < 0) throw SocketError(errno); + return *this; + } + /** * @brief Join an IP multicast group. Only one group can be joined by the socket. * From 1bb6bc67bf5b80e075ebb3c3c5b48c8c80838ade Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 15:13:52 +0900 Subject: [PATCH 09/37] chore(udp): use uint8_t because std::byte is annoying to refactor into everywhere Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index f8ff68df3..7943aded6 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -81,7 +81,7 @@ class UdpSocket static const int g_poll_timeout_ms = 10; public: - using callback_t = std::function &, uint64_t)>; + using callback_t = std::function &, uint64_t)>; /** * @brief Construct a UDP socket with timestamp measuring enabled. The minimal way to start @@ -167,7 +167,6 @@ class UdpSocket UdpSocket & join_multicast_group(const std::string & group_ip) { if (state_ < State::INITIALIZED) throw UsageError("Socket has to be initialized first"); - if (state_ >= State::BOUND) throw UsageError("Multicast groups have to be joined before binding"); @@ -190,7 +189,6 @@ class UdpSocket UdpSocket & bind() { if (state_ < State::INITIALIZED) throw UsageError("Socket has to be initialized first"); - if (state_ >= State::BOUND) throw UsageError("Re-binding already bound socket"); sockaddr_in addr{}; @@ -213,7 +211,6 @@ class UdpSocket UdpSocket & subscribe(callback_t && callback) { if (state_ < State::BOUND) throw UsageError("Socket has to be bound first"); - if (state_ > State::BOUND) throw UsageError("Cannot re-subscribe to socket"); callback_ = std::move(callback); @@ -236,7 +233,7 @@ class UdpSocket state_ = State::ACTIVE; receive_thread_ = std::thread([this]() { - std::vector buffer; + std::vector buffer; while (state_ == State::ACTIVE) { auto data_available = is_data_available(); if (!data_available.has_value()) throw SocketError(data_available.error()); @@ -294,7 +291,7 @@ class UdpSocket return std::strncmp(sender_->ip.c_str(), sender_name.data(), INET_ADDRSTRLEN) == 0; } - MsgBuffers make_msg_header(std::vector & receive_buffer) const + MsgBuffers make_msg_header(std::vector & receive_buffer) const { msghdr msg{}; iovec iov{}; From 9c6522c649975f337c0824eca7f8adff3dbe337b Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 15:28:25 +0900 Subject: [PATCH 10/37] fix(udp): update state correctly when `bind()` is called Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 7943aded6..305e94d33 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -198,6 +198,8 @@ class UdpSocket int result = ::bind(sock_fd_, (struct sockaddr *)&addr, sizeof(addr)); if (result == -1) throw SocketError(errno); + + state_ = State::BOUND; return *this; } From 65ef33d059b340c60e7e94a8401d0bf58964c952 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 17:09:43 +0900 Subject: [PATCH 11/37] feat(udp): monitor socket packet drops Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 76 ++++++++++++++----- 1 file changed, 56 insertions(+), 20 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 305e94d33..6015f3329 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -76,12 +76,37 @@ class UdpSocket sockaddr_in sender_addr; }; + class DropMonitor + { + uint32_t last_drop_counter_{0}; + + public: + uint32_t get_drops_since_last_receive(uint32_t current_drop_counter) + { + uint32_t last = last_drop_counter_; + last_drop_counter_ = current_drop_counter; + + bool counter_did_wrap = current_drop_counter < last; + if (counter_did_wrap) { + return (UINT32_MAX - last) + current_drop_counter; + } + + return current_drop_counter - last; + } + }; + enum class State { UNINITIALIZED, INITIALIZED, BOUND, ACTIVE }; static const int g_poll_timeout_ms = 10; public: - using callback_t = std::function &, uint64_t)>; + struct ReceiveMetadata + { + std::optional timestamp_ns; + uint64_t drops_since_last_receive{0}; + }; + + using callback_t = std::function &, const ReceiveMetadata &)>; /** * @brief Construct a UDP socket with timestamp measuring enabled. The minimal way to start @@ -93,11 +118,15 @@ class UdpSocket if (sock_fd == -1) throw SocketError(errno); int enable = 1; - int result = setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMP, &enable, sizeof(enable)) < 0; + int result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + if (result < 0) throw SocketError(errno); + + // Enable kernel-space receive time measurement + result = setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMP, &enable, sizeof(enable)); if (result < 0) throw SocketError(errno); - int reuse = 1; - result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + // Enable reporting on packets dropped due to full UDP receive buffer + result = setsockopt(sock_fd, SOL_SOCKET, SO_RXQ_OVFL, &enable, sizeof(enable)); if (result < 0) throw SocketError(errno); poll_fd_ = {sock_fd, POLLIN, 0}; @@ -248,33 +277,38 @@ class UdpSocket if (received < 0) throw SocketError(errno); if (!is_accepted_sender(msg_header.sender_addr)) continue; - auto timestamp_ns_opt = get_receive_timestamp(msg_header.msg); - if (!timestamp_ns_opt) continue; - - uint64_t timestamp_ns = timestamp_ns_opt.value(); + auto metadata = get_receive_metadata(msg_header.msg); buffer.resize(received); - callback_(buffer, timestamp_ns); + callback_(buffer, metadata); } }); } - std::optional get_receive_timestamp(msghdr & msg) + ReceiveMetadata get_receive_metadata(msghdr & msg) { - timeval const * tv = nullptr; + ReceiveMetadata result; for (cmsghdr * cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) { - tv = (struct timeval *)CMSG_DATA(cmsg); - break; + if (cmsg->cmsg_level != SOL_SOCKET) continue; + + switch (cmsg->cmsg_type) { + case SO_TIMESTAMP: { + timeval const * tv = (timeval const *)CMSG_DATA(cmsg); + uint64_t timestamp_ns = tv->tv_sec * 1'000'000'000 + tv->tv_usec * 1000; + result.timestamp_ns.emplace(timestamp_ns); + break; + } + case SO_RXQ_OVFL: { + uint32_t const * drops = (uint32_t const *)CMSG_DATA(cmsg); + result.drops_since_last_receive = drop_monitor_.get_drops_since_last_receive(*drops); + break; + } + default: + continue; } } - if (!tv) { - return {}; - } - - uint64_t timestamp_ns = tv->tv_sec * 1'000'000'000 + tv->tv_usec * 1000; - return timestamp_ns; + return result; } util::expected is_data_available() @@ -326,6 +360,8 @@ class UdpSocket std::optional sender_; std::thread receive_thread_; callback_t callback_; + + DropMonitor drop_monitor_; }; } // namespace nebula::drivers::connections From 84e5493071f60047120d46c9fc175efd9315f221 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 17:42:06 +0900 Subject: [PATCH 12/37] feat(udp): add explicit unsubscribe function to facilitate clean shutdown Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 6015f3329..adeed685e 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -249,10 +249,20 @@ class UdpSocket return *this; } - ~UdpSocket() + /** + * @brief Gracefully stops the active receiver thread (if any) but keeps the socket alive. The + * same socket can later be subscribed again. + */ + UdpSocket & unsubscribe() { if (state_ == State::ACTIVE) state_ = State::BOUND; if (receive_thread_.joinable()) receive_thread_.join(); + return *this; + } + + ~UdpSocket() + { + unsubscribe(); close(sock_fd_); } From 8e2f7cdbff3328bc7402a102e4d9a8ee1b3113ec Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 22 Nov 2024 11:01:29 +0900 Subject: [PATCH 13/37] chore(expected): add stdexcept include Signed-off-by: Max SCHMELLER --- nebula_common/include/nebula_common/util/expected.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/nebula_common/include/nebula_common/util/expected.hpp b/nebula_common/include/nebula_common/util/expected.hpp index 1d4333443..45ed0b590 100644 --- a/nebula_common/include/nebula_common/util/expected.hpp +++ b/nebula_common/include/nebula_common/util/expected.hpp @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include From 78f3641ba4af8f5bc6472db6b0c79e6118d0cc87 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 22 Nov 2024 11:05:46 +0900 Subject: [PATCH 14/37] feat(udp): report when messages have been truncated Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index adeed685e..aef16a64c 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -104,6 +104,7 @@ class UdpSocket { std::optional timestamp_ns; uint64_t drops_since_last_receive{0}; + bool truncated; }; using callback_t = std::function &, const ReceiveMetadata &)>; @@ -283,11 +284,13 @@ class UdpSocket buffer.resize(buffer_size_); auto msg_header = make_msg_header(buffer); - ssize_t received = recvmsg(sock_fd_, &msg_header.msg, 0); + ssize_t received = recvmsg(sock_fd_, &msg_header.msg, MSG_TRUNC); if (received < 0) throw SocketError(errno); if (!is_accepted_sender(msg_header.sender_addr)) continue; - auto metadata = get_receive_metadata(msg_header.msg); + ReceiveMetadata metadata; + get_receive_metadata(msg_header.msg, metadata); + metadata.truncated = static_cast(received) > buffer_size_; buffer.resize(received); callback_(buffer, metadata); @@ -295,30 +298,28 @@ class UdpSocket }); } - ReceiveMetadata get_receive_metadata(msghdr & msg) + void get_receive_metadata(msghdr & msg, ReceiveMetadata & inout_metadata) { - ReceiveMetadata result; for (cmsghdr * cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_level != SOL_SOCKET) continue; switch (cmsg->cmsg_type) { case SO_TIMESTAMP: { - timeval const * tv = (timeval const *)CMSG_DATA(cmsg); + auto tv = (timeval const *)CMSG_DATA(cmsg); uint64_t timestamp_ns = tv->tv_sec * 1'000'000'000 + tv->tv_usec * 1000; - result.timestamp_ns.emplace(timestamp_ns); + inout_metadata.timestamp_ns.emplace(timestamp_ns); break; } case SO_RXQ_OVFL: { - uint32_t const * drops = (uint32_t const *)CMSG_DATA(cmsg); - result.drops_since_last_receive = drop_monitor_.get_drops_since_last_receive(*drops); + auto drops = (uint32_t const *)CMSG_DATA(cmsg); + inout_metadata.drops_since_last_receive = + drop_monitor_.get_drops_since_last_receive(*drops); break; } default: continue; } } - - return result; } util::expected is_data_available() From dabdb0978a370adb75a54cb71621fc9fecd6b510 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 22 Nov 2024 11:06:06 +0900 Subject: [PATCH 15/37] chore(udp): relax some usage requirements Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index aef16a64c..0261e0699 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -158,7 +158,7 @@ class UdpSocket */ UdpSocket & limit_to_sender(const std::string & sender_ip, uint16_t sender_port) { - if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); + if (state_ >= State::ACTIVE) throw UsageError("Sender has to be set before subscribing"); sender_.emplace(Endpoint{sender_ip, sender_port}); return *this; @@ -172,7 +172,7 @@ class UdpSocket */ UdpSocket & set_mtu(size_t bytes) { - if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); + if (state_ >= State::ACTIVE) throw UsageError("MTU size has to be set before subscribing"); buffer_size_ = bytes; return *this; @@ -181,6 +181,8 @@ class UdpSocket UdpSocket & set_socket_buffer_size(size_t bytes) { if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); + if (bytes > static_cast(INT32_MAX)) + throw UsageError("The maximum value supported (0x7FFFFFF) has been exceeded"); auto buf_size = static_cast(bytes); int result = setsockopt(sock_fd_, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)); From df403959950e33dd13498fdd56265c57d0b34136 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 22 Nov 2024 11:07:09 +0900 Subject: [PATCH 16/37] test(udp): add most of the unit tests for udp socket Signed-off-by: Max SCHMELLER --- nebula_hw_interfaces/CMakeLists.txt | 21 +- nebula_hw_interfaces/test/common/test_udp.cpp | 184 ++++++++++++++++++ .../test/common/test_udp/utils.hpp | 78 ++++++++ 3 files changed, 277 insertions(+), 6 deletions(-) create mode 100644 nebula_hw_interfaces/test/common/test_udp.cpp create mode 100644 nebula_hw_interfaces/test/common/test_udp/utils.hpp diff --git a/nebula_hw_interfaces/CMakeLists.txt b/nebula_hw_interfaces/CMakeLists.txt index 2577ea9a5..f5dc6569b 100644 --- a/nebula_hw_interfaces/CMakeLists.txt +++ b/nebula_hw_interfaces/CMakeLists.txt @@ -2,13 +2,13 @@ cmake_minimum_required(VERSION 3.14) project(nebula_hw_interfaces) # Default to C++17 -if (NOT CMAKE_CXX_STANDARD) +if(NOT CMAKE_CXX_STANDARD) set(CMAKE_CXX_STANDARD 17) -endif () +endif() -if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic -Wunused-function) -endif () +endif() find_package(ament_cmake_auto REQUIRED) find_package(boost_tcp_driver) @@ -53,7 +53,6 @@ target_link_libraries(nebula_hw_interfaces_velodyne PUBLIC ${boost_tcp_driver_LIBRARIES} ${boost_udp_driver_LIBRARIES} ${velodyne_msgs_TARGETS} - ) target_include_directories(nebula_hw_interfaces_velodyne PUBLIC ${boost_udp_driver_INCLUDE_DIRS} @@ -68,7 +67,6 @@ target_link_libraries(nebula_hw_interfaces_robosense PUBLIC ${boost_tcp_driver_LIBRARIES} ${boost_udp_driver_LIBRARIES} ${robosense_msgs_TARGETS} - ) target_include_directories(nebula_hw_interfaces_robosense PUBLIC ${boost_udp_driver_INCLUDE_DIRS} @@ -100,6 +98,17 @@ install(DIRECTORY include/ DESTINATION include/${PROJECT_NAME}) if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() + + find_package(ament_cmake_gtest REQUIRED) + + ament_add_gtest(test_udp + test/common/test_udp.cpp + ) + + target_include_directories(test_udp PUBLIC + ${nebula_common_INCLUDE_DIRS} + include + test) endif() ament_export_include_directories("include/${PROJECT_NAME}") diff --git a/nebula_hw_interfaces/test/common/test_udp.cpp b/nebula_hw_interfaces/test/common/test_udp.cpp new file mode 100644 index 000000000..d2fd61aa6 --- /dev/null +++ b/nebula_hw_interfaces/test/common/test_udp.cpp @@ -0,0 +1,184 @@ +// Copyright 2024 TIER IV, Inc. + +#include "common/test_udp/utils.hpp" +#include "nebula_common/util/expected.hpp" +#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nebula::drivers::connections +{ + +using std::chrono_literals::operator""ms; + +static const char localhost_ip[] = "127.0.0.1"; +static const char broadcast_ip[] = "255.255.255.255"; +static const char any_ip[] = "0.0.0.0"; +static const char multicast_group[] = "230.1.2.3"; + +static const char sender_ip[] = "192.168.201"; +static const uint16_t sender_port = 7373; +static const uint16_t host_port = 6262; + +static const std::chrono::duration send_receive_timeout = 100ms; + +UdpSocket::callback_t empty_cb() +{ + return [](const auto &, const auto &) {}; +} + +util::expected read_sys_param(const std::string & param_fqn) +{ + std::string path = "/proc/sys/" + param_fqn; + std::replace(path.begin(), path.end(), '.', '/'); + std::ifstream ifs{path}; + if (!ifs) return "could not read " + param_fqn; + + size_t param{}; + if (!(ifs >> param)) return param_fqn + " has unrecognized format"; + return param; +} + +TEST(test_udp, test_basic_lifecycle) +{ + ASSERT_NO_THROW( + UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb()).unsubscribe()); +} + +TEST(test_udp, test_special_addresses_bind) +{ + ASSERT_NO_THROW(UdpSocket().init(broadcast_ip, host_port).bind()); + ASSERT_NO_THROW(UdpSocket().init(any_ip, host_port).bind()); +} + +TEST(test_udp, test_wildcard_multicast_join_error) +{ + ASSERT_THROW( + UdpSocket().init(broadcast_ip, host_port).join_multicast_group(multicast_group).bind(), + SocketError); +} + +TEST(test_udp, test_buffer_resize) +{ + auto rmem_max_maybe = read_sys_param("net.core.rmem_max"); + if (!rmem_max_maybe.has_value()) GTEST_SKIP() << rmem_max_maybe.error(); + size_t rmem_max = rmem_max_maybe.value(); + + // Setting buffer sizes up to and including rmem_max shall succeed + ASSERT_NO_THROW( + UdpSocket().init(localhost_ip, host_port).set_socket_buffer_size(rmem_max).bind()); + + // Linux only supports sizes up to INT32_MAX + ASSERT_THROW( + UdpSocket() + .init(localhost_ip, host_port) + .set_socket_buffer_size(static_cast(INT32_MAX) + 1) + .bind(), + UsageError); +} + +TEST(test_udp, test_correct_usage_is_enforced) +{ + // These functions require other functions (e.g. `init()`) to be called beforehand + ASSERT_THROW(UdpSocket().bind(), UsageError); + ASSERT_THROW(UdpSocket().join_multicast_group(multicast_group), UsageError); + ASSERT_THROW(UdpSocket().subscribe(empty_cb()), UsageError); + + // The following functions can be called in any order, any number of times + ASSERT_NO_THROW(UdpSocket().limit_to_sender(sender_ip, sender_port)); + ASSERT_NO_THROW(UdpSocket().init(localhost_ip, host_port)); + ASSERT_NO_THROW(UdpSocket() + .limit_to_sender(sender_ip, sender_port) + .init(localhost_ip, host_port) + .limit_to_sender(sender_ip, sender_port) + .init(localhost_ip, host_port) + .bind()); + + // Sockets cannot be re-bound + ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).bind().bind(), UsageError); + + ASSERT_THROW( + UdpSocket().init(localhost_ip, host_port).bind().init(localhost_ip, host_port), UsageError); + ASSERT_NO_THROW( + UdpSocket().init(localhost_ip, host_port).bind().limit_to_sender(sender_ip, sender_port)); + + // Only bound sockets can be subscribed + ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).subscribe(empty_cb()), UsageError); + ASSERT_NO_THROW(UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb())); + + // Only one callback can exist + ASSERT_THROW( + UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb()).subscribe(empty_cb()), + UsageError); + + // But un- and re-subscribing shall be supported + ASSERT_NO_THROW(UdpSocket() + .init(localhost_ip, host_port) + .bind() + .subscribe(empty_cb()) + .unsubscribe() + .subscribe(empty_cb())); + + // Unsubscribing on a non-subscribed socket shall also be supported + ASSERT_NO_THROW(UdpSocket().unsubscribe()); +} + +TEST(test_udp, test_receiving) +{ + const std::vector payload{1, 2, 3}; + UdpSocket sock{}; + sock.init(localhost_ip, host_port).bind(); + + auto err_no_opt = udp_send(localhost_ip, host_port, payload); + if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); + + auto result_opt = receive_once(sock, send_receive_timeout); + + ASSERT_TRUE(result_opt.has_value()); + auto const & [recv_payload, metadata] = result_opt.value(); + ASSERT_EQ(recv_payload, payload); + ASSERT_FALSE(metadata.truncated); + ASSERT_EQ(metadata.drops_since_last_receive, 0); + + // TODO(mojomex): currently cannot test timestamping on loopback interface (no timestamp produced) +} + +TEST(test_udp, test_receiving_oversized) +{ + const size_t mtu = 1500; + std::vector payload; + payload.resize(mtu + 1); + UdpSocket sock{}; + sock.init(localhost_ip, host_port).set_mtu(mtu).bind(); + + auto err_no_opt = udp_send(localhost_ip, host_port, payload); + if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); + + auto result_opt = receive_once(sock, send_receive_timeout); + + ASSERT_TRUE(result_opt.has_value()); + auto const & [recv_payload, metadata] = result_opt.value(); + ASSERT_TRUE(std::equal(recv_payload.begin(), recv_payload.end(), payload.begin())); + ASSERT_TRUE(metadata.truncated); + ASSERT_EQ(metadata.drops_since_last_receive, 0); +} + +} // namespace nebula::drivers::connections + +int main(int argc, char * argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}; \ No newline at end of file diff --git a/nebula_hw_interfaces/test/common/test_udp/utils.hpp b/nebula_hw_interfaces/test/common/test_udp/utils.hpp new file mode 100644 index 000000000..dfb0fbb9a --- /dev/null +++ b/nebula_hw_interfaces/test/common/test_udp/utils.hpp @@ -0,0 +1,78 @@ +// Copyright 2024 TIER IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +std::optional udp_send( + const char * to_ip, uint16_t to_port, const std::vector & bytes) +{ + int sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + if (sock_fd < 0) return errno; + + int enable = 1; + int result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + if (result < 0) return errno; + + sockaddr_in receiver_addr{}; + memset(&receiver_addr, 0, sizeof(receiver_addr)); + receiver_addr.sin_family = AF_INET; + receiver_addr.sin_port = htons(to_port); + receiver_addr.sin_addr.s_addr = inet_addr(to_ip); + + result = sendto( + sock_fd, bytes.data(), bytes.size(), 0, reinterpret_cast(&receiver_addr), + sizeof(receiver_addr)); + if (result < 0) return errno; + result = close(sock_fd); + if (result < 0) return errno; + return {}; +} + +template +std::optional< + std::pair, nebula::drivers::connections::UdpSocket::ReceiveMetadata>> +receive_once( + nebula::drivers::connections::UdpSocket & sock, std::chrono::duration<_T, _R> timeout) +{ + std::condition_variable cv_received_result; + std::mutex mtx_result; + std::optional< + std::pair, nebula::drivers::connections::UdpSocket::ReceiveMetadata>> + result; + + sock.subscribe([&](const auto & data, const auto & metadata) { + std::lock_guard lock(mtx_result); + result.emplace(data, metadata); + cv_received_result.notify_one(); + }); + + std::unique_lock lock(mtx_result); + cv_received_result.wait_for(lock, timeout, [&result]() { return result.has_value(); }); + sock.unsubscribe(); + return result; +} \ No newline at end of file From 23311cb6910ec4ee3e2991fb5d69cc68c3df85d4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 22 Nov 2024 02:07:40 +0000 Subject: [PATCH 17/37] ci(pre-commit): autofix --- nebula_hw_interfaces/test/common/test_udp.cpp | 2 +- nebula_hw_interfaces/test/common/test_udp/utils.hpp | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/nebula_hw_interfaces/test/common/test_udp.cpp b/nebula_hw_interfaces/test/common/test_udp.cpp index d2fd61aa6..eb5f68ccb 100644 --- a/nebula_hw_interfaces/test/common/test_udp.cpp +++ b/nebula_hw_interfaces/test/common/test_udp.cpp @@ -181,4 +181,4 @@ int main(int argc, char * argv[]) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -}; \ No newline at end of file +}; diff --git a/nebula_hw_interfaces/test/common/test_udp/utils.hpp b/nebula_hw_interfaces/test/common/test_udp/utils.hpp index dfb0fbb9a..6b9492190 100644 --- a/nebula_hw_interfaces/test/common/test_udp/utils.hpp +++ b/nebula_hw_interfaces/test/common/test_udp/utils.hpp @@ -53,11 +53,10 @@ std::optional udp_send( return {}; } -template +template std::optional< std::pair, nebula::drivers::connections::UdpSocket::ReceiveMetadata>> -receive_once( - nebula::drivers::connections::UdpSocket & sock, std::chrono::duration<_T, _R> timeout) +receive_once(nebula::drivers::connections::UdpSocket & sock, std::chrono::duration<_T, _R> timeout) { std::condition_variable cv_received_result; std::mutex mtx_result; @@ -75,4 +74,4 @@ receive_once( cv_received_result.wait_for(lock, timeout, [&result]() { return result.has_value(); }); sock.unsubscribe(); return result; -} \ No newline at end of file +} From 95eb54f1e1449b695a1b0eaa407280bee9b8818e Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 22 Nov 2024 11:09:08 +0900 Subject: [PATCH 18/37] chore(cspell): add OVFL to dictionary Signed-off-by: Max SCHMELLER --- .cspell.json | 1 + 1 file changed, 1 insertion(+) diff --git a/.cspell.json b/.cspell.json index 4c14cb9f3..f305225f6 100644 --- a/.cspell.json +++ b/.cspell.json @@ -32,6 +32,7 @@ "nproc", "nsec", "ntoa", + "OVFL", "pandar", "PANDAR", "PANDARAT", From 205c5dc009e35c9f72cd052ccabda2db26dfcf22 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 22 Nov 2024 14:06:30 +0900 Subject: [PATCH 19/37] fix(udp): return correctly truncated buffer when oversized packet is received Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 11 +++++++---- nebula_hw_interfaces/test/common/test_udp.cpp | 16 +++++++++++++++- .../test/common/test_udp/utils.hpp | 1 + 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 0261e0699..23cb4b928 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -286,15 +287,17 @@ class UdpSocket buffer.resize(buffer_size_); auto msg_header = make_msg_header(buffer); - ssize_t received = recvmsg(sock_fd_, &msg_header.msg, MSG_TRUNC); - if (received < 0) throw SocketError(errno); + ssize_t recv_result = recvmsg(sock_fd_, &msg_header.msg, MSG_TRUNC); + if (recv_result < 0) throw SocketError(errno); + size_t untruncated_packet_length = recv_result; + if (!is_accepted_sender(msg_header.sender_addr)) continue; ReceiveMetadata metadata; get_receive_metadata(msg_header.msg, metadata); - metadata.truncated = static_cast(received) > buffer_size_; + metadata.truncated = untruncated_packet_length > buffer_size_; - buffer.resize(received); + buffer.resize(std::min(buffer_size_, untruncated_packet_length)); callback_(buffer, metadata); } }); diff --git a/nebula_hw_interfaces/test/common/test_udp.cpp b/nebula_hw_interfaces/test/common/test_udp.cpp index eb5f68ccb..674077243 100644 --- a/nebula_hw_interfaces/test/common/test_udp.cpp +++ b/nebula_hw_interfaces/test/common/test_udp.cpp @@ -159,7 +159,7 @@ TEST(test_udp, test_receiving_oversized) { const size_t mtu = 1500; std::vector payload; - payload.resize(mtu + 1); + payload.resize(mtu + 1, 0x42); UdpSocket sock{}; sock.init(localhost_ip, host_port).set_mtu(mtu).bind(); @@ -170,11 +170,25 @@ TEST(test_udp, test_receiving_oversized) ASSERT_TRUE(result_opt.has_value()); auto const & [recv_payload, metadata] = result_opt.value(); + ASSERT_EQ(recv_payload.size(), mtu); ASSERT_TRUE(std::equal(recv_payload.begin(), recv_payload.end(), payload.begin())); ASSERT_TRUE(metadata.truncated); ASSERT_EQ(metadata.drops_since_last_receive, 0); } +TEST(test_udp, test_filtering_sender) +{ + std::vector payload{1, 2, 3}; + UdpSocket sock{}; + sock.init(localhost_ip, host_port).bind().limit_to_sender(sender_ip, sender_port); + + auto err_no_opt = udp_send(localhost_ip, host_port, payload); + if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); + + auto result_opt = receive_once(sock, send_receive_timeout); + ASSERT_FALSE(result_opt.has_value()); +} + } // namespace nebula::drivers::connections int main(int argc, char * argv[]) diff --git a/nebula_hw_interfaces/test/common/test_udp/utils.hpp b/nebula_hw_interfaces/test/common/test_udp/utils.hpp index 6b9492190..dc9a08c0e 100644 --- a/nebula_hw_interfaces/test/common/test_udp/utils.hpp +++ b/nebula_hw_interfaces/test/common/test_udp/utils.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include std::optional udp_send( From 9ea3a90d5d89ec839e8786ec94b23d339b5ef832 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 14:34:33 +0900 Subject: [PATCH 20/37] chore(udp): uniform initialization for buffer_size_ Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 23cb4b928..a733ddd05 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -370,7 +370,7 @@ class UdpSocket int sock_fd_; pollfd poll_fd_; - size_t buffer_size_ = 1500; + size_t buffer_size_{1500}; Endpoint host_; std::optional multicast_ip_; std::optional sender_; From 674b9607a8d5c92ebd00ae51b9d15de5c202dc78 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 15:07:59 +0900 Subject: [PATCH 21/37] chore(udp): disallow re-initializing socket Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index a733ddd05..c19c02017 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -14,6 +14,12 @@ #pragma once +#include +#ifndef _GNU_SOURCE +// See `man strerror_r` +#define _GNU_SOURCE +#endif + #include #include @@ -34,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -43,10 +50,17 @@ namespace nebula::drivers::connections class SocketError : public std::exception { + static constexpr size_t gnu_max_strerror_length = 1024; + public: - explicit SocketError(int err_no) : what_(strerror(err_no)) {} + explicit SocketError(int err_no) + { + std::array msg_buf; + std::string_view msg = strerror_r(err_no, msg_buf.data(), msg_buf.size()); + what_ = std::string{msg}; + } - explicit SocketError(const char * msg) : what_(msg) {} + explicit SocketError(const std::string_view & msg) : what_(msg) {} const char * what() const noexcept override { return what_.c_str(); } @@ -144,7 +158,7 @@ class UdpSocket */ UdpSocket & init(const std::string & host_ip, uint16_t host_port) { - if (state_ > State::INITIALIZED) throw UsageError("Socket must be initialized before binding"); + if (state_ >= State::INITIALIZED) throw UsageError("Socket must be initialized before binding"); host_ = {host_ip, host_port}; state_ = State::INITIALIZED; From 193bf6ec8aad384948ed112d1ebec9c62158210f Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 15:17:31 +0900 Subject: [PATCH 22/37] chore(udp): make error value checking consistent (== -1) Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index c19c02017..ebeab6e29 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -135,15 +135,15 @@ class UdpSocket int enable = 1; int result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); - if (result < 0) throw SocketError(errno); + if (result == -1) throw SocketError(errno); // Enable kernel-space receive time measurement result = setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMP, &enable, sizeof(enable)); - if (result < 0) throw SocketError(errno); + if (result == -1) throw SocketError(errno); // Enable reporting on packets dropped due to full UDP receive buffer result = setsockopt(sock_fd, SOL_SOCKET, SO_RXQ_OVFL, &enable, sizeof(enable)); - if (result < 0) throw SocketError(errno); + if (result == -1) throw SocketError(errno); poll_fd_ = {sock_fd, POLLIN, 0}; sock_fd_ = sock_fd; @@ -201,7 +201,7 @@ class UdpSocket auto buf_size = static_cast(bytes); int result = setsockopt(sock_fd_, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)); - if (result < 0) throw SocketError(errno); + if (result == -1) throw SocketError(errno); return *this; } @@ -222,7 +222,7 @@ class UdpSocket mreq.imr_interface.s_addr = inet_addr(host_.ip.c_str()); int result = setsockopt(sock_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); - if (result < 0) throw SocketError(errno); + if (result == -1) throw SocketError(errno); multicast_ip_.emplace(group_ip); return *this; @@ -344,7 +344,7 @@ class UdpSocket util::expected is_data_available() { int status = poll(&poll_fd_, 1, g_poll_timeout_ms); - if (status < 0) return errno; + if (status == -1) return errno; return (poll_fd_.revents & POLLIN) && (status > 0); } From 044208bbb5e3c9522c8f0796fcaef27fd570a319 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 15:17:53 +0900 Subject: [PATCH 23/37] chore(udp): add explanatory comment on handling of 0-length datagrams Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index ebeab6e29..79f96b84b 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -301,6 +301,8 @@ class UdpSocket buffer.resize(buffer_size_); auto msg_header = make_msg_header(buffer); + // As per `man recvmsg`, zero-length datagrams are permitted and valid. Since the socket is + // blocking, a recv_result of 0 means we received a valid 0-length datagram. ssize_t recv_result = recvmsg(sock_fd_, &msg_header.msg, MSG_TRUNC); if (recv_result < 0) throw SocketError(errno); size_t untruncated_packet_length = recv_result; From d60855f62b6674e9e2017f795ee23e045a01e04c Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 15:37:21 +0900 Subject: [PATCH 24/37] chore(udp): disallow binding to broadcast IP Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 79f96b84b..72ebd1c7b 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -113,6 +113,7 @@ class UdpSocket enum class State { UNINITIALIZED, INITIALIZED, BOUND, ACTIVE }; static const int g_poll_timeout_ms = 10; + static constexpr std::string_view broadcast_ip{"255.255.255.255"}; public: struct ReceiveMetadata @@ -159,6 +160,8 @@ class UdpSocket UdpSocket & init(const std::string & host_ip, uint16_t host_port) { if (state_ >= State::INITIALIZED) throw UsageError("Socket must be initialized before binding"); + if (host_ip == broadcast_ip) + throw UsageError("Do not bind to broadcast IP. Bind to 0.0.0.0 or a specific IP instead."); host_ = {host_ip, host_port}; state_ = State::INITIALIZED; From 237c2b054904329bac822d189f4fe5da4a9e02a5 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 15:37:30 +0900 Subject: [PATCH 25/37] chore(udp): bind to host IP instead of INADDR_ANY in non-multicast case Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 72ebd1c7b..f64f84a3b 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -244,7 +244,8 @@ class UdpSocket sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(host_.port); - addr.sin_addr.s_addr = multicast_ip_ ? inet_addr(multicast_ip_->c_str()) : INADDR_ANY; + const std::string & bind_ip = multicast_ip_ ? *multicast_ip_ : host_.ip; + addr.sin_addr.s_addr = inet_addr(bind_ip.c_str()); int result = ::bind(sock_fd_, (struct sockaddr *)&addr, sizeof(addr)); if (result == -1) throw SocketError(errno); From 11ed243f23518f7744694960c04b1ce5d0d6a5d2 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 16:01:53 +0900 Subject: [PATCH 26/37] feat(udp): make polling interval configurable Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index f64f84a3b..1493f0f26 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -112,7 +112,6 @@ class UdpSocket enum class State { UNINITIALIZED, INITIALIZED, BOUND, ACTIVE }; - static const int g_poll_timeout_ms = 10; static constexpr std::string_view broadcast_ip{"255.255.255.255"}; public: @@ -196,6 +195,23 @@ class UdpSocket return *this; } + /** + * @brief Set the interval at which the socket polls for new data. THis should be longer than the + * expected interval of packets arriving in order to not poll unnecessarily often, and should be + * shorter than the acceptable time delay for `unsubscribe()`. The `unsubscribe()` function blocks + * up to one full poll interval before returning. + * + * @param interval_ms The desired polling interval. See `man poll` for the meanings of 0 and + * negative values. + */ + UdpSocket & set_polling_interval(int32_t interval_ms) + { + if (state_ >= State::ACTIVE) throw UsageError("Poll timeout has to be set before subscribing"); + + polling_interval_ms_ = interval_ms; + return *this; + } + UdpSocket & set_socket_buffer_size(size_t bytes) { if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); @@ -349,7 +365,7 @@ class UdpSocket util::expected is_data_available() { - int status = poll(&poll_fd_, 1, g_poll_timeout_ms); + int status = poll(&poll_fd_, 1, polling_interval_ms_); if (status == -1) return errno; return (poll_fd_.revents & POLLIN) && (status > 0); } @@ -389,6 +405,7 @@ class UdpSocket int sock_fd_; pollfd poll_fd_; + int32_t polling_interval_ms_{10}; size_t buffer_size_{1500}; Endpoint host_; From ed904bd5df95613208388f0570f1f969054c0eaa Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 18:35:02 +0900 Subject: [PATCH 27/37] chore(udp): rename ReceiveMetadata to RxMetadata Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 8 ++++---- nebula_hw_interfaces/test/common/test_udp/utils.hpp | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 1493f0f26..fb7e0e0e2 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -115,14 +115,14 @@ class UdpSocket static constexpr std::string_view broadcast_ip{"255.255.255.255"}; public: - struct ReceiveMetadata + struct RxMetadata { std::optional timestamp_ns; uint64_t drops_since_last_receive{0}; bool truncated; }; - using callback_t = std::function &, const ReceiveMetadata &)>; + using callback_t = std::function &, const RxMetadata &)>; /** * @brief Construct a UDP socket with timestamp measuring enabled. The minimal way to start @@ -329,7 +329,7 @@ class UdpSocket if (!is_accepted_sender(msg_header.sender_addr)) continue; - ReceiveMetadata metadata; + RxMetadata metadata; get_receive_metadata(msg_header.msg, metadata); metadata.truncated = untruncated_packet_length > buffer_size_; @@ -339,7 +339,7 @@ class UdpSocket }); } - void get_receive_metadata(msghdr & msg, ReceiveMetadata & inout_metadata) + void get_receive_metadata(msghdr & msg, RxMetadata & inout_metadata) { for (cmsghdr * cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_level != SOL_SOCKET) continue; diff --git a/nebula_hw_interfaces/test/common/test_udp/utils.hpp b/nebula_hw_interfaces/test/common/test_udp/utils.hpp index dc9a08c0e..a4d8878ce 100644 --- a/nebula_hw_interfaces/test/common/test_udp/utils.hpp +++ b/nebula_hw_interfaces/test/common/test_udp/utils.hpp @@ -55,14 +55,13 @@ std::optional udp_send( } template -std::optional< - std::pair, nebula::drivers::connections::UdpSocket::ReceiveMetadata>> +std::optional, nebula::drivers::connections::UdpSocket::RxMetadata>> receive_once(nebula::drivers::connections::UdpSocket & sock, std::chrono::duration<_T, _R> timeout) { std::condition_variable cv_received_result; std::mutex mtx_result; std::optional< - std::pair, nebula::drivers::connections::UdpSocket::ReceiveMetadata>> + std::pair, nebula::drivers::connections::UdpSocket::RxMetadata>> result; sock.subscribe([&](const auto & data, const auto & metadata) { From d831f613cdfe8bfacdec5a0ac3495ba4e71b087c Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 19:20:10 +0900 Subject: [PATCH 28/37] chore(udp): parse IP addresses with error checking Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index fb7e0e0e2..7242bac55 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -79,7 +79,7 @@ class UdpSocket private: struct Endpoint { - std::string ip; + in_addr ip; uint16_t port; }; @@ -159,10 +159,12 @@ class UdpSocket UdpSocket & init(const std::string & host_ip, uint16_t host_port) { if (state_ >= State::INITIALIZED) throw UsageError("Socket must be initialized before binding"); - if (host_ip == broadcast_ip) + + in_addr host_in_addr = parse_ip_or_throw(host_ip); + if (host_in_addr.s_addr == INADDR_BROADCAST) throw UsageError("Do not bind to broadcast IP. Bind to 0.0.0.0 or a specific IP instead."); - host_ = {host_ip, host_port}; + host_ = {host_in_addr, host_port}; state_ = State::INITIALIZED; return *this; } @@ -177,7 +179,7 @@ class UdpSocket { if (state_ >= State::ACTIVE) throw UsageError("Sender has to be set before subscribing"); - sender_.emplace(Endpoint{sender_ip, sender_port}); + sender_.emplace(Endpoint{parse_ip_or_throw(sender_ip), sender_port}); return *this; } @@ -236,14 +238,12 @@ class UdpSocket if (state_ >= State::BOUND) throw UsageError("Multicast groups have to be joined before binding"); - ip_mreq mreq; - mreq.imr_multiaddr.s_addr = inet_addr(group_ip.c_str()); // Multicast group address - mreq.imr_interface.s_addr = inet_addr(host_.ip.c_str()); + ip_mreq mreq{parse_ip_or_throw(group_ip), host_.ip}; int result = setsockopt(sock_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); if (result == -1) throw SocketError(errno); - multicast_ip_.emplace(group_ip); + multicast_ip_.emplace(mreq.imr_multiaddr); return *this; } @@ -260,8 +260,7 @@ class UdpSocket sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(host_.port); - const std::string & bind_ip = multicast_ip_ ? *multicast_ip_ : host_.ip; - addr.sin_addr.s_addr = inet_addr(bind_ip.c_str()); + addr.sin_addr = multicast_ip_ ? *multicast_ip_ : host_.ip; int result = ::bind(sock_fd_, (struct sockaddr *)&addr, sizeof(addr)); if (result == -1) throw SocketError(errno); @@ -373,13 +372,10 @@ class UdpSocket bool is_accepted_sender(const sockaddr_in & sender_addr) { if (!sender_) return true; - - std::array sender_name; - inet_ntop(AF_INET, &sender_addr.sin_addr, sender_name.data(), INET_ADDRSTRLEN); - return std::strncmp(sender_->ip.c_str(), sender_name.data(), INET_ADDRSTRLEN) == 0; + return sender_addr.sin_addr.s_addr == sender_->ip.s_addr; } - MsgBuffers make_msg_header(std::vector & receive_buffer) const + static MsgBuffers make_msg_header(std::vector & receive_buffer) { msghdr msg{}; iovec iov{}; @@ -401,6 +397,14 @@ class UdpSocket return MsgBuffers{msg, iov, control, sender_addr}; } + static in_addr parse_ip_or_throw(const std::string & ip) + { + in_addr parsed_addr; + bool valid = inet_aton(ip.c_str(), &parsed_addr); + if (!valid) throw UsageError("Invalid IP address given"); + return parsed_addr; + } + std::atomic state_{State::UNINITIALIZED}; int sock_fd_; @@ -409,7 +413,7 @@ class UdpSocket size_t buffer_size_{1500}; Endpoint host_; - std::optional multicast_ip_; + std::optional multicast_ip_; std::optional sender_; std::thread receive_thread_; callback_t callback_; From 3124948b0ad11a0befb826fa6bad7bee03871a56 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Fri, 13 Dec 2024 19:30:24 +0900 Subject: [PATCH 29/37] test(udp): update and fix tests after changes of recent commits Signed-off-by: Max SCHMELLER --- nebula_hw_interfaces/test/common/test_udp.cpp | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/nebula_hw_interfaces/test/common/test_udp.cpp b/nebula_hw_interfaces/test/common/test_udp.cpp index 674077243..6b9262220 100644 --- a/nebula_hw_interfaces/test/common/test_udp.cpp +++ b/nebula_hw_interfaces/test/common/test_udp.cpp @@ -59,14 +59,14 @@ TEST(test_udp, test_basic_lifecycle) TEST(test_udp, test_special_addresses_bind) { - ASSERT_NO_THROW(UdpSocket().init(broadcast_ip, host_port).bind()); + ASSERT_THROW(UdpSocket().init(broadcast_ip, host_port), UsageError); ASSERT_NO_THROW(UdpSocket().init(any_ip, host_port).bind()); } -TEST(test_udp, test_wildcard_multicast_join_error) +TEST(test_udp, test_joining_invalid_multicast_group) { ASSERT_THROW( - UdpSocket().init(broadcast_ip, host_port).join_multicast_group(multicast_group).bind(), + UdpSocket().init(localhost_ip, host_port).join_multicast_group(broadcast_ip).bind(), SocketError); } @@ -98,13 +98,11 @@ TEST(test_udp, test_correct_usage_is_enforced) // The following functions can be called in any order, any number of times ASSERT_NO_THROW(UdpSocket().limit_to_sender(sender_ip, sender_port)); - ASSERT_NO_THROW(UdpSocket().init(localhost_ip, host_port)); - ASSERT_NO_THROW(UdpSocket() - .limit_to_sender(sender_ip, sender_port) - .init(localhost_ip, host_port) - .limit_to_sender(sender_ip, sender_port) - .init(localhost_ip, host_port) - .bind()); + ASSERT_NO_THROW( + UdpSocket().limit_to_sender(sender_ip, sender_port).limit_to_sender(sender_ip, sender_port)); + + // Sockets cannot be re-initialized + ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).init(localhost_ip, host_port), UsageError); // Sockets cannot be re-bound ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).bind().bind(), UsageError); From af49746b6d01377f1634e18573a85492f825e9ad Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 16 Dec 2024 09:36:18 +0900 Subject: [PATCH 30/37] feat(expected): add shorthand `value_or_throw()` function Signed-off-by: Max SCHMELLER --- nebula_common/include/nebula_common/util/expected.hpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nebula_common/include/nebula_common/util/expected.hpp b/nebula_common/include/nebula_common/util/expected.hpp index 45ed0b590..a64818772 100644 --- a/nebula_common/include/nebula_common/util/expected.hpp +++ b/nebula_common/include/nebula_common/util/expected.hpp @@ -79,6 +79,12 @@ struct expected throw std::runtime_error(error_msg); } + /// @brief If the instance has a value, return the value, else throw the stored error instance. + T value_or_throw() { + if (has_value()) return value(); + throw error(); + } + /// @brief Retrieve the error, or throw `bad_expected_access` if a value is contained. /// @return The error of type `E` E error() From f5a34843ac1dd514f32d078b0696d10b82d25468 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 16 Dec 2024 09:40:42 +0900 Subject: [PATCH 31/37] feat(udp): refactor to typestate-based builder pattern to make misuse a compiler error Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 377 ++++++++++-------- nebula_hw_interfaces/test/common/test_udp.cpp | 71 ++-- 2 files changed, 227 insertions(+), 221 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 7242bac55..4c5df6932 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -14,7 +14,6 @@ #pragma once -#include #ifndef _GNU_SOURCE // See `man strerror_r` #define _GNU_SOURCE @@ -29,6 +28,7 @@ #include #include +#include #include #include #include @@ -43,6 +43,7 @@ #include #include #include +#include #include namespace nebula::drivers::connections @@ -76,198 +77,224 @@ class UsageError : public std::runtime_error class UdpSocket { -private: struct Endpoint { in_addr ip; uint16_t port; }; - struct MsgBuffers + class SockFd { - msghdr msg{}; - iovec iov{}; - std::array control; - sockaddr_in sender_addr; - }; - - class DropMonitor - { - uint32_t last_drop_counter_{0}; + int sock_fd_; public: - uint32_t get_drops_since_last_receive(uint32_t current_drop_counter) + SockFd() : sock_fd_{-1} {} + SockFd(int sock_fd) : sock_fd_{sock_fd} {} + SockFd(SockFd && other) noexcept : sock_fd_{other.sock_fd_} { other.sock_fd_ = -1; } + + SockFd(const SockFd &) = delete; + SockFd & operator=(const SockFd &) = delete; + SockFd & operator=(SockFd && other) { - uint32_t last = last_drop_counter_; - last_drop_counter_ = current_drop_counter; + std::swap(sock_fd_, other.sock_fd_); + return *this; + }; - bool counter_did_wrap = current_drop_counter < last; - if (counter_did_wrap) { - return (UINT32_MAX - last) + current_drop_counter; - } + ~SockFd() + { + if (sock_fd_ == -1) return; + close(sock_fd_); + } - return current_drop_counter - last; + [[nodiscard]] int get() const { return sock_fd_; } + + template + [[nodiscard]] util::expected setsockopt( + int level, int optname, const T & optval) + { + int result = ::setsockopt(sock_fd_, level, optname, &optval, sizeof(T)); + if (result == -1) return SocketError(errno); + return std::monostate{}; } }; - enum class State { UNINITIALIZED, INITIALIZED, BOUND, ACTIVE }; + struct SocketConfig + { + int32_t polling_interval_ms{10}; + + size_t buffer_size{1500}; + Endpoint host; + std::optional multicast_ip; + std::optional sender; + }; - static constexpr std::string_view broadcast_ip{"255.255.255.255"}; + UdpSocket(SockFd sock_fd, SocketConfig config) + : sock_fd_(std::move(sock_fd)), poll_fd_{sock_fd_.get(), POLLIN, 0}, config_{std::move(config)} + { + } public: - struct RxMetadata + class Builder { - std::optional timestamp_ns; - uint64_t drops_since_last_receive{0}; - bool truncated; - }; + public: + /** + * @brief Build a UDP socket with timestamp measuring enabled. The minimal way to start + * receiving on the socket is `UdpSocket::Builder(...).bind().subscribe(...);`. + * + * @param host_ip The address to bind to. + * @param host_port The port to bind to. + */ + Builder(const std::string & host_ip, uint16_t host_port) + { + in_addr host_in_addr = parse_ip_or_throw(host_ip); + if (host_in_addr.s_addr == INADDR_BROADCAST) + throw UsageError("Do not bind to broadcast IP. Bind to 0.0.0.0 or a specific IP instead."); - using callback_t = std::function &, const RxMetadata &)>; + config_.host = {host_in_addr, host_port}; - /** - * @brief Construct a UDP socket with timestamp measuring enabled. The minimal way to start - * receiving on the socket is `UdpSocket().init(...).bind().subscribe(...);`. - */ - UdpSocket() - { - int sock_fd = socket(AF_INET, SOCK_DGRAM, 0); - if (sock_fd == -1) throw SocketError(errno); + int sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + if (sock_fd == -1) throw SocketError(errno); + sock_fd_ = SockFd{sock_fd}; - int enable = 1; - int result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); - if (result == -1) throw SocketError(errno); + sock_fd_.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1).value_or_throw(); - // Enable kernel-space receive time measurement - result = setsockopt(sock_fd, SOL_SOCKET, SO_TIMESTAMP, &enable, sizeof(enable)); - if (result == -1) throw SocketError(errno); + // Enable kernel-space receive time measurement + sock_fd_.setsockopt(SOL_SOCKET, SO_TIMESTAMP, 1).value_or_throw(); - // Enable reporting on packets dropped due to full UDP receive buffer - result = setsockopt(sock_fd, SOL_SOCKET, SO_RXQ_OVFL, &enable, sizeof(enable)); - if (result == -1) throw SocketError(errno); + // Enable reporting on packets dropped due to full UDP receive buffer + sock_fd_.setsockopt(SOL_SOCKET, SO_RXQ_OVFL, 1).value_or_throw(); + } - poll_fd_ = {sock_fd, POLLIN, 0}; - sock_fd_ = sock_fd; - } + /** + * @brief Set the socket to drop all packets not coming from `sender_ip` and `sender_port`. + * + * @param sender_ip The only allowed sender IP. Cannot be a multicast or broadcast address. + * @param sender_port The only allowed sender port. + */ + Builder && limit_to_sender(const std::string & sender_ip, uint16_t sender_port) + { + config_.sender.emplace(Endpoint{parse_ip_or_throw(sender_ip), sender_port}); + return std::move(*this); + } - /** - * @brief Specify the host address and port for this socket to be bound to. To bind the socket, - * call the `bind()` function. - * - * @param host_ip The address to bind to. - * @param host_port The port to bind to. - */ - UdpSocket & init(const std::string & host_ip, uint16_t host_port) - { - if (state_ >= State::INITIALIZED) throw UsageError("Socket must be initialized before binding"); + /** + * @brief Set the MTU this socket supports. While this can be set arbitrarily, it is best set to + * the MTU of the network interface, or to the maximum expected packet length. + * + * @param bytes The MTU size. The default value is 1500. + */ + Builder && set_mtu(size_t bytes) + { + config_.buffer_size = bytes; + return std::move(*this); + } - in_addr host_in_addr = parse_ip_or_throw(host_ip); - if (host_in_addr.s_addr == INADDR_BROADCAST) - throw UsageError("Do not bind to broadcast IP. Bind to 0.0.0.0 or a specific IP instead."); + /** + * @brief Set the internal socket receive buffer size. See `SO_RCVBUF` in `man 7 socket` for + * more information. + * + * @param bytes The desired buffer size in bytes. + */ + Builder && set_socket_buffer_size(size_t bytes) + { + if (bytes > static_cast(INT32_MAX)) + throw UsageError("The maximum value supported (0x7FFFFFF) has been exceeded"); - host_ = {host_in_addr, host_port}; - state_ = State::INITIALIZED; - return *this; - } + auto buf_size = static_cast(bytes); + sock_fd_.setsockopt(SOL_SOCKET, SO_RCVBUF, buf_size).value_or_throw(); + return std::move(*this); + } - /** - * @brief Set the socket to drop all packets not coming from `sender_ip` and `sender_port`. - * - * @param sender_ip The only allowed sender IP. Cannot be a multicast or broadcast address. - * @param sender_port The only allowed sender port. - */ - UdpSocket & limit_to_sender(const std::string & sender_ip, uint16_t sender_port) - { - if (state_ >= State::ACTIVE) throw UsageError("Sender has to be set before subscribing"); + /** + * @brief Join an IP multicast group. Only one group can be joined by the socket. + * + * @param group_ip The multicast IP. It has to be in the multicast range `224.0.0.0/4` (between + * `224.0.0.0` and `239.255.255.255`). + */ + Builder && join_multicast_group(const std::string & group_ip) + { + ip_mreq mreq{parse_ip_or_throw(group_ip), config_.host.ip}; - sender_.emplace(Endpoint{parse_ip_or_throw(sender_ip), sender_port}); - return *this; - } + sock_fd_.setsockopt(IPPROTO_IP, IP_ADD_MEMBERSHIP, mreq).value_or_throw(); + config_.multicast_ip.emplace(mreq.imr_multiaddr); + return std::move(*this); + } - /** - * @brief Set the MTU this socket supports. While this can be set arbitrarily, it is best set to - * the MTU of the network interface, or to the maximum expected packet length. - * - * @param bytes The MTU size. The default value is 1500. - */ - UdpSocket & set_mtu(size_t bytes) - { - if (state_ >= State::ACTIVE) throw UsageError("MTU size has to be set before subscribing"); + /** + * @brief Set the interval at which the socket polls for new data. THis should be longer than + * the expected interval of packets arriving in order to not poll unnecessarily often, and + * should be shorter than the acceptable time delay for `unsubscribe()`. The `unsubscribe()` + * function blocks up to one full poll interval before returning. + * + * @param interval_ms The desired polling interval. See `man poll` for the meanings of 0 and + * negative values. + */ + Builder && set_polling_interval(int32_t interval_ms) + { + config_.polling_interval_ms = interval_ms; + return std::move(*this); + } - buffer_size_ = bytes; - return *this; - } + /** + * @brief Bind the socket to host IP and port given in `init()`. If `join_multicast_group()` was + * called before this function, the socket will be bound to `group_ip` instead. At least + * `init()` has to have been called before. + */ + UdpSocket bind() && + { + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(config_.host.port); + addr.sin_addr = config_.multicast_ip ? *config_.multicast_ip : config_.host.ip; - /** - * @brief Set the interval at which the socket polls for new data. THis should be longer than the - * expected interval of packets arriving in order to not poll unnecessarily often, and should be - * shorter than the acceptable time delay for `unsubscribe()`. The `unsubscribe()` function blocks - * up to one full poll interval before returning. - * - * @param interval_ms The desired polling interval. See `man poll` for the meanings of 0 and - * negative values. - */ - UdpSocket & set_polling_interval(int32_t interval_ms) - { - if (state_ >= State::ACTIVE) throw UsageError("Poll timeout has to be set before subscribing"); + int result = ::bind(sock_fd_.get(), (struct sockaddr *)&addr, sizeof(addr)); + if (result == -1) throw SocketError(errno); - polling_interval_ms_ = interval_ms; - return *this; - } + return UdpSocket{std::move(sock_fd_), config_}; + } - UdpSocket & set_socket_buffer_size(size_t bytes) - { - if (state_ > State::INITIALIZED) throw UsageError("Buffer size has to be set before binding"); - if (bytes > static_cast(INT32_MAX)) - throw UsageError("The maximum value supported (0x7FFFFFF) has been exceeded"); + private: + SockFd sock_fd_; + SocketConfig config_; + }; - auto buf_size = static_cast(bytes); - int result = setsockopt(sock_fd_, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)); - if (result == -1) throw SocketError(errno); - return *this; - } +private: + struct MsgBuffers + { + msghdr msg{}; + iovec iov{}; + std::array control; + sockaddr_in sender_addr; + }; - /** - * @brief Join an IP multicast group. Only one group can be joined by the socket. - * - * @param group_ip The multicast IP. It has to be in the multicast range `224.0.0.0/4` (between - * `224.0.0.0` and `239.255.255.255`). - */ - UdpSocket & join_multicast_group(const std::string & group_ip) + class DropMonitor { - if (state_ < State::INITIALIZED) throw UsageError("Socket has to be initialized first"); - if (state_ >= State::BOUND) - throw UsageError("Multicast groups have to be joined before binding"); + uint32_t last_drop_counter_{0}; - ip_mreq mreq{parse_ip_or_throw(group_ip), host_.ip}; + public: + uint32_t get_drops_since_last_receive(uint32_t current_drop_counter) + { + uint32_t last = last_drop_counter_; + last_drop_counter_ = current_drop_counter; - int result = setsockopt(sock_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); - if (result == -1) throw SocketError(errno); + bool counter_did_wrap = current_drop_counter < last; + if (counter_did_wrap) { + return (UINT32_MAX - last) + current_drop_counter; + } - multicast_ip_.emplace(mreq.imr_multiaddr); - return *this; - } + return current_drop_counter - last; + } + }; - /** - * @brief Bind the socket to host IP and port given in `init()`. If `join_multicast_group()` was - * called before this function, the socket will be bound to `group_ip` instead. At least `init()` - * has to have been called before. - */ - UdpSocket & bind() +public: + struct RxMetadata { - if (state_ < State::INITIALIZED) throw UsageError("Socket has to be initialized first"); - if (state_ >= State::BOUND) throw UsageError("Re-binding already bound socket"); - - sockaddr_in addr{}; - addr.sin_family = AF_INET; - addr.sin_port = htons(host_.port); - addr.sin_addr = multicast_ip_ ? *multicast_ip_ : host_.ip; - - int result = ::bind(sock_fd_, (struct sockaddr *)&addr, sizeof(addr)); - if (result == -1) throw SocketError(errno); + std::optional timestamp_ns; + uint64_t drops_since_last_receive{0}; + bool truncated; + }; - state_ = State::BOUND; - return *this; - } + using callback_t = std::function &, const RxMetadata &)>; /** * @brief Register a callback for processing received packets and start the receiver thread. The @@ -278,9 +305,7 @@ class UdpSocket */ UdpSocket & subscribe(callback_t && callback) { - if (state_ < State::BOUND) throw UsageError("Socket has to be bound first"); - if (state_ > State::BOUND) throw UsageError("Cannot re-subscribe to socket"); - + unsubscribe(); callback_ = std::move(callback); launch_receiver(); return *this; @@ -292,37 +317,39 @@ class UdpSocket */ UdpSocket & unsubscribe() { - if (state_ == State::ACTIVE) state_ = State::BOUND; - if (receive_thread_.joinable()) receive_thread_.join(); + running_ = false; + if (receive_thread_.joinable()) { + receive_thread_.join(); + } return *this; } - ~UdpSocket() - { - unsubscribe(); - close(sock_fd_); - } + UdpSocket(const UdpSocket &) = delete; + UdpSocket(UdpSocket &&) = delete; + UdpSocket & operator=(const UdpSocket &) = delete; + UdpSocket & operator=(UdpSocket &&) = delete; + + ~UdpSocket() { unsubscribe(); } private: void launch_receiver() { - assert(state_ == State::BOUND); assert(callback_); - state_ = State::ACTIVE; + running_ = true; receive_thread_ = std::thread([this]() { std::vector buffer; - while (state_ == State::ACTIVE) { + while (running_) { auto data_available = is_data_available(); if (!data_available.has_value()) throw SocketError(data_available.error()); if (!data_available.value()) continue; - buffer.resize(buffer_size_); + buffer.resize(config_.buffer_size); auto msg_header = make_msg_header(buffer); // As per `man recvmsg`, zero-length datagrams are permitted and valid. Since the socket is // blocking, a recv_result of 0 means we received a valid 0-length datagram. - ssize_t recv_result = recvmsg(sock_fd_, &msg_header.msg, MSG_TRUNC); + ssize_t recv_result = recvmsg(sock_fd_.get(), &msg_header.msg, MSG_TRUNC); if (recv_result < 0) throw SocketError(errno); size_t untruncated_packet_length = recv_result; @@ -330,9 +357,9 @@ class UdpSocket RxMetadata metadata; get_receive_metadata(msg_header.msg, metadata); - metadata.truncated = untruncated_packet_length > buffer_size_; + metadata.truncated = untruncated_packet_length > config_.buffer_size; - buffer.resize(std::min(buffer_size_, untruncated_packet_length)); + buffer.resize(std::min(config_.buffer_size, untruncated_packet_length)); callback_(buffer, metadata); } }); @@ -364,15 +391,15 @@ class UdpSocket util::expected is_data_available() { - int status = poll(&poll_fd_, 1, polling_interval_ms_); + int status = poll(&poll_fd_, 1, config_.polling_interval_ms); if (status == -1) return errno; return (poll_fd_.revents & POLLIN) && (status > 0); } bool is_accepted_sender(const sockaddr_in & sender_addr) { - if (!sender_) return true; - return sender_addr.sin_addr.s_addr == sender_->ip.s_addr; + if (!config_.sender) return true; + return sender_addr.sin_addr.s_addr == config_.sender->ip.s_addr; } static MsgBuffers make_msg_header(std::vector & receive_buffer) @@ -405,16 +432,12 @@ class UdpSocket return parsed_addr; } - std::atomic state_{State::UNINITIALIZED}; - - int sock_fd_; + SockFd sock_fd_; pollfd poll_fd_; - int32_t polling_interval_ms_{10}; - size_t buffer_size_{1500}; - Endpoint host_; - std::optional multicast_ip_; - std::optional sender_; + SocketConfig config_; + + std::atomic_bool running_{false}; std::thread receive_thread_; callback_t callback_; diff --git a/nebula_hw_interfaces/test/common/test_udp.cpp b/nebula_hw_interfaces/test/common/test_udp.cpp index 6b9262220..415bc8017 100644 --- a/nebula_hw_interfaces/test/common/test_udp.cpp +++ b/nebula_hw_interfaces/test/common/test_udp.cpp @@ -54,19 +54,19 @@ util::expected read_sys_param(const std::string & param_f TEST(test_udp, test_basic_lifecycle) { ASSERT_NO_THROW( - UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb()).unsubscribe()); + UdpSocket::Builder(localhost_ip, host_port).bind().subscribe(empty_cb()).unsubscribe()); } TEST(test_udp, test_special_addresses_bind) { - ASSERT_THROW(UdpSocket().init(broadcast_ip, host_port), UsageError); - ASSERT_NO_THROW(UdpSocket().init(any_ip, host_port).bind()); + ASSERT_THROW(UdpSocket::Builder(broadcast_ip, host_port), UsageError); + ASSERT_NO_THROW(UdpSocket::Builder(any_ip, host_port).bind()); } TEST(test_udp, test_joining_invalid_multicast_group) { ASSERT_THROW( - UdpSocket().init(localhost_ip, host_port).join_multicast_group(broadcast_ip).bind(), + UdpSocket::Builder(localhost_ip, host_port).join_multicast_group(broadcast_ip).bind(), SocketError); } @@ -78,12 +78,11 @@ TEST(test_udp, test_buffer_resize) // Setting buffer sizes up to and including rmem_max shall succeed ASSERT_NO_THROW( - UdpSocket().init(localhost_ip, host_port).set_socket_buffer_size(rmem_max).bind()); + UdpSocket::Builder(localhost_ip, host_port).set_socket_buffer_size(rmem_max).bind()); // Linux only supports sizes up to INT32_MAX ASSERT_THROW( - UdpSocket() - .init(localhost_ip, host_port) + UdpSocket::Builder(localhost_ip, host_port) .set_socket_buffer_size(static_cast(INT32_MAX) + 1) .bind(), UsageError); @@ -91,53 +90,38 @@ TEST(test_udp, test_buffer_resize) TEST(test_udp, test_correct_usage_is_enforced) { - // These functions require other functions (e.g. `init()`) to be called beforehand - ASSERT_THROW(UdpSocket().bind(), UsageError); - ASSERT_THROW(UdpSocket().join_multicast_group(multicast_group), UsageError); - ASSERT_THROW(UdpSocket().subscribe(empty_cb()), UsageError); - // The following functions can be called in any order, any number of times - ASSERT_NO_THROW(UdpSocket().limit_to_sender(sender_ip, sender_port)); - ASSERT_NO_THROW( - UdpSocket().limit_to_sender(sender_ip, sender_port).limit_to_sender(sender_ip, sender_port)); - - // Sockets cannot be re-initialized - ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).init(localhost_ip, host_port), UsageError); - - // Sockets cannot be re-bound - ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).bind().bind(), UsageError); - - ASSERT_THROW( - UdpSocket().init(localhost_ip, host_port).bind().init(localhost_ip, host_port), UsageError); + ASSERT_NO_THROW(UdpSocket::Builder(localhost_ip, host_port) + .set_polling_interval(20) + .set_socket_buffer_size(3000) + .join_multicast_group(multicast_group) + .set_mtu(1600) + .limit_to_sender(sender_ip, sender_port) + .set_polling_interval(20) + .set_socket_buffer_size(3000) + .set_mtu(1600) + .limit_to_sender(sender_ip, sender_port) + .bind()); + + // Pre-existing subscriptions shall be gracefully unsubscribed when a new subscription is created ASSERT_NO_THROW( - UdpSocket().init(localhost_ip, host_port).bind().limit_to_sender(sender_ip, sender_port)); - - // Only bound sockets can be subscribed - ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).subscribe(empty_cb()), UsageError); - ASSERT_NO_THROW(UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb())); - - // Only one callback can exist - ASSERT_THROW( - UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb()).subscribe(empty_cb()), - UsageError); + UdpSocket::Builder(localhost_ip, host_port).bind().subscribe(empty_cb()).subscribe(empty_cb())); - // But un- and re-subscribing shall be supported - ASSERT_NO_THROW(UdpSocket() - .init(localhost_ip, host_port) + // Explicitly unsubscribing shall be supported + ASSERT_NO_THROW(UdpSocket::Builder(localhost_ip, host_port) .bind() .subscribe(empty_cb()) .unsubscribe() .subscribe(empty_cb())); // Unsubscribing on a non-subscribed socket shall also be supported - ASSERT_NO_THROW(UdpSocket().unsubscribe()); + ASSERT_NO_THROW(UdpSocket::Builder(localhost_ip, host_port).bind().unsubscribe()); } TEST(test_udp, test_receiving) { const std::vector payload{1, 2, 3}; - UdpSocket sock{}; - sock.init(localhost_ip, host_port).bind(); + auto sock = UdpSocket::Builder(localhost_ip, host_port).bind(); auto err_no_opt = udp_send(localhost_ip, host_port, payload); if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); @@ -158,8 +142,7 @@ TEST(test_udp, test_receiving_oversized) const size_t mtu = 1500; std::vector payload; payload.resize(mtu + 1, 0x42); - UdpSocket sock{}; - sock.init(localhost_ip, host_port).set_mtu(mtu).bind(); + auto sock = UdpSocket::Builder(localhost_ip, host_port).set_mtu(mtu).bind(); auto err_no_opt = udp_send(localhost_ip, host_port, payload); if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); @@ -177,8 +160,8 @@ TEST(test_udp, test_receiving_oversized) TEST(test_udp, test_filtering_sender) { std::vector payload{1, 2, 3}; - UdpSocket sock{}; - sock.init(localhost_ip, host_port).bind().limit_to_sender(sender_ip, sender_port); + auto sock = + UdpSocket::Builder(localhost_ip, host_port).limit_to_sender(sender_ip, sender_port).bind(); auto err_no_opt = udp_send(localhost_ip, host_port, payload); if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); From 4031b4227b999e2bb30cf6ec10bfd2c570941930 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 16 Dec 2024 00:41:14 +0000 Subject: [PATCH 32/37] ci(pre-commit): autofix --- nebula_common/include/nebula_common/util/expected.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nebula_common/include/nebula_common/util/expected.hpp b/nebula_common/include/nebula_common/util/expected.hpp index a64818772..cd6061716 100644 --- a/nebula_common/include/nebula_common/util/expected.hpp +++ b/nebula_common/include/nebula_common/util/expected.hpp @@ -80,7 +80,8 @@ struct expected } /// @brief If the instance has a value, return the value, else throw the stored error instance. - T value_or_throw() { + T value_or_throw() + { if (has_value()) return value(); throw error(); } From a9694e29b07cdc4d65f0ece7a30bfdbb3ed6de95 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 23 Dec 2024 13:50:00 +0900 Subject: [PATCH 33/37] chore(udp): replace `-1` with `uninitialized` in `SockFd` for clarity Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 4c5df6932..2c9eb2807 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -85,12 +85,13 @@ class UdpSocket class SockFd { + static const int uninitialized = -1; int sock_fd_; public: - SockFd() : sock_fd_{-1} {} + SockFd() : sock_fd_{uninitialized} {} SockFd(int sock_fd) : sock_fd_{sock_fd} {} - SockFd(SockFd && other) noexcept : sock_fd_{other.sock_fd_} { other.sock_fd_ = -1; } + SockFd(SockFd && other) noexcept : sock_fd_{other.sock_fd_} { other.sock_fd_ = uninitialized; } SockFd(const SockFd &) = delete; SockFd & operator=(const SockFd &) = delete; @@ -102,7 +103,7 @@ class UdpSocket ~SockFd() { - if (sock_fd_ == -1) return; + if (sock_fd_ == uninitialized) return; close(sock_fd_); } From d9629452af17d9f93a49a4c45e991a854e680d1b Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 23 Dec 2024 14:30:21 +0900 Subject: [PATCH 34/37] chore(udp): mark SockFd constructor explicit Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 2c9eb2807..dd9f34618 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -90,7 +90,7 @@ class UdpSocket public: SockFd() : sock_fd_{uninitialized} {} - SockFd(int sock_fd) : sock_fd_{sock_fd} {} + explicit SockFd(int sock_fd) : sock_fd_{sock_fd} {} SockFd(SockFd && other) noexcept : sock_fd_{other.sock_fd_} { other.sock_fd_ = uninitialized; } SockFd(const SockFd &) = delete; From 95fb8ea9ff7e967a25c295d9de8ceeea54f1a45a Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 23 Dec 2024 14:19:44 +0900 Subject: [PATCH 35/37] fix(udp): enforce that at most one multicast group is joined Signed-off-by: Max SCHMELLER --- .../nebula_hw_interfaces_common/connections/udp.hpp | 2 ++ nebula_hw_interfaces/test/common/test_udp.cpp | 9 ++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index dd9f34618..e9296d19d 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -214,6 +214,8 @@ class UdpSocket */ Builder && join_multicast_group(const std::string & group_ip) { + if (config_.multicast_ip) + throw UsageError("Only one multicast group can be joined by this socket"); ip_mreq mreq{parse_ip_or_throw(group_ip), config_.host.ip}; sock_fd_.setsockopt(IPPROTO_IP, IP_ADD_MEMBERSHIP, mreq).value_or_throw(); diff --git a/nebula_hw_interfaces/test/common/test_udp.cpp b/nebula_hw_interfaces/test/common/test_udp.cpp index 415bc8017..ea0aa29c3 100644 --- a/nebula_hw_interfaces/test/common/test_udp.cpp +++ b/nebula_hw_interfaces/test/common/test_udp.cpp @@ -27,6 +27,7 @@ static const char localhost_ip[] = "127.0.0.1"; static const char broadcast_ip[] = "255.255.255.255"; static const char any_ip[] = "0.0.0.0"; static const char multicast_group[] = "230.1.2.3"; +static const char multicast_group2[] = "230.4.5.6"; static const char sender_ip[] = "192.168.201"; static const uint16_t sender_port = 7373; @@ -94,7 +95,6 @@ TEST(test_udp, test_correct_usage_is_enforced) ASSERT_NO_THROW(UdpSocket::Builder(localhost_ip, host_port) .set_polling_interval(20) .set_socket_buffer_size(3000) - .join_multicast_group(multicast_group) .set_mtu(1600) .limit_to_sender(sender_ip, sender_port) .set_polling_interval(20) @@ -103,6 +103,13 @@ TEST(test_udp, test_correct_usage_is_enforced) .limit_to_sender(sender_ip, sender_port) .bind()); + // Only one multicast group can be joined + ASSERT_THROW( + UdpSocket::Builder(localhost_ip, host_port) + .join_multicast_group(multicast_group) + .join_multicast_group(multicast_group2), + UsageError); + // Pre-existing subscriptions shall be gracefully unsubscribed when a new subscription is created ASSERT_NO_THROW( UdpSocket::Builder(localhost_ip, host_port).bind().subscribe(empty_cb()).subscribe(empty_cb())); From 3d68921080135f9413959d5ecf316a6be780675d Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 23 Dec 2024 14:35:47 +0900 Subject: [PATCH 36/37] chore(udp): reorder UdpSocket class to reduce number of needed visibility modifiers Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 56 +++++++++---------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index e9296d19d..359b9cbb7 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -129,6 +129,33 @@ class UdpSocket std::optional sender; }; + struct MsgBuffers + { + msghdr msg{}; + iovec iov{}; + std::array control; + sockaddr_in sender_addr; + }; + + class DropMonitor + { + uint32_t last_drop_counter_{0}; + + public: + uint32_t get_drops_since_last_receive(uint32_t current_drop_counter) + { + uint32_t last = last_drop_counter_; + last_drop_counter_ = current_drop_counter; + + bool counter_did_wrap = current_drop_counter < last; + if (counter_did_wrap) { + return (UINT32_MAX - last) + current_drop_counter; + } + + return current_drop_counter - last; + } + }; + UdpSocket(SockFd sock_fd, SocketConfig config) : sock_fd_(std::move(sock_fd)), poll_fd_{sock_fd_.get(), POLLIN, 0}, config_{std::move(config)} { @@ -261,35 +288,6 @@ class UdpSocket SocketConfig config_; }; -private: - struct MsgBuffers - { - msghdr msg{}; - iovec iov{}; - std::array control; - sockaddr_in sender_addr; - }; - - class DropMonitor - { - uint32_t last_drop_counter_{0}; - - public: - uint32_t get_drops_since_last_receive(uint32_t current_drop_counter) - { - uint32_t last = last_drop_counter_; - last_drop_counter_ = current_drop_counter; - - bool counter_did_wrap = current_drop_counter < last; - if (counter_did_wrap) { - return (UINT32_MAX - last) + current_drop_counter; - } - - return current_drop_counter - last; - } - }; - -public: struct RxMetadata { std::optional timestamp_ns; From 0c5e3c0a320d457e5d4bee1f1a23b844c3bf3bf6 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 23 Dec 2024 15:35:39 +0900 Subject: [PATCH 37/37] fix(udp): make UDP socket object move-constructible to enable things like optional.emplace() etc. Signed-off-by: Max SCHMELLER --- .../connections/udp.hpp | 12 +++++++++- nebula_hw_interfaces/test/common/test_udp.cpp | 24 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp index 359b9cbb7..77f91eccc 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp @@ -312,6 +312,8 @@ class UdpSocket return *this; } + bool is_subscribed() { return running_; } + /** * @brief Gracefully stops the active receiver thread (if any) but keeps the socket alive. The * same socket can later be subscribed again. @@ -326,7 +328,15 @@ class UdpSocket } UdpSocket(const UdpSocket &) = delete; - UdpSocket(UdpSocket &&) = delete; + UdpSocket(UdpSocket && other) + : sock_fd_((other.unsubscribe(), std::move(other.sock_fd_))), + poll_fd_(std::move(other.poll_fd_)), + config_(std::move(other.config_)), + drop_monitor_(std::move(other.drop_monitor_)) + { + if (other.callback_) subscribe(std::move(other.callback_)); + }; + UdpSocket & operator=(const UdpSocket &) = delete; UdpSocket & operator=(UdpSocket &&) = delete; diff --git a/nebula_hw_interfaces/test/common/test_udp.cpp b/nebula_hw_interfaces/test/common/test_udp.cpp index ea0aa29c3..1c800878e 100644 --- a/nebula_hw_interfaces/test/common/test_udp.cpp +++ b/nebula_hw_interfaces/test/common/test_udp.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include namespace nebula::drivers::connections @@ -177,6 +178,29 @@ TEST(test_udp, test_filtering_sender) ASSERT_FALSE(result_opt.has_value()); } +TEST(test_udp, test_moveable) +{ + std::vector payload{1, 2, 3}; + + size_t n_received = 0; + + auto sock = UdpSocket::Builder(localhost_ip, host_port).bind(); + sock.subscribe([&n_received](const auto &, const auto &) { n_received++; }); + + auto err_no_opt = udp_send(localhost_ip, host_port, payload); + if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); + + // The subscription moves to the new socket object + UdpSocket sock2{std::move(sock)}; + ASSERT_TRUE(sock2.is_subscribed()); + + err_no_opt = udp_send(localhost_ip, host_port, payload); + if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value()); + + std::this_thread::sleep_for(100ms); + ASSERT_EQ(n_received, 2); +} + } // namespace nebula::drivers::connections int main(int argc, char * argv[])