diff --git a/.github/workflows/tests.examples.targets b/.github/workflows/tests.examples.targets index 0316ed2f8685..92417a9c33a9 100644 --- a/.github/workflows/tests.examples.targets +++ b/.github/workflows/tests.examples.targets @@ -43,3 +43,4 @@ tests.examples.quickstart.partitioned_vector_spmd_foreach tests.examples.quickstart.sort_by_key_demo tests.examples.transpose.transpose_block_numa tests.examples.modules.collectives.distributed.tcp.channel_communicator +tests.examples.modules.collectives.distributed.tcp.distributed_pi diff --git a/libs/full/collectives/examples/CMakeLists.txt b/libs/full/collectives/examples/CMakeLists.txt index 636f692d527f..1bc2db31e44a 100644 --- a/libs/full/collectives/examples/CMakeLists.txt +++ b/libs/full/collectives/examples/CMakeLists.txt @@ -17,11 +17,14 @@ else() return() endif() -set(example_programs channel_communicator) +set(example_programs channel_communicator distributed_pi) set(channel_communicator_PARAMETERS LOCALITIES 2 THREADS_PER_LOCALITY 2) set(channel_communicator_FLAGS DEPENDENCIES iostreams_component) +set(distributed_pi_PARAMETERS LOCALITIES 2 THREADS_PER_LOCALITY 2) +set(distributed_pi_FLAGS COMPILE_FLAGS -DHPX_HAVE_RUN_MAIN_EVERYWHERE) + foreach(example_program ${example_programs}) set(sources ${example_program}.cpp) diff --git a/libs/full/collectives/examples/distributed_pi.cpp b/libs/full/collectives/examples/distributed_pi.cpp new file mode 100644 index 000000000000..6e317d55e138 --- /dev/null +++ b/libs/full/collectives/examples/distributed_pi.cpp @@ -0,0 +1,47 @@ +// Copyright (c) 2025 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include + +#include +#include +#include +#include + +inline double sqr(double val) +{ + return val * val; +} + +int main(int argc, char* argv[]) +{ + std::size_t N = 1'000'000; + std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync); + std::uint32_t locality_id = hpx::get_locality_id(); + + if (locality_id == 0 && argc > 1) + N = std::stol(argv[1]); + + hpx::collectives::broadcast(hpx::collectives::get_world_communicator(), N); + + std::size_t const blocksize = N / num_localities; + std::size_t const begin = blocksize * locality_id; + std::size_t const end = blocksize * (locality_id + 1); + double h = 1.0 / N; + + double pi = 0.0; + for (std::size_t i = begin; i != end; ++i) + pi += h * 4.0 / (1 + sqr(i * h)); + + hpx::collectives::reduce( + hpx::collectives::get_world_communicator(), pi, std::plus{}); + + if (locality_id == 0) + std::cout << "pi: " << pi << std::endl; + + return 0; +} diff --git a/libs/full/collectives/include/hpx/collectives/broadcast.hpp b/libs/full/collectives/include/hpx/collectives/broadcast.hpp index 388a7b9af1ef..3f1c0189b1f8 100644 --- a/libs/full/collectives/include/hpx/collectives/broadcast.hpp +++ b/libs/full/collectives/include/hpx/collectives/broadcast.hpp @@ -467,7 +467,7 @@ namespace hpx::collectives { fid.wait(); // make sure communicator was created - if (this_site == fid.get_info().second) + if (this_site == std::get<2>(fid.get_info_ex())) { broadcast_to( hpx::launch::sync, HPX_MOVE(fid), value, this_site, generation); diff --git a/libs/full/collectives/include/hpx/collectives/channel_communicator.hpp b/libs/full/collectives/include/hpx/collectives/channel_communicator.hpp index ca011d3d17ef..88333d253b9d 100644 --- a/libs/full/collectives/include/hpx/collectives/channel_communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/channel_communicator.hpp @@ -115,9 +115,8 @@ namespace hpx { namespace collectives { #include #include #include -#include -namespace hpx { namespace collectives { +namespace hpx::collectives { // forward declarations class channel_communicator; @@ -126,10 +125,18 @@ namespace hpx { namespace collectives { hpx::future get( channel_communicator, that_site_arg, tag_arg = tag_arg()); + template + T get(hpx::launch::sync_policy, channel_communicator, that_site_arg, + tag_arg = tag_arg()); + template hpx::future set( channel_communicator, that_site_arg, T&&, tag_arg = tag_arg()); + template + void set(hpx::launch::sync_policy, channel_communicator, that_site_arg, T&&, + tag_arg = tag_arg()); + class channel_communicator { private: @@ -140,10 +147,18 @@ namespace hpx { namespace collectives { template friend hpx::future get(channel_communicator, that_site_arg, tag_arg); + template + friend T get(hpx::launch::sync_policy, channel_communicator, + that_site_arg, tag_arg); + template friend hpx::future set( channel_communicator, that_site_arg, T&&, tag_arg); + template + friend void set(hpx::launch::sync_policy, channel_communicator, + that_site_arg, T&&, tag_arg); + private: HPX_EXPORT channel_communicator(char const* basename, num_sites_arg num_sites, this_site_arg this_site, @@ -163,6 +178,11 @@ namespace hpx { namespace collectives { HPX_EXPORT void free(); + explicit operator bool() const noexcept + { + return comm_.get() != nullptr; + } + private: std::shared_ptr comm_; }; @@ -185,6 +205,14 @@ namespace hpx { namespace collectives { return comm.comm_->template get(site.argument_, tag.argument_); } + template + T get(hpx::launch::sync_policy, channel_communicator comm, + that_site_arg site, tag_arg tag) + { + return comm.comm_->template get(site.argument_, tag.argument_).get(); + } + + /////////////////////////////////////////////////////////////////////////// template hpx::future set( channel_communicator comm, that_site_arg site, T&& value, tag_arg tag) @@ -192,7 +220,26 @@ namespace hpx { namespace collectives { return comm.comm_->set( site.argument_, HPX_FORWARD(T, value), tag.argument_); } -}} // namespace hpx::collectives + + template + void set(hpx::launch::sync_policy, channel_communicator comm, + that_site_arg site, T&& value, tag_arg tag) + { + return comm.comm_ + ->set(site.argument_, HPX_FORWARD(T, value), tag.argument_) + .get(); + } + + /////////////////////////////////////////////////////////////////////////// + // Predefined p2p communicator (refers to all localities) + HPX_EXPORT channel_communicator get_world_channel_communicator(); + + namespace detail { + + HPX_EXPORT void create_world_channel_communicator(); + HPX_EXPORT void reset_world_channel_communicator(); + } // namespace detail +} // namespace hpx::collectives #endif // !HPX_COMPUTE_DEVICE_CODE #endif // DOXYGEN diff --git a/libs/full/collectives/include/hpx/collectives/create_communicator.hpp b/libs/full/collectives/include/hpx/collectives/create_communicator.hpp index 7aa5837354db..c8985f84b1ae 100644 --- a/libs/full/collectives/include/hpx/collectives/create_communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/create_communicator.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Hartmut Kaiser +// Copyright (c) 2020-2025 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -113,6 +113,7 @@ namespace hpx { namespace collectives { #include #include +#include #include /////////////////////////////////////////////////////////////////////////////// @@ -123,6 +124,7 @@ namespace hpx::collectives::detail { { num_sites_arg num_sites_; this_site_arg this_site_; + root_site_arg root_site_; }; } // namespace hpx::collectives::detail @@ -173,8 +175,13 @@ namespace hpx::collectives { { } - HPX_EXPORT void set_info( - num_sites_arg num_sites, this_site_arg this_site) noexcept; + HPX_EXPORT void set_info(num_sites_arg num_sites, + this_site_arg this_site, + root_site_arg root_site = root_site_arg()) noexcept; + + [[nodiscard]] HPX_EXPORT + std::tuple + get_info_ex() const noexcept; [[nodiscard]] HPX_EXPORT std::pair get_info() const noexcept; @@ -186,9 +193,26 @@ namespace hpx::collectives { }; /////////////////////////////////////////////////////////////////////////// - // Predefined global communicator + // Predefined global communicator (refers to all localities) HPX_EXPORT communicator get_world_communicator(); + namespace detail { + + HPX_EXPORT void create_global_communicator(); + HPX_EXPORT void reset_global_communicator(); + } // namespace detail + + /////////////////////////////////////////////////////////////////////////// + // Predefined local communicator (refers to all threads on the calling + // locality) + HPX_EXPORT communicator get_local_communicator(); + + namespace detail { + + HPX_EXPORT void create_local_communicator(); + HPX_EXPORT void reset_local_communicator(); + } // namespace detail + /////////////////////////////////////////////////////////////////////////// HPX_EXPORT communicator create_communicator(char const* basename, num_sites_arg num_sites = num_sites_arg(), diff --git a/libs/full/collectives/include/hpx/collectives/detail/channel_communicator.hpp b/libs/full/collectives/include/hpx/collectives/detail/channel_communicator.hpp index 9406d879dd8c..7355264a1d9a 100644 --- a/libs/full/collectives/include/hpx/collectives/detail/channel_communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/detail/channel_communicator.hpp @@ -19,7 +19,6 @@ #include #include #include -#include #include #include @@ -27,7 +26,7 @@ #include #include -namespace hpx { namespace collectives { namespace detail { +namespace hpx::collectives::detail { /////////////////////////////////////////////////////////////////////////// class channel_communicator_server @@ -39,7 +38,6 @@ namespace hpx { namespace collectives { namespace detail { public: channel_communicator_server() //-V730 - : data_() { HPX_ASSERT(false); // shouldn't ever be called } @@ -57,8 +55,7 @@ namespace hpx { namespace collectives { namespace detail { { std::unique_lock l(data_[which].mtx_); - util::ignore_while_checking il(&l); - HPX_UNUSED(il); + [[maybe_unused]] util::ignore_while_checking il(&l); channel_type& c = data_[which].channels_[tag]; f = c.get(); @@ -84,8 +81,7 @@ namespace hpx { namespace collectives { namespace detail { void set(std::size_t which, T value, std::size_t tag) { std::unique_lock l(data_[which].mtx_); - util::ignore_while_checking il(&l); - HPX_UNUSED(il); + [[maybe_unused]] util::ignore_while_checking il(&l); data_[which].channels_[tag].set(unique_any_nonser(HPX_MOVE(value))); } @@ -157,6 +153,6 @@ namespace hpx { namespace collectives { namespace detail { std::size_t this_site_; std::vector clients_; }; -}}} // namespace hpx::collectives::detail +} // namespace hpx::collectives::detail #endif // COMPUTE_HOST_CODE diff --git a/libs/full/collectives/include/hpx/collectives/reduce.hpp b/libs/full/collectives/include/hpx/collectives/reduce.hpp index edc5519ff660..a6893d6aa61a 100644 --- a/libs/full/collectives/include/hpx/collectives/reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/reduce.hpp @@ -524,7 +524,7 @@ namespace hpx::collectives { fid.wait(); // make sure communicator was created - if (this_site == fid.get_info().second) + if (this_site == std::get<2>(fid.get_info_ex())) { local_result = reduce_here(hpx::launch::sync, HPX_MOVE(fid), HPX_FORWARD(T, local_result), HPX_FORWARD(F, op), this_site, diff --git a/libs/full/collectives/src/channel_communicator.cpp b/libs/full/collectives/src/channel_communicator.cpp index e9da22859c0b..354c42db3d3e 100644 --- a/libs/full/collectives/src/channel_communicator.cpp +++ b/libs/full/collectives/src/channel_communicator.cpp @@ -17,10 +17,13 @@ #include #include #include +#include #include +#include #include #include +#include #include /////////////////////////////////////////////////////////////////////////////// @@ -90,6 +93,48 @@ namespace hpx::collectives { return create_channel_communicator(basename, num_sites, this_site) .get(); } + + /////////////////////////////////////////////////////////////////////////// + // Predefined channel (p2p) communicator + namespace { + + channel_communicator world_channel_communicator; + hpx::mutex world_channel_communicator_mtx; + } // namespace + + channel_communicator get_world_channel_communicator() + { + detail::create_world_channel_communicator(); + return world_channel_communicator; + } + + namespace detail { + + void create_world_channel_communicator() + { + std::unique_lock l(world_channel_communicator_mtx); + [[maybe_unused]] util::ignore_while_checking il(&l); + + if (!world_channel_communicator) + { + auto const num_sites = + num_sites_arg(agas::get_num_localities(hpx::launch::sync)); + auto const this_site = this_site_arg(agas::get_locality_id()); + + world_channel_communicator = + collectives::create_channel_communicator(hpx::launch::sync, + "world_channel_communicator", num_sites, this_site); + } + } + + void reset_world_channel_communicator() + { + if (world_channel_communicator) + { + world_channel_communicator.free(); + } + } + } // namespace detail } // namespace hpx::collectives #endif // !HPX_COMPUTE_DEVICE_CODE diff --git a/libs/full/collectives/src/create_communicator.cpp b/libs/full/collectives/src/create_communicator.cpp index 95874e424357..cfdf42180520 100644 --- a/libs/full/collectives/src/create_communicator.cpp +++ b/libs/full/collectives/src/create_communicator.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Hartmut Kaiser +// Copyright (c) 2020-2025 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -16,13 +16,17 @@ #include #include #include +#include +#include #include #include #include +#include #include #include #include +#include #include #include @@ -68,14 +72,15 @@ namespace hpx::collectives { } // namespace detail /////////////////////////////////////////////////////////////////////////// - void communicator::set_info( - num_sites_arg num_sites, this_site_arg this_site) noexcept + void communicator::set_info(num_sites_arg num_sites, + this_site_arg this_site, root_site_arg root_site) noexcept { - auto& [num_sites_, this_site_] = + auto& [num_sites_, this_site_, root_site_] = get_extra_data(); num_sites_ = num_sites; this_site_ = this_site; + root_site_ = root_site; } std::pair communicator::get_info() @@ -93,6 +98,22 @@ namespace hpx::collectives { return std::make_pair(num_sites_arg{}, this_site_arg{}); } + std::tuple + communicator::get_info_ex() const noexcept + { + auto const* client_data = + try_get_extra_data(); + + if (client_data != nullptr) + { + return std::make_tuple(client_data->num_sites_, + client_data->this_site_, client_data->root_site_); + } + + return std::make_tuple( + num_sites_arg{}, this_site_arg{}, root_site_arg()); + } + /////////////////////////////////////////////////////////////////////////// communicator create_communicator(char const* basename, num_sites_arg num_sites, this_site_arg this_site, @@ -141,13 +162,17 @@ namespace hpx::collectives { "operation was already registered: {}", target.registered_name()); } - target.set_info(num_sites, this_site); + target.set_info(num_sites, this_site, root_site); return target; }); } // find existing communicator - return hpx::find_from_basename(HPX_MOVE(name), root_site); + return hpx::find_from_basename(HPX_MOVE(name), root_site) + .then(hpx::launch::sync, [=](communicator&& c) { + c.set_info(num_sites, this_site, root_site); + return HPX_MOVE(c); + }); } /////////////////////////////////////////////////////////////////////////// @@ -193,31 +218,92 @@ namespace hpx::collectives { c.registered_name()); } - c.set_info(num_sites, this_site); + c.set_info(num_sites, this_site, root_site); return c; } // find existing communicator - return hpx::find_from_basename(HPX_MOVE(name), root_site); + return hpx::find_from_basename(HPX_MOVE(name), root_site) + .then(hpx::launch::sync, [=](communicator&& c) { + c.set_info(num_sites, this_site, root_site); + return HPX_MOVE(c); + }); } /////////////////////////////////////////////////////////////////////////// // Predefined global communicator namespace { + communicator world_communicator; - hpx::mutex world_communicator_mtx; + communicator local_communicator; + + hpx::mutex local_communicator_mtx; } // namespace communicator get_world_communicator() { + HPX_ASSERT(world_communicator); + return world_communicator; + } + + namespace detail { + + void create_global_communicator() { - std::lock_guard l(world_communicator_mtx); - if (!world_communicator) - world_communicator = - create_communicator("hpx::collectives::world_communicator"); + HPX_ASSERT(!world_communicator); + + auto const num_sites = + num_sites_arg(agas::get_num_localities(hpx::launch::sync)); + auto const this_site = this_site_arg(agas::get_locality_id()); + + world_communicator = create_communicator("/0/world_communicator", + num_sites, this_site, generation_arg(), root_site_arg(0)); + world_communicator.set_info(num_sites, this_site, root_site_arg(0)); } - return world_communicator; + + void reset_global_communicator() + { + HPX_ASSERT(world_communicator); + world_communicator.detach(); + } + } // namespace detail + + communicator get_local_communicator() + { + detail::create_local_communicator(); + return local_communicator; } + + namespace detail { + + void create_local_communicator() + { + std::unique_lock l(local_communicator_mtx); + [[maybe_unused]] util::ignore_while_checking il(&l); + + if (!local_communicator) + { + auto const num_sites = + num_sites_arg(hpx::get_num_worker_threads()); + auto const this_site = + this_site_arg(hpx::get_worker_thread_num()); + + local_communicator = collectives::create_local_communicator( + "local_communicator", num_sites, this_site, + generation_arg(), root_site_arg(0)); + local_communicator.set_info( + num_sites, this_site, root_site_arg(0)); + } + } + + void reset_local_communicator() + { + if (local_communicator) + { + local_communicator.detach(); + } + } + } // namespace detail } // namespace hpx::collectives #endif // !HPX_COMPUTE_DEVICE_CODE diff --git a/libs/full/collectives/src/detail/channel_communicator_server.cpp b/libs/full/collectives/src/detail/channel_communicator_server.cpp index 90b5385d9cef..cc66bed958fe 100644 --- a/libs/full/collectives/src/detail/channel_communicator_server.cpp +++ b/libs/full/collectives/src/detail/channel_communicator_server.cpp @@ -25,7 +25,7 @@ using channel_communicator_component = hpx::components::component< HPX_REGISTER_COMPONENT(channel_communicator_component) /////////////////////////////////////////////////////////////////////////////// -namespace hpx { namespace collectives { namespace detail { +namespace hpx::collectives::detail { /////////////////////////////////////////////////////////////////////////// channel_communicator::channel_communicator(char const* basename, @@ -36,6 +36,6 @@ namespace hpx { namespace collectives { namespace detail { // replace reference to our own client (manages base-name registration) clients_[this_site] = HPX_MOVE(here); } -}}} // namespace hpx::collectives::detail +} // namespace hpx::collectives::detail #endif // !HPX_COMPUTE_DEVICE_CODE diff --git a/libs/full/include/include/hpx/hpx.hpp b/libs/full/include/include/hpx/hpx.hpp index 1aa815b444b3..18b5c2045cd3 100644 --- a/libs/full/include/include/hpx/hpx.hpp +++ b/libs/full/include/include/hpx/hpx.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Hartmut Kaiser +// Copyright (c) 2007-2025 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/libs/full/init_runtime/src/pre_main.cpp b/libs/full/init_runtime/src/pre_main.cpp index 492245c1076a..50a43d8e73b3 100644 --- a/libs/full/init_runtime/src/pre_main.cpp +++ b/libs/full/init_runtime/src/pre_main.cpp @@ -11,6 +11,8 @@ #include #include +#include +#include #include #include #include @@ -158,6 +160,9 @@ namespace hpx { namespace detail { "barriers"; } + // create predefined communicators + hpx::collectives::detail::create_global_communicator(); + // create our global barrier... hpx::distributed::barrier::get_global_barrier() = hpx::distributed::barrier::create_global_barrier(); @@ -239,6 +244,11 @@ namespace hpx { namespace detail { void post_main() { + // destroy predefined communicators + hpx::collectives::detail::reset_global_communicator(); + hpx::collectives::detail::reset_local_communicator(); + hpx::collectives::detail::reset_world_channel_communicator(); + // simply destroy global barrier auto& b = hpx::distributed::barrier::get_global_barrier(); b[0].detach(); diff --git a/wrap/src/hpx_main.cpp b/wrap/src/hpx_main.cpp index bce43c781aa1..2126404d6891 100644 --- a/wrap/src/hpx_main.cpp +++ b/wrap/src/hpx_main.cpp @@ -4,7 +4,6 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -#include #include #include @@ -12,6 +11,8 @@ namespace hpx_startup { + void install_user_main_config(); + namespace { std::vector (*prev_user_main_config_function)(