From 1c0d9a6ec51c37f67318dc15599c23c3cb28812d Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 14 May 2024 01:39:22 +0200 Subject: [PATCH] FEAT-#7254: Support right merge/join (#7226) Signed-off-by: Anatoly Myachev Co-authored-by: Iaroslav Igoshev --- .../dataframe/pandas/dataframe/dataframe.py | 5 +- .../pandas/partitioning/partition_manager.py | 12 +++- .../storage_formats/base/query_compiler.py | 6 +- modin/core/storage_formats/pandas/merge.py | 49 +++++++++++--- .../storage_formats/pandas/query_compiler.py | 35 ++++++---- .../hdk_on_native/dataframe/dataframe.py | 11 +++- .../implementations/hdk_on_native/expr.py | 6 +- .../tests/pandas/dataframe/test_join_sort.py | 65 +++++++++++++------ 8 files changed, 138 insertions(+), 51 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index a5327f3b484..c7384b21ea4 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3321,8 +3321,11 @@ def _extract_partitions(self): if self._partitions.size > 0: return self._partitions else: + dtypes = None + if self.has_materialized_dtypes: + dtypes = self.dtypes return self._partition_mgr_cls.create_partition_from_metadata( - index=self.index, columns=self.columns + index=self.index, columns=self.columns, dtypes=dtypes ) @lazy_metadata_decorator(apply_axis="both") diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index d7fa8640066..f3a761c4333 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -21,7 +21,7 @@ import warnings from abc import ABC from functools import wraps -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import numpy as np import pandas @@ -183,12 +183,18 @@ def preprocess_func(cls, map_func): # END Abstract Methods @classmethod - def create_partition_from_metadata(cls, **metadata): + def create_partition_from_metadata( + cls, dtypes: Optional[pandas.Series] = None, **metadata + ): """ Create NumPy array of partitions that holds an empty dataframe with given metadata. Parameters ---------- + dtypes : pandas.Series, optional + Column dtypes. + Upon creating a pandas DataFrame from `metadata` we call `astype` since + pandas doesn't allow to pass a list of dtypes directly in the constructor. **metadata : dict Metadata that has to be wrapped in a partition. @@ -198,6 +204,8 @@ def create_partition_from_metadata(cls, **metadata): A NumPy 2D array of a single partition which contains the data. """ metadata_dataframe = pandas.DataFrame(**metadata) + if dtypes is not None: + metadata_dataframe = metadata_dataframe.astype(dtypes) return np.array([[cls._partition_class.put(metadata_dataframe)]]) @classmethod diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 06b2ed17aea..b25bc9b1ea2 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -54,6 +54,8 @@ from . import doc_utils if TYPE_CHECKING: + from typing_extensions import Self + # TODO: should be ModinDataframe # https://github.com/modin-project/modin/issues/7244 from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe @@ -158,7 +160,7 @@ def __wrap_in_qc(self, obj): else: return obj - def default_to_pandas(self, pandas_op, *args, **kwargs): + def default_to_pandas(self, pandas_op, *args, **kwargs) -> Self: """ Do fallback to pandas for the passed function. @@ -4467,7 +4469,7 @@ def write_items(df, broadcasted_items): # END Abstract methods for QueryCompiler @cached_property - def __constructor__(self) -> type[BaseQueryCompiler]: + def __constructor__(self) -> type[Self]: """ Get query compiler constructor. diff --git a/modin/core/storage_formats/pandas/merge.py b/modin/core/storage_formats/pandas/merge.py index 956841ac228..37a9c325bd0 100644 --- a/modin/core/storage_formats/pandas/merge.py +++ b/modin/core/storage_formats/pandas/merge.py @@ -15,7 +15,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import pandas from pandas.core.dtypes.common import is_list_like @@ -103,7 +103,7 @@ def func(left, right): @classmethod def row_axis_merge( cls, left: PandasQueryCompiler, right: PandasQueryCompiler, kwargs: dict - ): + ) -> PandasQueryCompiler: """ Execute merge using row-axis implementation. @@ -126,10 +126,25 @@ def row_axis_merge( right_index = kwargs.get("right_index", False) sort = kwargs.get("sort", False) - if how in ["left", "inner"] and left_index is False and right_index is False: + if ( + ( + how in ["left", "inner"] + or (how == "right" and right._modin_frame._partitions.size != 0) + ) + and left_index is False + and right_index is False + ): kwargs["sort"] = False - def should_keep_index(left, right): + reverted = False + if how == "right": + left, right = right, left + reverted = True + + def should_keep_index( + left: PandasQueryCompiler, + right: PandasQueryCompiler, + ) -> bool: keep_index = False if left_on is not None and right_on is not None: keep_index = any( @@ -144,8 +159,14 @@ def should_keep_index(left, right): ) return keep_index - def map_func(left, right): # pragma: no cover - return pandas.merge(left, right, **kwargs) + def map_func( + left, right, kwargs=kwargs + ) -> pandas.DataFrame: # pragma: no cover + if reverted: + df = pandas.merge(right, left, **kwargs) + else: + df = pandas.merge(left, right, **kwargs) + return df # Want to ensure that these are python lists if left_on is not None and right_on is not None: @@ -156,7 +177,11 @@ def map_func(left, right): # pragma: no cover right_to_broadcast = right._modin_frame.combine() new_columns, new_dtypes = cls._compute_result_metadata( - left, right, on, left_on, right_on, kwargs.get("suffixes", ("_x", "_y")) + *((left, right) if not reverted else (right, left)), + on, + left_on, + right_on, + kwargs.get("suffixes", ("_x", "_y")), ) # We rebalance when the ratio of the number of existing partitions to @@ -226,7 +251,15 @@ def map_func(left, right): # pragma: no cover return left.default_to_pandas(pandas.DataFrame.merge, right, **kwargs) @classmethod - def _compute_result_metadata(cls, left, right, on, left_on, right_on, suffixes): + def _compute_result_metadata( + cls, + left: PandasQueryCompiler, + right: PandasQueryCompiler, + on, + left_on, + right_on, + suffixes, + ) -> tuple[Optional[pandas.Index], Optional[ModinDtypes]]: """ Compute columns and dtypes metadata for the result of merge if possible. diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 234cc58133b..6000fd1a1aa 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -526,33 +526,46 @@ def merge(self, right, **kwargs): get_logger().info(message) return MergeImpl.row_axis_merge(self, right, kwargs) - def join(self, right, **kwargs): + def join(self, right: PandasQueryCompiler, **kwargs) -> PandasQueryCompiler: on = kwargs.get("on", None) how = kwargs.get("how", "left") sort = kwargs.get("sort", False) + left = self - if how in ["left", "inner"]: - - def map_func(left, right, kwargs=kwargs): # pragma: no cover - return pandas.DataFrame.join(left, right, **kwargs) + if how in ["left", "inner"] or ( + how == "right" and right._modin_frame._partitions.size != 0 + ): + reverted = False + if how == "right": + left, right = right, left + reverted = True + + def map_func( + left, right, kwargs=kwargs + ) -> pandas.DataFrame: # pragma: no cover + if reverted: + df = pandas.DataFrame.join(right, left, **kwargs) + else: + df = pandas.DataFrame.join(left, right, **kwargs) + return df right_to_broadcast = right._modin_frame.combine() - new_self = self.__constructor__( - self._modin_frame.broadcast_apply_full_axis( + left = left.__constructor__( + left._modin_frame.broadcast_apply_full_axis( axis=1, func=map_func, # We're going to explicitly change the shape across the 1-axis, # so we want for partitioning to adapt as well keep_partitioning=False, num_splits=merge_partitioning( - self._modin_frame, right._modin_frame, axis=1 + left._modin_frame, right._modin_frame, axis=1 ), other=right_to_broadcast, ) ) - return new_self.sort_rows_by_column_values(on) if sort else new_self + return left.sort_rows_by_column_values(on) if sort else left else: - return self.default_to_pandas(pandas.DataFrame.join, right, **kwargs) + return left.default_to_pandas(pandas.DataFrame.join, right, **kwargs) # END Inter-Data operations @@ -588,7 +601,7 @@ def reindex(self, axis, labels, **kwargs): ) return self.__constructor__(new_modin_frame) - def reset_index(self, **kwargs): + def reset_index(self, **kwargs) -> PandasQueryCompiler: if self.lazy_execution: def _reset(df, *axis_lengths, partition_idx): # pragma: no cover diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py index c5b8379ffba..1c8d3a7134b 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py @@ -1454,9 +1454,14 @@ def _join_by_index(self, other_modin_frames, how, sort, ignore_index): condition=condition, ) - new_columns = Index.__new__( - Index, data=new_columns, dtype=new_columns_dtype - ) + # in the case of heterogeneous data, using the `dtype` parameter of the + # `Index` constructor can lead to the following error: + # `ValueError: string values cannot be losslessly cast to int64` + # that's why we explicitly call astype below + new_columns = Index(new_columns) + if new_columns.dtype != new_columns_dtype and new_columns_dtype is not None: + # ValueError: string values cannot be losslessly cast to int64 + new_columns = new_columns.astype(new_columns_dtype) lhs = lhs.__constructor__( dtypes=lhs._dtypes_for_exprs(exprs), columns=new_columns, diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/expr.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/expr.py index 0fe29e9c29f..484d4b42e1e 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/expr.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/expr.py @@ -1347,13 +1347,13 @@ def build_row_idx_filter_expr(row_idx, row_col): return row_col.eq(row_idx) if is_range_like(row_idx): - start = row_idx[0] - stop = row_idx[-1] + start = row_idx.start + stop = row_idx.stop step = row_idx.step if step < 0: start, stop = stop, start step = -step - exprs = [row_col.ge(start), row_col.le(stop)] + exprs = [row_col.ge(start), row_col.cmp("<", stop)] if step > 1: mod = OpExpr("MOD", [row_col, LiteralExpr(step)], _get_dtype(int)) exprs.append(mod.eq(0)) diff --git a/modin/tests/pandas/dataframe/test_join_sort.py b/modin/tests/pandas/dataframe/test_join_sort.py index 1ac478a5b3d..fc294a6b33e 100644 --- a/modin/tests/pandas/dataframe/test_join_sort.py +++ b/modin/tests/pandas/dataframe/test_join_sort.py @@ -80,20 +80,20 @@ def test_combine(data): "test_data, test_data2", [ ( - np.random.uniform(0, 100, size=(2**6, 2**6)), - np.random.uniform(0, 100, size=(2**7, 2**6)), + np.random.randint(0, 100, size=(64, 64)), + np.random.randint(0, 100, size=(128, 64)), ), ( - np.random.uniform(0, 100, size=(2**7, 2**6)), - np.random.uniform(0, 100, size=(2**6, 2**6)), + np.random.randint(0, 100, size=(128, 64)), + np.random.randint(0, 100, size=(64, 64)), ), ( - np.random.uniform(0, 100, size=(2**6, 2**6)), - np.random.uniform(0, 100, size=(2**6, 2**7)), + np.random.randint(0, 100, size=(64, 64)), + np.random.randint(0, 100, size=(64, 128)), ), ( - np.random.uniform(0, 100, size=(2**6, 2**7)), - np.random.uniform(0, 100, size=(2**6, 2**6)), + np.random.randint(0, 100, size=(64, 128)), + np.random.randint(0, 100, size=(64, 64)), ), ], ) @@ -122,8 +122,9 @@ def test_join(test_data, test_data2): hows = ["inner", "left", "right", "outer"] ons = ["col33", "col34"] sorts = [False, True] - for i in range(4): - for j in range(2): + assert len(ons) == len(sorts), "the loop below is designed for this condition" + for i in range(len(hows)): + for j in range(len(ons)): modin_result = modin_df.join( modin_df2, how=hows[i], @@ -140,7 +141,13 @@ def test_join(test_data, test_data2): lsuffix="_caller", rsuffix="_other", ) - df_equals(modin_result, pandas_result) + if sorts[j]: + # sorting in `join` is implemented through range partitioning technique + # therefore the order of the rows after it does not match the pandas, + # so additional sorting is needed in order to get the same result as for pandas + df_equals_and_sort(modin_result, pandas_result) + else: + df_equals(modin_result, pandas_result) frame_data = { "col1": [0, 1, 2, 3], @@ -174,6 +181,15 @@ def test_join(test_data, test_data2): df_equals(modin_join, pandas_join) +@pytest.mark.parametrize("how", ["left", "inner", "right"]) +def test_join_empty(how): + data = np.random.randint(0, 100, size=(64, 64)) + eval_general( + *create_test_dfs(data), + lambda df: df.join(df.iloc[:0], on=1, how=how, lsuffix="_caller"), + ) + + def test_join_cross_6786(): data = [[7, 8, 9], [10, 11, 12]] modin_df, pandas_df = create_test_dfs(data, columns=["x", "y", "z"]) @@ -269,19 +285,25 @@ def test_merge(test_data, test_data2): index=pandas.Index([i for i in range(1, test_data2.shape[0] + 1)], name="key"), ) - hows = ["left", "inner"] + hows = ["left", "inner", "right"] ons = ["col33", ["col33", "col34"]] sorts = [False, True] - for i in range(2): - for j in range(2): + assert len(ons) == len(sorts), "the loop below is designed for this condition" + for i in range(len(hows)): + for j in range(len(ons)): modin_result = modin_df.merge( modin_df2, how=hows[i], on=ons[j], sort=sorts[j] ) pandas_result = pandas_df.merge( pandas_df2, how=hows[i], on=ons[j], sort=sorts[j] ) + # sorting in `merge` is implemented through range partitioning technique + # therefore the order of the rows after it does not match the pandas, + # so additional sorting is needed in order to get the same result as for pandas sort_if_range_partitioning( - modin_result, pandas_result, force=StorageFormat.get() == "Hdk" + modin_result, + pandas_result, + force=StorageFormat.get() == "Hdk" or sorts[j], ) modin_result = modin_df.merge( @@ -299,7 +321,9 @@ def test_merge(test_data, test_data2): sort=sorts[j], ) sort_if_range_partitioning( - modin_result, pandas_result, force=StorageFormat.get() == "Hdk" + modin_result, + pandas_result, + force=StorageFormat.get() == "Hdk" or sorts[j], ) # Test for issue #1771 @@ -418,11 +442,10 @@ def test_merge(test_data, test_data2): modin_df.merge("Non-valid type") -def test_merge_empty(): - data = np.random.uniform(0, 100, size=(2**6, 2**6)) - pandas_df = pandas.DataFrame(data) - modin_df = pd.DataFrame(data) - eval_general(modin_df, pandas_df, lambda df: df.merge(df.iloc[:0])) +@pytest.mark.parametrize("how", ["left", "inner", "right"]) +def test_merge_empty(how): + data = np.random.randint(0, 100, size=(64, 64)) + eval_general(*create_test_dfs(data), lambda df: df.merge(df.iloc[:0], how=how)) def test_merge_with_mi_columns():