Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change temp view name to uppercase, fix various build issues #536

Merged
merged 24 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/test_all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, "3.10", "3.11"]
python-version: [3.8, "3.10"] # TODO: add back 3.11 when dask-sql is compatible

steps:
- uses: actions/checkout@v2
Expand All @@ -42,9 +42,10 @@ jobs:
run: make test
- name: "Upload coverage to Codecov"
if: matrix.python-version == '3.10'
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}

no_spark:
name: Tests
Expand Down
21 changes: 19 additions & 2 deletions .github/workflows/test_dask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ jobs:
- name: Test
run: make testdask

test_dask_latest:
name: Dask Latest
test_dask_sql_latest:
name: Dask with SQL Latest
runs-on: ubuntu-latest

steps:
Expand All @@ -49,7 +49,24 @@ jobs:
python-version: "3.10"
- name: Install dependencies
run: make devenv
- name: Test
run: make testdask

test_dask_latest:
name: Dask without SQL Latest
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.11
uses: actions/setup-python@v1
with:
python-version: "3.11"
- name: Install dependencies
run: make devenv
- name: Setup Dask
run: pip install -U dask[dataframe,distributed] pyarrow pandas
- name: Remove Dask SQL
run: pip uninstall -y dask-sql qpd fugue-sql-antlr sqlglot
- name: Test
run: make testdask
4 changes: 2 additions & 2 deletions .github/workflows/test_ray.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ concurrency:

jobs:
test_ray_lower_bound:
name: Ray 2.4.0
name: Ray 2.5.0
runs-on: ubuntu-latest

steps:
Expand All @@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: make devenv
- name: Setup Ray
run: pip install ray[data]==2.4.0 pyarrow==6.0.1 pandas==1.5.3 'pydantic<2'
run: pip install ray[data]==2.5.0 pyarrow==7.0.0 "duckdb<0.9" pandas==1.5.3 'pydantic<2'
- name: Test
run: make testray

Expand Down
12 changes: 12 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Release Notes

## 0.9.0

- [482](https://github.com/fugue-project/fugue/issues/482) Move Fugue SQL dependencies into extra `[sql]` and functions to become soft dependencies
- [504](https://github.com/fugue-project/fugue/issues/504) Create Fugue pytest fixtures and plugins
- [541](https://github.com/fugue-project/fugue/issues/541) Change table temp view names to uppercase
- [540](https://github.com/fugue-project/fugue/issues/540) Fix Ray 2.10+ compatibility issues
- [539](https://github.com/fugue-project/fugue/issues/539) Fix compatibility issues with Dask 2024.4+
- [534](https://github.com/fugue-project/fugue/issues/534) Remove ibis version cap
- [505](https://github.com/fugue-project/fugue/issues/505) Deprecate `as_ibis` in FugueWorkflow
- [387](https://github.com/fugue-project/fugue/issues/387) Improve test coverage on 3.10, add tests for 3.11
- [269](https://github.com/fugue-project/fugue/issues/269) Spark and Dask Take 1 row without sorting optimization

## 0.8.7

- [488](https://github.com/fugue-project/fugue/issues/488) Migrate from fs to fsspec
Expand Down
2 changes: 1 addition & 1 deletion fugue/collections/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class TempTableName:
"""Generating a temporary, random and globaly unique table name"""

def __init__(self):
self.key = "_" + str(uuid4())[:5]
self.key = "_" + str(uuid4())[:5].upper()

def __repr__(self) -> str:
return _TEMP_TABLE_EXPR_PREFIX + self.key + _TEMP_TABLE_EXPR_SUFFIX
Expand Down
22 changes: 4 additions & 18 deletions fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,6 @@
rename_dataframe_column_names = rename


def _pa_type_eq(t1: pa.DataType, t2: pa.DataType) -> bool:
# should ignore the name difference of list
# e.g. list<item: string> == list<l: string>
if pa.types.is_list(t1) and pa.types.is_list(t2): # pragma: no cover
return _pa_type_eq(t1.value_type, t2.value_type)
return t1 == t2


def _schema_eq(s1: Schema, s2: Schema) -> bool:
if s1 == s2:
return True
return s1.names == s2.names and all(
_pa_type_eq(f1.type, f2.type) for f1, f2 in zip(s1.fields, s2.fields)
)


def _df_eq(
df: DataFrame,
data: Any,
Expand All @@ -46,6 +30,7 @@ def _df_eq(
check_schema: bool = True,
check_content: bool = True,
no_pandas: bool = False,
equal_type_groups: Optional[List[List[Any]]] = None,
throw=False,
) -> bool:
"""Compare if two dataframes are equal. Is for internal, unit test
Expand All @@ -66,6 +51,7 @@ def _df_eq(
:param no_pandas: if true, it will compare the string representations of the
dataframes, otherwise, it will convert both to pandas dataframe to compare,
defaults to False
:param equal_type_groups: the groups to treat as equal types, defaults to None.
:param throw: if to throw error if not equal, defaults to False
:return: if they equal
"""
Expand All @@ -78,8 +64,8 @@ def _df_eq(
assert (
df1.count() == df2.count()
), f"count mismatch {df1.count()}, {df2.count()}"
assert not check_schema or _schema_eq(
df.schema, df2.schema
assert not check_schema or df.schema.is_like(
df2.schema, equal_groups=equal_type_groups
), f"schema mismatch {df.schema.pa_schema}, {df2.schema.pa_schema}"
if not check_content:
return True
Expand Down
12 changes: 11 additions & 1 deletion fugue/test/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type

from fugue.dataframe.utils import _df_eq
from triad import assert_or_throw, run_once
from triad.utils.entry_points import load_entry_point

Expand Down Expand Up @@ -160,6 +160,7 @@ def test_spark(self):

backend: Any
tmp_path: Path
equal_type_groups: Any = None

__test__ = False
_test_context: Any = None
Expand All @@ -180,6 +181,15 @@ def engine(self) -> Any:
"""The engine object inside the ``FugueTestContext``"""
return self.context.engine

def get_equal_type_groups(self) -> Optional[List[List[Any]]]:
return None # pragma: no cover

def df_eq(self, *args: Any, **kwargs: Any) -> bool:
"""A wrapper of :func:`~fugue.dataframe.utils.df_eq`"""
if "equal_type_groups" not in kwargs:
kwargs["equal_type_groups"] = self.equal_type_groups
return _df_eq(*args, **kwargs)


def fugue_test_suite(backend: Any, mark_test: Optional[bool] = None) -> Any:
def deco(cls: Type["FugueTestSuite"]) -> Type["FugueTestSuite"]:
Expand Down
13 changes: 8 additions & 5 deletions fugue_dask/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from triad.collections.dict import ParamDict
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.io import join, makedirs, url_to_fs
from triad.utils.io import isfile, join, makedirs, url_to_fs

from fugue._utils.io import FileParser, _get_single_files
from fugue_dask.dataframe import DaskDataFrame
Expand Down Expand Up @@ -100,9 +100,11 @@ def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:


def _safe_load_csv(path: str, **kwargs: Any) -> dd.DataFrame:
if not isfile(path):
return dd.read_csv(join(path, "*.csv"), **kwargs)
try:
return dd.read_csv(path, **kwargs)
except (IsADirectoryError, PermissionError):
except (IsADirectoryError, PermissionError): # pragma: no cover
return dd.read_csv(join(path, "*.csv"), **kwargs)


Expand Down Expand Up @@ -148,11 +150,12 @@ def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:


def _safe_load_json(path: str, **kwargs: Any) -> dd.DataFrame:
if not isfile(path):
return dd.read_json(join(path, "*.json"), **kwargs)
try:
return dd.read_json(path, **kwargs)
except (IsADirectoryError, PermissionError):
x = dd.read_json(join(path, "*.json"), **kwargs)
return x
except (IsADirectoryError, PermissionError): # pragma: no cover
return dd.read_json(join(path, "*.json"), **kwargs)


def _load_json(
Expand Down
8 changes: 4 additions & 4 deletions fugue_dask/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def hash_repartition(df: dd.DataFrame, num: int, cols: List[Any]) -> dd.DataFram
if num < 1:
return df
if num == 1:
return df.repartition(1)
return df.repartition(npartitions=1)
df = df.reset_index(drop=True).clear_divisions()
idf, ct = _add_hash_index(df, num, cols)
return _postprocess(idf, ct, num)
Expand All @@ -76,7 +76,7 @@ def even_repartition(df: dd.DataFrame, num: int, cols: List[Any]) -> dd.DataFram
the number of partitions will be the number of groups.
"""
if num == 1:
return df.repartition(1)
return df.repartition(npartitions=1)
if len(cols) == 0 and num <= 0:
return df
df = df.reset_index(drop=True).clear_divisions()
Expand Down Expand Up @@ -111,7 +111,7 @@ def rand_repartition(
if num < 1:
return df
if num == 1:
return df.repartition(1)
return df.repartition(npartitions=1)
df = df.reset_index(drop=True).clear_divisions()
if len(cols) == 0:
idf, ct = _add_random_index(df, num=num, seed=seed)
Expand All @@ -124,7 +124,7 @@ def rand_repartition(
def _postprocess(idf: dd.DataFrame, ct: int, num: int) -> dd.DataFrame:
parts = min(ct, num)
if parts <= 1:
return idf.repartition(1)
return idf.repartition(npartitions=1)
divisions = list(np.arange(ct, step=math.ceil(ct / parts)))
divisions.append(ct - 1)
return idf.repartition(divisions=divisions, force=True)
Expand Down
1 change: 1 addition & 0 deletions fugue_duckdb/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def _load_csv( # noqa: C901
else:
if header:
kw["ALL_VARCHAR"] = 1
kw["auto_detect"] = 1
if columns is None:
cols = "*"
elif isinstance(columns, list):
Expand Down
15 changes: 11 additions & 4 deletions fugue_ibis/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from ._utils import to_ibis_schema
from .dataframe import IbisDataFrame

_JOIN_RIGHT_SUFFIX = "_ibis_y__"
_GEN_TABLE_NAMES = (f"_fugue_temp_table_{i:d}" for i in itertools.count())
_JOIN_RIGHT_SUFFIX = "_ibis_y__".upper()
_GEN_TABLE_NAMES = (f"_fugue_temp_table_{i:d}".upper() for i in itertools.count())


class IbisSQLEngine(SQLEngine):
Expand Down Expand Up @@ -224,7 +224,7 @@ def take(
_presort = parse_presort_exp(presort)
else:
_presort = partition_spec.presort
tbn = "_temp"
tbn = "_TEMP"
idf = self.to_df(df)

if len(_presort) == 0:
Expand All @@ -233,9 +233,10 @@ def take(
pcols = ", ".join(
self.encode_column_name(x) for x in partition_spec.partition_by
)
dummy_order_by = self._dummy_window_order_by()
sql = (
f"SELECT * FROM ("
f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols}) "
f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols} {dummy_order_by}) "
f"AS __fugue_take_param FROM {tbn}"
f") WHERE __fugue_take_param<={n}"
)
Expand Down Expand Up @@ -290,6 +291,12 @@ def save_table(
def load_table(self, table: str, **kwargs: Any) -> DataFrame:
return self.to_df(self.backend.table(table))

def _dummy_window_order_by(self) -> str:
"""Return a dummy window order by clause, this is required for
some SQL backends when there is no real order by clause in window
"""
return ""


class IbisMapEngine(MapEngine):
"""IbisExecutionEngine's MapEngine, it is a wrapper of the map engine
Expand Down
7 changes: 3 additions & 4 deletions fugue_ray/_constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict

import ray
from packaging import version

FUGUE_RAY_CONF_SHUFFLE_PARTITIONS = "fugue.ray.shuffle.partitions"
FUGUE_RAY_DEFAULT_PARTITIONS = "fugue.ray.default.partitions"
Expand All @@ -12,8 +13,6 @@
FUGUE_RAY_DEFAULT_PARTITIONS: 0,
FUGUE_RAY_ZERO_COPY: True,
}
RAY_VERSION = version.parse(ray.__version__)

if ray.__version__ >= "2.3":
_ZERO_COPY: Dict[str, Any] = {"zero_copy_batch": True}
else: # pragma: no cover
_ZERO_COPY = {}
_ZERO_COPY: Dict[str, Any] = {"zero_copy_batch": True}
31 changes: 10 additions & 21 deletions fugue_ray/_utils/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import pandas as pd
import pyarrow as pa
import ray
import ray.data as rd
from triad import Schema

Expand Down Expand Up @@ -31,31 +30,21 @@ def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
df = materialize(df)
if df.count() == 0:
return None, df
if ray.__version__ < "2.5.0": # pragma: no cover
if hasattr(df, "_dataset_format"): # pragma: no cover
return df._dataset_format(), df # ray<2.2
ctx = rd.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
return df.dataset_format(), df # ray>=2.2
else:
schema = df.schema(fetch_if_missing=True)
if schema is None: # pragma: no cover
return None, df
if isinstance(schema.base_schema, pa.Schema):
return "arrow", df
return "pandas", df
schema = df.schema(fetch_if_missing=True)
if schema is None: # pragma: no cover
return None, df
if isinstance(schema.base_schema, pa.Schema):
return "arrow", df
return "pandas", df


def to_schema(schema: Any) -> Schema: # pragma: no cover
if isinstance(schema, pa.Schema):
return Schema(schema)
if ray.__version__ >= "2.5.0":
if isinstance(schema, rd.Schema):
if hasattr(schema, "base_schema") and isinstance(
schema.base_schema, pa.Schema
):
return Schema(schema.base_schema)
return Schema(list(zip(schema.names, schema.types)))
if isinstance(schema, rd.Schema):
if hasattr(schema, "base_schema") and isinstance(schema.base_schema, pa.Schema):
return Schema(schema.base_schema)
return Schema(list(zip(schema.names, schema.types)))
raise ValueError(f"{schema} is not supported")


Expand Down
Loading
Loading