Skip to content

Commit

Permalink
Update Fugue to be Pandas 2 compatible, remove Spark2 support, remove…
Browse files Browse the repository at this point in the history
… avro support (#463)

* Update Fugue to be Pandas 2 compatible

* update

* update

* update

* remove avro

* Remove Spark2 support

* update

* fix windows test

* fix tests

* update
  • Loading branch information
goodwanghan authored Apr 19, 2023
1 parent 02151cd commit adf3884
Show file tree
Hide file tree
Showing 32 changed files with 86 additions and 462 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test_core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ jobs:
- name: Fix setuptools_scm
run: pip install "setuptools_scm<7"
- name: Save time
if: matrix.python-version == 3.7
run: pip install "pandas<1.3.0"
- name: Install dependencies
run: make devenv
- name: Install pandas 2
if: matrix.python-version == '3.10'
run: pip install "pandas>=2"
- name: Test
run: make testcore
50 changes: 0 additions & 50 deletions .github/workflows/test_legacy_pyspark.yml

This file was deleted.

2 changes: 2 additions & 0 deletions .github/workflows/test_win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install -r requirements.txt
- name: Install pyarrow
run: pip install pyarrow==7.0.0
- name: Test
run: python -m pytest --reruns 2 --only-rerun 'Overflow in cast' tests/fugue tests/fugue_dask tests/fugue_ibis tests/fugue_duckdb
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ devenv:
pip3 install -r requirements.txt
pre-commit install
pre-commit install-hooks
bash scripts/add_avro_jar.sh
pip freeze

devenvlegacy:
Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release Notes

## 0.8.4

- [455](https://github.com/fugue-project/fugue/issues/455) Make Fugue pandas 2 compatible

## 0.8.3

- [449](https://github.com/fugue-project/fugue/issues/449) Add coarse partitioning concept
Expand Down
93 changes: 2 additions & 91 deletions fugue/_utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import fs as pfs
import pandas as pd
from fs.errors import FileExpected
from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw

from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame


class FileParser(object):
def __init__(self, path: str, format_hint: Optional[str] = None):
Expand Down Expand Up @@ -271,111 +271,22 @@ def _load_json(
return pdf[schema.names], schema


def _save_avro(df: LocalDataFrame, p: FileParser, **kwargs: Any):
"""Save pandas dataframe as avro.
If providing your own schema, the usage of schema argument is preferred
:param schema: Avro Schema determines dtypes saved
"""
import pandavro as pdx

kw = ParamDict(kwargs)

# pandavro defaults
schema = None
append = False
times_as_micros = True

if "schema" in kw:
schema = kw["schema"]
del kw["schema"]

if "append" in kw:
append = kw["append"] # default is overwrite (False) instead of append (True)
del kw["append"]

if "times_as_micros" in kw:
times_as_micros = kw["times_as_micros"]
del kw["times_as_micros"]

pdf = df.as_pandas()
pdx.to_avro(
p.uri, pdf, schema=schema, append=append, times_as_micros=times_as_micros, **kw
)


def _load_avro(
p: FileParser, columns: Any = None, **kwargs: Any
) -> Tuple[pd.DataFrame, Any]:
path = p.uri
try:
pdf = _load_single_avro(path, **kwargs)
except (IsADirectoryError, PermissionError, FileExpected):
fs = FileSystem()
pdf = pd.concat(
[
_load_single_avro(
pfs.path.combine(path, pfs.path.basename(x.path)), **kwargs
)
for x in fs.opendir(path).glob("*.avro")
]
)

if columns is None:
return pdf, None
if isinstance(columns, list): # column names
return pdf[columns], None

schema = Schema(columns)

# Return created DataFrame
return pdf[schema.names], schema


def _load_single_avro(path: str, **kwargs: Any) -> pd.DataFrame:
from fastavro import reader

kw = ParamDict(kwargs)
process_record = None
if "process_record" in kw:
process_record = kw["process_record"]
del kw["process_record"]

fs = FileSystem()
with fs.openbin(path) as fp:
# Configure Avro reader
avro_reader = reader(fp)
# Load records in memory
if process_record:
records = [process_record(r) for r in avro_reader]

else:
records = list(avro_reader)

# Populate pandas.DataFrame with records
return pd.DataFrame.from_records(records)


_FORMAT_MAP: Dict[str, str] = {
".csv": "csv",
".csv.gz": "csv",
".parquet": "parquet",
".json": "json",
".json.gz": "json",
".avro": "avro",
".avro.gz": "avro",
}

_FORMAT_LOAD: Dict[str, Callable[..., Tuple[pd.DataFrame, Any]]] = {
"csv": _load_csv,
"parquet": _load_parquet,
"json": _load_json,
"avro": _load_avro,
}

_FORMAT_SAVE: Dict[str, Callable] = {
"csv": _save_csv,
"parquet": _save_parquet,
"json": _save_json,
"avro": _save_avro,
}
2 changes: 1 addition & 1 deletion fugue/dataframe/arrow_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def count(self) -> int:
return self.native.shape[0]

def as_pandas(self) -> pd.DataFrame:
return self.native.to_pandas()
return self.native.to_pandas(use_threads=False, date_as_object=False)

def head(
self, n: int, columns: Optional[List[str]] = None
Expand Down
2 changes: 1 addition & 1 deletion fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _df_eq(
d1 = d1.reset_index(drop=True)
d2 = d2.reset_index(drop=True)
pd.testing.assert_frame_equal(
d1, d2, check_less_precise=digits, check_dtype=False
d1, d2, rtol=0, atol=10 ** (-digits), check_dtype=False, check_exact=False
)
return True
except AssertionError:
Expand Down
13 changes: 0 additions & 13 deletions fugue_dask/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import fs as pfs
from dask import dataframe as dd
from fugue._utils.io import FileParser, _get_single_files
from fugue._utils.io import _load_avro as _pd_load_avro
from fugue._utils.io import _save_avro
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.collections.schema import Schema
Expand Down Expand Up @@ -153,25 +151,14 @@ def _load_json(
return pdf[schema.names], schema


def _load_avro(
p: FileParser, columns: Any = None, **kwargs: Any
) -> Tuple[dd.DataFrame, Any]:
# TODO: change this hacky implementation!
pdf, schema = _pd_load_avro(p, columns, **kwargs)

return dd.from_pandas(pdf, npartitions=4), schema


_FORMAT_LOAD: Dict[str, Callable[..., Tuple[dd.DataFrame, Any]]] = {
"csv": _load_csv,
"parquet": _load_parquet,
"json": _load_json,
"avro": _load_avro,
}

_FORMAT_SAVE: Dict[str, Callable] = {
"csv": _save_csv,
"parquet": _save_parquet,
"json": _save_json,
"avro": _save_avro,
}
6 changes: 2 additions & 4 deletions fugue_ray/_utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa
import ray.data as rd
from fugue import ExecutionEngine
from fugue._utils.io import FileParser, load_df, save_df
from fugue._utils.io import FileParser, save_df
from fugue.collections.partition import PartitionSpec
from fugue.dataframe import DataFrame
from fugue_ray.dataframe import RayDataFrame
Expand Down Expand Up @@ -49,8 +49,6 @@ def load_df(
len(fmts) == 1, NotImplementedError("can't support multiple formats")
)
fmt = fmts[0]
if fmt == "avro": # TODO: remove avro support
return load_df(uri, format_hint=format_hint, columns=columns, **kwargs)
files = [f.uri for f in fp]
return self._loads[fmt](files, columns, **kwargs)

Expand All @@ -75,7 +73,7 @@ def save_df(
except Exception: # pragma: no cover
pass
p = FileParser(uri, format_hint)
if not force_single and p.file_format != "avro":
if not force_single:
df = self._prepartition(df, partition_spec=partition_spec)

self._saves[p.file_format](df=df, uri=p.uri, **kwargs)
Expand Down
3 changes: 0 additions & 3 deletions fugue_spark/_constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from typing import Dict, Any
import pyspark

FUGUE_SPARK_CONF_USE_PANDAS_UDF = "fugue.spark.use_pandas_udf"

FUGUE_SPARK_DEFAULT_CONF: Dict[str, Any] = {FUGUE_SPARK_CONF_USE_PANDAS_UDF: True}

_IS_SPARK_2 = pyspark.__version__ < "3"
28 changes: 19 additions & 9 deletions fugue_spark/_utils/convert.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
from typing import Any, Iterable, List, Tuple

import cloudpickle
import pandas as pd
import pyarrow as pa
import pyspark.sql as ps
import pyspark.sql.types as pt

try: # pyspark < 3
from pyspark.sql.types import from_arrow_type, to_arrow_type # type: ignore

# https://issues.apache.org/jira/browse/SPARK-29041
pt._acceptable_types[pt.BinaryType] = (bytearray, bytes) # type: ignore # pragma: no cover # noqa: E501 # pylint: disable=line-too-long
except ImportError: # pyspark >=3
from pyspark.sql.pandas.types import from_arrow_type, to_arrow_type

from pyarrow.types import is_list, is_struct, is_timestamp
from pyspark.sql.pandas.types import from_arrow_type, to_arrow_type
from triad.collections import Schema
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP
Expand Down Expand Up @@ -113,6 +107,22 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[
yield r


def to_pandas(df: ps.DataFrame) -> pd.DataFrame:
if pd.__version__ < "2" or not any(
isinstance(x.dataType, (pt.TimestampType, pt.TimestampNTZType))
for x in df.schema.fields
):
return df.toPandas()

def serialize(dfs): # pragma: no cover
for df in dfs:
data = cloudpickle.dumps(df)
yield pd.DataFrame([[data]], columns=["data"])

sdf = df.mapInPandas(serialize, schema="data binary")
return pd.concat(cloudpickle.loads(x.data) for x in sdf.collect())


# TODO: the following function always set nullable to true,
# but should we use field.nullable?
def _to_arrow_type(dt: pt.DataType) -> pa.DataType:
Expand Down
13 changes: 0 additions & 13 deletions fugue_spark/_utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, spark_session: SparkSession, fs: FileSystem):
"csv": self._load_csv,
"parquet": self._load_parquet,
"json": self._load_json,
"avro": self._load_avro,
}

def load_df(
Expand Down Expand Up @@ -136,15 +135,3 @@ def _load_json(self, p: List[str], columns: Any = None, **kwargs: Any) -> DataFr
return SparkDataFrame(reader.load(p))[columns]
schema = Schema(columns)
return SparkDataFrame(reader.load(p)[schema.names], schema)

def _load_avro(self, p: List[str], columns: Any = None, **kwargs: Any) -> DataFrame:
reader = self._session.read.format(
"avro"
) # avro is an external data source that has built-in support since spark 2.4
reader.options(**kwargs)
if columns is None:
return SparkDataFrame(reader.load(p))
if isinstance(columns, list): # column names
return SparkDataFrame(reader.load(p))[columns]
schema = Schema(columns)
return SparkDataFrame(reader.load(p)[schema.names], schema)
Loading

0 comments on commit adf3884

Please sign in to comment.