From 6e89d32938e56d9ab4bb522bbd9b6da7e847bec1 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 4 Dec 2024 17:07:41 +0100 Subject: [PATCH 1/3] Refs #22055: Add regression tests Signed-off-by: cferreiragonz --- .../api/dds-pim/PubSubParticipant.hpp | 7 + .../common/BlackboxTestsTransportTCP.cpp | 132 ++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index 4fcf93ff8db..ee4223a998c 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -701,6 +701,13 @@ class PubSubParticipant return false; } + PubSubParticipant& initial_peers( + const eprosima::fastdds::rtps::LocatorList& initial_peers) + { + participant_qos_.wire_protocol().builtin.initialPeersList = initial_peers; + return *this; + } + PubSubParticipant& pub_property_policy( const eprosima::fastdds::rtps::PropertyPolicy property_policy) { diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 53c6aafa3b1..2b534dfc317 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -25,6 +25,7 @@ #include "../api/dds-pim/TCPReqRepHelloWorldRequester.hpp" #include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp" +#include "PubSubParticipant.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" #include "DatagramInjectionTransport.hpp" @@ -1385,6 +1386,137 @@ TEST_P(TransportTCP, TCP_initial_peers_connection) p3.block_for_all(); } +TEST_P(TransportTCP, tcp_unique_network_flows_init) +{ + // TCP Writer creation should fail as feature is not implemented for writers + { + PubSubWriter writer(TEST_TOPIC_NAME); + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + + auto transport_desc = std::make_shared(); + transport_desc->add_listener_port(global_port); + writer.disable_builtin_transport().add_user_transport_to_pparams(transport_desc); + + writer.entity_property_policy(properties).init(); + + EXPECT_FALSE(writer.isInitialized()); + } + + // Two readers on the same participant not requesting unique flows should give the same logical port and same physical port + { + PubSubParticipant participant(0, 2, 0, 0); + + participant.sub_topic_name(TEST_TOPIC_NAME); + + test_transport_->add_listener_port(global_port); + participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); + + EXPECT_TRUE(locators == locators2); + ASSERT_EQ(locators.size(), 1); + ASSERT_EQ(locators2.size(), 1); + auto locator1 = locators.begin(); + auto locator2 = locators2.begin(); + EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2)); + EXPECT_EQ(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2)); + } + + // Two TCP readers on the same participant requesting unique flows should give different logical ports but same physical port + { + PubSubParticipant participant(0, 2, 0, 0); + + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties); + + test_transport_->add_listener_port(global_port); + participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); + + EXPECT_FALSE(locators == locators2); + ASSERT_EQ(locators.size(), 1); + ASSERT_EQ(locators2.size(), 1); + auto locator1 = locators.begin(); + auto locator2 = locators2.begin(); + EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2)); + EXPECT_NE(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2)); + } +} + +TEST_P(TransportTCP, tcp_unique_network_flows_communication) +{ + PubSubParticipant readers(0, 2, 0, 2); + PubSubWriter writer(TEST_TOPIC_NAME); + + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + readers.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + eprosima::fastdds::rtps::Locator_t initial_peer_locator; + if (use_ipv6) + { + initial_peer_locator.kind = LOCATOR_KIND_TCPv6; + eprosima::fastdds::rtps::IPLocator::setIPv6(initial_peer_locator, "::1"); + } + else + { + initial_peer_locator.kind = LOCATOR_KIND_TCPv4; + eprosima::fastdds::rtps::IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); + } + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(initial_peer_locator, global_port); + eprosima::fastdds::rtps::LocatorList_t initial_peer_list; + initial_peer_list.push_back(initial_peer_locator); + + readers.sub_topic_name(TEST_TOPIC_NAME) + .sub_property_policy(properties) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .initial_peers(initial_peer_list); + + ASSERT_TRUE(readers.init_participant()); + ASSERT_TRUE(readers.init_subscriber(0)); + ASSERT_TRUE(readers.init_subscriber(1)); + + test_transport_->add_listener_port(global_port); + writer.disable_builtin_transport() + .add_user_transport_to_pparams(test_transport_) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .history_depth(100); + + writer.init(); + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + readers.sub_wait_discovery(); + + // Send data + auto data = default_helloworld_data_generator(); + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block until readers have acknowledged all samples. + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30))); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else From d046bdf70dd9e7662836ac747af43e724b494865 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Thu, 5 Dec 2024 07:56:08 +0100 Subject: [PATCH 2/3] Refs #22055: Fix unique flows for TCP Signed-off-by: cferreiragonz --- src/cpp/rtps/participant/RTPSParticipantImpl.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index f137468d6c4..56b202131e3 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1737,7 +1737,15 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( // Set port on unicast locators for (Locator_t& loc : attributes.unicastLocatorList) { - loc.port = port; + // Set logical port only TCP locators + if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind) + { + IPLocator::setLogicalPort(loc, port); + } + else + { + loc.port = port; + } } // Try creating receiver resources From 9d5f9bac3f49f68de3e4738ea058b3e5380fd032 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Mon, 9 Dec 2024 17:03:16 +0100 Subject: [PATCH 3/3] Refs #22055: Fix tests Signed-off-by: cferreiragonz --- .../common/BlackboxTestsTransportTCP.cpp | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 2b534dfc317..bcd0ca47d9a 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -1394,9 +1394,8 @@ TEST_P(TransportTCP, tcp_unique_network_flows_init) PropertyPolicy properties; properties.properties().emplace_back("fastdds.unique_network_flows", ""); - auto transport_desc = std::make_shared(); - transport_desc->add_listener_port(global_port); - writer.disable_builtin_transport().add_user_transport_to_pparams(transport_desc); + test_transport_->add_listener_port(global_port); + writer.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); writer.entity_property_policy(properties).init(); @@ -1409,7 +1408,6 @@ TEST_P(TransportTCP, tcp_unique_network_flows_init) participant.sub_topic_name(TEST_TOPIC_NAME); - test_transport_->add_listener_port(global_port); participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); ASSERT_TRUE(participant.init_participant()); @@ -1423,8 +1421,9 @@ TEST_P(TransportTCP, tcp_unique_network_flows_init) participant.get_native_reader(1).get_listening_locators(locators2); EXPECT_TRUE(locators == locators2); - ASSERT_EQ(locators.size(), 1); - ASSERT_EQ(locators2.size(), 1); + // LocatorList size depends on the number of interfaces. Different address but same port. + ASSERT_GT(locators.size(), 0); + ASSERT_GT(locators2.size(), 0); auto locator1 = locators.begin(); auto locator2 = locators2.begin(); EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2)); @@ -1439,7 +1438,6 @@ TEST_P(TransportTCP, tcp_unique_network_flows_init) properties.properties().emplace_back("fastdds.unique_network_flows", ""); participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties); - test_transport_->add_listener_port(global_port); participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); ASSERT_TRUE(participant.init_participant()); @@ -1453,8 +1451,9 @@ TEST_P(TransportTCP, tcp_unique_network_flows_init) participant.get_native_reader(1).get_listening_locators(locators2); EXPECT_FALSE(locators == locators2); - ASSERT_EQ(locators.size(), 1); - ASSERT_EQ(locators2.size(), 1); + // LocatorList size depends on the number of interfaces. Different address but same port. + ASSERT_GT(locators.size(), 0); + ASSERT_GT(locators2.size(), 0); auto locator1 = locators.begin(); auto locator2 = locators2.begin(); EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2));