From f81bbe6a2914456cff1273b7fd123994bf1f044c Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Tue, 14 May 2024 13:11:49 +0200 Subject: [PATCH] FEAT-#5394: Reduce amount of remote calls for TreeReduce and GroupByReduce operators (#7245) Signed-off-by: Kirill Suvorov --- .../dataframe/pandas/dataframe/dataframe.py | 48 ++----------- .../pandas/partitioning/partition_manager.py | 68 ++++++++++++++++++- .../storage_formats/pandas/test_internals.py | 3 +- 3 files changed, 76 insertions(+), 43 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index c7384b21ea4..b9e1c18a34a 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -32,7 +32,7 @@ from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype from pandas.core.indexes.api import Index, RangeIndex -from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions +from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index from modin.core.dataframe.pandas.dataframe.utils import ( @@ -2212,46 +2212,12 @@ def map( PandasDataframe A new dataframe. """ - if self.num_parts <= 1.5 * CpuCount.get(): - # block-wise map - map_fn = ( - self._partition_mgr_cls.lazy_map_partitions - if lazy - else self._partition_mgr_cls.map_partitions - ) - new_partitions = map_fn(self._partitions, func, func_args, func_kwargs) - else: - # axis-wise map - # we choose an axis for a combination of partitions - # whose size is closer to the number of CPUs - if abs(self._partitions.shape[0] - CpuCount.get()) < abs( - self._partitions.shape[1] - CpuCount.get() - ): - axis = 1 - else: - axis = 0 - - column_splits = CpuCount.get() // self._partitions.shape[1] - - if axis == 0 and column_splits > 1: - # splitting by parts of columnar partitions - new_partitions = ( - self._partition_mgr_cls.map_partitions_joined_by_column( - self._partitions, column_splits, func, func_args, func_kwargs - ) - ) - else: - # splitting by full axis partitions - new_partitions = self._partition_mgr_cls.map_axis_partitions( - axis, - self._partitions, - lambda df: func( - df, - *(func_args if func_args is not None else ()), - **(func_kwargs if func_kwargs is not None else {}), - ), - keep_partitioning=True, - ) + map_fn = ( + self._partition_mgr_cls.lazy_map_partitions + if lazy + else self._partition_mgr_cls.map_partitions + ) + new_partitions = map_fn(self._partitions, func, func_args, func_kwargs) if new_columns is not None and self.has_materialized_columns: assert len(new_columns) == len( diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index f3a761c4333..e813560e244 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -29,6 +29,7 @@ from modin.config import ( BenchmarkMode, + CpuCount, Engine, MinPartitionSize, NPartitions, @@ -603,7 +604,7 @@ def broadcast_axis_partitions( @classmethod @wait_computations_if_benchmark_mode - def map_partitions( + def base_map_partitions( cls, partitions, map_func, @@ -644,6 +645,71 @@ def map_partitions( ] ) + @classmethod + @wait_computations_if_benchmark_mode + def map_partitions( + cls, + partitions, + map_func, + func_args=None, + func_kwargs=None, + ): + """ + Apply `map_func` to `partitions` using different approaches to achieve the best performance. + + Parameters + ---------- + partitions : NumPy 2D array + Partitions housing the data of Modin Frame. + map_func : callable + Function to apply. + func_args : iterable, optional + Positional arguments for the 'map_func'. + func_kwargs : dict, optional + Keyword arguments for the 'map_func'. + + Returns + ------- + NumPy array + An array of partitions + """ + if np.prod(partitions.shape) <= 1.5 * CpuCount.get(): + # block-wise map + new_partitions = cls.base_map_partitions( + partitions, map_func, func_args, func_kwargs + ) + else: + # axis-wise map + # we choose an axis for a combination of partitions + # whose size is closer to the number of CPUs + if abs(partitions.shape[0] - CpuCount.get()) < abs( + partitions.shape[1] - CpuCount.get() + ): + axis = 1 + else: + axis = 0 + + column_splits = CpuCount.get() // partitions.shape[1] + + if axis == 0 and column_splits > 1: + # splitting by parts of columnar partitions + new_partitions = cls.map_partitions_joined_by_column( + partitions, column_splits, map_func, func_args, func_kwargs + ) + else: + # splitting by full axis partitions + new_partitions = cls.map_axis_partitions( + axis, + partitions, + lambda df: map_func( + df, + *(func_args if func_args is not None else ()), + **(func_kwargs if func_kwargs is not None else {}), + ), + keep_partitioning=True, + ) + return new_partitions + @classmethod @wait_computations_if_benchmark_mode def lazy_map_partitions( diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index d772da2abda..1e0e13ef002 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -2657,6 +2657,7 @@ def test_map_approaches(partitioning_scheme, expected_map_approach): df = pandas.DataFrame(data) modin_df = construct_modin_df_by_scheme(df, partitioning_scheme(df)) + partitions = modin_df._query_compiler._modin_frame._partitions partition_mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls with mock.patch.object( @@ -2664,7 +2665,7 @@ def test_map_approaches(partitioning_scheme, expected_map_approach): expected_map_approach, wraps=getattr(partition_mgr_cls, expected_map_approach), ) as expected_method: - try_cast_to_pandas(modin_df.map(lambda x: x * 2)) + partition_mgr_cls.map_partitions(partitions, lambda x: x * 2) expected_method.assert_called()