Skip to content

Commit

Permalink
lci pp: fix messages larger than INT_MAX
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed Nov 10, 2024
1 parent 342d373 commit 25d15b6
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 37 deletions.
18 changes: 8 additions & 10 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
if (tchunk_size <= max_header_size - current_header_size)
{
current_header_size += tchunk_size;
}
Expand Down Expand Up @@ -118,12 +117,11 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
set<pos_numbytes_tchunk>(static_cast<value_type>(tchunk_size));
if (tchunk_size <= int(max_header_size - current_header_size))
if (tchunk_size <= max_header_size - current_header_size)
{
data_[pos_piggy_back_flag_tchunk] = 1;
std::memcpy(&data_[current_header_size],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand All @@ -178,8 +178,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_.resize(num_zero_copy_chunks);
for (int j = 0; j < num_zero_copy_chunks; ++j)
{
std::size_t chunk_size =
buffer.transmission_chunks_[j].second;
size_t chunk_size = buffer.transmission_chunks_[j].second;
HPX_ASSERT(iovec.lbuffers[i].length == chunk_size);
buffer.chunks_[j] = serialization::create_pointer_chunk(
iovec.lbuffers[i].address, chunk_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
rcvd_chunks,
locked
};
LCI_comp_t unified_recv(void* address, int length);
LCI_comp_t unified_recv(void* address, size_t length);
return_t receive_transmission_chunks();
return_t receive_data();
return_t receive_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
locked,
};
return_t send_header();
return_t unified_followup_send(void* address, int length);
return_t unified_followup_send(void* address, size_t length);
return_t send_transmission_chunks();
return_t send_data();
return_t send_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ namespace hpx::parcelset::policies::lci {
std::vector<
typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
iovec.lbuffers[i].address = tchunks.data();
iovec.lbuffers[i].length = tchunks_length;
if (config_t::reg_mem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand Down Expand Up @@ -135,11 +135,11 @@ namespace hpx::parcelset::policies::lci {
}

LCI_comp_t receiver_connection_sendrecv::unified_recv(
void* address, int length)
void* address, size_t length)
{
LCI_comp_t completion =
device_p->completion_manager_p->recv_followup->alloc_completion();
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t mbuffer;
mbuffer.address = address;
Expand Down Expand Up @@ -197,8 +197,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_tchunks)
{
auto& tchunks = buffer.transmission_chunks_;
int tchunk_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunk_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length);
state.store(next_state, std::memory_order_release);
Expand All @@ -221,8 +221,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_data)
{
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(
buffer.data_.data(), static_cast<int>(buffer.data_.size()));
LCI_comp_t completion =
unified_recv(buffer.data_.data(), buffer.data_.size());
state.store(next_state, std::memory_order_release);
return {false, completion};
}
Expand Down Expand Up @@ -316,8 +316,7 @@ namespace hpx::parcelset::policies::lci {
HPX_UNUSED(chunk_size);

state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand All @@ -344,8 +343,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_[idx] =
serialization::create_pointer_chunk(chunk.data(), chunk.size());
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ namespace hpx::parcelset::policies::lci {
}

sender_connection_sendrecv::return_t
sender_connection_sendrecv::unified_followup_send(void* address, int length)
sender_connection_sendrecv::unified_followup_send(
void* address, size_t length)
{
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t buffer;
buffer.address = address;
Expand Down Expand Up @@ -323,7 +324,7 @@ namespace hpx::parcelset::policies::lci {

std::vector<typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_size = (int) tchunks.size() *
size_t tchunks_size = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
auto ret = unified_followup_send(tchunks.data(), tchunks_size);
Expand Down Expand Up @@ -389,9 +390,8 @@ namespace hpx::parcelset::policies::lci {
{
state.store(
connection_state::locked, std::memory_order_relaxed);
auto ret =
unified_followup_send(const_cast<void*>(chunk.data_.cpos_),
static_cast<int>(chunk.size_));
auto ret = unified_followup_send(
const_cast<void*>(chunk.data_.cpos_), chunk.size_);
if (ret.status == return_status_t::done)
{
++send_chunks_idx;
Expand Down
41 changes: 40 additions & 1 deletion libs/full/parcelset/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,44 @@
# Copyright (c) 2020-2021 The STE||AR-Group
# Copyright (c) 2024 The STE||AR-Group
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

# Copyright (c) 2024 Hartmut Kaiser
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

if(HPX_WITH_NETWORKING)
set(tests ${tests} very_big_parcel)
set(very_big_parcel_PARAMETERS LOCALITIES 2)
endif()

foreach(test ${tests})
set(sources ${test}.cpp)

source_group("Source Files" FILES ${sources})

# add example executable
add_hpx_executable(
${test}_test INTERNAL_FLAGS
SOURCES ${sources} ${${test}_FLAGS}
EXCLUDE_FROM_ALL
HPX_PREFIX ${HPX_BUILD_PREFIX}
FOLDER "Tests/Regressions/Modules/Full/Parcelset"
)

add_hpx_regression_test("modules.parcelset" ${test} ${${test}_PARAMETERS})

endforeach()

if(HPX_WITH_NETWORKING)
# very_big_parcel with one additional configurations
add_hpx_regression_test(
"modules.parcelset" very_big_parcel_int_max_plus_1
EXECUTABLE very_big_parcel
PSEUDO_DEPS_NAME very_big_parcel ${very_big_parcel_PARAMETERS}
--nbytes-add=1
)
endif()
162 changes: 162 additions & 0 deletions libs/full/parcelset/tests/regressions/very_big_parcel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) 2024 Jiakun Yan
// Copyright (c) 2024 Marco Diers
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#if !defined(HPX_COMPUTE_DEVICE_CODE)
#include <hpx/hpx_init.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/async.hpp>
#include <hpx/include/lcos.hpp>
#include <hpx/include/runtime.hpp>
#include <hpx/include/serialization.hpp>
#include <hpx/include/util.hpp>
#include <hpx/modules/testing.hpp>

#include <algorithm>
#include <cstddef>
#include <vector>

///////////////////////////////////////////////////////////////////////////////
const std::size_t nbytes_default = std::numeric_limits<int>::max();
const std::size_t nbytes_add_default = 0;

struct config_t
{
size_t nbytes;
size_t nbytes_add;
} config;
///////////////////////////////////////////////////////////////////////////////
class Data
{
public:
Data() = default;
Data(std::size_t size)
: _data(size, 'a')
{
}
auto size() const
{
return _data.size();
}

char& operator[](size_t idx)
{
return _data[idx];
}

char operator[](size_t idx) const
{
return _data[idx];
}

template <typename Archive>
friend auto serialize(Archive& archive, Data& object, unsigned int version)
{
archive& object._data;
return;
}

private:
std::vector<char> _data{};
};

class Component : public hpx::components::component_base<Component>
{
public:
Component() = default;

auto call(Data data) -> void
{
std::cout << "Data size: " << data.size() << '\n';
bool flag = true;
size_t idx = 0;
for (; idx < data.size(); ++idx)
{
if (data[idx] != 'a')
{
flag = false;
break;
}
}
if (!flag)
std::cout << "Data[" << idx << "] = " << data[idx]
<< " instead of a\n";
else
std::cout << "data is correct\n";
HPX_TEST_EQ(flag, true);
return;
}

HPX_DEFINE_COMPONENT_ACTION(Component, call);
};

HPX_REGISTER_COMPONENT(hpx::components::component<Component>, Component);
HPX_REGISTER_ACTION(Component::call_action);

class ComponentClient
: public hpx::components::client_base<ComponentClient, Component>
{
using BaseType = hpx::components::client_base<ComponentClient, Component>;

public:
template <typename... Arguments>
ComponentClient(Arguments... arguments)
: BaseType(std::move(arguments)...)
{
}

template <typename... Arguments>
auto call(Arguments... arguments)
{
return hpx::async<Component::call_action>(
this->get_id(), std::move(arguments)...);
}
};

int hpx_main(hpx::program_options::variables_map& b_arg)
{
config.nbytes = b_arg["nbytes"].as<std::size_t>();
config.nbytes_add = b_arg["nbytes-add"].as<std::size_t>();

std::vector<ComponentClient> clients;
auto localities(hpx::find_all_localities());
std::transform(std::begin(localities), std::end(localities),
std::back_inserter(clients),
[](auto& loc) { return hpx::new_<ComponentClient>(loc); });

Data data(config.nbytes + config.nbytes_add);
std::vector<decltype(clients.front().call(data))> calls;
for (auto& client : clients)
{
calls.emplace_back(client.call(data));
}
hpx::wait_all(calls);

return hpx::finalize();
}

///////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
namespace po = hpx::program_options;
po::options_description description("HPX big parcel test");

description.add_options()("nbytes",
po::value<std::size_t>()->default_value(nbytes_default),
"number of bytes to send")("nbytes-add",
po::value<std::size_t>()->default_value(nbytes_add_default),
"number of additional bytes to send");

hpx::init_params init_args;
init_args.desc_cmdline = description;

// Initialize and run HPX
HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0);

return hpx::util::report_errors();
}
#endif

0 comments on commit 25d15b6

Please sign in to comment.