diff --git a/.github/labeler.yml b/.github/labeler.yml index c589fda6099..368bf328b99 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -9,17 +9,6 @@ python: benchmarks: - 'benchmarks/**' -doc: - - 'docs/**' - - '**/*.md' - - 'datasets/**' - - 'notebooks/**' - - '**/*.txt' - - '**/*.rst' - - '**/*.ipynb' - - '**/*.pdf' - - '**/*.png' - datasets: - 'datasets/**' diff --git a/github/workflows/labeler.yml b/.github/workflows/labeler.yml similarity index 83% rename from github/workflows/labeler.yml rename to .github/workflows/labeler.yml index 23956a02fbd..31e78f82a62 100644 --- a/github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -6,6 +6,6 @@ jobs: triage: runs-on: ubuntu-latest steps: - - uses: actions/labeler@main + - uses: actions/labeler@v4 with: repo-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/CHANGELOG.md b/CHANGELOG.md index 33a5b2bc5e7..d165cd7efc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,84 @@ +# cuGraph 23.12.00 (6 Dec 2023) + +## 🚨 Breaking Changes + +- [BUG] Restore the original default order of CSR, which does not reverse edges in cuGraph-PyG ([#3980](https://github.com/rapidsai/cugraph/pull/3980)) [@alexbarghi-nv](https://github.com/alexbarghi-nv) +- `Resultset` and `Dataset` Refactors ([#3957](https://github.com/rapidsai/cugraph/pull/3957)) [@nv-rliu](https://github.com/nv-rliu) +- Moves more MG graph ETL to libcugraph and re-enables MG tests in CI ([#3941](https://github.com/rapidsai/cugraph/pull/3941)) [@jnke2016](https://github.com/jnke2016) + +## 🐛 Bug Fixes + +- Pin actions/labeler to v4 ([#4038](https://github.com/rapidsai/cugraph/pull/4038)) [@raydouglass](https://github.com/raydouglass) +- Find rmm before cuco ([#4011](https://github.com/rapidsai/cugraph/pull/4011)) [@vyasr](https://github.com/vyasr) +- Pin to minor versions of packages outside the cuGraph repository. ([#4004](https://github.com/rapidsai/cugraph/pull/4004)) [@bdice](https://github.com/bdice) +- Move MTMG_TEST to MG tests block ([#3993](https://github.com/rapidsai/cugraph/pull/3993)) [@naimnv](https://github.com/naimnv) +- Fix Leiden refinement phase ([#3990](https://github.com/rapidsai/cugraph/pull/3990)) [@naimnv](https://github.com/naimnv) +- [BUG] Fix Graph Construction From Pandas in cuGraph-PyG ([#3985](https://github.com/rapidsai/cugraph/pull/3985)) [@alexbarghi-nv](https://github.com/alexbarghi-nv) +- [BUG] Restore the original default order of CSR, which does not reverse edges in cuGraph-PyG ([#3980](https://github.com/rapidsai/cugraph/pull/3980)) [@alexbarghi-nv](https://github.com/alexbarghi-nv) +- Fix eigenvector testing and HITS testing discrepancies ([#3979](https://github.com/rapidsai/cugraph/pull/3979)) [@ChuckHastings](https://github.com/ChuckHastings) +- [BUG] Fix Incorrect Edge Index, Directory Selection in cuGraph-PyG Loader ([#3978](https://github.com/rapidsai/cugraph/pull/3978)) [@alexbarghi-nv](https://github.com/alexbarghi-nv) +- [BUG] Check if Dask has quit to avoid throwing an exception and triggering a segfault on ddp exit ([#3961](https://github.com/rapidsai/cugraph/pull/3961)) [@alexbarghi-nv](https://github.com/alexbarghi-nv) +- nx-cugraph: xfail test_louvain.py:test_threshold in Python 3.9 ([#3944](https://github.com/rapidsai/cugraph/pull/3944)) [@eriknw](https://github.com/eriknw) + +## 📖 Documentation + +- [DOC]: Fix invalid links and add materials to notebooks ([#4002](https://github.com/rapidsai/cugraph/pull/4002)) [@huiyuxie](https://github.com/huiyuxie) +- Update Broken Links in README.md ([#3924](https://github.com/rapidsai/cugraph/pull/3924)) [@nv-rliu](https://github.com/nv-rliu) + +## 🚀 New Features + +- Implement the transform_e primitive (to update property values for all edges) ([#3917](https://github.com/rapidsai/cugraph/pull/3917)) [@seunghwak](https://github.com/seunghwak) +- Update the neighbor intersection primitive to support edge masking. ([#3550](https://github.com/rapidsai/cugraph/pull/3550)) [@seunghwak](https://github.com/seunghwak) + +## 🛠️ Improvements + +- Correct defect found in DLFW testing ([#4021](https://github.com/rapidsai/cugraph/pull/4021)) [@ChuckHastings](https://github.com/ChuckHastings) +- `nx-cugraph` README update: adds missing `connected_components` algo to table ([#4019](https://github.com/rapidsai/cugraph/pull/4019)) [@rlratzel](https://github.com/rlratzel) +- Build concurrency for nightly and merge triggers ([#4009](https://github.com/rapidsai/cugraph/pull/4009)) [@bdice](https://github.com/bdice) +- Support `drop_last` Argument in cuGraph-PyG Loader ([#3995](https://github.com/rapidsai/cugraph/pull/3995)) [@alexbarghi-nv](https://github.com/alexbarghi-nv) +- Adds `update-version.sh` support for recently added files containing RAPIDS versions ([#3994](https://github.com/rapidsai/cugraph/pull/3994)) [@rlratzel](https://github.com/rlratzel) +- Use new `rapids-dask-dependency` metapackage for managing `dask` versions ([#3991](https://github.com/rapidsai/cugraph/pull/3991)) [@galipremsagar](https://github.com/galipremsagar) +- Fixes to nx-cugraph README: fixes typos, updates link to NX backend docs ([#3989](https://github.com/rapidsai/cugraph/pull/3989)) [@rlratzel](https://github.com/rlratzel) +- Address FIXMEs ([#3988](https://github.com/rapidsai/cugraph/pull/3988)) [@seunghwak](https://github.com/seunghwak) +- Updates README file to include nx-cugraph user documentation, adds nx-cugraph to main README ([#3984](https://github.com/rapidsai/cugraph/pull/3984)) [@rlratzel](https://github.com/rlratzel) +- Update C API graph creation function signatures ([#3982](https://github.com/rapidsai/cugraph/pull/3982)) [@ChuckHastings](https://github.com/ChuckHastings) +- [REVIEW]Optimize cugraph-DGL csc codepath ([#3977](https://github.com/rapidsai/cugraph/pull/3977)) [@VibhuJawa](https://github.com/VibhuJawa) +- nx-cugraph: add SSSP (unweighted) ([#3976](https://github.com/rapidsai/cugraph/pull/3976)) [@eriknw](https://github.com/eriknw) +- CuGraph compatibility fixes ([#3973](https://github.com/rapidsai/cugraph/pull/3973)) [@brandon-b-miller](https://github.com/brandon-b-miller) +- Skip certain `cugraph-pyg` tests when torch-sparse is not available ([#3970](https://github.com/rapidsai/cugraph/pull/3970)) [@tingyu66](https://github.com/tingyu66) +- nx-cugraph: add `eigenvector_centrality`, `katz_centrality`, `hits`, `pagerank` ([#3968](https://github.com/rapidsai/cugraph/pull/3968)) [@eriknw](https://github.com/eriknw) +- Cut peak memory footprint in graph creation ([#3966](https://github.com/rapidsai/cugraph/pull/3966)) [@seunghwak](https://github.com/seunghwak) +- nx-cugraph: add CC for undirected graphs to fix k-truss ([#3965](https://github.com/rapidsai/cugraph/pull/3965)) [@eriknw](https://github.com/eriknw) +- Skip certain `cugraph-pyg` tests when `torch_sparse` is not available ([#3962](https://github.com/rapidsai/cugraph/pull/3962)) [@tingyu66](https://github.com/tingyu66) +- `Resultset` and `Dataset` Refactors ([#3957](https://github.com/rapidsai/cugraph/pull/3957)) [@nv-rliu](https://github.com/nv-rliu) +- Download `xml` docs artifact through CloudFront endpoint ([#3955](https://github.com/rapidsai/cugraph/pull/3955)) [@AyodeAwe](https://github.com/AyodeAwe) +- Add many graph generators to nx-cugraph ([#3954](https://github.com/rapidsai/cugraph/pull/3954)) [@eriknw](https://github.com/eriknw) +- Unpin `dask` and `distributed` for `23.12` development ([#3953](https://github.com/rapidsai/cugraph/pull/3953)) [@galipremsagar](https://github.com/galipremsagar) +- Errors compiling for DLFW on CUDA 12.3 ([#3952](https://github.com/rapidsai/cugraph/pull/3952)) [@ChuckHastings](https://github.com/ChuckHastings) +- nx-cugraph: add k_truss and degree centralities ([#3945](https://github.com/rapidsai/cugraph/pull/3945)) [@eriknw](https://github.com/eriknw) +- nx-cugraph: handle seed argument in edge_betweenness_centrality ([#3943](https://github.com/rapidsai/cugraph/pull/3943)) [@eriknw](https://github.com/eriknw) +- Moves more MG graph ETL to libcugraph and re-enables MG tests in CI ([#3941](https://github.com/rapidsai/cugraph/pull/3941)) [@jnke2016](https://github.com/jnke2016) +- Temporarily disable mg testing ([#3940](https://github.com/rapidsai/cugraph/pull/3940)) [@jnke2016](https://github.com/jnke2016) +- adding C/C++ API docs ([#3938](https://github.com/rapidsai/cugraph/pull/3938)) [@BradReesWork](https://github.com/BradReesWork) +- Add multigraph support to nx-cugraph ([#3934](https://github.com/rapidsai/cugraph/pull/3934)) [@eriknw](https://github.com/eriknw) +- Setup Consistent Nightly Versions for Pip and Conda ([#3933](https://github.com/rapidsai/cugraph/pull/3933)) [@divyegala](https://github.com/divyegala) +- MTMG multi node ([#3932](https://github.com/rapidsai/cugraph/pull/3932)) [@ChuckHastings](https://github.com/ChuckHastings) +- Use branch-23.12 workflows. ([#3928](https://github.com/rapidsai/cugraph/pull/3928)) [@bdice](https://github.com/bdice) +- Fix an issue occurring in the cuGraph-DGL example for "mixed" mode. ([#3927](https://github.com/rapidsai/cugraph/pull/3927)) [@drivanov](https://github.com/drivanov) +- Updating Docs ([#3923](https://github.com/rapidsai/cugraph/pull/3923)) [@BradReesWork](https://github.com/BradReesWork) +- Forward-merge branch-23.10 to branch-23.12 ([#3919](https://github.com/rapidsai/cugraph/pull/3919)) [@nv-rliu](https://github.com/nv-rliu) +- new build all option ([#3916](https://github.com/rapidsai/cugraph/pull/3916)) [@BradReesWork](https://github.com/BradReesWork) +- Silence spurious compiler warnings ([#3913](https://github.com/rapidsai/cugraph/pull/3913)) [@seunghwak](https://github.com/seunghwak) +- Link wholegrah and cugraphops XML docs ([#3906](https://github.com/rapidsai/cugraph/pull/3906)) [@AyodeAwe](https://github.com/AyodeAwe) +- Updates to 23.12 ([#3905](https://github.com/rapidsai/cugraph/pull/3905)) [@raydouglass](https://github.com/raydouglass) +- Forward-merge branch-23.10 to branch-23.12 ([#3904](https://github.com/rapidsai/cugraph/pull/3904)) [@GPUtester](https://github.com/GPUtester) +- Build CUDA 12.0 ARM conda packages. ([#3903](https://github.com/rapidsai/cugraph/pull/3903)) [@bdice](https://github.com/bdice) +- Merge branch-23.10 into branch-23.12 ([#3898](https://github.com/rapidsai/cugraph/pull/3898)) [@rlratzel](https://github.com/rlratzel) +- Some MTMG code cleanup and small optimizations ([#3894](https://github.com/rapidsai/cugraph/pull/3894)) [@ChuckHastings](https://github.com/ChuckHastings) +- Enable parallel mode ([#3875](https://github.com/rapidsai/cugraph/pull/3875)) [@jnke2016](https://github.com/jnke2016) +- Adds benchmarks for `nx-cugraph` ([#3854](https://github.com/rapidsai/cugraph/pull/3854)) [@rlratzel](https://github.com/rlratzel) +- Add nx-cugraph notebook for showing accelerated networkX APIs ([#3830](https://github.com/rapidsai/cugraph/pull/3830)) [@betochimas](https://github.com/betochimas) + # cuGraph 23.10.00 (11 Oct 2023) ## 🚨 Breaking Changes diff --git a/cpp/include/cugraph/graph_view.hpp b/cpp/include/cugraph/graph_view.hpp index f30a8b7e2af..d79d4635c54 100644 --- a/cpp/include/cugraph/graph_view.hpp +++ b/cpp/include/cugraph/graph_view.hpp @@ -268,7 +268,11 @@ class graph_base_t { properties_(properties){}; vertex_t number_of_vertices() const { return number_of_vertices_; } - edge_t number_of_edges() const { return number_of_edges_; } + edge_t number_of_edges() const + { + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); + return number_of_edges_; + } template std::enable_if_t::value, bool> is_valid_vertex(vertex_type v) const @@ -285,6 +289,20 @@ class graph_base_t { bool is_symmetric() const { return properties_.is_symmetric; } bool is_multigraph() const { return properties_.is_multigraph; } + void attach_edge_mask(edge_property_view_t edge_mask_view) + { + edge_mask_view_ = edge_mask_view; + } + + void clear_edge_mask() { edge_mask_view_ = std::nullopt; } + + bool has_edge_mask() const { return edge_mask_view_.has_value(); } + + std::optional> edge_mask_view() const + { + return edge_mask_view_; + } + protected: raft::handle_t const* handle_ptr() const { return handle_ptr_; }; graph_properties_t graph_properties() const { return properties_; } @@ -296,6 +314,8 @@ class graph_base_t { edge_t number_of_edges_{0}; graph_properties_t properties_{}; + + std::optional> edge_mask_view_{std::nullopt}; }; } // namespace detail @@ -731,20 +751,6 @@ class graph_view_t edge_mask_view) - { - edge_mask_view_ = edge_mask_view; - } - - void clear_edge_mask() { edge_mask_view_ = std::nullopt; } - - bool has_edge_mask() const { return edge_mask_view_.has_value(); } - - std::optional> edge_mask_view() const - { - return edge_mask_view_; - } - private: std::vector edge_partition_offsets_{}; std::vector edge_partition_indices_{}; @@ -790,8 +796,6 @@ class graph_view_t>, std::optional /* dummy */> local_sorted_unique_edge_dst_vertex_partition_offsets_{std::nullopt}; - - std::optional> edge_mask_view_{std::nullopt}; }; // single-GPU version @@ -1012,28 +1016,12 @@ class graph_view_t edge_mask_view) - { - edge_mask_view_ = edge_mask_view; - } - - void clear_edge_mask() { edge_mask_view_ = std::nullopt; } - - bool has_edge_mask() const { return edge_mask_view_.has_value(); } - - std::optional> edge_mask_view() const - { - return edge_mask_view_; - } - private: edge_t const* offsets_{nullptr}; vertex_t const* indices_{nullptr}; // segment offsets based on vertex degree, relevant only if vertex IDs are renumbered std::optional> segment_offsets_{std::nullopt}; - - std::optional> edge_mask_view_{std::nullopt}; }; } // namespace cugraph diff --git a/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp b/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp index c4cacb401af..5fbe7bc9f01 100644 --- a/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp +++ b/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp @@ -57,10 +57,10 @@ class device_shared_wrapper_t { { std::lock_guard lock(lock_); - auto pos = objects_.find(handle.get_local_rank()); + auto pos = objects_.find(handle.get_rank()); CUGRAPH_EXPECTS(pos == objects_.end(), "Cannot overwrite wrapped object"); - objects_.insert(std::make_pair(handle.get_local_rank(), std::move(obj))); + objects_.insert(std::make_pair(handle.get_rank(), std::move(obj))); } /** @@ -79,7 +79,6 @@ class device_shared_wrapper_t { objects_.insert(std::make_pair(local_rank, std::move(obj))); } - public: /** * @brief Get reference to an object for a particular thread * @@ -90,7 +89,7 @@ class device_shared_wrapper_t { { std::lock_guard lock(lock_); - auto pos = objects_.find(handle.get_local_rank()); + auto pos = objects_.find(handle.get_rank()); CUGRAPH_EXPECTS(pos != objects_.end(), "Uninitialized wrapped object"); return pos->second; @@ -106,7 +105,7 @@ class device_shared_wrapper_t { { std::lock_guard lock(lock_); - auto pos = objects_.find(handle.get_local_rank()); + auto pos = objects_.find(handle.get_rank()); CUGRAPH_EXPECTS(pos != objects_.end(), "Uninitialized wrapped object"); diff --git a/cpp/include/cugraph/mtmg/graph_view.hpp b/cpp/include/cugraph/mtmg/graph_view.hpp index 94347e016ea..8e202ab4904 100644 --- a/cpp/include/cugraph/mtmg/graph_view.hpp +++ b/cpp/include/cugraph/mtmg/graph_view.hpp @@ -27,8 +27,27 @@ namespace mtmg { * @brief Graph view for each GPU */ template -using graph_view_t = detail::device_shared_wrapper_t< - cugraph::graph_view_t>; +class graph_view_t : public detail::device_shared_wrapper_t< + cugraph::graph_view_t> { + public: + /** + * @brief Get the vertex_partition_view for this graph + */ + vertex_partition_view_t get_vertex_partition_view( + cugraph::mtmg::handle_t const& handle) const + { + return this->get(handle).local_vertex_partition_view(); + } + + /** + * @brief Get the vertex_partition_view for this graph + */ + std::vector get_vertex_partition_range_lasts( + cugraph::mtmg::handle_t const& handle) const + { + return this->get(handle).vertex_partition_range_lasts(); + } +}; } // namespace mtmg } // namespace cugraph diff --git a/cpp/include/cugraph/mtmg/handle.hpp b/cpp/include/cugraph/mtmg/handle.hpp index 6223de1781d..0b02091a3cc 100644 --- a/cpp/include/cugraph/mtmg/handle.hpp +++ b/cpp/include/cugraph/mtmg/handle.hpp @@ -32,18 +32,19 @@ namespace mtmg { * */ class handle_t { + handle_t(handle_t const&) = delete; + handle_t operator=(handle_t const&) = delete; + public: /** * @brief Constructor * * @param raft_handle Raft handle for the resources * @param thread_rank Rank for this thread + * @param device_id Device id for the device this handle operates on */ - handle_t(raft::handle_t const& raft_handle, int thread_rank, size_t device_id) - : raft_handle_(raft_handle), - thread_rank_(thread_rank), - local_rank_(raft_handle.get_comms().get_rank()), // FIXME: update for multi-node - device_id_(device_id) + handle_t(raft::handle_t const& raft_handle, int thread_rank, rmm::cuda_device_id device_id) + : raft_handle_(raft_handle), thread_rank_(thread_rank), device_id_raii_(device_id) { } @@ -118,18 +119,10 @@ class handle_t { */ int get_rank() const { return raft_handle_.get_comms().get_rank(); } - /** - * @brief Get local gpu rank - * - * @return local gpu rank - */ - int get_local_rank() const { return local_rank_; } - private: raft::handle_t const& raft_handle_; int thread_rank_; - int local_rank_; - size_t device_id_; + rmm::cuda_set_device_raii device_id_raii_; }; } // namespace mtmg diff --git a/cpp/include/cugraph/mtmg/instance_manager.hpp b/cpp/include/cugraph/mtmg/instance_manager.hpp index f819a5a0abe..f60063c4101 100644 --- a/cpp/include/cugraph/mtmg/instance_manager.hpp +++ b/cpp/include/cugraph/mtmg/instance_manager.hpp @@ -47,15 +47,10 @@ class instance_manager_t { ~instance_manager_t() { - int current_device{}; - RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); - for (size_t i = 0; i < nccl_comms_.size(); ++i) { - RAFT_CUDA_TRY(cudaSetDevice(device_ids_[i].value())); + rmm::cuda_set_device_raii local_set_device(device_ids_[i]); RAFT_NCCL_TRY(ncclCommDestroy(*nccl_comms_[i])); } - - RAFT_CUDA_TRY(cudaSetDevice(current_device)); } /** @@ -75,8 +70,7 @@ class instance_manager_t { int gpu_id = local_id % raft_handle_.size(); int thread_id = local_id / raft_handle_.size(); - RAFT_CUDA_TRY(cudaSetDevice(device_ids_[gpu_id].value())); - return handle_t(*raft_handle_[gpu_id], thread_id, static_cast(gpu_id)); + return handle_t(*raft_handle_[gpu_id], thread_id, device_ids_[gpu_id]); } /** diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index 127944cf7ba..bc312c9ae77 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -89,7 +89,7 @@ class resource_manager_t { local_rank_map_.insert(std::pair(global_rank, local_device_id)); - RAFT_CUDA_TRY(cudaSetDevice(local_device_id.value())); + rmm::cuda_set_device_raii local_set_device(local_device_id); // FIXME: There is a bug in the cuda_memory_resource that results in a Hang. // using the pool resource as a work-around. @@ -182,14 +182,12 @@ class resource_manager_t { --gpu_row_comm_size; } - int current_device{}; - RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); RAFT_NCCL_TRY(ncclGroupStart()); for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { int rank = local_ranks_to_include[i]; auto pos = local_rank_map_.find(rank); - RAFT_CUDA_TRY(cudaSetDevice(pos->second.value())); + rmm::cuda_set_device_raii local_set_device(pos->second); nccl_comms.push_back(std::make_unique()); handles.push_back( @@ -204,7 +202,6 @@ class resource_manager_t { handles[i].get(), *nccl_comms[i], ranks_to_include.size(), rank); } RAFT_NCCL_TRY(ncclGroupEnd()); - RAFT_CUDA_TRY(cudaSetDevice(current_device)); std::vector running_threads; @@ -217,9 +214,7 @@ class resource_manager_t { &device_ids, &nccl_comms, &handles]() { - int rank = local_ranks_to_include[idx]; - RAFT_CUDA_TRY(cudaSetDevice(device_ids[idx].value())); - + rmm::cuda_set_device_raii local_set_device(device_ids[idx]); cugraph::partition_manager::init_subcomm(*handles[idx], gpu_row_comm_size); }); } diff --git a/cpp/include/cugraph/mtmg/vertex_result_view.hpp b/cpp/include/cugraph/mtmg/vertex_result_view.hpp index a349bb95333..42b80cea62f 100644 --- a/cpp/include/cugraph/mtmg/vertex_result_view.hpp +++ b/cpp/include/cugraph/mtmg/vertex_result_view.hpp @@ -39,11 +39,12 @@ class vertex_result_view_t : public detail::device_shared_device_span_t + template rmm::device_uvector gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + cugraph::vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); }; diff --git a/cpp/include/cugraph/utilities/misc_utils.cuh b/cpp/include/cugraph/utilities/misc_utils.cuh index a62e8ce85ec..28e2853727f 100644 --- a/cpp/include/cugraph/utilities/misc_utils.cuh +++ b/cpp/include/cugraph/utilities/misc_utils.cuh @@ -19,6 +19,7 @@ #include #include +#include #include #include #include diff --git a/cpp/src/mtmg/vertex_result.cu b/cpp/src/mtmg/vertex_result.cu index 97fcd291c87..5b1825656ff 100644 --- a/cpp/src/mtmg/vertex_result.cu +++ b/cpp/src/mtmg/vertex_result.cu @@ -27,15 +27,14 @@ namespace cugraph { namespace mtmg { template -template +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view) { - auto this_gpu_graph_view = graph_view.get(handle); - rmm::device_uvector local_vertices(vertices.size(), handle.get_stream()); rmm::device_uvector vertex_gpu_ids(vertices.size(), handle.get_stream()); rmm::device_uvector vertex_pos(vertices.size(), handle.get_stream()); @@ -47,11 +46,11 @@ rmm::device_uvector vertex_result_view_t::gather( cugraph::detail::sequence_fill( handle.get_stream(), vertex_pos.data(), vertex_pos.size(), size_t{0}); - rmm::device_uvector d_vertex_partition_range_lasts( - this_gpu_graph_view.vertex_partition_range_lasts().size(), handle.get_stream()); + rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), + handle.get_stream()); raft::update_device(d_vertex_partition_range_lasts.data(), - this_gpu_graph_view.vertex_partition_range_lasts().data(), - this_gpu_graph_view.vertex_partition_range_lasts().size(), + vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.size(), handle.get_stream()); if (renumber_map_view) { @@ -60,8 +59,8 @@ rmm::device_uvector vertex_result_view_t::gather( local_vertices.data(), local_vertices.size(), renumber_map_view->get(handle).data(), - this_gpu_graph_view.local_vertex_partition_range_first(), - this_gpu_graph_view.local_vertex_partition_range_last()); + vertex_partition_view.local_vertex_partition_range_first(), + vertex_partition_view.local_vertex_partition_range_last()); } auto const major_comm_size = @@ -89,8 +88,8 @@ rmm::device_uvector vertex_result_view_t::gather( auto& wrapped = this->get(handle); - auto vertex_partition = vertex_partition_device_view_t( - this_gpu_graph_view.local_vertex_partition_view()); + auto vertex_partition = + vertex_partition_device_view_t(vertex_partition_view); auto iter = thrust::make_transform_iterator(local_vertices.begin(), [vertex_partition] __device__(auto v) { @@ -130,37 +129,85 @@ rmm::device_uvector vertex_result_view_t::gather( template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); -template rmm::device_uvector vertex_result_view_t::gather( +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); -template rmm::device_uvector vertex_result_view_t::gather( +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); -template rmm::device_uvector vertex_result_view_t::gather( +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); } // namespace mtmg diff --git a/cpp/src/prims/count_if_e.cuh b/cpp/src/prims/count_if_e.cuh index f6e4bc9bead..9cff4f5eceb 100644 --- a/cpp/src/prims/count_if_e.cuh +++ b/cpp/src/prims/count_if_e.cuh @@ -74,8 +74,6 @@ typename GraphViewType::edge_type count_if_e(raft::handle_t const& handle, using vertex_t = typename GraphViewType::vertex_type; using edge_t = typename GraphViewType::edge_type; - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do } diff --git a/cpp/src/prims/fill_edge_property.cuh b/cpp/src/prims/fill_edge_property.cuh index d446944b65b..e6875576044 100644 --- a/cpp/src/prims/fill_edge_property.cuh +++ b/cpp/src/prims/fill_edge_property.cuh @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -23,6 +24,7 @@ #include #include +#include #include @@ -38,21 +40,78 @@ void fill_edge_property(raft::handle_t const& handle, { static_assert(std::is_same_v); + using edge_t = typename GraphViewType::edge_type; + + auto edge_mask_view = graph_view.edge_mask_view(); + auto value_firsts = edge_property_output.value_firsts(); auto edge_counts = edge_property_output.edge_counts(); for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { + auto edge_partition_e_mask = + edge_mask_view + ? thrust::make_optional< + detail::edge_partition_edge_property_device_view_t>( + *edge_mask_view, i) + : thrust::nullopt; + if constexpr (cugraph::has_packed_bool_element< std::remove_reference_t, T>()) { static_assert(std::is_arithmetic_v, "unimplemented for thrust::tuple types."); auto packed_input = input ? packed_bool_full_mask() : packed_bool_empty_mask(); - thrust::fill_n(handle.get_thrust_policy(), - value_firsts[i], - packed_bool_size(static_cast(edge_counts[i])), - packed_input); + auto rem = edge_counts[i] % packed_bools_per_word(); + if (edge_partition_e_mask) { + auto input_first = + thrust::make_zip_iterator(value_firsts[i], (*edge_partition_e_mask).value_first()); + thrust::transform(handle.get_thrust_policy(), + input_first, + input_first + packed_bool_size(static_cast(edge_counts[i] - rem)), + value_firsts[i], + [packed_input] __device__(thrust::tuple pair) { + auto old_value = thrust::get<0>(pair); + auto mask = thrust::get<1>(pair); + return (old_value & ~mask) | (packed_input & mask); + }); + if (rem > 0) { + thrust::transform( + handle.get_thrust_policy(), + input_first + packed_bool_size(static_cast(edge_counts[i] - rem)), + input_first + packed_bool_size(static_cast(edge_counts[i])), + value_firsts[i] + packed_bool_size(static_cast(edge_counts[i] - rem)), + [packed_input, rem] __device__(thrust::tuple pair) { + auto old_value = thrust::get<0>(pair); + auto mask = thrust::get<1>(pair); + return ((old_value & ~mask) | (packed_input & mask)) & packed_bool_partial_mask(rem); + }); + } + } else { + thrust::fill_n(handle.get_thrust_policy(), + value_firsts[i], + packed_bool_size(static_cast(edge_counts[i] - rem)), + packed_input); + if (rem > 0) { + thrust::fill_n( + handle.get_thrust_policy(), + value_firsts[i] + packed_bool_size(static_cast(edge_counts[i] - rem)), + 1, + packed_input & packed_bool_partial_mask(rem)); + } + } } else { - thrust::fill_n( - handle.get_thrust_policy(), value_firsts[i], static_cast(edge_counts[i]), input); + if (edge_partition_e_mask) { + thrust::transform_if(handle.get_thrust_policy(), + thrust::make_constant_iterator(input), + thrust::make_constant_iterator(input) + edge_counts[i], + thrust::make_counting_iterator(edge_t{0}), + value_firsts[i], + thrust::identity{}, + [edge_partition_e_mask = *edge_partition_e_mask] __device__(edge_t i) { + return edge_partition_e_mask.get(i); + }); + } else { + thrust::fill_n( + handle.get_thrust_policy(), value_firsts[i], static_cast(edge_counts[i]), input); + } } } } @@ -79,8 +138,6 @@ void fill_edge_property(raft::handle_t const& handle, edge_property_t& edge_property_output, bool do_expensive_check = false) { - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do } diff --git a/cpp/src/prims/transform_e.cuh b/cpp/src/prims/transform_e.cuh index edacdc8a970..c6623621d24 100644 --- a/cpp/src/prims/transform_e.cuh +++ b/cpp/src/prims/transform_e.cuh @@ -16,10 +16,12 @@ #pragma once #include +#include #include #include #include #include +#include #include #include @@ -44,6 +46,7 @@ template __global__ void transform_e_packed_bool( @@ -53,6 +56,7 @@ __global__ void transform_e_packed_bool( EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, + thrust::optional edge_partition_e_mask, EdgePartitionEdgeValueOutputWrapper edge_partition_e_value_output, EdgeOp e_op) { @@ -68,11 +72,14 @@ __global__ void transform_e_packed_bool( auto num_edges = edge_partition.number_of_edges(); while (idx < static_cast(packed_bool_size(num_edges))) { + auto edge_mask = packed_bool_full_mask(); + if (edge_partition_e_mask) { edge_mask = *((*edge_partition_e_mask).value_first() + idx); } + auto local_edge_idx = idx * static_cast(packed_bools_per_word()) + static_cast(lane_id); - uint32_t mask{0}; int predicate{0}; - if (local_edge_idx < num_edges) { + + if ((local_edge_idx < num_edges) && (edge_mask & packed_bool_mask(lane_id))) { auto major_idx = edge_partition.major_idx_from_local_edge_idx_nocheck(local_edge_idx); auto major = edge_partition.major_from_major_idx_nocheck(major_idx); auto major_offset = edge_partition.major_offset_from_major_nocheck(major); @@ -91,8 +98,15 @@ __global__ void transform_e_packed_bool( ? int{1} : int{0}; } - mask = __ballot_sync(uint32_t{0xffffffff}, predicate); - if (lane_id == 0) { *(edge_partition_e_value_output.value_first() + idx) = mask; } + uint32_t new_val = __ballot_sync(uint32_t{0xffffffff}, predicate); + if (lane_id == 0) { + if (edge_mask == packed_bool_full_mask()) { + *(edge_partition_e_value_output.value_first() + idx) = new_val; + } else { + auto old_val = *(edge_partition_e_value_output.value_first() + idx); + *(edge_partition_e_value_output.value_first() + idx) = (old_val & ~edge_mask) | new_val; + } + } idx += static_cast(gridDim.x * (blockDim.x / raft::warp_size())); } @@ -178,12 +192,18 @@ void transform_e(raft::handle_t const& handle, typename EdgeValueOutputWrapper::value_iterator, typename EdgeValueOutputWrapper::value_type>; - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); + auto edge_mask_view = graph_view.edge_mask_view(); for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { auto edge_partition = edge_partition_device_view_t( graph_view.local_edge_partition_view(i)); + auto edge_partition_e_mask = + edge_mask_view + ? thrust::make_optional< + detail::edge_partition_edge_property_device_view_t>( + *edge_mask_view, i) + : thrust::nullopt; edge_partition_src_input_device_view_t edge_partition_src_value_input{}; edge_partition_dst_input_device_view_t edge_partition_dst_value_input{}; @@ -214,35 +234,40 @@ void transform_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, edge_partition_e_value_output, e_op); } } else { - thrust::transform( + thrust::for_each( handle.get_thrust_policy(), thrust::make_counting_iterator(edge_t{0}), thrust::make_counting_iterator(num_edges), - edge_partition_e_value_output.value_first(), [e_op, edge_partition, edge_partition_src_value_input, edge_partition_dst_value_input, - edge_partition_e_value_input] __device__(edge_t i) { - auto major_idx = edge_partition.major_idx_from_local_edge_idx_nocheck(i); - auto major = edge_partition.major_from_major_idx_nocheck(major_idx); - auto major_offset = edge_partition.major_offset_from_major_nocheck(major); - auto minor = *(edge_partition.indices() + i); - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - - auto src = GraphViewType::is_storage_transposed ? minor : major; - auto dst = GraphViewType::is_storage_transposed ? major : minor; - auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset; - auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset; - return e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(i)); + edge_partition_e_value_input, + edge_partition_e_mask, + edge_partition_e_value_output] __device__(edge_t i) { + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(i)) { + auto major_idx = edge_partition.major_idx_from_local_edge_idx_nocheck(i); + auto major = edge_partition.major_from_major_idx_nocheck(major_idx); + auto major_offset = edge_partition.major_offset_from_major_nocheck(major); + auto minor = *(edge_partition.indices() + i); + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + + auto src = GraphViewType::is_storage_transposed ? minor : major; + auto dst = GraphViewType::is_storage_transposed ? major : minor; + auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset; + auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset; + auto e_op_result = e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(i)); + edge_partition_e_value_output.set(i, e_op_result); + } }); } } @@ -336,14 +361,12 @@ void transform_e(raft::handle_t const& handle, typename EdgeValueOutputWrapper::value_iterator, typename EdgeValueOutputWrapper::value_type>; - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - auto major_first = GraphViewType::is_storage_transposed ? edge_list.dst_begin() : edge_list.src_begin(); auto minor_first = GraphViewType::is_storage_transposed ? edge_list.src_begin() : edge_list.dst_begin(); - auto edge_first = thrust::make_zip_iterator(thrust::make_tuple(major_first, minor_first)); + auto edge_first = thrust::make_zip_iterator(major_first, minor_first); if (do_expensive_check) { CUGRAPH_EXPECTS( @@ -382,10 +405,18 @@ void transform_e(raft::handle_t const& handle, edge_partition_offsets.back() = edge_list.size(); } + auto edge_mask_view = graph_view.edge_mask_view(); + for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { auto edge_partition = edge_partition_device_view_t( graph_view.local_edge_partition_view(i)); + auto edge_partition_e_mask = + edge_mask_view + ? thrust::make_optional< + detail::edge_partition_edge_property_device_view_t>( + *edge_mask_view, i) + : thrust::nullopt; if (do_expensive_check) { CUGRAPH_EXPECTS( @@ -393,7 +424,8 @@ void transform_e(raft::handle_t const& handle, handle.get_thrust_policy(), edge_first + edge_partition_offsets[i], edge_first + edge_partition_offsets[i + 1], - [edge_partition] __device__(thrust::tuple edge) { + [edge_partition, + edge_partition_e_mask] __device__(thrust::tuple edge) { auto major = thrust::get<0>(edge); auto minor = thrust::get<1>(edge); vertex_t major_idx{}; @@ -416,8 +448,19 @@ void transform_e(raft::handle_t const& handle, edge_t edge_offset{}; edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_idx); - auto it = thrust::lower_bound(thrust::seq, indices, indices + local_degree, minor); - return *it != minor; + auto lower_it = + thrust::lower_bound(thrust::seq, indices, indices + local_degree, minor); + if (*lower_it != minor) { return true; } + if (edge_partition_e_mask) { + auto upper_it = + thrust::upper_bound(thrust::seq, lower_it, indices + local_degree, minor); + if (detail::count_set_bits((*edge_partition_e_mask).value_first(), + edge_offset + thrust::distance(indices, lower_it), + thrust::distance(lower_it, upper_it)) == 0) { + return true; + } + } + return false; }) == 0, "Invalid input arguments: edge_list contains edges that do not exist in the input graph."); } @@ -446,6 +489,7 @@ void transform_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, edge_partition_e_value_output] __device__(thrust::tuple edge) { auto major = thrust::get<0>(edge); auto minor = thrust::get<1>(edge); @@ -469,7 +513,7 @@ void transform_e(raft::handle_t const& handle, edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_idx); auto lower_it = thrust::lower_bound(thrust::seq, indices, indices + local_degree, minor); - auto upper_it = thrust::upper_bound(thrust::seq, indices, indices + local_degree, minor); + auto upper_it = thrust::upper_bound(thrust::seq, lower_it, indices + local_degree, minor); auto src = GraphViewType::is_storage_transposed ? minor : major; auto dst = GraphViewType::is_storage_transposed ? major : minor; @@ -478,14 +522,17 @@ void transform_e(raft::handle_t const& handle, for (auto it = lower_it; it != upper_it; ++it) { assert(*it == minor); - auto e_op_result = - e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + thrust::distance(indices, it))); - edge_partition_e_value_output.set(edge_offset + thrust::distance(indices, it), - e_op_result); + if (!edge_partition_e_mask || + ((*edge_partition_e_mask).get(edge_offset + thrust::distance(indices, it)))) { + auto e_op_result = + e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + thrust::distance(indices, it))); + edge_partition_e_value_output.set(edge_offset + thrust::distance(indices, it), + e_op_result); + } } }); } diff --git a/cpp/src/prims/transform_reduce_e.cuh b/cpp/src/prims/transform_reduce_e.cuh index 9c23f3fca18..483ab64dcd9 100644 --- a/cpp/src/prims/transform_reduce_e.cuh +++ b/cpp/src/prims/transform_reduce_e.cuh @@ -56,6 +56,7 @@ template __global__ void transform_reduce_e_hypersparse( @@ -65,6 +66,7 @@ __global__ void transform_reduce_e_hypersparse( EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, + thrust::optional edge_partition_e_mask, ResultIterator result_iter /* size 1 */, EdgeOp e_op) { @@ -101,24 +103,31 @@ __global__ void transform_reduce_e_hypersparse( &edge_partition_src_value_input, &edge_partition_dst_value_input, &edge_partition_e_value_input, + &edge_partition_e_mask, &e_op, major, indices, edge_offset] __device__(auto i) { - auto major_offset = edge_partition.major_offset_from_major_nocheck(major); - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed ? minor : major; - auto dst = GraphViewType::is_storage_transposed ? major : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - return e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto major_offset = edge_partition.major_offset_from_major_nocheck(major); + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed ? minor : major; + auto dst = GraphViewType::is_storage_transposed ? major : minor; + auto src_offset = GraphViewType::is_storage_transposed + ? minor_offset + : static_cast(major_offset); + auto dst_offset = GraphViewType::is_storage_transposed + ? static_cast(major_offset) + : minor_offset; + return e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + } else { + return e_op_result_t{}; + } }, e_op_result_t{}, edge_property_add); @@ -135,6 +144,7 @@ template __global__ void transform_reduce_e_low_degree( @@ -146,6 +156,7 @@ __global__ void transform_reduce_e_low_degree( EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, + thrust::optional edge_partition_e_mask, ResultIterator result_iter /* size 1 */, EdgeOp e_op) { @@ -177,27 +188,34 @@ __global__ void transform_reduce_e_low_degree( &edge_partition_src_value_input, &edge_partition_dst_value_input, &edge_partition_e_value_input, + &edge_partition_e_mask, &e_op, major_offset, indices, edge_offset] __device__(auto i) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - return e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed + ? minor + : edge_partition.major_from_major_offset_nocheck(major_offset); + auto dst = GraphViewType::is_storage_transposed + ? edge_partition.major_from_major_offset_nocheck(major_offset) + : minor; + auto src_offset = GraphViewType::is_storage_transposed + ? minor_offset + : static_cast(major_offset); + auto dst_offset = GraphViewType::is_storage_transposed + ? static_cast(major_offset) + : minor_offset; + return e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + } else { + return e_op_result_t{}; + } }, e_op_result_t{}, edge_property_add); @@ -214,6 +232,7 @@ template __global__ void transform_reduce_e_mid_degree( @@ -225,6 +244,7 @@ __global__ void transform_reduce_e_mid_degree( EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, + thrust::optional edge_partition_e_mask, ResultIterator result_iter /* size 1 */, EdgeOp e_op) { @@ -250,24 +270,26 @@ __global__ void transform_reduce_e_mid_degree( edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_offset); for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - auto e_op_result = e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); - e_op_result_sum = edge_property_add(e_op_result_sum, e_op_result); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed + ? minor + : edge_partition.major_from_major_offset_nocheck(major_offset); + auto dst = GraphViewType::is_storage_transposed + ? edge_partition.major_from_major_offset_nocheck(major_offset) + : minor; + auto src_offset = + GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); + auto dst_offset = + GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; + auto e_op_result = e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + e_op_result_sum = edge_property_add(e_op_result_sum, e_op_result); + } } idx += gridDim.x * (blockDim.x / raft::warp_size()); } @@ -280,6 +302,7 @@ template __global__ void transform_reduce_e_high_degree( @@ -291,6 +314,7 @@ __global__ void transform_reduce_e_high_degree( EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, + thrust::optional edge_partition_e_mask, ResultIterator result_iter /* size 1 */, EdgeOp e_op) { @@ -313,24 +337,26 @@ __global__ void transform_reduce_e_high_degree( edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_offset); for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - auto e_op_result = e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); - e_op_result_sum = edge_property_add(e_op_result_sum, e_op_result); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed + ? minor + : edge_partition.major_from_major_offset_nocheck(major_offset); + auto dst = GraphViewType::is_storage_transposed + ? edge_partition.major_from_major_offset_nocheck(major_offset) + : minor; + auto src_offset = + GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); + auto dst_offset = + GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; + auto e_op_result = e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + e_op_result_sum = edge_property_add(e_op_result_sum, e_op_result); + } } idx += gridDim.x; } @@ -417,8 +443,6 @@ T transform_reduce_e(raft::handle_t const& handle, typename EdgeValueInputWrapper::value_iterator, typename EdgeValueInputWrapper::value_type>>; - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do } @@ -431,10 +455,18 @@ T transform_reduce_e(raft::handle_t const& handle, get_dataframe_buffer_begin(result_buffer) + 1, T{}); + auto edge_mask_view = graph_view.edge_mask_view(); + for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { auto edge_partition = edge_partition_device_view_t( graph_view.local_edge_partition_view(i)); + auto edge_partition_e_mask = + edge_mask_view + ? thrust::make_optional< + detail::edge_partition_edge_property_device_view_t>( + *edge_mask_view, i) + : thrust::nullopt; edge_partition_src_input_device_view_t edge_partition_src_value_input{}; edge_partition_dst_input_device_view_t edge_partition_dst_value_input{}; @@ -467,6 +499,7 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); } @@ -482,6 +515,7 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); } @@ -497,6 +531,7 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); } @@ -510,6 +545,7 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); } @@ -527,6 +563,7 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); } @@ -601,8 +638,6 @@ auto transform_reduce_e(raft::handle_t const& handle, edge_op_result_type::type; static_assert(!std::is_same_v); - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do } diff --git a/cpp/src/prims/update_edge_src_dst_property.cuh b/cpp/src/prims/update_edge_src_dst_property.cuh index 2d72a075ca5..b8621e122c6 100644 --- a/cpp/src/prims/update_edge_src_dst_property.cuh +++ b/cpp/src/prims/update_edge_src_dst_property.cuh @@ -866,8 +866,6 @@ void update_edge_src_property( edge_src_property_output, bool do_expensive_check = false) { - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do } @@ -917,8 +915,6 @@ void update_edge_src_property( edge_src_property_output, bool do_expensive_check = false) { - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { auto num_invalids = thrust::count_if( handle.get_thrust_policy(), @@ -985,8 +981,6 @@ void update_edge_dst_property( edge_dst_property_output, bool do_expensive_check = false) { - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do } @@ -1036,8 +1030,6 @@ void update_edge_dst_property( edge_dst_property_output, bool do_expensive_check = false) { - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { auto num_invalids = thrust::count_if( handle.get_thrust_policy(), diff --git a/cpp/src/structure/detail/structure_utils.cuh b/cpp/src/structure/detail/structure_utils.cuh index c49b62e4543..f0f729bce18 100644 --- a/cpp/src/structure/detail/structure_utils.cuh +++ b/cpp/src/structure/detail/structure_utils.cuh @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -524,34 +525,21 @@ std::tuple> mark_entries(raft::handle_t co return word; }); - size_t bit_count = thrust::transform_reduce( - handle.get_thrust_policy(), - marked_entries.begin(), - marked_entries.end(), - [] __device__(auto word) { return __popc(word); }, - size_t{0}, - thrust::plus()); + size_t bit_count = detail::count_set_bits(handle, marked_entries.begin(), num_entries); return std::make_tuple(bit_count, std::move(marked_entries)); } template -rmm::device_uvector remove_flagged_elements(raft::handle_t const& handle, - rmm::device_uvector&& vector, - raft::device_span remove_flags, - size_t remove_count) +rmm::device_uvector keep_flagged_elements(raft::handle_t const& handle, + rmm::device_uvector&& vector, + raft::device_span keep_flags, + size_t keep_count) { - rmm::device_uvector result(vector.size() - remove_count, handle.get_stream()); - - thrust::copy_if( - handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(vector.size()), - thrust::make_transform_output_iterator(result.begin(), - indirection_t{vector.data()}), - [remove_flags] __device__(size_t i) { - return !(remove_flags[cugraph::packed_bool_offset(i)] & cugraph::packed_bool_mask(i)); - }); + rmm::device_uvector result(keep_count, handle.get_stream()); + + detail::copy_if_mask_set( + handle, vector.begin(), vector.end(), keep_flags.begin(), result.begin()); return result; } diff --git a/cpp/src/structure/graph_view_impl.cuh b/cpp/src/structure/graph_view_impl.cuh index 64a8a3212b3..37a553dcdbd 100644 --- a/cpp/src/structure/graph_view_impl.cuh +++ b/cpp/src/structure/graph_view_impl.cuh @@ -548,7 +548,7 @@ graph_view_tpartition_, this->edge_partition_segment_offsets_); } else { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); return compute_minor_degrees(handle, *this); } } @@ -566,7 +566,7 @@ graph_view_tlocal_vertex_partition_range_size()); } else { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); return compute_minor_degrees(handle, *this); } } @@ -577,7 +577,7 @@ graph_view_thas_edge_mask()), "unimplemented."); return compute_minor_degrees(handle, *this); } else { return compute_major_degrees(handle, @@ -598,7 +598,7 @@ graph_view_thas_edge_mask()), "unimplemented."); return compute_minor_degrees(handle, *this); } else { return compute_major_degrees( @@ -614,7 +614,7 @@ template >:: compute_max_in_degree(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); auto in_degrees = compute_in_degrees(handle); auto it = thrust::max_element(handle.get_thrust_policy(), in_degrees.begin(), in_degrees.end()); @@ -632,7 +632,7 @@ template >:: compute_max_in_degree(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); auto in_degrees = compute_in_degrees(handle); auto it = thrust::max_element(handle.get_thrust_policy(), in_degrees.begin(), in_degrees.end()); @@ -646,7 +646,7 @@ template >:: compute_max_out_degree(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); auto out_degrees = compute_out_degrees(handle); auto it = thrust::max_element(handle.get_thrust_policy(), out_degrees.begin(), out_degrees.end()); @@ -664,7 +664,7 @@ template >:: compute_max_out_degree(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); auto out_degrees = compute_out_degrees(handle); auto it = thrust::max_element(handle.get_thrust_policy(), out_degrees.begin(), out_degrees.end()); @@ -678,7 +678,7 @@ template >:: count_self_loops(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); return count_if_e( handle, @@ -693,7 +693,7 @@ template >:: count_self_loops(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); return count_if_e( handle, @@ -708,7 +708,7 @@ template >:: count_multi_edges(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); if (!this->is_multigraph()) { return edge_t{0}; } @@ -728,7 +728,7 @@ template >:: count_multi_edges(raft::handle_t const& handle) const { - CUGRAPH_EXPECTS(!has_edge_mask(), "unimplemented."); + CUGRAPH_EXPECTS(!(this->has_edge_mask()), "unimplemented."); if (!this->is_multigraph()) { return edge_t{0}; } diff --git a/cpp/src/structure/remove_multi_edges_impl.cuh b/cpp/src/structure/remove_multi_edges_impl.cuh index ab6b1fba8eb..fdd3059f874 100644 --- a/cpp/src/structure/remove_multi_edges_impl.cuh +++ b/cpp/src/structure/remove_multi_edges_impl.cuh @@ -254,50 +254,47 @@ remove_multi_edges(raft::handle_t const& handle, } } - auto [multi_edge_count, multi_edges_to_delete] = - detail::mark_entries(handle, - edgelist_srcs.size(), - [d_edgelist_srcs = edgelist_srcs.data(), - d_edgelist_dsts = edgelist_dsts.data()] __device__(auto idx) { - return (idx > 0) && (d_edgelist_srcs[idx - 1] == d_edgelist_srcs[idx]) && - (d_edgelist_dsts[idx - 1] == d_edgelist_dsts[idx]); - }); - - if (multi_edge_count > 0) { - edgelist_srcs = detail::remove_flagged_elements( + auto [keep_count, keep_flags] = detail::mark_entries( + handle, + edgelist_srcs.size(), + [d_edgelist_srcs = edgelist_srcs.data(), + d_edgelist_dsts = edgelist_dsts.data()] __device__(auto idx) { + return !((idx > 0) && (d_edgelist_srcs[idx - 1] == d_edgelist_srcs[idx]) && + (d_edgelist_dsts[idx - 1] == d_edgelist_dsts[idx])); + }); + + if (keep_count < edgelist_srcs.size()) { + edgelist_srcs = detail::keep_flagged_elements( handle, std::move(edgelist_srcs), - raft::device_span{multi_edges_to_delete.data(), multi_edges_to_delete.size()}, - multi_edge_count); - edgelist_dsts = detail::remove_flagged_elements( + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); + edgelist_dsts = detail::keep_flagged_elements( handle, std::move(edgelist_dsts), - raft::device_span{multi_edges_to_delete.data(), multi_edges_to_delete.size()}, - multi_edge_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); if (edgelist_weights) - edgelist_weights = detail::remove_flagged_elements( + edgelist_weights = detail::keep_flagged_elements( handle, std::move(*edgelist_weights), - raft::device_span{multi_edges_to_delete.data(), - multi_edges_to_delete.size()}, - multi_edge_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); if (edgelist_edge_ids) - edgelist_edge_ids = detail::remove_flagged_elements( + edgelist_edge_ids = detail::keep_flagged_elements( handle, std::move(*edgelist_edge_ids), - raft::device_span{multi_edges_to_delete.data(), - multi_edges_to_delete.size()}, - multi_edge_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); if (edgelist_edge_types) - edgelist_edge_types = detail::remove_flagged_elements( + edgelist_edge_types = detail::keep_flagged_elements( handle, std::move(*edgelist_edge_types), - raft::device_span{multi_edges_to_delete.data(), - multi_edges_to_delete.size()}, - multi_edge_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); } return std::make_tuple(std::move(edgelist_srcs), diff --git a/cpp/src/structure/remove_self_loops_impl.cuh b/cpp/src/structure/remove_self_loops_impl.cuh index 161ffeae28e..dafe26cd1c5 100644 --- a/cpp/src/structure/remove_self_loops_impl.cuh +++ b/cpp/src/structure/remove_self_loops_impl.cuh @@ -44,44 +44,44 @@ remove_self_loops(raft::handle_t const& handle, std::optional>&& edgelist_edge_ids, std::optional>&& edgelist_edge_types) { - auto [self_loop_count, self_loops_to_delete] = + auto [keep_count, keep_flags] = detail::mark_entries(handle, edgelist_srcs.size(), [d_srcs = edgelist_srcs.data(), d_dsts = edgelist_dsts.data()] __device__( - size_t i) { return d_srcs[i] == d_dsts[i]; }); + size_t i) { return d_srcs[i] != d_dsts[i]; }); - if (self_loop_count > 0) { - edgelist_srcs = detail::remove_flagged_elements( + if (keep_count < edgelist_srcs.size()) { + edgelist_srcs = detail::keep_flagged_elements( handle, std::move(edgelist_srcs), - raft::device_span{self_loops_to_delete.data(), self_loops_to_delete.size()}, - self_loop_count); - edgelist_dsts = detail::remove_flagged_elements( + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); + edgelist_dsts = detail::keep_flagged_elements( handle, std::move(edgelist_dsts), - raft::device_span{self_loops_to_delete.data(), self_loops_to_delete.size()}, - self_loop_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); if (edgelist_weights) - edgelist_weights = detail::remove_flagged_elements( + edgelist_weights = detail::keep_flagged_elements( handle, std::move(*edgelist_weights), - raft::device_span{self_loops_to_delete.data(), self_loops_to_delete.size()}, - self_loop_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); if (edgelist_edge_ids) - edgelist_edge_ids = detail::remove_flagged_elements( + edgelist_edge_ids = detail::keep_flagged_elements( handle, std::move(*edgelist_edge_ids), - raft::device_span{self_loops_to_delete.data(), self_loops_to_delete.size()}, - self_loop_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); if (edgelist_edge_types) - edgelist_edge_types = detail::remove_flagged_elements( + edgelist_edge_types = detail::keep_flagged_elements( handle, std::move(*edgelist_edge_types), - raft::device_span{self_loops_to_delete.data(), self_loops_to_delete.size()}, - self_loop_count); + raft::device_span{keep_flags.data(), keep_flags.size()}, + keep_count); } return std::make_tuple(std::move(edgelist_srcs), diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 6530a25d178..d9c88bc179e 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -738,9 +738,16 @@ if (BUILD_CUGRAPH_MTMG_TESTS) # - MTMG tests ------------------------------------------------------------------------- ConfigureTest(MTMG_TEST mtmg/threaded_test.cu) target_link_libraries(MTMG_TEST - PRIVATE - UCP::UCP - ) + PRIVATE + UCP::UCP + ) + + ConfigureTest(MTMG_LOUVAIN_TEST mtmg/threaded_test_louvain.cu) + target_link_libraries(MTMG_LOUVAIN_TEST + PRIVATE + cugraphmgtestutil + UCP::UCP + ) if(BUILD_CUGRAPH_MG_TESTS) ############################################################################################### diff --git a/cpp/tests/community/triangle_count_test.cpp b/cpp/tests/community/triangle_count_test.cpp index 836bab59457..592924c3c47 100644 --- a/cpp/tests/community/triangle_count_test.cpp +++ b/cpp/tests/community/triangle_count_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -232,7 +232,7 @@ class Tests_TriangleCount for (size_t i = 0; i < h_cugraph_vertices.size(); ++i) { auto v = h_cugraph_vertices[i]; auto count = h_cugraph_triangle_counts[i]; - ASSERT_TRUE(count == h_reference_triangle_counts[v]) + ASSERT_EQ(count, h_reference_triangle_counts[v]) << "Triangle count values do not match with the reference values."; } } diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index e5a7de07781..17aed4fdecf 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -311,7 +311,8 @@ class Tests_Multithreaded auto d_my_pageranks = pageranks_view.gather( thread_handle, raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, - graph_view, + graph_view.get_vertex_partition_range_lasts(thread_handle), + graph_view.get_vertex_partition_view(thread_handle), renumber_map_view); std::vector my_pageranks(d_my_pageranks.size()); diff --git a/cpp/tests/mtmg/threaded_test.cu b/cpp/tests/mtmg/threaded_test.cu index bc4d8cfef6a..a5df0199cac 100644 --- a/cpp/tests/mtmg/threaded_test.cu +++ b/cpp/tests/mtmg/threaded_test.cu @@ -155,10 +155,25 @@ class Tests_Multithreaded input_usecase.template construct_edgelist( handle, multithreaded_usecase.test_weighted, false, false); + rmm::device_uvector d_unique_vertices(2 * d_src_v.size(), handle.get_stream()); + thrust::copy( + handle.get_thrust_policy(), d_src_v.begin(), d_src_v.end(), d_unique_vertices.begin()); + thrust::copy(handle.get_thrust_policy(), + d_dst_v.begin(), + d_dst_v.end(), + d_unique_vertices.begin() + d_src_v.size()); + thrust::sort(handle.get_thrust_policy(), d_unique_vertices.begin(), d_unique_vertices.end()); + + d_unique_vertices.resize(thrust::distance(d_unique_vertices.begin(), + thrust::unique(handle.get_thrust_policy(), + d_unique_vertices.begin(), + d_unique_vertices.end())), + handle.get_stream()); + auto h_src_v = cugraph::test::to_host(handle, d_src_v); auto h_dst_v = cugraph::test::to_host(handle, d_dst_v); auto h_weights_v = cugraph::test::to_host(handle, d_weights_v); - auto unique_vertices = cugraph::test::to_host(handle, d_vertices_v); + auto unique_vertices = cugraph::test::to_host(handle, d_unique_vertices); // Load edgelist from different threads. We'll use more threads than GPUs here for (int i = 0; i < num_threads; ++i) { @@ -293,13 +308,13 @@ class Tests_Multithreaded num_threads]() { auto thread_handle = instance_manager->get_handle(); - auto number_of_vertices = unique_vertices->size(); + auto number_of_vertices = unique_vertices.size(); std::vector my_vertex_list; my_vertex_list.reserve((number_of_vertices + num_threads - 1) / num_threads); for (size_t j = i; j < number_of_vertices; j += num_threads) { - my_vertex_list.push_back((*unique_vertices)[j]); + my_vertex_list.push_back(unique_vertices[j]); } rmm::device_uvector d_my_vertex_list(my_vertex_list.size(), @@ -312,7 +327,8 @@ class Tests_Multithreaded auto d_my_pageranks = pageranks_view.gather( thread_handle, raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, - graph_view, + graph_view.get_vertex_partition_range_lasts(thread_handle), + graph_view.get_vertex_partition_view(thread_handle), renumber_map_view); std::vector my_pageranks(d_my_pageranks.size()); diff --git a/cpp/tests/mtmg/threaded_test_louvain.cu b/cpp/tests/mtmg/threaded_test_louvain.cu new file mode 100644 index 00000000000..c1395037646 --- /dev/null +++ b/cpp/tests/mtmg/threaded_test_louvain.cu @@ -0,0 +1,511 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include + +#include + +#include +#include + +struct Multithreaded_Usecase { + bool test_weighted{false}; + bool check_correctness{true}; +}; + +template +class Tests_Multithreaded + : public ::testing::TestWithParam> { + public: + Tests_Multithreaded() {} + + static void SetUpTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + std::vector get_gpu_list() + { + int num_gpus_per_node{1}; + RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_per_node)); + + std::vector gpu_list(num_gpus_per_node); + std::iota(gpu_list.begin(), gpu_list.end(), 0); + + return gpu_list; + } + + template + void run_current_test( + std::tuple const& param, + std::vector gpu_list) + { + using edge_type_t = int32_t; + + constexpr bool renumber = true; + constexpr bool do_expensive_check = false; + + auto [multithreaded_usecase, input_usecase] = param; + + raft::handle_t handle{}; + + size_t max_level{1}; // Louvain is non-deterministic in MG if max_leve > 1 + weight_t threshold{1e-6}; + weight_t resolution{1}; + + size_t device_buffer_size{64 * 1024 * 1024}; + size_t thread_buffer_size{4 * 1024 * 1024}; + + int num_gpus = gpu_list.size(); + int num_threads = num_gpus * 4; + + cugraph::mtmg::resource_manager_t resource_manager; + + std::for_each(gpu_list.begin(), gpu_list.end(), [&resource_manager](int gpu_id) { + resource_manager.register_local_gpu(gpu_id, rmm::cuda_device_id{gpu_id}); + }); + + ncclUniqueId instance_manager_id; + ncclGetUniqueId(&instance_manager_id); + + auto instance_manager = resource_manager.create_instance_manager( + resource_manager.registered_ranks(), instance_manager_id); + + cugraph::mtmg::edgelist_t edgelist; + cugraph::mtmg::graph_t graph; + cugraph::mtmg::graph_view_t graph_view; + cugraph::mtmg::vertex_result_t louvain_clusters; + std::optional> renumber_map = + std::make_optional>(); + + auto edge_weights = multithreaded_usecase.test_weighted + ? std::make_optional, + weight_t>>() + : std::nullopt; + + // + // Simulate graph creation by spawning threads to walk through the + // local COO and add edges + // + std::vector running_threads; + + // Initialize shared edgelist object, one per GPU + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &edgelist, + device_buffer_size, + use_weight = true, + use_edge_id = false, + use_edge_type = false]() { + auto thread_handle = instance_manager->get_handle(); + + edgelist.set(thread_handle, device_buffer_size, use_weight, use_edge_id, use_edge_type); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + // Load SG edge list + auto [d_src_v, d_dst_v, d_weights_v, d_vertices_v, is_symmetric] = + input_usecase.template construct_edgelist( + handle, multithreaded_usecase.test_weighted, false, false); + + rmm::device_uvector d_unique_vertices(2 * d_src_v.size(), handle.get_stream()); + thrust::copy( + handle.get_thrust_policy(), d_src_v.begin(), d_src_v.end(), d_unique_vertices.begin()); + thrust::copy(handle.get_thrust_policy(), + d_dst_v.begin(), + d_dst_v.end(), + d_unique_vertices.begin() + d_src_v.size()); + thrust::sort(handle.get_thrust_policy(), d_unique_vertices.begin(), d_unique_vertices.end()); + + d_unique_vertices.resize(thrust::distance(d_unique_vertices.begin(), + thrust::unique(handle.get_thrust_policy(), + d_unique_vertices.begin(), + d_unique_vertices.end())), + handle.get_stream()); + + auto h_src_v = cugraph::test::to_host(handle, d_src_v); + auto h_dst_v = cugraph::test::to_host(handle, d_dst_v); + auto h_weights_v = cugraph::test::to_host(handle, d_weights_v); + auto unique_vertices = cugraph::test::to_host(handle, d_unique_vertices); + + // Load edgelist from different threads. We'll use more threads than GPUs here + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back([&instance_manager, + thread_buffer_size, + &edgelist, + &h_src_v, + &h_dst_v, + &h_weights_v, + i, + num_threads]() { + auto thread_handle = instance_manager->get_handle(); + cugraph::mtmg::per_thread_edgelist_t + per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); + + for (size_t j = i; j < h_src_v.size(); j += num_threads) { + per_thread_edgelist.append( + thread_handle, + h_src_v[j], + h_dst_v[j], + h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, + std::nullopt, + std::nullopt); + } + + per_thread_edgelist.flush(thread_handle); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph, + &edge_weights, + &edgelist, + &renumber_map, + is_symmetric = is_symmetric, + renumber, + do_expensive_check]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + std::optional, + edge_t>> + edge_ids{std::nullopt}; + std::optional, + int32_t>> + edge_types{std::nullopt}; + + edgelist.finalize_buffer(thread_handle); + edgelist.consolidate_and_shuffle(thread_handle, false); + + cugraph::mtmg:: + create_graph_from_edgelist( + thread_handle, + edgelist, + cugraph::graph_properties_t{is_symmetric, true}, + renumber, + graph, + edge_weights, + edge_ids, + edge_types, + renumber_map, + do_expensive_check); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + graph_view = graph.view(); + auto renumber_map_view = renumber_map ? std::make_optional(renumber_map->view()) : std::nullopt; + + weight_t modularity{0}; + + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back([&instance_manager, + &graph_view, + &edge_weights, + &louvain_clusters, + &modularity, + &renumber_map, + max_level, + threshold, + resolution]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + rmm::device_uvector local_louvain_clusters( + graph_view.get(thread_handle).local_vertex_partition_range_size(), + thread_handle.get_stream()); + + std::tie(std::ignore, modularity) = cugraph::louvain( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) : std::nullopt, + local_louvain_clusters.data(), + max_level, + threshold, + resolution); + + louvain_clusters.set(thread_handle, std::move(local_louvain_clusters)); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + std::vector, std::vector>> computed_clusters_v; + std::mutex computed_clusters_lock{}; + + auto louvain_clusters_view = louvain_clusters.view(); + std::vector h_renumber_map; + + // Load computed_clusters_v from different threads. + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph_view, + &renumber_map_view, + &louvain_clusters_view, + &computed_clusters_lock, + &computed_clusters_v, + &h_src_v, + &h_dst_v, + &h_weights_v, + &h_renumber_map, + &unique_vertices, + i, + num_threads]() { + auto thread_handle = instance_manager->get_handle(); + + auto number_of_vertices = unique_vertices.size(); + + std::vector my_vertex_list; + my_vertex_list.reserve((number_of_vertices + num_threads - 1) / num_threads); + + for (size_t j = i; j < number_of_vertices; j += num_threads) { + my_vertex_list.push_back(unique_vertices[j]); + } + + rmm::device_uvector d_my_vertex_list(my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + raft::update_device(d_my_vertex_list.data(), + my_vertex_list.data(), + my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + + auto d_my_clusters = louvain_clusters_view.gather( + thread_handle, + raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, + graph_view.get_vertex_partition_range_lasts(thread_handle), + graph_view.get_vertex_partition_view(thread_handle), + renumber_map_view); + + std::vector my_clusters(d_my_clusters.size()); + raft::update_host(my_clusters.data(), + d_my_clusters.data(), + d_my_clusters.size(), + thread_handle.raft_handle().get_stream()); + + { + std::lock_guard lock(computed_clusters_lock); + computed_clusters_v.push_back( + std::make_tuple(std::move(my_vertex_list), std::move(my_clusters))); + } + + h_renumber_map = cugraph::test::to_host( + thread_handle.raft_handle(), + cugraph::test::device_allgatherv(thread_handle.raft_handle(), + renumber_map_view->get(thread_handle))); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + if (multithreaded_usecase.check_correctness) { + // Want to compare the results in computed_clusters_v with SG results + cugraph::graph_t sg_graph(handle); + std::optional< + cugraph::edge_property_t, weight_t>> + sg_edge_weights{std::nullopt}; + + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back( + [&instance_manager, &graph_view, &edge_weights, &sg_graph, &sg_edge_weights]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_rank() == 0) { + std::tie(sg_graph, sg_edge_weights, std::ignore) = + cugraph::test::mg_graph_to_sg_graph( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) + : std::nullopt, + std::optional>{std::nullopt}, + false); // create an SG graph with MG graph vertex IDs + } else { + cugraph::test::mg_graph_to_sg_graph( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) + : std::nullopt, + std::optional>{std::nullopt}, + false); // create an SG graph with MG graph vertex IDs + } + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + rmm::device_uvector sg_clusters(sg_graph.number_of_vertices(), handle.get_stream()); + weight_t modularity; + + std::tie(std::ignore, modularity) = cugraph::louvain( + handle, + sg_graph.view(), + sg_edge_weights ? std::make_optional(sg_edge_weights->view()) : std::nullopt, + sg_clusters.data(), + max_level, + threshold, + resolution); + + auto h_sg_clusters = cugraph::test::to_host(handle, sg_clusters); + std::map h_cluster_map; + std::map h_cluster_reverse_map; + + std::for_each( + computed_clusters_v.begin(), + computed_clusters_v.end(), + [&h_sg_clusters, &h_cluster_map, &h_renumber_map, &h_cluster_reverse_map](auto t1) { + std::for_each( + thrust::make_zip_iterator(std::get<0>(t1).begin(), std::get<1>(t1).begin()), + thrust::make_zip_iterator(std::get<0>(t1).end(), std::get<1>(t1).end()), + [&h_sg_clusters, &h_cluster_map, &h_renumber_map, &h_cluster_reverse_map](auto t2) { + vertex_t v = thrust::get<0>(t2); + vertex_t c = thrust::get<1>(t2); + + auto pos = std::find(h_renumber_map.begin(), h_renumber_map.end(), v); + auto offset = std::distance(h_renumber_map.begin(), pos); + + auto cluster_pos = h_cluster_map.find(c); + if (cluster_pos == h_cluster_map.end()) { + auto reverse_pos = h_cluster_reverse_map.find(h_sg_clusters[offset]); + + ASSERT_TRUE(reverse_pos != h_cluster_map.end()) << "two different cluster mappings"; + + h_cluster_map.insert(std::make_pair(c, h_sg_clusters[offset])); + h_cluster_reverse_map.insert(std::make_pair(h_sg_clusters[offset], c)); + } else { + ASSERT_EQ(cluster_pos->second, h_sg_clusters[offset]) + << "vertex " << v << ", offset = " << offset + << ", SG cluster = " << h_sg_clusters[offset] << ", mtmg cluster = " << c + << ", mapped value = " << cluster_pos->second; + } + }); + }); + } + } +}; + +using Tests_Multithreaded_File = Tests_Multithreaded; +using Tests_Multithreaded_Rmat = Tests_Multithreaded; + +// FIXME: add tests for type combinations +TEST_P(Tests_Multithreaded_File, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_File_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); +} + +TEST_P(Tests_Multithreaded_Rmat, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_Rmat_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); +} + +INSTANTIATE_TEST_SUITE_P(file_test, + Tests_Multithreaded_File, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{true, true}), + ::testing::Values(cugraph::test::File_Usecase("karate.csv"), + cugraph::test::File_Usecase("dolphins.csv")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_Multithreaded_Rmat, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{true, true}), + //::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + ::testing::Values(cugraph::test::Rmat_Usecase(5, 8, 0.57, 0.19, 0.19, 0, false, false)))); + +INSTANTIATE_TEST_SUITE_P( + file_benchmark_test, /* note that the test filename can be overridden in benchmarking (with + --gtest_filter to select only the file_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one File_Usecase that differ only in filename + (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_File, + ::testing::Combine( + // disable correctness checks + ::testing::Values(Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with + --gtest_filter to select only the rmat_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one Rmat_Usecase that differ only in scale or edge + factor (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_Rmat, + ::testing::Combine( + // disable correctness checks for large graphs + ::testing::Values(Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + +CUGRAPH_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/prims/mg_count_if_e.cu b/cpp/tests/prims/mg_count_if_e.cu index 449aa728d87..03bf8ae0ae5 100644 --- a/cpp/tests/prims/mg_count_if_e.cu +++ b/cpp/tests/prims/mg_count_if_e.cu @@ -53,8 +53,9 @@ #include struct Prims_Usecase { - bool check_correctness{true}; bool test_weighted{false}; + bool edge_masking{false}; + bool check_correctness{true}; }; template @@ -102,6 +103,13 @@ class Tests_MGCountIfE auto mg_graph_view = mg_graph.view(); + std::optional> edge_mask{std::nullopt}; + if (prims_usecase.edge_masking) { + edge_mask = + cugraph::test::generate::edge_property(*handle_, mg_graph_view, 2); + mg_graph_view.attach_edge_mask((*edge_mask).view()); + } + // 2. run MG count_if_e const int hash_bin_count = 5; @@ -148,19 +156,19 @@ class Tests_MGCountIfE (*mg_renumber_map).size()), false); - auto sg_graph_view = sg_graph.view(); + if (handle_->get_comms().get_rank() == 0) { + auto sg_graph_view = sg_graph.view(); - auto sg_vertex_prop = cugraph::test::generate::vertex_property( - *handle_, - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_last()), - hash_bin_count); - auto sg_src_prop = cugraph::test::generate::src_property( - *handle_, sg_graph_view, sg_vertex_prop); - auto sg_dst_prop = cugraph::test::generate::dst_property( - *handle_, sg_graph_view, sg_vertex_prop); + auto sg_vertex_prop = cugraph::test::generate::vertex_property( + *handle_, + thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), + thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_last()), + hash_bin_count); + auto sg_src_prop = cugraph::test::generate::src_property( + *handle_, sg_graph_view, sg_vertex_prop); + auto sg_dst_prop = cugraph::test::generate::dst_property( + *handle_, sg_graph_view, sg_vertex_prop); - if (handle_->get_comms().get_rank() == 0) { auto expected_result = count_if_e( *handle_, sg_graph_view, @@ -312,7 +320,10 @@ INSTANTIATE_TEST_SUITE_P( file_test, Tests_MGCountIfE_File, ::testing::Combine( - ::testing::Values(Prims_Usecase{true}), + ::testing::Values(Prims_Usecase{false, false, true}, + Prims_Usecase{false, true, true}, + Prims_Usecase{true, false, true}, + Prims_Usecase{true, true, true}), ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), @@ -320,7 +331,10 @@ INSTANTIATE_TEST_SUITE_P( INSTANTIATE_TEST_SUITE_P(rmat_small_test, Tests_MGCountIfE_Rmat, - ::testing::Combine(::testing::Values(Prims_Usecase{true}), + ::testing::Combine(::testing::Values(Prims_Usecase{false, false, true}, + Prims_Usecase{false, true, true}, + Prims_Usecase{true, false, true}, + Prims_Usecase{true, true, true}), ::testing::Values(cugraph::test::Rmat_Usecase( 10, 16, 0.57, 0.19, 0.19, 0, false, false)))); @@ -332,7 +346,10 @@ INSTANTIATE_TEST_SUITE_P( factor (to avoid running same benchmarks more than once) */ Tests_MGCountIfE_Rmat, ::testing::Combine( - ::testing::Values(Prims_Usecase{false}), + ::testing::Values(Prims_Usecase{false, false, false}, + Prims_Usecase{false, true, false}, + Prims_Usecase{true, false, false}, + Prims_Usecase{true, true, false}), ::testing::Values(cugraph::test::Rmat_Usecase(20, 32, 0.57, 0.19, 0.19, 0, false, false)))); CUGRAPH_MG_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/prims/mg_per_v_pair_transform_dst_nbr_intersection.cu b/cpp/tests/prims/mg_per_v_pair_transform_dst_nbr_intersection.cu index a3edb1f6372..ac73c446d89 100644 --- a/cpp/tests/prims/mg_per_v_pair_transform_dst_nbr_intersection.cu +++ b/cpp/tests/prims/mg_per_v_pair_transform_dst_nbr_intersection.cu @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "property_generator.cuh" + #include #include #include @@ -116,29 +118,8 @@ class Tests_MGPerVPairTransformDstNbrIntersection std::optional> edge_mask{std::nullopt}; if (prims_usecase.edge_masking) { - cugraph::edge_src_property_t edge_src_renumber_map( - *handle_, mg_graph_view); - cugraph::edge_dst_property_t edge_dst_renumber_map( - *handle_, mg_graph_view); - cugraph::update_edge_src_property( - *handle_, mg_graph_view, (*mg_renumber_map).begin(), edge_src_renumber_map); - cugraph::update_edge_dst_property( - *handle_, mg_graph_view, (*mg_renumber_map).begin(), edge_dst_renumber_map); - - edge_mask = cugraph::edge_property_t(*handle_, mg_graph_view); - - cugraph::transform_e( - *handle_, - mg_graph_view, - edge_src_renumber_map.view(), - edge_dst_renumber_map.view(), - cugraph::edge_dummy_property_t{}.view(), - [] __device__(auto src, auto dst, auto src_property, auto dst_property, thrust::nullopt_t) { - return ((src_property % 2 == 0) && (dst_property % 2 == 0)) - ? false - : true; // mask out the edges with even unrenumbered src & dst vertex IDs - }, - (*edge_mask).mutable_view()); + edge_mask = + cugraph::test::generate::edge_property(*handle_, mg_graph_view, 2); mg_graph_view.attach_edge_mask((*edge_mask).view()); } @@ -257,42 +238,6 @@ class Tests_MGPerVPairTransformDstNbrIntersection if (handle_->get_comms().get_rank() == 0) { auto sg_graph_view = sg_graph.view(); - if (prims_usecase.edge_masking) { - rmm::device_uvector srcs(0, handle_->get_stream()); - rmm::device_uvector dsts(0, handle_->get_stream()); - std::tie(srcs, dsts, std::ignore, std::ignore) = - cugraph::decompress_to_edgelist( - *handle_, sg_graph_view, std::nullopt, std::nullopt, std::nullopt); - auto edge_first = thrust::make_zip_iterator(srcs.begin(), dsts.begin()); - srcs.resize(thrust::distance(edge_first, - thrust::remove_if(handle_->get_thrust_policy(), - edge_first, - edge_first + srcs.size(), - [] __device__(auto pair) { - return (thrust::get<0>(pair) % 2 == 0) && - (thrust::get<1>(pair) % 2 == 0); - })), - handle_->get_stream()); - dsts.resize(srcs.size(), handle_->get_stream()); - rmm::device_uvector vertices(sg_graph_view.number_of_vertices(), - handle_->get_stream()); - thrust::sequence( - handle_->get_thrust_policy(), vertices.begin(), vertices.end(), vertex_t{0}); - std::tie(sg_graph, std::ignore, std::ignore, std::ignore, std::ignore) = cugraph:: - create_graph_from_edgelist( - *handle_, - std::move(vertices), - std::move(srcs), - std::move(dsts), - std::nullopt, - std::nullopt, - std::nullopt, - cugraph::graph_properties_t{sg_graph_view.is_symmetric(), - sg_graph_view.is_multigraph()}, - false); - sg_graph_view = sg_graph.view(); - } - auto sg_result_buffer = cugraph::allocate_dataframe_buffer>( cugraph::size_dataframe_buffer(mg_aggregate_vertex_pair_buffer), handle_->get_stream()); auto sg_out_degrees = sg_graph_view.compute_out_degrees(*handle_); diff --git a/cpp/tests/prims/mg_per_v_random_select_transform_outgoing_e.cu b/cpp/tests/prims/mg_per_v_random_select_transform_outgoing_e.cu index eb6a8fd5cb6..2b9e9aafa3f 100644 --- a/cpp/tests/prims/mg_per_v_random_select_transform_outgoing_e.cu +++ b/cpp/tests/prims/mg_per_v_random_select_transform_outgoing_e.cu @@ -324,8 +324,9 @@ class Tests_MGPerVRandomSelectTransformOutgoingE with_replacement = prims_usecase.with_replacement, invalid_value = invalid_value ? thrust::make_optional(*invalid_value) : thrust::nullopt, - property_transform = cugraph::test::detail::property_transform{ - hash_bin_count}] __device__(size_t i) { + property_transform = + cugraph::test::detail::vertex_property_transform{ + hash_bin_count}] __device__(size_t i) { auto v = *(frontier_vertex_first + i); // check sample_offsets diff --git a/cpp/tests/prims/mg_transform_e.cu b/cpp/tests/prims/mg_transform_e.cu index 24deaad810a..e9be80f1f7d 100644 --- a/cpp/tests/prims/mg_transform_e.cu +++ b/cpp/tests/prims/mg_transform_e.cu @@ -52,6 +52,7 @@ struct Prims_Usecase { bool use_edgelist{false}; + bool edge_masking{false}; bool check_correctness{true}; }; @@ -100,6 +101,13 @@ class Tests_MGTransformE auto mg_graph_view = mg_graph.view(); + std::optional> edge_mask{std::nullopt}; + if (prims_usecase.edge_masking) { + edge_mask = + cugraph::test::generate::edge_property(*handle_, mg_graph_view, 2); + mg_graph_view.attach_edge_mask((*edge_mask).view()); + } + // 2. run MG transform_e const int hash_bin_count = 5; @@ -439,7 +447,10 @@ INSTANTIATE_TEST_SUITE_P( file_test, Tests_MGTransformE_File, ::testing::Combine( - ::testing::Values(Prims_Usecase{false, true}, Prims_Usecase{true, true}), + ::testing::Values(Prims_Usecase{false, false, true}, + Prims_Usecase{false, true, true}, + Prims_Usecase{true, false, true}, + Prims_Usecase{true, true, true}), ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), @@ -447,8 +458,10 @@ INSTANTIATE_TEST_SUITE_P( INSTANTIATE_TEST_SUITE_P(rmat_small_test, Tests_MGTransformE_Rmat, - ::testing::Combine(::testing::Values(Prims_Usecase{false, true}, - Prims_Usecase{true, true}), + ::testing::Combine(::testing::Values(Prims_Usecase{false, false, true}, + Prims_Usecase{false, true, true}, + Prims_Usecase{true, false, true}, + Prims_Usecase{true, true, true}), ::testing::Values(cugraph::test::Rmat_Usecase( 10, 16, 0.57, 0.19, 0.19, 0, false, false)))); @@ -460,7 +473,10 @@ INSTANTIATE_TEST_SUITE_P( factor (to avoid running same benchmarks more than once) */ Tests_MGTransformE_Rmat, ::testing::Combine( - ::testing::Values(Prims_Usecase{false, false}, Prims_Usecase{true, false}), + ::testing::Values(Prims_Usecase{false, false, false}, + Prims_Usecase{false, true, false}, + Prims_Usecase{true, false, false}, + Prims_Usecase{true, true, false}), ::testing::Values(cugraph::test::Rmat_Usecase(20, 32, 0.57, 0.19, 0.19, 0, false, false)))); CUGRAPH_MG_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/prims/mg_transform_reduce_e.cu b/cpp/tests/prims/mg_transform_reduce_e.cu index 79aa3da54df..c4ae11ab7c9 100644 --- a/cpp/tests/prims/mg_transform_reduce_e.cu +++ b/cpp/tests/prims/mg_transform_reduce_e.cu @@ -91,8 +91,9 @@ struct result_compare> { }; struct Prims_Usecase { - bool check_correctness{true}; bool test_weighted{false}; + bool edge_masking{false}; + bool check_correctness{true}; }; template @@ -141,6 +142,13 @@ class Tests_MGTransformReduceE auto mg_graph_view = mg_graph.view(); + std::optional> edge_mask{std::nullopt}; + if (prims_usecase.edge_masking) { + edge_mask = + cugraph::test::generate::edge_property(*handle_, mg_graph_view, 2); + mg_graph_view.attach_edge_mask((*edge_mask).view()); + } + // 2. run MG transform reduce const int hash_bin_count = 5; @@ -365,7 +373,10 @@ INSTANTIATE_TEST_SUITE_P( file_test, Tests_MGTransformReduceE_File, ::testing::Combine( - ::testing::Values(Prims_Usecase{true}), + ::testing::Values(Prims_Usecase{false, false, true}, + Prims_Usecase{false, true, true}, + Prims_Usecase{true, false, true}, + Prims_Usecase{true, true, true}), ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), @@ -373,7 +384,10 @@ INSTANTIATE_TEST_SUITE_P( INSTANTIATE_TEST_SUITE_P(rmat_small_test, Tests_MGTransformReduceE_Rmat, - ::testing::Combine(::testing::Values(Prims_Usecase{true}), + ::testing::Combine(::testing::Values(Prims_Usecase{false, false, true}, + Prims_Usecase{false, true, true}, + Prims_Usecase{true, false, true}, + Prims_Usecase{true, true, true}), ::testing::Values(cugraph::test::Rmat_Usecase( 10, 16, 0.57, 0.19, 0.19, 0, false, false)))); @@ -385,7 +399,10 @@ INSTANTIATE_TEST_SUITE_P( factor (to avoid running same benchmarks more than once) */ Tests_MGTransformReduceE_Rmat, ::testing::Combine( - ::testing::Values(Prims_Usecase{false}), + ::testing::Values(Prims_Usecase{false, false, false}, + Prims_Usecase{false, true, false}, + Prims_Usecase{true, false, false}, + Prims_Usecase{true, true, false}), ::testing::Values(cugraph::test::Rmat_Usecase(20, 32, 0.57, 0.19, 0.19, 0, false, false)))); CUGRAPH_MG_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/prims/property_generator.cuh b/cpp/tests/prims/property_generator.cuh index e7264cd276f..680455eda79 100644 --- a/cpp/tests/prims/property_generator.cuh +++ b/cpp/tests/prims/property_generator.cuh @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include @@ -61,7 +62,7 @@ __host__ __device__ auto make_property_value(T val) } template -struct property_transform { +struct vertex_property_transform { int32_t mod{}; constexpr __device__ property_t operator()(vertex_t v) const @@ -73,6 +74,20 @@ struct property_transform { } }; +template +struct edge_property_transform { + int32_t mod{}; + + constexpr __device__ property_t operator()( + vertex_t src, vertex_t dst, thrust::nullopt_t, thrust::nullopt_t, thrust::nullopt_t) const + { + static_assert(cugraph::is_thrust_tuple_of_arithmetic::value || + std::is_arithmetic_v); + cuco::detail::MurmurHash3_32 hash_func{}; + return make_property_value(hash_func(src + dst) % mod); + } +}; + } // namespace detail template @@ -96,7 +111,7 @@ struct generate { labels.begin(), labels.end(), cugraph::get_dataframe_buffer_begin(data), - detail::property_transform{hash_bin_count}); + detail::vertex_property_transform{hash_bin_count}); return data; } @@ -111,7 +126,7 @@ struct generate { begin, end, cugraph::get_dataframe_buffer_begin(data), - detail::property_transform{hash_bin_count}); + detail::vertex_property_transform{hash_bin_count}); return data; } @@ -138,6 +153,22 @@ struct generate { handle, graph_view, cugraph::get_dataframe_buffer_begin(property), output_property); return output_property; } + + template + static auto edge_property(raft::handle_t const& handle, + graph_view_type const& graph_view, + int32_t hash_bin_count) + { + auto output_property = cugraph::edge_property_t(handle, graph_view); + cugraph::transform_e(handle, + graph_view, + cugraph::edge_src_dummy_property_t{}.view(), + cugraph::edge_dst_dummy_property_t{}.view(), + cugraph::edge_dummy_property_t{}.view(), + detail::edge_property_transform{hash_bin_count}, + output_property.mutable_view()); + return output_property; + } }; } // namespace test diff --git a/cpp/tests/structure/mg_select_random_vertices_test.cpp b/cpp/tests/structure/mg_select_random_vertices_test.cpp index 79c50301922..e49e1ebcb99 100644 --- a/cpp/tests/structure/mg_select_random_vertices_test.cpp +++ b/cpp/tests/structure/mg_select_random_vertices_test.cpp @@ -90,7 +90,7 @@ class Tests_MGSelectRandomVertices std::iota( h_given_set.begin(), h_given_set.end(), mg_graph_view.local_vertex_partition_range_first()); std::shuffle(h_given_set.begin(), h_given_set.end(), std::mt19937{std::random_device{}()}); - h_given_set.resize(std::rand() % mg_graph_view.local_vertex_partition_range_size() + 1); + h_given_set.resize(std::rand() % (mg_graph_view.local_vertex_partition_range_size() + 1)); // Compute size of the distributed vertex set int num_of_elements_in_given_set = static_cast(h_given_set.size()); @@ -105,7 +105,7 @@ class Tests_MGSelectRandomVertices size_t select_count = num_of_elements_in_given_set > select_random_vertices_usecase.select_count ? select_random_vertices_usecase.select_count - : std::rand() % num_of_elements_in_given_set + 1; + : std::rand() % (num_of_elements_in_given_set + 1); for (int idx = 0; idx < with_replacement_flags.size(); idx++) { bool with_replacement = with_replacement_flags[idx]; diff --git a/cpp/tests/utilities/test_graphs.hpp b/cpp/tests/utilities/test_graphs.hpp index 16c9d3ed145..8cc87b26f1d 100644 --- a/cpp/tests/utilities/test_graphs.hpp +++ b/cpp/tests/utilities/test_graphs.hpp @@ -621,9 +621,25 @@ construct_graph(raft::handle_t const& handle, CUGRAPH_EXPECTS(d_src_v.size() <= static_cast(std::numeric_limits::max()), "Invalid template parameter: edge_t overflow."); - if (drop_self_loops) { remove_self_loops(handle, d_src_v, d_dst_v, d_weights_v); } + if (drop_self_loops) { + std::tie(d_src_v, d_dst_v, d_weights_v, std::ignore, std::ignore) = + cugraph::remove_self_loops(handle, + std::move(d_src_v), + std::move(d_dst_v), + std::move(d_weights_v), + std::nullopt, + std::nullopt); + } - if (drop_multi_edges) { sort_and_remove_multi_edges(handle, d_src_v, d_dst_v, d_weights_v); } + if (drop_multi_edges) { + std::tie(d_src_v, d_dst_v, d_weights_v, std::ignore, std::ignore) = + cugraph::remove_multi_edges(handle, + std::move(d_src_v), + std::move(d_dst_v), + std::move(d_weights_v), + std::nullopt, + std::nullopt); + } graph_t graph(handle); std::optional< diff --git a/cpp/tests/utilities/thrust_wrapper.cu b/cpp/tests/utilities/thrust_wrapper.cu index cb7e6f1bd66..2daf250b4a2 100644 --- a/cpp/tests/utilities/thrust_wrapper.cu +++ b/cpp/tests/utilities/thrust_wrapper.cu @@ -206,131 +206,5 @@ template void populate_vertex_ids(raft::handle_t const& handle, rmm::device_uvector& d_vertices_v, int64_t vertex_id_offset); -template -void remove_self_loops(raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */) -{ - if (d_weight_v) { - auto edge_first = thrust::make_zip_iterator( - thrust::make_tuple(d_src_v.begin(), d_dst_v.begin(), (*d_weight_v).begin())); - d_src_v.resize( - thrust::distance(edge_first, - thrust::remove_if( - handle.get_thrust_policy(), - edge_first, - edge_first + d_src_v.size(), - [] __device__(auto e) { return thrust::get<0>(e) == thrust::get<1>(e); })), - handle.get_stream()); - d_dst_v.resize(d_src_v.size(), handle.get_stream()); - (*d_weight_v).resize(d_src_v.size(), handle.get_stream()); - } else { - auto edge_first = - thrust::make_zip_iterator(thrust::make_tuple(d_src_v.begin(), d_dst_v.begin())); - d_src_v.resize( - thrust::distance(edge_first, - thrust::remove_if( - handle.get_thrust_policy(), - edge_first, - edge_first + d_src_v.size(), - [] __device__(auto e) { return thrust::get<0>(e) == thrust::get<1>(e); })), - handle.get_stream()); - d_dst_v.resize(d_src_v.size(), handle.get_stream()); - } - - d_src_v.shrink_to_fit(handle.get_stream()); - d_dst_v.shrink_to_fit(handle.get_stream()); - if (d_weight_v) { (*d_weight_v).shrink_to_fit(handle.get_stream()); } -} - -template void remove_self_loops( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template void remove_self_loops( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template void remove_self_loops( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template void remove_self_loops( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template -void sort_and_remove_multi_edges( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */) -{ - if (d_weight_v) { - auto edge_first = thrust::make_zip_iterator( - thrust::make_tuple(d_src_v.begin(), d_dst_v.begin(), (*d_weight_v).begin())); - thrust::sort(handle.get_thrust_policy(), edge_first, edge_first + d_src_v.size()); - d_src_v.resize( - thrust::distance(edge_first, - thrust::unique(handle.get_thrust_policy(), - edge_first, - edge_first + d_src_v.size(), - [] __device__(auto lhs, auto rhs) { - return (thrust::get<0>(lhs) == thrust::get<0>(rhs)) && - (thrust::get<1>(lhs) == thrust::get<1>(rhs)); - })), - handle.get_stream()); - d_dst_v.resize(d_src_v.size(), handle.get_stream()); - (*d_weight_v).resize(d_src_v.size(), handle.get_stream()); - } else { - auto edge_first = - thrust::make_zip_iterator(thrust::make_tuple(d_src_v.begin(), d_dst_v.begin())); - thrust::sort(handle.get_thrust_policy(), edge_first, edge_first + d_src_v.size()); - d_src_v.resize( - thrust::distance( - edge_first, - thrust::unique(handle.get_thrust_policy(), edge_first, edge_first + d_src_v.size())), - handle.get_stream()); - d_dst_v.resize(d_src_v.size(), handle.get_stream()); - } - - d_src_v.shrink_to_fit(handle.get_stream()); - d_dst_v.shrink_to_fit(handle.get_stream()); - if (d_weight_v) { (*d_weight_v).shrink_to_fit(handle.get_stream()); } -} - -template void sort_and_remove_multi_edges( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template void sort_and_remove_multi_edges( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template void sort_and_remove_multi_edges( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template void sort_and_remove_multi_edges( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - } // namespace test } // namespace cugraph diff --git a/cpp/tests/utilities/thrust_wrapper.hpp b/cpp/tests/utilities/thrust_wrapper.hpp index eead4dc268f..fb82d781198 100644 --- a/cpp/tests/utilities/thrust_wrapper.hpp +++ b/cpp/tests/utilities/thrust_wrapper.hpp @@ -46,18 +46,5 @@ void populate_vertex_ids(raft::handle_t const& handle, rmm::device_uvector& d_vertices_v /* [INOUT] */, vertex_t vertex_id_offset); -template -void remove_self_loops(raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - -template -void sort_and_remove_multi_edges( - raft::handle_t const& handle, - rmm::device_uvector& d_src_v /* [INOUT] */, - rmm::device_uvector& d_dst_v /* [INOUT] */, - std::optional>& d_weight_v /* [INOUT] */); - } // namespace test } // namespace cugraph diff --git a/python/cugraph/cugraph/dask/common/mg_utils.py b/python/cugraph/cugraph/dask/common/mg_utils.py index 6acda48c9da..b04f293dc0e 100644 --- a/python/cugraph/cugraph/dask/common/mg_utils.py +++ b/python/cugraph/cugraph/dask/common/mg_utils.py @@ -12,7 +12,7 @@ # limitations under the License. import os - +import gc import numba.cuda @@ -68,3 +68,8 @@ def get_visible_devices(): else: visible_devices = _visible_devices.strip().split(",") return visible_devices + + +def run_gc_on_dask_cluster(client): + gc.collect() + client.run(gc.collect) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index f666900b226..319435575cc 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -37,8 +37,8 @@ from cugraph.dask.common.part_utils import ( get_persisted_df_worker_map, persist_dask_df_equal_parts_per_worker, - _chunk_lst, ) +from cugraph.dask.common.mg_utils import run_gc_on_dask_cluster from cugraph.dask import get_n_workers import cugraph.dask.comms.comms as Comms @@ -171,7 +171,6 @@ def __from_edgelist( store_transposed=False, legacy_renum_only=False, ): - if not isinstance(input_ddf, dask_cudf.DataFrame): raise TypeError("input should be a dask_cudf dataFrame") @@ -275,7 +274,6 @@ def __from_edgelist( ) value_col = None else: - source_col, dest_col, value_col = symmetrize( input_ddf, source, @@ -350,9 +348,11 @@ def __from_edgelist( is_symmetric=not self.properties.directed, ) ddf = ddf.repartition(npartitions=len(workers) * 2) - ddf_keys = ddf.to_delayed() workers = _client.scheduler_info()["workers"].keys() - ddf_keys_ls = _chunk_lst(ddf_keys, len(workers)) + persisted_keys_d = persist_dask_df_equal_parts_per_worker( + ddf, _client, return_type="dict" + ) + del ddf delayed_tasks_d = { w: delayed(simpleDistributedGraphImpl._make_plc_graph)( @@ -367,19 +367,19 @@ def __from_edgelist( self.edge_id_type, self.edge_type_id_type, ) - for w, edata in zip(workers, ddf_keys_ls) + for w, edata in persisted_keys_d.items() } + del persisted_keys_d self._plc_graph = { w: _client.compute( delayed_task, workers=w, allow_other_workers=False, pure=False ) for w, delayed_task in delayed_tasks_d.items() } - wait(list(self._plc_graph.values())) - del ddf_keys del delayed_tasks_d - gc.collect() - _client.run(gc.collect) + run_gc_on_dask_cluster(_client) + wait(list(self._plc_graph.values())) + run_gc_on_dask_cluster(_client) @property def renumbered(self): @@ -945,7 +945,6 @@ def convert_to_cudf(cp_arrays: cp.ndarray) -> cudf.Series: def _call_plc_select_random_vertices( mg_graph_x, sID: bytes, random_state: int, num_vertices: int ) -> cudf.Series: - cp_arrays = pylibcugraph_select_random_vertices( graph=mg_graph_x, resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), @@ -961,7 +960,6 @@ def _mg_call_plc_select_random_vertices( random_state: int, num_vertices: int, ) -> dask_cudf.Series: - result = [ client.submit( _call_plc_select_random_vertices,