Skip to content

Commit

Permalink
Merge branch 'rolling' into wip/merge-rolling
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Nov 7, 2024
2 parents 787376c + fbdd17a commit ec7776e
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 13 deletions.
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,8 @@ void GraphCache::set_querying_subscriber_callback(
std::unordered_map<std::size_t, QueryingSubscriberCallback>
>::iterator cb_it = querying_subs_cbs_.find(sub_keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[sub_keyexpr] = std::move(
std::unordered_map<std::size_t, QueryingSubscriberCallback>{});
querying_subs_cbs_[sub_keyexpr] =
std::unordered_map<std::size_t, QueryingSubscriberCallback>{};
cb_it = querying_subs_cbs_.find(sub_keyexpr);
}
cb_it->second.insert(std::make_pair(sub_keyxpr_hash, std::move(cb)));
Expand Down
11 changes: 8 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <condition_variable>
#include <cstring>
#include <chrono>
#include <memory>
#include <mutex>
#include <utility>

#include "liveliness_utils.hpp"
#include "logging_macros.hpp"
#include "rmw_data_types.hpp"

Expand Down Expand Up @@ -184,6 +182,13 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
z_drop(z_move(err_str));
return;
}

std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

client_data->add_new_reply(std::make_unique<ZenohReply>(reply, received_timestamp));
// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}

void client_data_drop(void * data)
Expand Down
14 changes: 13 additions & 1 deletion rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ ZenohQuery::~ZenohQuery() {z_drop(z_move(query_));}
const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);}

///=============================================================================
ZenohReply::ZenohReply(z_owned_reply_t reply) {reply_ = reply;}
ZenohReply::ZenohReply(
const z_owned_reply_t * reply,
std::chrono::nanoseconds::rep received_timestamp)
{
reply_ = *reply;
received_timestamp_ = received_timestamp;
}

///=============================================================================
ZenohReply::~ZenohReply() {z_drop(z_move(reply_));}
Expand All @@ -57,4 +63,10 @@ std::optional<const z_loaned_sample_t *> ZenohReply::get_sample() const

return std::nullopt;
}

///=============================================================================
std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const
{
return received_timestamp_;
}
} // namespace rmw_zenoh_cpp
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <zenoh.h>

#include <chrono>
#include <functional>
#include <optional>

#include "rmw/types.h"
Expand All @@ -34,14 +36,17 @@ create_map_and_set_sequence_num(
class ZenohReply final
{
public:
ZenohReply(z_owned_reply_t reply);
ZenohReply(const z_owned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp);

~ZenohReply();

std::optional<const z_loaned_sample_t *> get_sample() const;

std::chrono::nanoseconds::rep get_received_timestamp() const;

private:
z_owned_reply_t reply_;
std::chrono::nanoseconds::rep received_timestamp_;
};

// A class to store the queries made by clients.
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
extern "C"
{
//==============================================================================
/// Initialize the middleware with the given options, and yielding an context.
/// Initialize the middleware with the given options, and yield a context.
rmw_ret_t
rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
{
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/rmw_init_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ rmw_init_options_copy(const rmw_init_options_t * src, rmw_init_options_t * dst)
return ret;
}
auto free_discovery_options = rcpputils::make_scope_exit(
[&tmp, allocator]() {
[&tmp]() {
rmw_ret_t tmp_ret = rmw_discovery_options_fini(&tmp.discovery_options);
static_cast<void>(tmp_ret);
});
Expand Down
19 changes: 15 additions & 4 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,12 @@ rmw_create_client(
allocator->deallocate(client_data, allocator->state);
});

RMW_TRY_PLACEMENT_NEW(client_data, client_data, return nullptr, rmw_zenoh_cpp::rmw_client_data_t);
RMW_TRY_PLACEMENT_NEW(
client_data,
client_data,
return nullptr,
rmw_zenoh_cpp::rmw_client_data_t,
);
auto destruct_client_data = rcpputils::make_scope_exit(
[client_data]() {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
Expand Down Expand Up @@ -1869,9 +1874,15 @@ rmw_take_response(
return RMW_RET_ERROR;
}

auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
request_header->received_timestamp = now_ns.count();
if (!rmw_zenoh_cpp::get_gid_from_attachment(
&sample->attachment,
request_header->request_id.writer_guid))
{
RMW_SET_ERROR_MSG("Could not get client gid from attachment");
return RMW_RET_ERROR;
}

request_header->received_timestamp = latest_reply->get_received_timestamp();

z_drop(z_move(payload));
*taken = true;
Expand Down

0 comments on commit ec7776e

Please sign in to comment.