Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish GraphCache implementation for Pub/Sub #66

Merged
merged 12 commits into from
Nov 19, 2023
Prev Previous commit
Next Next commit
Fix types and type mangling.
When playing around with this I found that the types
weren't being properly filled in, and also weren't being
demangled.  Fix both of those things here, as well
as do a lot of cleanup.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Nov 17, 2023
commit f7711c7d3e9d967d71ff3cd7e17919f550d3a378
146 changes: 75 additions & 71 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
#include <utility>
#include <vector>

#include "rcpputils/find_and_replace.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rcutils/logging_macros.h"
@@ -38,6 +39,19 @@ std::string GenerateToken::liveliness(size_t domain_id)
}

///=============================================================================
/**
* Generate a liveliness token for the particular entity.
*
* The liveliness tokens are in the form:
*
* @ros2_lv/<domainid>/<entity>/<namespace>/<nodename>
*
* Where:
* <domainid> - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS.
* <entity> - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client.
* <namespace> - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons.
* <nodename> - The ROS node name for this entity.
*/
static std::string generate_base_token(
const std::string & entity,
size_t domain_id,
@@ -46,7 +60,7 @@ static std::string generate_base_token(
{
std::stringstream token_ss;
token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_;
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
// An empty namespace from rcl will contain "/"" but zenoh does not allow keys with "//".
// An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//".
// Hence we add an "_" to denote an empty namespace such that splitting the key
// will always result in 5 parts.
if (namespace_ == "/") {
@@ -79,7 +93,6 @@ std::string GenerateToken::publisher(
{
std::string token = generate_base_token("MP", domain_id, node_namespace, node_name);
token += topic + "/" + type + "/" + qos;
printf("GenerateToken::Publisher: %s\n", token.c_str());
return token;
}

@@ -94,7 +107,6 @@ std::string GenerateToken::subscription(
{
std::string token = generate_base_token("MS", domain_id, node_namespace, node_name);
token += topic + "/" + type + "/" + qos;
printf("GenerateToken::Subscription: %s\n", token.c_str());
return token;
}

@@ -213,23 +225,12 @@ std::optional<std::pair<std::string, GraphNode>> _parse_token(const std::string
}
}

// Get the entity, ie N, MP, MS, SS, SC.
// Get the entity, ie NN, MP, MS, SS, SC.
std::string & entity = parts[2];
if (entity != "NN" &&
entity != "MP" &&
entity != "MS" &&
entity != "SS" &&
entity != "SC")
{
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid entity [%s] in liveliness token", entity.c_str());
return std::nullopt;
}

GraphNode node;
// Nodes with empty namespaces will contain a "_".
node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3);
node.ns = parts[3] == "_" ? "/" : "/" + parts[3];
node.name = std::move(parts[4]);

if (entity != "NN") {
@@ -248,14 +249,21 @@ std::optional<std::pair<std::string, GraphNode>> _parse_token(const std::string
node.pubs.push_back(std::move(data));
} else if (entity == "MS") {
node.subs.push_back(std::move(data));
} else if (entity == "SS") {
// TODO(yadunund): Service server
} else if (entity == "SC") {
// TODO(yadunund): Service client
} else {
// TODO.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid entity [%s] in liveliness token", entity.c_str());
return std::nullopt;
}
}

return std::make_pair(std::move(entity), std::move(node));
}
} // namespace anonymous
} // namespace

///=============================================================================
GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count)
@@ -282,11 +290,10 @@ void GraphCache::parse_put(const std::string & keyexpr)
auto ns_it = graph_.find(node->ns);
if (ns_it == graph_.end()) {
// New namespace.
std::string ns = node->ns;
std::unordered_map<std::string, GraphNodePtr> map = {
{node->name, node}
};
graph_.insert(std::make_pair(std::move(ns), std::move(map)));
graph_.insert(std::make_pair(std::move(node->ns), std::move(map)));
} else {
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second) {
@@ -371,9 +378,9 @@ void GraphCache::parse_put(const std::string & keyexpr)
node->name.c_str());
return;
} else if (entity == "SS") {
// Service
// TODO(yadunund): Service server
} else if (entity == "SC") {
// Client
// TODO(yadunud): Service Client
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
@@ -501,9 +508,9 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
}
} else if (entity == "SS") {
// Service
// TODO(yadunund): Service server
} else if (entity == "SC") {
// Client
// TODO(yadunund): Service client
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
@@ -539,10 +546,6 @@ rmw_ret_t GraphCache::get_node_names(
for (auto it = graph_.begin(); it != graph_.end(); ++it) {
nodes_number += it->second.size();
}
// TODO(Yadunund): Delete.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"nodes_number %ld", nodes_number);

rcutils_ret_t rcutils_ret =
rcutils_string_array_init(node_names, nodes_number, allocator);
@@ -630,6 +633,33 @@ rmw_ret_t GraphCache::get_node_names(
return RMW_RET_OK;
}

namespace
{
// Shamelessly copied from https://github.com/ros2/rmw_cyclonedds/blob/f7f67bdef82f59558366aa6ce94ef9af3c5ab569/rmw_cyclonedds_cpp/src/demangle.cpp#L67
std::string
_demangle_if_ros_type(const std::string & dds_type_string)
{
if (dds_type_string[dds_type_string.size() - 1] != '_') {
// not a ROS type
return dds_type_string;
}

std::string substring = "dds_::";
size_t substring_position = dds_type_string.find(substring);
if (substring_position == std::string::npos) {
// not a ROS type
return dds_type_string;
}

std::string type_namespace = dds_type_string.substr(0, substring_position);
type_namespace = rcpputils::find_and_replace(type_namespace, "::", "/");
size_t start = substring_position + substring.size();
std::string type_name = dds_type_string.substr(start, dds_type_string.length() - 1 - start);
return type_namespace + type_name;
}

} // namespace

///=============================================================================
rmw_ret_t GraphCache::get_topic_names_and_types(
rcutils_allocator_t * allocator,
@@ -642,10 +672,6 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

std::lock_guard<std::mutex> lock(graph_mutex_);
const size_t topic_number = graph_topics_.size();
// TODO(Yadunund): Delete.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"topic_number %ld", topic_number);

rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator);
if (ret != RMW_RET_OK) {
@@ -659,66 +685,44 @@ rmw_ret_t GraphCache::get_topic_names_and_types(
}
});

// topic_names_and_types->names is an rcutils_string_array_t,
// while topic_names_and_types->types is an rcutils_string_array_t *

// rcutils_ret_t rcutils_ret =
// rcutils_string_array_init(topic_names_and_types->names, topic_number, allocator);
// if (rcutils_ret != RCUTILS_RET_OK) {
// return RMW_RET_BAD_ALLOC;
// }
auto free_topic_names = rcpputils::make_scope_exit(
[names = &topic_names_and_types->names]() {
rcutils_ret_t ret = rcutils_string_array_fini(names);
if (ret != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string().str);
}
});

rcutils_ret_t rcutils_ret =
rcutils_string_array_init(topic_names_and_types->types, topic_number, allocator);
if (rcutils_ret != RCUTILS_RET_OK) {
return RMW_RET_BAD_ALLOC;
}
auto free_topic_types = rcpputils::make_scope_exit(
[types = topic_names_and_types->types]() {
rcutils_ret_t ret = rcutils_string_array_fini(types);
if (ret != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string().str);
}
});

// Fill topic names and types.
std::size_t j = 0;
for (const auto & it : graph_topics_) {
// Split based on "?".
// TODO(Yadunund): Be more systematic about this.
// TODO(clalancette): Rather than doing the splitting here, should we store
// it in graph_topics_ already split?
std::vector<std::string> parts = split_keyexpr(it.first, '?');
if (parts.size() < 2) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid topic_key %s", it.first.c_str());
return RMW_RET_INVALID_ARGUMENT;
}
topic_names_and_types->names.data[j] = rcutils_strdup(parts.at(0).c_str(), *allocator);

topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator);
if (!topic_names_and_types->names.data[j]) {
return RMW_RET_BAD_ALLOC;
}
topic_names_and_types->types->data[j] = rcutils_strdup(
parts.at(1).c_str(), *allocator);
if (!topic_names_and_types->types->data[j]) {

// TODO(clalancette): This won't work if there are multiple types on the same topic
rcutils_ret_t rcutils_ret = rcutils_string_array_init(
&topic_names_and_types->types[j], 1, allocator);
if (RCUTILS_RET_OK != rcutils_ret) {
RMW_SET_ERROR_MSG(rcutils_get_error_string().str);
return RMW_RET_BAD_ALLOC;
}

topic_names_and_types->types[j].data[0] = rcutils_strdup(
_demangle_if_ros_type(parts[1]).c_str(), *allocator);
if (!topic_names_and_types->types[j].data[0]) {
return RMW_RET_BAD_ALLOC;
}

++j;
}

cleanup_names_and_types.cancel();
free_topic_names.cancel();
free_topic_types.cancel();

return RMW_RET_OK;
}
1 change: 0 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
@@ -79,7 +79,6 @@ class PublishToken
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{

struct TopicData
{
std::string topic;
57 changes: 0 additions & 57 deletions rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
Original file line number Diff line number Diff line change
@@ -41,62 +41,5 @@ rmw_get_topic_names_and_types(

return node->context->impl->graph_cache.get_topic_names_and_types(
allocator, no_demangle, topic_names_and_types);

// rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator);
// if (ret != RMW_RET_OK) {
// return ret;
// }
// auto cleanup_names_and_types = rcpputils::make_scope_exit(
// [topic_names_and_types] {
// rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
// if (fail_ret != RMW_RET_OK) {
// RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
// }
// });

// // topic_names_and_types->names is an rcutils_string_array_t,
// // while topic_names_and_types->types is an rcutils_string_array_t *

// topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator);
// if (topic_names_and_types->names.data[0] == nullptr) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic names");
// return RMW_RET_BAD_ALLOC;
// }
// auto free_names = rcpputils::make_scope_exit(
// [topic_names_and_types, allocator] {
// allocator->deallocate(topic_names_and_types->names.data[0], allocator->state);
// });

// rcutils_ret_t rcutils_ret = rcutils_string_array_init(
// topic_names_and_types->types, 1,
// allocator);
// if (rcutils_ret != RCUTILS_RET_OK) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic types");
// return RMW_RET_BAD_ALLOC;
// }
// auto fini_string_array = rcpputils::make_scope_exit(
// [topic_names_and_types] {
// rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types);
// if (fail_ret != RMW_RET_OK) {
// RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling");
// }
// });

// topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator);
// if (topic_names_and_types->types[0].data[0] == nullptr) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic data");
// return RMW_RET_BAD_ALLOC;
// }
// auto free_types = rcpputils::make_scope_exit(
// [topic_names_and_types, allocator] {
// allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state);
// });

// free_types.cancel();
// fini_string_array.cancel();
// free_names.cancel();
// cleanup_names_and_types.cancel();

return RMW_RET_OK;
}
} // extern "C"
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
@@ -248,7 +248,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// Setup liveliness subscriptions for discovery.
const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id);

// Query the router/liveliness participants to get graph information before this session was started.
// Query router/liveliness participants to get graph information before this session was started.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Sending Query '%s' to fetch discovery data...",
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
@@ -15,13 +15,13 @@
#include <fastcdr/FastBuffer.h>
#include <fastcdr/Cdr.h>

#include <zenoh.h>

#include <chrono>
#include <mutex>
#include <new>
#include <sstream>

#include <zenoh.h>

#include "detail/guard_condition.hpp"
#include "detail/graph_cache.hpp"
#include "detail/identifier.hpp"
@@ -247,7 +247,8 @@ rmw_destroy_node(rmw_node_t * node)
// return RMW_RET_ERROR;
// }

// Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset.
// Undeclare liveliness token for the node to advertise that the node has ridden
// off into the sunset.
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
zc_liveliness_undeclare_token(z_move(node_data->token));

@@ -1313,7 +1314,6 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)

auto sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
if (sub_data != nullptr) {

// Publish to the graph that a subscription has ridden off into the sunset
zc_liveliness_undeclare_token(z_move(sub_data->token));

Loading