Skip to content

Commit

Permalink
Single DDS publisher/subscriber pair per context
Browse files Browse the repository at this point in the history
There is no benefit to having a pair per node.

Signed-off-by: Erik Boasson <[email protected]>
  • Loading branch information
eboasson committed Apr 16, 2020
1 parent 52b7479 commit ce59290
Showing 1 changed file with 26 additions and 37 deletions.
63 changes: 26 additions & 37 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ struct rmw_context_impl_t
dds_entity_t rd_subscription;
dds_entity_t rd_publication;

/* DDS publisher, subscriber used for ROS2 publishers and subscriptions */
dds_entity_t dds_pub;
dds_entity_t dds_sub;

rmw_context_impl_t()
: common(), domain_id(UINT32_MAX), ppant(0)
{}
Expand All @@ -257,9 +261,6 @@ struct rmw_context_impl_t

struct CddsNode
{
/* DDS publisher, subscriber used for ROS2 publishers and subscriptions */
dds_entity_t pub;
dds_entity_t sub;
};

struct CddsPublisher : CddsEntity
Expand Down Expand Up @@ -914,6 +915,7 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
return RMW_RET_ERROR;
}

/* Create readers for DDS built-in topics for monitoring discovery */
if ((impl->rd_participant =
dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0)
{
Expand All @@ -936,18 +938,33 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
return RMW_RET_ERROR;
}

/* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers
to be created for RMW publishers/subscriptions. */
if ((impl->dds_pub = dds_create_publisher(impl->ppant, nullptr, nullptr)) < 0) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher");
return RMW_RET_ERROR;
}
if ((impl->dds_sub = dds_create_subscriber(impl->ppant, nullptr, nullptr)) < 0) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber");
return RMW_RET_ERROR;
}

rmw_qos_profile_t pubsub_qos = rmw_qos_profile_default;
pubsub_qos.avoid_ros_namespace_conventions = true;
pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST;
pubsub_qos.depth = 1;
pubsub_qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
pubsub_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE;

/* Create RMW publisher/subscription/guard condition used by rmw_dds_common
discovery */
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
std::unique_ptr<rmw_publisher_t, std::function<void(rmw_publisher_t *)>>
publisher(
create_publisher(
impl->ppant, impl->ppant,
impl->ppant, impl->dds_pub,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"ros_discovery_info",
&pubsub_qos,
Expand All @@ -970,7 +987,7 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
std::unique_ptr<rmw_subscription_t, std::function<void(rmw_subscription_t *)>>
subscription(
create_subscription(
impl->ppant, impl->ppant,
impl->ppant, impl->dds_sub,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"ros_discovery_info",
&pubsub_qos,
Expand Down Expand Up @@ -1092,27 +1109,9 @@ extern "C" rmw_node_t * rmw_create_node(
return nullptr;
}

/* Since ROS2 doesn't require anything fancy from DDS Subscribers or Publishers, create a single
pair & reuse that */
const dds_entity_t pp = context->impl->ppant;
dds_entity_t pub, sub;
if ((pub = dds_create_publisher(pp, nullptr, nullptr)) < 0) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher");
return nullptr;
}
if ((sub = dds_create_subscriber(pp, nullptr, nullptr)) < 0) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber");
dds_delete(pub);
return nullptr;
}

auto * node_impl = new CddsNode();
rmw_node_t * node_handle = nullptr;
RET_ALLOC_X(node_impl, goto fail_node_impl);
node_impl->pub = pub;
node_impl->sub = sub;

node_handle = rmw_node_allocate();
RET_ALLOC_X(node_handle, goto fail_node_handle);
Expand Down Expand Up @@ -1161,8 +1160,6 @@ extern "C" rmw_node_t * rmw_create_node(
fail_node_handle:
delete node_impl;
fail_node_impl:
dds_delete(pub);
dds_delete(sub);
return nullptr;
}

Expand All @@ -1187,8 +1184,6 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node)
common->pub, static_cast<void *>(&participant_msg), nullptr);
}

dds_delete(node_impl->pub);
dds_delete(node_impl->sub);
rmw_free(const_cast<char *>(node->name));
rmw_free(const_cast<char *>(node->namespace_));
rmw_node_free(node);
Expand Down Expand Up @@ -1788,10 +1783,8 @@ extern "C" rmw_publisher_t * rmw_create_publisher(
)
{
RET_WRONG_IMPLID_X(node, return nullptr);
auto node_impl = static_cast<CddsNode *>(node->data);
RET_NULL_X(node_impl, return nullptr);
rmw_publisher_t * pub = create_publisher(
node->context->impl->ppant, node_impl->pub,
node->context->impl->ppant, node->context->impl->dds_pub,
type_supports, topic_name, qos_policies,
publisher_options);
if (pub != nullptr) {
Expand Down Expand Up @@ -2060,10 +2053,8 @@ extern "C" rmw_subscription_t * rmw_create_subscription(
const rmw_subscription_options_t * subscription_options)
{
RET_WRONG_IMPLID_X(node, return nullptr);
auto node_impl = static_cast<CddsNode *>(node->data);
RET_NULL_X(node_impl, return nullptr);
rmw_subscription_t * sub = create_subscription(
node->context->impl->ppant, node_impl->sub,
node->context->impl->ppant, node->context->impl->dds_sub,
type_supports, topic_name, qos_policies,
subscription_options);
if (sub != nullptr) {
Expand Down Expand Up @@ -3050,8 +3041,6 @@ static rmw_ret_t rmw_init_cs(
RET_WRONG_IMPLID(node);
RET_NULL_OR_EMPTYSTR(service_name);
RET_NULL(qos_policies);
auto node_impl = static_cast<CddsNode *>(node->data);
RET_NULL(node_impl);
const rosidl_service_type_support_t * type_support = get_service_typesupport(type_supports);
RET_NULL(type_support);

Expand Down Expand Up @@ -3120,13 +3109,13 @@ static rmw_ret_t rmw_init_cs(
}
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1));
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
if ((pub->enth = dds_create_writer(node_impl->pub, pubtopic, qos, nullptr)) < 0) {
if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer;
}
get_entity_gid(pub->enth, pub->gid);
pub->sertopic = pub_stact;
if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) {
if ((sub->enth = dds_create_reader(node->context->impl->dds_sub, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader;
}
Expand Down

0 comments on commit ce59290

Please sign in to comment.