From 604a27de079c31e770a519664e80fb91799ab7d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Wed, 11 Dec 2024 16:51:31 +0100 Subject: [PATCH 1/2] Fix unique network flows with TCP transports (#5461) * Refs #22055: Add regression tests Signed-off-by: cferreiragonz * Refs #22055: Fix unique flows for TCP Signed-off-by: cferreiragonz * Refs #22055: Fix tests Signed-off-by: cferreiragonz --------- Signed-off-by: cferreiragonz (cherry picked from commit 81cdb10a9076adb7262d450ccec3f47bf29f67da) # Conflicts: # test/blackbox/common/BlackboxTestsTransportTCP.cpp --- .../rtps/participant/RTPSParticipantImpl.cpp | 10 +- .../api/dds-pim/PubSubParticipant.hpp | 7 + .../common/BlackboxTestsTransportTCP.cpp | 223 ++++++++++++++++++ 3 files changed, 239 insertions(+), 1 deletion(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 6fc2bf66c36..5a56843007d 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1846,7 +1846,15 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( // Set port on unicast locators for (Locator_t& loc : attributes.unicastLocatorList) { - loc.port = port; + // Set logical port only TCP locators + if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind) + { + IPLocator::setLogicalPort(loc, port); + } + else + { + loc.port = port; + } } // Try creating receiver resources diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index bc4afb9800a..304b17b3b62 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -684,6 +684,13 @@ class PubSubParticipant return false; } + PubSubParticipant& initial_peers( + const eprosima::fastdds::rtps::LocatorList& initial_peers) + { + participant_qos_.wire_protocol().builtin.initialPeersList = initial_peers; + return *this; + } + PubSubParticipant& pub_property_policy( const eprosima::fastrtps::rtps::PropertyPolicy property_policy) { diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index d30aecf316f..22ea379033c 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -28,6 +28,14 @@ #include #include +<<<<<<< HEAD +======= +#include "../api/dds-pim/TCPReqRepHelloWorldRequester.hpp" +#include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp" +#include "PubSubParticipant.hpp" +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" +>>>>>>> 81cdb10a9 (Fix unique network flows with TCP transports (#5461)) #include "DatagramInjectionTransport.hpp" using namespace eprosima::fastrtps; @@ -1253,6 +1261,221 @@ TEST_P(TransportTCP, large_message_large_data_send_receive) reader.block_for_all(); } +<<<<<<< HEAD +======= +// Test CreateInitialConnection for TCP +TEST_P(TransportTCP, TCP_initial_peers_connection) +{ + PubSubWriter p1(TEST_TOPIC_NAME); + PubSubReader p2(TEST_TOPIC_NAME); + PubSubReader p3(TEST_TOPIC_NAME); + + // Add TCP Transport with listening port + std::shared_ptr p1_transport; + std::shared_ptr p2_transport; + std::shared_ptr p3_transport; + if (use_ipv6) + { + // TCPv6TransportDescriptor + p1_transport = std::make_shared(); + p2_transport = std::make_shared(); + p3_transport = std::make_shared(); + } + else + { + // TCPv4TransportDescriptor + p1_transport = std::make_shared(); + p2_transport = std::make_shared(); + p3_transport = std::make_shared(); + } + p1_transport->add_listener_port(global_port); + p2_transport->add_listener_port(global_port + 1); + p3_transport->add_listener_port(global_port - 1); + + // Add initial peer to clients + Locator_t initialPeerLocator; + initialPeerLocator.port = global_port; + if (use_ipv6) + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + + // Setup participants + p1.disable_builtin_transport() + .add_user_transport_to_pparams(p1_transport); + + p2.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(p2_transport); + + p3.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(p3_transport); + + // Init participants + p1.init(); + p2.init(); + p3.init(); + ASSERT_TRUE(p1.isInitialized()); + ASSERT_TRUE(p2.isInitialized()); + ASSERT_TRUE(p3.isInitialized()); + + // Wait for discovery + p1.wait_discovery(2, std::chrono::seconds(0)); + p2.wait_discovery(std::chrono::seconds(0), 1); + p3.wait_discovery(std::chrono::seconds(0), 1); + + // Send and receive data + auto data = default_helloworld_data_generator(); + p2.startReception(data); + p3.startReception(data); + + p1.send(data); + EXPECT_TRUE(data.empty()); + + p2.block_for_all(); + p3.block_for_all(); +} + +TEST_P(TransportTCP, tcp_unique_network_flows_init) +{ + // TCP Writer creation should fail as feature is not implemented for writers + { + PubSubWriter writer(TEST_TOPIC_NAME); + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + + test_transport_->add_listener_port(global_port); + writer.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + writer.entity_property_policy(properties).init(); + + EXPECT_FALSE(writer.isInitialized()); + } + + // Two readers on the same participant not requesting unique flows should give the same logical port and same physical port + { + PubSubParticipant participant(0, 2, 0, 0); + + participant.sub_topic_name(TEST_TOPIC_NAME); + + participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); + + EXPECT_TRUE(locators == locators2); + // LocatorList size depends on the number of interfaces. Different address but same port. + ASSERT_GT(locators.size(), 0); + ASSERT_GT(locators2.size(), 0); + auto locator1 = locators.begin(); + auto locator2 = locators2.begin(); + EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2)); + EXPECT_EQ(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2)); + } + + // Two TCP readers on the same participant requesting unique flows should give different logical ports but same physical port + { + PubSubParticipant participant(0, 2, 0, 0); + + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties); + + participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); + + EXPECT_FALSE(locators == locators2); + // LocatorList size depends on the number of interfaces. Different address but same port. + ASSERT_GT(locators.size(), 0); + ASSERT_GT(locators2.size(), 0); + auto locator1 = locators.begin(); + auto locator2 = locators2.begin(); + EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2)); + EXPECT_NE(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2)); + } +} + +TEST_P(TransportTCP, tcp_unique_network_flows_communication) +{ + PubSubParticipant readers(0, 2, 0, 2); + PubSubWriter writer(TEST_TOPIC_NAME); + + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + readers.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + eprosima::fastdds::rtps::Locator_t initial_peer_locator; + if (use_ipv6) + { + initial_peer_locator.kind = LOCATOR_KIND_TCPv6; + eprosima::fastdds::rtps::IPLocator::setIPv6(initial_peer_locator, "::1"); + } + else + { + initial_peer_locator.kind = LOCATOR_KIND_TCPv4; + eprosima::fastdds::rtps::IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); + } + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(initial_peer_locator, global_port); + eprosima::fastdds::rtps::LocatorList_t initial_peer_list; + initial_peer_list.push_back(initial_peer_locator); + + readers.sub_topic_name(TEST_TOPIC_NAME) + .sub_property_policy(properties) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .initial_peers(initial_peer_list); + + ASSERT_TRUE(readers.init_participant()); + ASSERT_TRUE(readers.init_subscriber(0)); + ASSERT_TRUE(readers.init_subscriber(1)); + + test_transport_->add_listener_port(global_port); + writer.disable_builtin_transport() + .add_user_transport_to_pparams(test_transport_) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .history_depth(100); + + writer.init(); + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + readers.sub_wait_discovery(); + + // Send data + auto data = default_helloworld_data_generator(); + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block until readers have acknowledged all samples. + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30))); +} + +>>>>>>> 81cdb10a9 (Fix unique network flows with TCP transports (#5461)) #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else From ecf8c082ea52f037d3c0808b148937dd7919b802 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 22 Jan 2025 10:23:09 +0100 Subject: [PATCH 2/2] Solve conflicts Signed-off-by: cferreiragonz --- .../fastrtps_deprecated/PubSubParticipant.hpp | 7 ++ .../common/BlackboxTestsTransportTCP.cpp | 104 +----------------- 2 files changed, 13 insertions(+), 98 deletions(-) diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp index 3131dc4efd5..a23c1abe86f 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp @@ -430,6 +430,13 @@ class PubSubParticipant return *this; } + PubSubParticipant& initial_peers( + const eprosima::fastrtps::rtps::LocatorList_t& initial_peers) + { + participant_attr_.rtps.builtin.initialPeersList = initial_peers; + return *this; + } + PubSubParticipant& add_user_transport_to_pparams( std::shared_ptr userTransportDescriptor) { diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 22ea379033c..e33d8a2a078 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -16,6 +16,7 @@ #include "TCPReqRepHelloWorldRequester.hpp" #include "TCPReqRepHelloWorldReplier.hpp" +#include "PubSubParticipant.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" @@ -28,14 +29,6 @@ #include #include -<<<<<<< HEAD -======= -#include "../api/dds-pim/TCPReqRepHelloWorldRequester.hpp" -#include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp" -#include "PubSubParticipant.hpp" -#include "PubSubReader.hpp" -#include "PubSubWriter.hpp" ->>>>>>> 81cdb10a9 (Fix unique network flows with TCP transports (#5461)) #include "DatagramInjectionTransport.hpp" using namespace eprosima::fastrtps; @@ -1261,90 +1254,6 @@ TEST_P(TransportTCP, large_message_large_data_send_receive) reader.block_for_all(); } -<<<<<<< HEAD -======= -// Test CreateInitialConnection for TCP -TEST_P(TransportTCP, TCP_initial_peers_connection) -{ - PubSubWriter p1(TEST_TOPIC_NAME); - PubSubReader p2(TEST_TOPIC_NAME); - PubSubReader p3(TEST_TOPIC_NAME); - - // Add TCP Transport with listening port - std::shared_ptr p1_transport; - std::shared_ptr p2_transport; - std::shared_ptr p3_transport; - if (use_ipv6) - { - // TCPv6TransportDescriptor - p1_transport = std::make_shared(); - p2_transport = std::make_shared(); - p3_transport = std::make_shared(); - } - else - { - // TCPv4TransportDescriptor - p1_transport = std::make_shared(); - p2_transport = std::make_shared(); - p3_transport = std::make_shared(); - } - p1_transport->add_listener_port(global_port); - p2_transport->add_listener_port(global_port + 1); - p3_transport->add_listener_port(global_port - 1); - - // Add initial peer to clients - Locator_t initialPeerLocator; - initialPeerLocator.port = global_port; - if (use_ipv6) - { - initialPeerLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(initialPeerLocator, "::1"); - } - else - { - initialPeerLocator.kind = LOCATOR_KIND_TCPv4; - IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); - } - LocatorList_t initial_peer_list; - initial_peer_list.push_back(initialPeerLocator); - - // Setup participants - p1.disable_builtin_transport() - .add_user_transport_to_pparams(p1_transport); - - p2.disable_builtin_transport() - .initial_peers(initial_peer_list) - .add_user_transport_to_pparams(p2_transport); - - p3.disable_builtin_transport() - .initial_peers(initial_peer_list) - .add_user_transport_to_pparams(p3_transport); - - // Init participants - p1.init(); - p2.init(); - p3.init(); - ASSERT_TRUE(p1.isInitialized()); - ASSERT_TRUE(p2.isInitialized()); - ASSERT_TRUE(p3.isInitialized()); - - // Wait for discovery - p1.wait_discovery(2, std::chrono::seconds(0)); - p2.wait_discovery(std::chrono::seconds(0), 1); - p3.wait_discovery(std::chrono::seconds(0), 1); - - // Send and receive data - auto data = default_helloworld_data_generator(); - p2.startReception(data); - p3.startReception(data); - - p1.send(data); - EXPECT_TRUE(data.empty()); - - p2.block_for_all(); - p3.block_for_all(); -} - TEST_P(TransportTCP, tcp_unique_network_flows_init) { // TCP Writer creation should fail as feature is not implemented for writers @@ -1429,19 +1338,19 @@ TEST_P(TransportTCP, tcp_unique_network_flows_communication) properties.properties().emplace_back("fastdds.unique_network_flows", ""); readers.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); - eprosima::fastdds::rtps::Locator_t initial_peer_locator; + Locator_t initial_peer_locator; if (use_ipv6) { initial_peer_locator.kind = LOCATOR_KIND_TCPv6; - eprosima::fastdds::rtps::IPLocator::setIPv6(initial_peer_locator, "::1"); + IPLocator::setIPv6(initial_peer_locator, "::1"); } else { initial_peer_locator.kind = LOCATOR_KIND_TCPv4; - eprosima::fastdds::rtps::IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); + IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); } - eprosima::fastdds::rtps::IPLocator::setPhysicalPort(initial_peer_locator, global_port); - eprosima::fastdds::rtps::LocatorList_t initial_peer_list; + IPLocator::setPhysicalPort(initial_peer_locator, global_port); + LocatorList_t initial_peer_list; initial_peer_list.push_back(initial_peer_locator); readers.sub_topic_name(TEST_TOPIC_NAME) @@ -1475,7 +1384,6 @@ TEST_P(TransportTCP, tcp_unique_network_flows_communication) EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30))); } ->>>>>>> 81cdb10a9 (Fix unique network flows with TCP transports (#5461)) #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else