Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't copy contiguous bytes on reception (backport #343) #375

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include "attachment_helpers.hpp"
#include "cdr.hpp"
Expand All @@ -44,10 +43,10 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
SubscriptionData::Message::Message(
std::vector<uint8_t> && p,
const zenoh::Bytes & p,
uint64_t recv_ts,
AttachmentData && attachment_)
: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
{
}

Expand Down Expand Up @@ -225,7 +224,7 @@ bool SubscriptionData::init()

sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
sample.get_payload().as_vector(),
sample.get_payload(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down Expand Up @@ -303,13 +302,12 @@ bool SubscriptionData::init()
"Unable to obtain attachment")
return;
}
auto payload = sample.get_payload().clone();
auto attachment_value = attachment.value();

AttachmentData attachment_data(attachment_value);
sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
payload.as_vector(),
sample.get_payload(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down
5 changes: 2 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>

#include <zenoh.hxx>

Expand All @@ -51,13 +50,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
struct Message
{
explicit Message(
std::vector<uint8_t> && p,
const zenoh::Bytes & bytes,
uint64_t recv_ts,
AttachmentData && attachment);

~Message();

std::vector<uint8_t> payload;
Payload payload;
uint64_t recv_timestamp;
AttachmentData attachment;
};
Expand Down
51 changes: 51 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,55 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const
{
return received_timestamp_;
}

Payload::Payload(const zenoh::Bytes & bytes)
{
// NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of
// buffers contains exactly one element, it is not necessary to concatenate the list of buffers.
// In this case, we store a clone of the bytes object to maintain a non-zero reference-count on
// the buffer. This ensures that the slice into said buffer stays valid until we drop our copy
// of the bytes object (at the very least). This case corresponds to the `Contiguous`
// alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local"
// communication.

zenoh::Bytes::SliceIterator slices = bytes.slice_iter();
std::optional<zenoh::Slice> slice = slices.next();
if (!slice.has_value()) {
bytes_ = nullptr;
} else {
if (!slices.next().has_value()) {
bytes_ = Contiguous {slice.value(), bytes.clone()};
} else {
bytes_ = bytes.as_vector();
}
}
}

const uint8_t * Payload::data() const
{
if (std::holds_alternative<Empty>(bytes_)) {
return nullptr;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).data();
} else {
return std::get<Contiguous>(bytes_).slice.data;
}
}

size_t Payload::size() const
{
if (std::holds_alternative<Empty>(bytes_)) {
return 0;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).size();
} else {
return std::get<Contiguous>(bytes_).slice.len;
}
}

bool Payload::empty() const
{
return std::holds_alternative<Empty>(bytes_);
}

} // namespace rmw_zenoh_cpp
29 changes: 29 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <chrono>
#include <functional>
#include <optional>
#include <utility>
#include <variant>
#include <vector>

#include "rmw/types.h"

Expand Down Expand Up @@ -65,6 +68,32 @@ class ZenohQuery final
zenoh::Query query_;
std::chrono::nanoseconds::rep received_timestamp_;
};

class Payload
{
public:
explicit Payload(const zenoh::Bytes & bytes);

~Payload() = default;

const uint8_t * data() const;

size_t size() const;

bool empty() const;

private:
struct Contiguous
{
zenoh::Slice slice;
zenoh::Bytes bytes;
};
using NonContiguous = std::vector<uint8_t>;
using Empty = std::nullptr_t;
// Is `std::vector<uint8_t>` in case of a non-contiguous payload
// and `zenoh::Slice` plus a `zenoh::Bytes` otherwise.
std::variant<NonContiguous, Contiguous, Empty> bytes_;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ZENOH_UTILS_HPP_
Loading