From 0f341bc482d27e521586430ac30ea48932d1c616 Mon Sep 17 00:00:00 2001 From: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com> Date: Wed, 18 Dec 2024 09:30:03 +0100 Subject: [PATCH] Arithmetic overflow in fragment size calculations (#5464) * Tests arithmetic overflow in fragment size calculations Signed-off-by: Eugenio Collado * Refs #21814. Fix code in BaseWriter.cpp. Signed-off-by: Miguel Company * Fix corner case overhead==max_data_size Signed-off-by: Eugenio Collado * Refs #21814. Fix code in WriterHistory.cpp. Signed-off-by: Miguel Company * Fix corner case overhead==final_high_mark_for_frag Signed-off-by: Eugenio Collado * Uncrustify Signed-off-by: Eugenio Collado * Fix log error message Signed-off-by: Eugenio Collado * Fix test fragments not been dropped Signed-off-by: Eugenio Collado * Fix corner case RTPSParticipantImpl max_data_size < overhead Signed-off-by: Eugenio Collado * Test refactor for windows compilation Signed-off-by: Eugenio Collado * Fix blackbox test Signed-off-by: Eugenio Collado * Applied review suggestions Signed-off-by: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com> --------- Signed-off-by: Eugenio Collado Signed-off-by: Miguel Company Signed-off-by: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com> Co-authored-by: Miguel Company (cherry picked from commit bfc5a530bd523e01e7f9acaa4aeac62b6481cbfe) # Conflicts: # src/cpp/rtps/writer/BaseWriter.cpp # test/blackbox/common/DDSBlackboxTestsListeners.cpp # test/unittest/rtps/history/CMakeLists.txt --- src/cpp/rtps/history/WriterHistory.cpp | 13 +- .../rtps/participant/RTPSParticipantImpl.cpp | 14 +- src/cpp/rtps/writer/BaseWriter.cpp | 425 ++++++++++++++++++ .../common/DDSBlackboxTestsListeners.cpp | 5 + test/unittest/rtps/history/CMakeLists.txt | 25 +- .../rtps/history/WriterHistoryTests.cpp | 119 +++++ 6 files changed, 591 insertions(+), 10 deletions(-) create mode 100644 src/cpp/rtps/writer/BaseWriter.cpp create mode 100644 test/unittest/rtps/history/WriterHistoryTests.cpp diff --git a/src/cpp/rtps/history/WriterHistory.cpp b/src/cpp/rtps/history/WriterHistory.cpp index c007a7db5fd..69a083450be 100644 --- a/src/cpp/rtps/history/WriterHistory.cpp +++ b/src/cpp/rtps/history/WriterHistory.cpp @@ -320,9 +320,16 @@ void WriterHistory::set_fragments( // If inlineqos for related_sample_identity is required, then remove its size from the final fragment size. if (0 < inline_qos_size) { - final_high_mark_for_frag -= ( - fastdds::dds::ParameterSerializer::PARAMETER_SENTINEL_SIZE + - inline_qos_size); + uint32_t overhead = fastdds::dds::ParameterSerializer::PARAMETER_SENTINEL_SIZE + inline_qos_size; + constexpr uint32_t min_fragment_size = 4; + if (final_high_mark_for_frag < (overhead + min_fragment_size)) + { + final_high_mark_for_frag = min_fragment_size; + } + else + { + final_high_mark_for_frag -= overhead; + } } // If it is big data, fragment it. diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 6fc2bf66c36..e80cdb82c16 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2390,20 +2390,22 @@ uint32_t RTPSParticipantImpl::getMaxDataSize() uint32_t RTPSParticipantImpl::calculateMaxDataSize( uint32_t length) { - uint32_t maxDataSize = length; - + // RTPS header + uint32_t overhead = RTPSMESSAGE_HEADER_SIZE; #if HAVE_SECURITY // If there is rtps messsage protection, reduce max size for messages, // because extra data is added on encryption. if (security_attributes_.is_rtps_protected) { - maxDataSize -= m_security_manager.calculate_extra_size_for_rtps_message(); + overhead += m_security_manager.calculate_extra_size_for_rtps_message(); } #endif // if HAVE_SECURITY - // RTPS header - maxDataSize -= RTPSMESSAGE_HEADER_SIZE; - return maxDataSize; + if (length <= overhead) + { + return 0; + } + return length - overhead; } bool RTPSParticipantImpl::networkFactoryHasRegisteredTransports() const diff --git a/src/cpp/rtps/writer/BaseWriter.cpp b/src/cpp/rtps/writer/BaseWriter.cpp new file mode 100644 index 00000000000..386badbdba9 --- /dev/null +++ b/src/cpp/rtps/writer/BaseWriter.cpp @@ -0,0 +1,425 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file BaseWriter.cpp + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +BaseWriter::BaseWriter( + RTPSParticipantImpl* impl, + const GUID_t& guid, + const WriterAttributes& att, + FlowController* flow_controller, + WriterHistory* hist, + WriterListener* listen) + : RTPSWriter(impl, guid, att) + , flow_controller_(flow_controller) + , history_(hist) + , listener_(listen) + , is_async_(att.mode == SYNCHRONOUS_WRITER ? false : true) + , separate_sending_enabled_(att.separate_sending) + , liveliness_kind_(att.liveliness_kind) + , liveliness_lease_duration_(att.liveliness_lease_duration) + , liveliness_announcement_period_(att.liveliness_announcement_period) +{ + init(att); + + history_->mp_writer = this; + history_->mp_mutex = &mp_mutex; + + flow_controller_->register_writer(this); + + EPROSIMA_LOG_INFO(RTPS_WRITER, "RTPSWriter created"); +} + +BaseWriter* BaseWriter::downcast( + RTPSWriter* writer) +{ + assert(nullptr != dynamic_cast(writer)); + return static_cast(writer); +} + +BaseWriter* BaseWriter::downcast( + Endpoint* endpoint) +{ + assert(nullptr != dynamic_cast(endpoint)); + return static_cast(endpoint); +} + +BaseWriter::~BaseWriter() +{ + EPROSIMA_LOG_INFO(RTPS_WRITER, "RTPSWriter destructor"); + + // Deletion of the events has to be made in child destructor. + // Also at this point all CacheChange_t must have been released by the child destructor + + history_->mp_writer = nullptr; + history_->mp_mutex = nullptr; +} + +bool BaseWriter::matched_reader_add( + const SubscriptionBuiltinTopicData& rqos) +{ + const auto& alloc = mp_RTPSParticipant->get_attributes().allocation; + ReaderProxyData rdata( + alloc.locators.max_unicast_locators, + alloc.locators.max_multicast_locators, + alloc.data_limits); + + from_builtin_to_proxy(rqos, rdata); + return matched_reader_add_edp(rdata); +} + +WriterListener* BaseWriter::get_listener() const +{ + return listener_; +} + +bool BaseWriter::set_listener( + WriterListener* listener) +{ + listener_ = listener; + return true; +} + +bool BaseWriter::is_async() const +{ + return is_async_; +} + +#ifdef FASTDDS_STATISTICS + +bool BaseWriter::add_statistics_listener( + std::shared_ptr listener) +{ + return add_statistics_listener_impl(listener); +} + +bool BaseWriter::remove_statistics_listener( + std::shared_ptr listener) +{ + return remove_statistics_listener_impl(listener); +} + +void BaseWriter::set_enabled_statistics_writers_mask( + uint32_t enabled_writers) +{ + set_enabled_statistics_writers_mask_impl(enabled_writers); +} + +#endif // FASTDDS_STATISTICS + +uint32_t BaseWriter::get_max_allowed_payload_size() +{ + uint32_t flow_max = flow_controller_->get_max_payload(); + uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize(); + uint32_t max_size = flow_max > part_max ? part_max : flow_max; + if (max_output_message_size_ < max_size) + { + max_size = max_output_message_size_; + } + + max_size = calculate_max_payload_size(max_size); + return max_size &= ~3; +} + +uint32_t BaseWriter::calculate_max_payload_size( + uint32_t datagram_length) +{ + constexpr uint32_t info_dst_message_length = 16; + constexpr uint32_t info_ts_message_length = 12; + constexpr uint32_t data_frag_submessage_header_length = 36; + constexpr uint32_t heartbeat_message_length = 32; + + uint32_t max_data_size = mp_RTPSParticipant->calculateMaxDataSize(datagram_length); + uint32_t overhead = info_dst_message_length + + info_ts_message_length + + data_frag_submessage_header_length + + heartbeat_message_length; + +#if HAVE_SECURITY + if (getAttributes().security_attributes().is_submessage_protected) + { + overhead += mp_RTPSParticipant->security_manager().calculate_extra_size_for_rtps_submessage(m_guid); + } + + if (getAttributes().security_attributes().is_payload_protected) + { + overhead += mp_RTPSParticipant->security_manager().calculate_extra_size_for_encoded_payload(m_guid); + } +#endif // if HAVE_SECURITY + +#ifdef FASTDDS_STATISTICS + overhead += eprosima::fastdds::statistics::rtps::statistics_submessage_length; +#endif // FASTDDS_STATISTICS + + constexpr uint32_t min_fragment_size = 4; + if ((overhead + min_fragment_size) > max_data_size) + { + auto min_datagram_length = overhead + min_fragment_size + 1 + (datagram_length - max_data_size); + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Datagram length '" << datagram_length << "' is too small." << + "At least " << min_datagram_length << " bytes are needed to send a message. Fixing fragments to " << + min_fragment_size << " bytes."); + return min_fragment_size; + } + + max_data_size -= overhead; + return max_data_size; +} + +void BaseWriter::add_statistics_sent_submessage( + CacheChange_t* change, + size_t num_locators) +{ + static_cast(change); + static_cast(num_locators); + +#ifdef FASTDDS_STATISTICS + change->writer_info.num_sent_submessages += num_locators; + on_data_generated(num_locators); +#endif // ifdef FASTDDS_STATISTICS +} + +bool BaseWriter::send_nts( + const std::vector& buffers, + const uint32_t& total_bytes, + const LocatorSelectorSender& locator_selector, + std::chrono::steady_clock::time_point& max_blocking_time_point) const +{ + RTPSParticipantImpl* participant = get_participant_impl(); + + return locator_selector.locator_selector.selected_size() == 0 || + participant->sendSync(buffers, total_bytes, m_guid, locator_selector.locator_selector.begin(), + locator_selector.locator_selector.end(), max_blocking_time_point); +} + +const dds::LivelinessQosPolicyKind& BaseWriter::get_liveliness_kind() const +{ + return liveliness_kind_; +} + +const dds::Duration_t& BaseWriter::get_liveliness_lease_duration() const +{ + return liveliness_lease_duration_; +} + +const dds::Duration_t& BaseWriter::get_liveliness_announcement_period() const +{ + return liveliness_announcement_period_; +} + +void BaseWriter::liveliness_lost() +{ + std::unique_lock lock(mp_mutex); + + liveliness_lost_status_.total_count++; + liveliness_lost_status_.total_count_change++; + if (listener_ != nullptr) + { + listener_->on_liveliness_lost(this, liveliness_lost_status_); + } + liveliness_lost_status_.total_count_change = 0u; +} + +bool BaseWriter::is_datasharing_compatible() const +{ + return (m_att.data_sharing_configuration().kind() != dds::OFF); +} + +bool BaseWriter::is_datasharing_compatible_with( + const dds::DataSharingQosPolicy& qos) const +{ + if (!is_datasharing_compatible() || qos.kind() == fastdds::dds::OFF) + { + return false; + } + + for (auto id : qos.domain_ids()) + { + if (std::find(m_att.data_sharing_configuration().domain_ids().begin(), + m_att.data_sharing_configuration().domain_ids().end(), id) + != m_att.data_sharing_configuration().domain_ids().end()) + { + return true; + } + } + return false; +} + +SequenceNumber_t BaseWriter::get_seq_num_min() +{ + CacheChange_t* change; + if (history_->get_min_change(&change) && change != nullptr) + { + return change->sequenceNumber; + } + else + { + return c_SequenceNumber_Unknown; + } +} + +SequenceNumber_t BaseWriter::get_seq_num_max() +{ + CacheChange_t* change; + if (history_->get_max_change(&change) && change != nullptr) + { + return change->sequenceNumber; + } + else + { + return c_SequenceNumber_Unknown; + } +} + +void BaseWriter::add_guid( + LocatorSelectorSender& locator_selector, + const GUID_t& remote_guid) +{ + const GuidPrefix_t& prefix = remote_guid.guidPrefix; + locator_selector.all_remote_readers.push_back(remote_guid); + if (std::find(locator_selector.all_remote_participants.begin(), + locator_selector.all_remote_participants.end(), prefix) == + locator_selector.all_remote_participants.end()) + { + locator_selector.all_remote_participants.push_back(prefix); + } +} + +void BaseWriter::compute_selected_guids( + LocatorSelectorSender& locator_selector) +{ + locator_selector.all_remote_readers.clear(); + locator_selector.all_remote_participants.clear(); + + for (LocatorSelectorEntry* entry : locator_selector.locator_selector.transport_starts()) + { + if (entry->enabled) + { + add_guid(locator_selector, entry->remote_guid); + } + } +} + +void BaseWriter::update_cached_info_nts( + LocatorSelectorSender& locator_selector) +{ + locator_selector.locator_selector.reset(true); + mp_RTPSParticipant->network_factory().select_locators(locator_selector.locator_selector); +} + +void BaseWriter::init( + const WriterAttributes& att) +{ + { + const std::string* max_size_property = + PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size"); + if (max_size_property != nullptr) + { + try + { + max_output_message_size_ = std::stoul(*max_size_property); + } + catch (const std::exception& e) + { + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what()); + } + } + } + + fixed_payload_size_ = 0; + if (history_->m_att.memoryPolicy == PREALLOCATED_MEMORY_MODE) + { + fixed_payload_size_ = history_->m_att.payloadMaxSize; + } + + if (att.endpoint.data_sharing_configuration().kind() != dds::OFF) + { + std::shared_ptr pool = std::dynamic_pointer_cast(history_->get_payload_pool()); + if (!pool || !pool->init_shared_memory(this, att.endpoint.data_sharing_configuration().shm_directory())) + { + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Could not initialize DataSharing writer pool"); + } + } +} + +void BaseWriter::local_actions_on_writer_removed() +{ + // First, unregister changes from FlowController. This action must be protected. + { + std::lock_guard guard(mp_mutex); + for (auto it = history_->changesBegin(); it != history_->changesEnd(); ++it) + { + flow_controller_->remove_change(*it, std::chrono::steady_clock::now() + std::chrono::hours(24)); + } + + for (auto it = history_->changesBegin(); it != history_->changesEnd(); ++it) + { + history_->release_change(*it); + } + + history_->m_changes.clear(); + } + flow_controller_->unregister_writer(this); +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 873537a8da3..528c412eb6b 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -2941,8 +2941,13 @@ TEST(DDSStatus, sample_rejected_waitset) .add_user_transport_to_pparams(testTransport) .disable_heartbeat_piggyback(true) .asynchronously(eprosima::fastdds::dds::PublishModeQosPolicyKind::ASYNCHRONOUS_PUBLISH_MODE) +<<<<<<< HEAD .add_throughput_controller_descriptor_to_pparams( // Be sure are sent in separate submessage each DATA. eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::FIFO, 100, 50) +======= + .add_flow_controller_descriptor_to_pparams( // Be sure are sent in separate submessage each DATA. + eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::FIFO, 300, 300) // Be sure the first message is processed before sending the second. +>>>>>>> bfc5a530 (Arithmetic overflow in fragment size calculations (#5464)) .init(); reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) diff --git a/test/unittest/rtps/history/CMakeLists.txt b/test/unittest/rtps/history/CMakeLists.txt index e9d2e48b0d7..46dfa65acc1 100644 --- a/test/unittest/rtps/history/CMakeLists.txt +++ b/test/unittest/rtps/history/CMakeLists.txt @@ -52,6 +52,8 @@ set(TOPICPAYLOADPOOLTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) +set(WRITERHISTORYTESTS_SOURCE WriterHistoryTests.cpp) + if(WIN32) add_definitions(-D_WIN32_WINNT=0x0601) endif() @@ -119,6 +121,7 @@ target_include_directories(TopicPayloadPoolTests PRIVATE target_link_libraries(TopicPayloadPoolTests GTest::gtest ${CMAKE_DL_LIBS}) +<<<<<<< HEAD add_gtest(TopicPayloadPoolTests SOURCES ${TOPICPAYLOADPOOLTESTS_SOURCE}) if(ANDROID) @@ -126,4 +129,24 @@ if(ANDROID) set_property(TARGET BasicPoolsTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") set_property(TARGET CacheChangePoolTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") set_property(TARGET CacheChangeTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") -endif() \ No newline at end of file +endif() +======= +gtest_discover_tests(TopicPayloadPoolTests) + + + +add_executable(WriterHistoryTests ${WRITERHISTORYTESTS_SOURCE}) +target_compile_definitions(WriterHistoryTests PRIVATE + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) +target_link_libraries(WriterHistoryTests + fastcdr + fastdds + foonathan_memory + GTest::gtest + ${CMAKE_DL_LIBS}) +gtest_discover_tests(WriterHistoryTests) +>>>>>>> bfc5a530 (Arithmetic overflow in fragment size calculations (#5464)) diff --git a/test/unittest/rtps/history/WriterHistoryTests.cpp b/test/unittest/rtps/history/WriterHistoryTests.cpp new file mode 100644 index 00000000000..3082120e7d4 --- /dev/null +++ b/test/unittest/rtps/history/WriterHistoryTests.cpp @@ -0,0 +1,119 @@ +// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + + +namespace eprosima { +namespace fastdds { +namespace rtps { + +using namespace testing; + +#define MAX_MESSAGE_SIZE 300 + +void cache_change_fragment( + uint32_t max_message_size, + uint32_t inline_qos_length, + bool expected_fragmentation) +{ + uint32_t domain_id = 0; + uint32_t initial_reserved_caches = 10; + std::string max_message_size_str = std::to_string(max_message_size); + + RTPSParticipantAttributes p_attr; + p_attr.properties.properties().emplace_back("fastdds.max_message_size", max_message_size_str); + RTPSParticipant* participant = RTPSDomain::createParticipant( + domain_id, true, p_attr); + + ASSERT_NE(participant, nullptr); + + HistoryAttributes h_attr; + h_attr.memoryPolicy = DYNAMIC_RESERVE_MEMORY_MODE; + h_attr.initialReservedCaches = initial_reserved_caches; + h_attr.payloadMaxSize = 250; + WriterHistory* history = new WriterHistory(h_attr); + + WriterAttributes w_attr; + RTPSWriter* writer = RTPSDomain::createRTPSWriter(participant, w_attr, history); + + ASSERT_NE(writer, nullptr); + + CacheChange_t* change = history->create_change(ALIVE); + if (expected_fragmentation) + { + change->serializedPayload.length = 3 * max_message_size; + } + else + { + change->serializedPayload.length = max_message_size / 3; + } + change->inline_qos.length = inline_qos_length; + history->add_change(change); + + auto result = change->getFragmentSize(); + std::cout << "Fragment size: " << result << std::endl; + if (expected_fragmentation) + { + ASSERT_NE(result, 0); + } + else + { + ASSERT_EQ(result, 0); + } +} + +/** + * This test checks the get_max_allowed_payload_size() method of the BaseWriter class. + * When setting the RTPS Participant Attribute property fastdds.max_message_size to a value lower than the + * message overhead, if the method does not overflow the fragment size will be set. + * If the max_message_size is big enough for the overhead, inline_qos and serializedPayload, + * then no fragmentation will occur. + */ +TEST(WriterHistoryTests, get_max_allowed_payload_size_overflow) +{ + cache_change_fragment(100, 0, true); + cache_change_fragment(MAX_MESSAGE_SIZE, 0, false); +} + +/** + * This test checks the fragment size calculation for a cache change depending on the inline qos length. + * The change.serializedPayload.length is set to 3 times the max_allowed_payload_size, so the fragment size should always be set. + * In case of an overflow in the attribute high_mark_for_frag_ the fragment size will not be set, which is an error. + */ +TEST(WriterHistoryTests, final_high_mark_for_frag_overflow) +{ + for (uint32_t inline_qos_length = 0; inline_qos_length < MAX_MESSAGE_SIZE; inline_qos_length += 40) + { + cache_change_fragment(MAX_MESSAGE_SIZE, inline_qos_length, true); + } +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}