diff --git a/modin/core/io/text/text_file_dispatcher.py b/modin/core/io/text/text_file_dispatcher.py index 6f7e68393b9..130fd484ca1 100644 --- a/modin/core/io/text/text_file_dispatcher.py +++ b/modin/core/io/text/text_file_dispatcher.py @@ -216,7 +216,7 @@ def partitioned_file( newline: bytes = None, header_size: int = 0, pre_reading: int = 0, - read_callback_kw: dict = None, + get_metadata_kw: dict = None, ): """ Compute chunk sizes in bytes for every partition. @@ -244,7 +244,7 @@ def partitioned_file( Number of rows, that occupied by header. pre_reading : int, default: 0 Number of rows between header and skipped rows, that should be read. - read_callback_kw : dict, optional + get_metadata_kw : dict, optional Keyword arguments for `cls.read_callback` to compute metadata if needed. This option is not compatible with `pre_reading!=0`. @@ -255,11 +255,11 @@ def partitioned_file( int : partition start read byte int : partition end read byte pandas.DataFrame or None - Dataframe from which metadata can be retrieved. Can be None if `read_callback_kw=None`. + Dataframe from which metadata can be retrieved. Can be None if `get_metadata_kw=None`. """ - if read_callback_kw is not None and pre_reading != 0: + if get_metadata_kw is not None and pre_reading != 0: raise ValueError( - f"Incompatible combination of parameters: {read_callback_kw=}, {pre_reading=}" + f"Incompatible combination of parameters: {get_metadata_kw=}, {pre_reading=}" ) read_rows_counter = 0 outside_quotes = True @@ -297,11 +297,11 @@ def partitioned_file( rows_skipper(skiprows) else: rows_skipper(skiprows) - if read_callback_kw: + if get_metadata_kw: start = f.tell() # For correct behavior, if we want to avoid double skipping rows, # we need to get metadata after skipping. - pd_df_metadata = cls.read_callback(f, **read_callback_kw) + pd_df_metadata = cls.read_callback(f, **get_metadata_kw) f.seek(start) rows_skipper(header_size) @@ -1063,28 +1063,32 @@ def _read(cls, filepath_or_buffer, **kwargs): and (usecols is None or skiprows is None) and pre_reading == 0 ) - read_callback_kw = dict(kwargs, nrows=1, skipfooter=0, index_col=index_col) + get_metadata_kw = dict(kwargs, nrows=1, skipfooter=0, index_col=index_col) + if get_metadata_kw.get("engine", None) == "pyarrow": + # pyarrow engine doesn't support `nrows` option; + # https://github.com/pandas-dev/pandas/issues/38872 can be used to track pyarrow engine features + get_metadata_kw["engine"] = "c" if not can_compute_metadata_while_skipping_rows: pd_df_metadata = cls.read_callback( filepath_or_buffer_md, - **read_callback_kw, + **get_metadata_kw, ) column_names = pd_df_metadata.columns column_widths, num_splits = cls._define_metadata( pd_df_metadata, column_names ) - read_callback_kw = None + get_metadata_kw = None else: - read_callback_kw = dict(read_callback_kw, skiprows=None) + get_metadata_kw = dict(get_metadata_kw, skiprows=None) # `memory_map` doesn't work with file-like object so we can't use it here. # We can definitely skip it without violating the reading logic # since this parameter is intended to optimize reading. # For reading a couple of lines, this is not essential. - read_callback_kw.pop("memory_map", None) + get_metadata_kw.pop("memory_map", None) # These parameters are already used when opening file `f`, # they do not need to be used again. - read_callback_kw.pop("storage_options", None) - read_callback_kw.pop("compression", None) + get_metadata_kw.pop("storage_options", None) + get_metadata_kw.pop("compression", None) with OpenFile( filepath_or_buffer_md, @@ -1110,7 +1114,7 @@ def _read(cls, filepath_or_buffer, **kwargs): newline=newline, header_size=header_size, pre_reading=pre_reading, - read_callback_kw=read_callback_kw, + get_metadata_kw=get_metadata_kw, ) if can_compute_metadata_while_skipping_rows: pd_df_metadata = pd_df_metadata_temp diff --git a/modin/tests/pandas/test_io.py b/modin/tests/pandas/test_io.py index 3088bc1ab65..bade22ef8ec 100644 --- a/modin/tests/pandas/test_io.py +++ b/modin/tests/pandas/test_io.py @@ -653,7 +653,7 @@ def test_read_csv_encoding_976(self, pathlike): # Quoting, Compression parameters tests @pytest.mark.parametrize("compression", ["infer", "gzip", "bz2", "xz", "zip"]) @pytest.mark.parametrize("encoding", [None, "latin8", "utf16"]) - @pytest.mark.parametrize("engine", [None, "python", "c"]) + @pytest.mark.parametrize("engine", [None, "python", "c", "pyarrow"]) def test_read_csv_compression(self, make_csv_file, compression, encoding, engine): unique_filename = make_csv_file(encoding=encoding, compression=compression) expected_exception = None