Skip to content

Commit

Permalink
Fix Ray 2.5.0 compatibility issue (#484)
Browse files Browse the repository at this point in the history
* Update release note

* update readme

* Fix ray 2.5 compatibility issue

* Fix ray 2.5 compatibility issue

* update

* update

* update

* update
  • Loading branch information
goodwanghan authored Jun 14, 2023
1 parent 891e193 commit 5f6d6f1
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 21 deletions.
55 changes: 55 additions & 0 deletions .github/workflows/test_ray.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Ray Tests

on:
push:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'
pull_request:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
test_ray_lower_bound:
name: Ray 2.1.0
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install dependencies
run: make devenv
- name: Setup Ray
run: pip install ray[data]==2.1.0 pyarrow==6.0.1 pandas==1.5.3
- name: Test
run: make testray

test_ray_latest:
name: Ray Latest
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install dependencies
run: make devenv
- name: Setup Ray
run: pip install -U ray[data]
- name: Test
run: make testray
Empty file added fugue/py.typed
Empty file.
32 changes: 27 additions & 5 deletions fugue_ray/_utils/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pandas as pd
import pyarrow as pa
import ray
import ray.data as rd
from triad import Schema

Expand All @@ -17,11 +18,32 @@ def get_dataset_format(df: rd.Dataset) -> Optional[str]:
df.fully_executed()
if df.count() == 0:
return None
if hasattr(df, "_dataset_format"): # pragma: no cover
return df._dataset_format() # ray<2.2
ctx = rd.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
return df.dataset_format() # ray>=2.2
if ray.__version__ < "2.5.0": # pragma: no cover
if hasattr(df, "_dataset_format"): # pragma: no cover
return df._dataset_format() # ray<2.2
ctx = rd.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
return df.dataset_format() # ray>=2.2
else:
schema = df.schema(fetch_if_missing=True)
if schema is None: # pragma: no cover
return None
if isinstance(schema.base_schema, pa.Schema):
return "arrow"
return "pandas"


def to_schema(schema: Any) -> Schema: # pragma: no cover
if isinstance(schema, pa.Schema):
return Schema(schema)
if ray.__version__ >= "2.5.0":
if isinstance(schema, rd.Schema):
if hasattr(schema, "base_schema") and isinstance(
schema.base_schema, pa.Schema
):
return Schema(schema.base_schema)
return Schema(list(zip(schema.names, schema.types)))
raise ValueError(f"{schema} is not supported")


def build_empty(schema: Schema) -> rd.Dataset:
Expand Down
23 changes: 13 additions & 10 deletions fugue_ray/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)

from ._constants import _ZERO_COPY
from ._utils.dataframe import build_empty, get_dataset_format
from ._utils.dataframe import build_empty, get_dataset_format, to_schema


class RayDataFrame(DataFrame):
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__( # noqa: C901
rdf = rd.from_arrow_refs(df.to_arrow_refs())
elif fmt == "arrow":
rdf = df
else:
else: # pragma: no cover
raise NotImplementedError(
f"Ray Dataset in {fmt} format is not supported"
)
Expand Down Expand Up @@ -229,8 +229,8 @@ def _apply_schema(
if get_dataset_format(rdf) is None: # empty
schema = _input_schema(schema).assert_not_empty()
return build_empty(schema), schema
if schema is None or schema == rdf.schema(fetch_if_missing=True):
return rdf, rdf.schema(fetch_if_missing=True)
if schema is None or schema == to_schema(rdf.schema(fetch_if_missing=True)):
return rdf, to_schema(rdf.schema(fetch_if_missing=True))

def _alter(table: pa.Table) -> pa.Table: # pragma: no cover
return ArrowDataFrame(table).alter_columns(schema).native # type: ignore
Expand Down Expand Up @@ -263,12 +263,15 @@ def _rd_as_local(df: rd.Dataset) -> bool:

@get_column_names.candidate(lambda df: isinstance(df, rd.Dataset))
def _get_ray_dataframe_columns(df: rd.Dataset) -> List[Any]:
fmt = get_dataset_format(df)
if fmt == "pandas":
return list(df.schema(True).names)
elif fmt == "arrow":
return [f.name for f in df.schema(True)]
raise NotImplementedError(f"{fmt} is not supported") # pragma: no cover
if hasattr(df, "columns"): # higher version of ray
return df.columns(fetch_if_missing=True)
else: # pragma: no cover
fmt = get_dataset_format(df)
if fmt == "pandas":
return list(df.schema(True).names)
elif fmt == "arrow":
return df.schema(fetch_if_missing=True).names
raise NotImplementedError(f"{fmt} is not supported") # pragma: no cover


@rename.candidate(lambda df, *args, **kwargs: isinstance(df, rd.Dataset))
Expand Down
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.5"
__version__ = "0.8.6"
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ seaborn
pyspark[connect]
duckdb-engine>=0.6.4
sqlalchemy==2.0.10 # 2.0.11 has a bug
ray[data]>=2.5.0
# pyarrow==7.0.0

# publish to pypi
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_version() -> str:
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"qpd[dask]>=0.4.3",
],
"ray": ["ray[data]>=2.0.0", "duckdb>=0.5.0", "pyarrow>=6.0.1"],
"ray": ["ray[data]>=2.1.0", "duckdb>=0.5.0", "pyarrow>=6.0.1"],
"duckdb": [
"duckdb>=0.5.0",
"pyarrow>=6.0.1",
Expand All @@ -74,7 +74,7 @@ def get_version() -> str:
"pyspark>=3.1.1",
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"ray[data]>=2.0.0",
"ray[data]>=2.1.0",
"qpd[dask]>=0.4.3",
"notebook",
"jupyterlab",
Expand All @@ -100,7 +100,7 @@ def get_version() -> str:
"Programming Language :: Python :: 3 :: Only",
],
python_requires=">=3.7",
package_data={"fugue_notebook": ["nbextension/*"]},
package_data={"fugue": ["py.typed"], "fugue_notebook": ["nbextension/*"]},
entry_points={
"fugue.plugins": [
"ibis = fugue_ibis[ibis]",
Expand Down
2 changes: 0 additions & 2 deletions tests/fugue_ray/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ def test_ray_init(self):

raises(Exception, lambda: RayDataFrame(123))

raises(NotImplementedError, lambda: RayDataFrame(rd.from_items([1, 2])))

def test_ray_cast(self):
data = [["a", "1"], ["b", "2"]]
df = RayDataFrame(data, "a:str,b:str")
Expand Down

0 comments on commit 5f6d6f1

Please sign in to comment.