From f7c5e63e86ed7daccf089ff6fcc9e3c830dd73f1 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 27 Sep 2022 22:21:40 -0700 Subject: [PATCH] Deprecate Python 3.6, enable map type support, add IbisDataFrame (#359) --- .github/workflows/test_core.yml | 2 +- RELEASE.md | 6 + fugue/execution/native_execution_engine.py | 13 +- fugue_dask/dataframe.py | 4 +- fugue_dask/ibis_engine.py | 4 +- fugue_duckdb/_utils.py | 35 ++- fugue_duckdb/dataframe.py | 20 +- fugue_duckdb/execution_engine.py | 8 +- fugue_duckdb/ibis_engine.py | 4 +- fugue_ibis/__init__.py | 12 +- fugue_ibis/_compat.py | 7 + fugue_ibis/_utils.py | 11 +- fugue_ibis/dataframe.py | 143 ++++++++++ fugue_ibis/execution/ibis_engine.py | 5 +- fugue_ibis/execution/pandas_backend.py | 14 +- fugue_ibis/execution_engine.py | 293 ++++++++++++++++++++ fugue_ibis/extensions.py | 11 +- fugue_spark/_utils/convert.py | 2 + fugue_spark/execution_engine.py | 21 +- fugue_spark/ibis_engine.py | 4 +- fugue_sql/_visitors.py | 4 + fugue_test/dataframe_suite.py | 17 +- fugue_version/__init__.py | 2 +- requirements.txt | 2 + setup.py | 30 +- tests/fugue_dask/test_dataframe.py | 6 +- tests/fugue_duckdb/test_dask.py | 17 +- tests/fugue_duckdb/test_execution_engine.py | 16 +- tests/fugue_duckdb/test_utils.py | 16 ++ tests/fugue_ibis/mock/__init__.py | 0 tests/fugue_ibis/mock/dataframe.py | 18 ++ tests/fugue_ibis/mock/execution_engine.py | 159 +++++++++++ tests/fugue_ibis/test_dataframe.py | 57 ++++ tests/fugue_ibis/test_execution_engine.py | 45 +++ tests/fugue_ibis/test_utils.py | 3 + tests/fugue_spark/test_dataframe.py | 11 +- tests/fugue_spark/utils/test_convert.py | 75 +++-- tests/fugue_sql/_test_syntax.py | 265 ------------------ tests/fugue_sql/test_visitors.py | 33 ++- 39 files changed, 1010 insertions(+), 385 deletions(-) create mode 100644 fugue_ibis/_compat.py create mode 100644 fugue_ibis/dataframe.py create mode 100644 fugue_ibis/execution_engine.py create mode 100644 tests/fugue_ibis/mock/__init__.py create mode 100644 tests/fugue_ibis/mock/dataframe.py create mode 100644 tests/fugue_ibis/mock/execution_engine.py create mode 100644 tests/fugue_ibis/test_dataframe.py create mode 100644 tests/fugue_ibis/test_execution_engine.py delete mode 100644 tests/fugue_sql/_test_syntax.py diff --git a/.github/workflows/test_core.yml b/.github/workflows/test_core.yml index 40f4116a..1e95c625 100644 --- a/.github/workflows/test_core.yml +++ b/.github/workflows/test_core.yml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.6, 3.7, "3.10"] + python-version: [3.7, "3.10"] steps: - uses: actions/checkout@v2 diff --git a/RELEASE.md b/RELEASE.md index bcc155d8..059284cf 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,5 +1,11 @@ # Release Notes +## 0.7.3 + +- [362](https://github.com/fugue-project/fugue/issues/362) Remove Python 3.6 Support +- [363](https://github.com/fugue-project/fugue/issues/363) Create IbisDataFrame and IbisExecutionEngine +- [364](https://github.com/fugue-project/fugue/issues/364) Enable Map type support + ## 0.7.2 - [348](https://github.com/fugue-project/fugue/issues/348) Make create data error more informative diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index 02a43897..d8c7445f 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -1,7 +1,7 @@ import inspect import logging import os -from typing import Any, Callable, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union import pandas as pd from fugue._utils.interfaceless import ( @@ -266,13 +266,16 @@ def dropna( self, df: DataFrame, how: str = "any", - thresh: int = None, + thresh: Optional[int] = None, subset: List[str] = None, metadata: Any = None, ) -> DataFrame: - d = df.as_pandas().dropna( - axis=0, how=how, thresh=thresh, subset=subset, inplace=False - ) + kwargs: Dict[str, Any] = dict(axis=0, subset=subset, inplace=False) + if thresh is None: + kwargs["how"] = how + else: + kwargs["thresh"] = thresh + d = df.as_pandas().dropna(**kwargs) return PandasDataFrame(d.reset_index(drop=True), df.schema, metadata) def fillna( diff --git a/fugue_dask/dataframe.py b/fugue_dask/dataframe.py index bd4c5e6a..6665d526 100644 --- a/fugue_dask/dataframe.py +++ b/fugue_dask/dataframe.py @@ -3,7 +3,7 @@ import dask.dataframe as pd import pandas import pyarrow as pa -from fugue.dataframe import DataFrame, LocalDataFrame, PandasDataFrame +from fugue.dataframe import ArrowDataFrame, DataFrame, LocalDataFrame, PandasDataFrame from fugue.dataframe.dataframe import _input_schema from fugue.exceptions import FugueDataFrameOperationError from triad.collections.schema import Schema @@ -183,7 +183,7 @@ def as_array( df: DataFrame = self if columns is not None: df = df[columns] - return PandasDataFrame(df.as_pandas(), schema=df.schema).as_array( + return ArrowDataFrame(df.as_pandas(), schema=df.schema).as_array( type_safe=type_safe ) diff --git a/fugue_dask/ibis_engine.py b/fugue_dask/ibis_engine.py index e1929bab..52970484 100644 --- a/fugue_dask/ibis_engine.py +++ b/fugue_dask/ibis_engine.py @@ -2,8 +2,8 @@ import dask.dataframe as dd import ibis -import ibis.expr.types as ir from fugue import DataFrame, DataFrames, ExecutionEngine +from fugue_ibis import IbisTable from fugue_ibis._utils import to_ibis_schema, to_schema from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine from ibis.backends.dask import Backend @@ -24,7 +24,7 @@ def __init__(self, execution_engine: ExecutionEngine) -> None: super().__init__(execution_engine) def select( - self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr] + self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], IbisTable] ) -> DataFrame: pdfs = { k: self.execution_engine.to_df(v).native # type: ignore diff --git a/fugue_duckdb/_utils.py b/fugue_duckdb/_utils.py index 7f6d3172..88fedb9c 100644 --- a/fugue_duckdb/_utils.py +++ b/fugue_duckdb/_utils.py @@ -1,5 +1,5 @@ from datetime import date, datetime -from typing import Any, Dict, Iterable, Tuple +from typing import Any, Dict, Iterable, Optional, Tuple from uuid import uuid4 import numpy as np @@ -85,6 +85,10 @@ def _to_duck_type(tp: pa.DataType) -> str: if pa.types.is_struct(tp): inner = ",".join(f.name + " " + _to_duck_type(f.type) for f in tp) return f"STRUCT({inner})" + if pa.types.is_map(tp): + k = _to_duck_type(tp.key_type) + v = _to_duck_type(tp.item_type) + return f"MAP({k},{v})" if pa.types.is_list(tp): inner = _to_duck_type(tp.value_type) return f"{inner}[]" @@ -108,6 +112,12 @@ def _to_pa_type(duck_type: str) -> pa.DataType: for k, v in _split_comma(duck_type[p + 1 : -1]) ] return pa.struct(fields) + if tp == "MAP": + fields = [ + _to_pa_type(t.strip()) + for t, _ in _split_comma(duck_type[p + 1 : -1], split_char=None) + ] + return pa.map_(fields[0], fields[1]) if tp != "DECIMAL": raise Exception pair = duck_type[p + 1 : -1].split(",", 1) @@ -130,6 +140,10 @@ def _to_duck_type_legacy(tp: pa.DataType) -> str: # pragma: no cover if pa.types.is_list(tp): inner = _to_duck_type_legacy(tp.value_type) return f"LIST<{inner}>" + if pa.types.is_map(tp): + k = _to_duck_type_legacy(tp.key_type) + v = _to_duck_type_legacy(tp.item_type) + return f"LIST<{k},{v}>" if pa.types.is_decimal(tp): return f"DECIMAL({tp.precision}, {tp.scale})" return _PA_TYPES_TO_DUCK[tp] @@ -174,7 +188,10 @@ def _to_pa_type_legacy(duck_type: str) -> pa.DataType: # pragma: no cover def _split_comma( - expr: str, split_char=" ", left_char="(", right_char=")" + expr: str, + split_char: Optional[str] = " ", + left_char: str = "(", + right_char: str = ")", ) -> Iterable[Tuple[str, str]]: lv = 0 start = 0 @@ -184,8 +201,14 @@ def _split_comma( elif expr[i] == right_char: lv -= 1 elif lv == 0 and expr[i] == ",": - x = expr[start:i].strip().split(split_char, 1) - yield x[0], x[1] + if split_char is None: + yield expr[start:i].strip(), "" + else: + x = expr[start:i].strip().split(split_char, 1) + yield x[0], x[1] start = i + 1 - x = expr[start : len(expr)].strip().split(split_char, 1) - yield x[0], x[1] + if split_char is None: + yield expr[start : len(expr)].strip(), "" + else: + x = expr[start : len(expr)].strip().split(split_char, 1) + yield x[0], x[1] diff --git a/fugue_duckdb/dataframe.py b/fugue_duckdb/dataframe.py index 3de289af..7414aeb2 100644 --- a/fugue_duckdb/dataframe.py +++ b/fugue_duckdb/dataframe.py @@ -90,7 +90,7 @@ def as_array( ) -> List[Any]: if columns is not None: return self[columns].as_array(type_safe=type_safe) - return [list(x) for x in self._rel.fetchall()] + return self._fetchall(self._rel) def as_array_iterable( self, columns: Optional[List[str]] = None, type_safe: bool = False @@ -98,9 +98,23 @@ def as_array_iterable( if columns is not None: yield from self[columns].as_array_iterable(type_safe=type_safe) else: - yield from [list(x) for x in self._rel.fetchall()] + yield from self._fetchall(self._rel) def head(self, n: int, columns: Optional[List[str]] = None) -> List[Any]: if columns is not None: return self[columns].head(n) - return [list(x) for x in self._rel.limit(n).fetchall()] + return self._fetchall(self._rel.limit(n)) + + def _fetchall(self, rel: DuckDBPyRelation) -> List[List[Any]]: + map_pos = [i for i, t in enumerate(self.schema.types) if pa.types.is_map(t)] + if len(map_pos) == 0: + return [list(x) for x in rel.fetchall()] + else: + + def to_list(row: Any) -> List[Any]: + res = list(row) + for p in map_pos: + res[p] = list(zip(row[p]["key"], row[p]["value"])) + return res + + return [to_list(x) for x in rel.fetchall()] diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index 3a11e294..a539900a 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -280,11 +280,11 @@ def intersect( ) -> DataFrame: if distinct: t1, t2 = get_temp_df_name(), get_temp_df_name() - sql = f"SELECT * FROM {t1} INTERSECT SELECT * FROM {t2}" + sql = f"SELECT * FROM {t1} INTERSECT DISTINCT SELECT * FROM {t2}" return self._sql(sql, {t1: df1, t2: df2}, metadata=metadata) - return DuckDataFrame( - self._to_duck_df(df1).native.intersect(self._to_duck_df(df2).native), - metadata=metadata, + raise NotImplementedError( + "DuckDB doesn't have consist behavior on INTERSECT ALL," + " so Fugue doesn't support it" ) def distinct( diff --git a/fugue_duckdb/ibis_engine.py b/fugue_duckdb/ibis_engine.py index 8ac1b971..2de268b0 100644 --- a/fugue_duckdb/ibis_engine.py +++ b/fugue_duckdb/ibis_engine.py @@ -1,8 +1,8 @@ from typing import Any, Callable, Optional import ibis -import ibis.expr.types as ir from fugue import DataFrame, DataFrames, ExecutionEngine +from fugue_ibis import IbisTable from fugue_ibis._utils import to_ibis_schema from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine from ibis.backends.pandas import Backend @@ -12,7 +12,7 @@ class DuckDBIbisEngine(IbisEngine): def select( - self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr] + self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], IbisTable] ) -> DataFrame: be = _BackendWrapper().connect({}) be.set_schemas(dfs) diff --git a/fugue_ibis/__init__.py b/fugue_ibis/__init__.py index 0f0c0143..22d8ea89 100644 --- a/fugue_ibis/__init__.py +++ b/fugue_ibis/__init__.py @@ -1,8 +1,14 @@ # flake8: noqa -from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine -from fugue_ibis.execution.pandas_backend import _to_pandas_ibis_engine -from fugue_ibis.extensions import as_fugue, as_ibis, run_ibis +from triad import run_at_def +from ._compat import IbisTable +from .dataframe import IbisDataFrame +from .execution.ibis_engine import IbisEngine, register_ibis_engine +from .execution.pandas_backend import _to_pandas_ibis_engine +from .execution_engine import IbisExecutionEngine +from .extensions import as_fugue, as_ibis, run_ibis + +@run_at_def def register(): register_ibis_engine(1, _to_pandas_ibis_engine) diff --git a/fugue_ibis/_compat.py b/fugue_ibis/_compat.py new file mode 100644 index 00000000..249bc2f4 --- /dev/null +++ b/fugue_ibis/_compat.py @@ -0,0 +1,7 @@ +# flake8: noqa +# pylint: disable-all + +try: + from ibis.expr.types import Table as IbisTable +except Exception: + from ibis.expr.types import TableExpr as IbisTable diff --git a/fugue_ibis/_utils.py b/fugue_ibis/_utils.py index d9c15f28..c4e3e8b7 100644 --- a/fugue_ibis/_utils.py +++ b/fugue_ibis/_utils.py @@ -76,6 +76,8 @@ def _ibis_to_pa_type(tp: dt.DataType) -> pa.DataType: if isinstance(tp, dt.Struct): fields = [pa.field(n, _ibis_to_pa_type(t)) for n, t in zip(tp.names, tp.types)] return pa.struct(fields) + if isinstance(tp, dt.Map): + return pa.map_(_ibis_to_pa_type(tp.key_type), _ibis_to_pa_type(tp.value_type)) raise NotImplementedError(tp) # pragma: no cover @@ -84,10 +86,15 @@ def _pa_to_ibis_type(tp: pa.DataType) -> dt.DataType: return _PYARROW_TO_IBIS[tp] if pa.types.is_list(tp): ttp = _pa_to_ibis_type(tp.value_type) - return dt.Array(ttp) + return dt.Array(value_type=ttp) if pa.types.is_struct(tp): fields = [(f.name, _pa_to_ibis_type(f.type)) for f in tp] - return dt.Struct([x[0] for x in fields], [x[1] for x in fields]) + return dt.Struct.from_tuples(fields) + if pa.types.is_map(tp): + return dt.Map( + key_type=_pa_to_ibis_type(tp.key_type), + value_type=_pa_to_ibis_type(tp.item_type), + ) raise NotImplementedError(tp) # pragma: no cover diff --git a/fugue_ibis/dataframe.py b/fugue_ibis/dataframe.py new file mode 100644 index 00000000..c92ccab6 --- /dev/null +++ b/fugue_ibis/dataframe.py @@ -0,0 +1,143 @@ +from typing import Any, Dict, Iterable, List, Optional + +import pandas as pd +import pyarrow as pa +from fugue import DataFrame, IterableDataFrame, LocalDataFrame +from fugue.dataframe.dataframe import _input_schema +from fugue.exceptions import FugueDataFrameEmptyError, FugueDataFrameOperationError +from triad import Schema + +from ._compat import IbisTable +from ._utils import _pa_to_ibis_type, to_schema + + +class IbisDataFrame(DataFrame): + """DataFrame that wraps Ibis ``Table``. + + :param rel: ``DuckDBPyRelation`` object + :param metadata: dict-like object with string keys, default ``None`` + """ + + def __init__(self, table: IbisTable, schema: Any = None, metadata: Any = None): + self._table = table + _schema = to_schema(table.schema()) + if schema is not None: + _to_schema = _input_schema(schema).assert_not_empty() + if _to_schema != schema: + table = self._alter_table_columns(table, _schema, _to_schema) + _schema = _to_schema + self._table = table + super().__init__(schema=_schema, metadata=metadata) + + @property + def native(self) -> IbisTable: + """Ibis Table object""" + return self._table + + def _to_local_df(self, table: IbisTable, metadata: Any = None) -> LocalDataFrame: + raise NotImplementedError # pragma: no cover + + def _to_iterable_df( + self, table: IbisTable, metadata: Any = None + ) -> IterableDataFrame: + raise NotImplementedError # pragma: no cover + + def _to_new_df(self, table: IbisTable, metadata: Any = None) -> DataFrame: + raise NotImplementedError # pragma: no cover + + def _compute_scalar(self, table: IbisTable) -> Any: + return table.execute() + + @property + def is_local(self) -> bool: + return False + + @property + def is_bounded(self) -> bool: + return True + + @property + def empty(self) -> bool: + return self._to_local_df(self._table.limit(1)).count() == 0 + + @property + def num_partitions(self) -> int: + return 1 # pragma: no cover + + def peek_array(self) -> Any: + res = self._to_local_df(self._table.head(1)).as_array() + if len(res) == 0: + raise FugueDataFrameEmptyError() + return res[0] + + def count(self) -> int: + return self._compute_scalar(self._table.count()) + + def _drop_cols(self, cols: List[str]) -> DataFrame: + schema = self.schema.exclude(cols) + return self._to_new_df(self._table[schema.names]) + + def _select_cols(self, keys: List[Any]) -> DataFrame: + schema = self.schema.extract(keys) + return self._to_new_df(self._table[schema.names]) + + def rename(self, columns: Dict[str, str]) -> DataFrame: + try: + schema = self.schema.rename(columns) + except Exception as e: + raise FugueDataFrameOperationError from e + cols: List[Any] = [] + for a, b in zip(self.schema.names, schema.names): + if a == b: + cols.append(self._table[a]) + else: + cols.append(self._table[a].name(b)) + return self._to_new_df(self._table.projection(cols)) + + def alter_columns(self, columns: Any) -> DataFrame: + new_schema = self._get_altered_schema(columns) + if new_schema == self.schema: + return self + return self._to_new_df( + self._alter_table_columns(self._table, self.schema, new_schema) + ) + + def as_arrow(self, type_safe: bool = False) -> pa.Table: + return self._to_local_df(self._table).as_arrow(type_safe=type_safe) + + def as_pandas(self) -> pd.DataFrame: + return self._to_local_df(self._table).as_pandas() + + def as_local(self) -> LocalDataFrame: + return self._to_local_df(self._table, self.metadata) + + def as_array( + self, columns: Optional[List[str]] = None, type_safe: bool = False + ) -> List[Any]: + if columns is not None: + return self[columns].as_array(type_safe=type_safe) + return self._to_local_df(self._table).as_array(type_safe=type_safe) + + def as_array_iterable( + self, columns: Optional[List[str]] = None, type_safe: bool = False + ) -> Iterable[Any]: + if columns is not None: + yield from self[columns].as_array_iterable(type_safe=type_safe) + else: + yield from self._to_iterable_df(self._table).as_array_iterable( + type_safe=type_safe + ) + + def head(self, n: int, columns: Optional[List[str]] = None) -> List[Any]: + if columns is not None: + return self[columns].head(n) + return self._to_local_df(self._table.head(n)).as_array(type_safe=True) + + def _alter_table_columns( + self, table: IbisTable, schema: Schema, new_schema: Schema + ): + fields: Dict[str, Any] = {} + for f1, f2 in zip(schema.fields, new_schema.fields): + if f1.type != f2.type: + fields[f1.name] = self._table[f1.name].cast(_pa_to_ibis_type(f2.type)) + return table.mutate(**fields) diff --git a/fugue_ibis/execution/ibis_engine.py b/fugue_ibis/execution/ibis_engine.py index 7d655d79..3108001a 100644 --- a/fugue_ibis/execution/ibis_engine.py +++ b/fugue_ibis/execution/ibis_engine.py @@ -2,9 +2,10 @@ from typing import Any, Callable, List, Optional, Tuple import ibis -import ibis.expr.types as ir from fugue import DataFrame, DataFrames, ExecutionEngine +from .._compat import IbisTable + _ENGINE_FUNC: List[ Tuple[int, int, Callable[[ExecutionEngine, Any], Optional["IbisEngine"]]] ] = [] @@ -47,7 +48,7 @@ def execution_engine(self) -> ExecutionEngine: @abstractmethod def select( - self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr] + self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], IbisTable] ) -> DataFrame: # pragma: no cover """Execute the ibis select expression. diff --git a/fugue_ibis/execution/pandas_backend.py b/fugue_ibis/execution/pandas_backend.py index 59c628ae..e93b64a5 100644 --- a/fugue_ibis/execution/pandas_backend.py +++ b/fugue_ibis/execution/pandas_backend.py @@ -1,25 +1,25 @@ from typing import Any, Callable, Optional import ibis -import ibis.expr.types as ir +import pandas as pd from fugue import ( DataFrame, DataFrames, ExecutionEngine, - PandasDataFrame, NativeExecutionEngine, + PandasDataFrame, ) -from triad.utils.assertion import assert_or_throw -import pandas as pd - +from fugue_ibis._utils import to_ibis_schema, to_schema from fugue_ibis.execution.ibis_engine import IbisEngine -from fugue_ibis._utils import to_schema, to_ibis_schema from ibis.backends.pandas import Backend +from triad.utils.assertion import assert_or_throw + +from .._compat import IbisTable class PandasIbisEngine(IbisEngine): def select( - self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr] + self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], IbisTable] ) -> DataFrame: # pragma: no cover pdfs = {k: v.as_pandas() for k, v in dfs.items()} be = _BackendWrapper().connect(pdfs) diff --git a/fugue_ibis/execution_engine.py b/fugue_ibis/execution_engine.py new file mode 100644 index 00000000..1ae0c921 --- /dev/null +++ b/fugue_ibis/execution_engine.py @@ -0,0 +1,293 @@ +from typing import Any, List, Optional, Dict + +import ibis +from fugue.collections.partition import ( + EMPTY_PARTITION_SPEC, + PartitionSpec, + parse_presort_exp, +) +from fugue.dataframe import DataFrame, DataFrames +from fugue.dataframe.utils import get_join_schemas +from fugue.execution.execution_engine import ( + _DEFAULT_JOIN_KEYS, + ExecutionEngine, + SQLEngine, +) +from ibis import BaseBackend +from triad.utils.assertion import assert_or_throw + +from .dataframe import IbisDataFrame +import itertools + +_JOIN_RIGHT_SUFFIX = "_ibis_y__" +_GEN_TABLE_NAMES = (f"_fugue_temp_table_{i:d}" for i in itertools.count()) + + +class IbisSQLEngine(SQLEngine): + """Ibis SQL backend base implementation. + + :param execution_engine: the execution engine this sql engine will run on + """ + + def __init__(self, execution_engine: ExecutionEngine) -> None: + assert_or_throw( + isinstance(execution_engine, IbisExecutionEngine) + and isinstance( + execution_engine.backend, ibis.backends.base.sql.BaseSQLBackend + ) + ) + super().__init__(execution_engine) + self._ibis_engine: IbisExecutionEngine = execution_engine # type: ignore + + def select(self, dfs: DataFrames, statement: str) -> DataFrame: + for k, v in dfs.items(): + self._ibis_engine._to_ibis_dataframe(v).native.alias(k) + tb = self._ibis_engine.backend.sql(statement) + return self._ibis_engine._to_ibis_dataframe(tb) + + +class IbisExecutionEngine(ExecutionEngine): + """The base execution engine using Ibis. + Please read |ExecutionEngineTutorial| to understand this important Fugue concept + + :param conf: |ParamsLikeObject|, read |FugueConfig| to learn Fugue specific options + """ + + @property + def default_sql_engine(self) -> SQLEngine: + return IbisSQLEngine(self) + + @property + def backend(self) -> BaseBackend: # pragma: no cover + raise NotImplementedError + + def encode_column_name(self, name: str) -> str: # pragma: no cover + raise NotImplementedError + + def get_temp_table_name(self) -> str: + return next(_GEN_TABLE_NAMES) + + def _to_ibis_dataframe( + self, df: Any, schema: Any = None, metadata: Any = None + ) -> IbisDataFrame: # pragma: no cover + raise NotImplementedError + + def to_df(self, df: Any, schema: Any = None, metadata: Any = None) -> DataFrame: + return self._to_ibis_dataframe(df, schema=schema, metadata=metadata) + + def join( + self, + df1: DataFrame, + df2: DataFrame, + how: str, + on: List[str] = _DEFAULT_JOIN_KEYS, + metadata: Any = None, + ) -> DataFrame: + _df1 = self._to_ibis_dataframe(df1) + _df2 = self._to_ibis_dataframe(df2) + key_schema, end_schema = get_join_schemas(_df1, _df2, how=how, on=on) + on_fields = [_df1.native[k] == _df2.native[k] for k in key_schema] + if how.lower() == "cross": + tb = _df1.native.cross_join(_df2.native, suffixes=("", _JOIN_RIGHT_SUFFIX)) + elif how.lower() == "right_outer": + tb = _df2.native.left_join( + _df1.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX) + ) + elif how.lower() == "left_outer": + tb = _df1.native.left_join( + _df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX) + ) + elif how.lower() == "full_outer": + tb = _df1.native.outer_join( + _df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX) + ) + cols: List[Any] = [] + for k in end_schema.names: + if k not in key_schema: + cols.append(k) + else: + cols.append( + ibis.coalesce(tb[k], tb[k + _JOIN_RIGHT_SUFFIX]).name(k) + ) + tb = tb[cols] + elif how.lower() in ["semi", "left_semi"]: + tb = _df1.native.semi_join( + _df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX) + ) + elif how.lower() in ["anti", "left_anti"]: + tb = _df1.native.anti_join( + _df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX) + ) + else: + tb = _df1.native.inner_join( + _df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX) + ) + return self._to_ibis_dataframe(tb[end_schema.names], metadata=metadata) + + def union( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + _df1 = self._to_ibis_dataframe(df1) + _df2 = self._to_ibis_dataframe(df2) + tb = _df1.native.union(_df2.native, distinct=distinct) + return self._to_ibis_dataframe(tb, _df1.schema, metadata=metadata) + + def subtract( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + _df1 = self._to_ibis_dataframe(df1) + _df2 = self._to_ibis_dataframe(df2) + tb = _df1.native.difference(_df2.native, distinct=distinct) + return self._to_ibis_dataframe(tb, _df1.schema, metadata=metadata) + + def intersect( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + _df1 = self._to_ibis_dataframe(df1) + _df2 = self._to_ibis_dataframe(df2) + tb = _df1.native.intersect(_df2.native, distinct=distinct) + return self._to_ibis_dataframe(tb, _df1.schema, metadata=metadata) + + def distinct( + self, + df: DataFrame, + metadata: Any = None, + ) -> DataFrame: + _df = self._to_ibis_dataframe(df) + tb = _df.native.distinct() + return self._to_ibis_dataframe(tb, _df.schema, metadata=metadata) + + def dropna( + self, + df: DataFrame, + how: str = "any", + thresh: int = None, + subset: Optional[List[str]] = None, + metadata: Any = None, + ) -> DataFrame: + schema = df.schema + if subset is not None: + schema = schema.extract(subset) + _df = self._to_ibis_dataframe(df) + if thresh is None: + tb = _df.native.dropna(subset=subset, how=how) + return self._to_ibis_dataframe(tb, _df.schema, metadata=metadata) + assert_or_throw( + how == "any", ValueError("when thresh is set, how must be 'any'") + ) + sm = None + for col in schema.names: + expr = _df.native[col].case().when(ibis.null(), 0).else_(1).end() + if sm is None: + sm = expr + else: + sm = sm + expr + tb = _df.native.filter(sm >= ibis.literal(thresh)) + return self._to_ibis_dataframe(tb, _df.schema, metadata=metadata) + + def fillna( + self, + df: DataFrame, + value: Any, + subset: List[str] = None, + metadata: Any = None, + ) -> DataFrame: + def _build_value_dict(names: List[str]) -> Dict[str, str]: + if not isinstance(value, dict): + return {n: value for n in names} + else: + return {n: value[n] for n in names} + + names = list(df.schema.names) + if isinstance(value, dict): + # subset should be ignored + names = list(value.keys()) + elif subset is not None: + names = list(df.schema.extract(subset).names) + vd = _build_value_dict(names) + assert_or_throw( + all(v is not None for v in vd.values()), + ValueError("fillna value can not be None or contain None"), + ) + tb = self._to_ibis_dataframe(df).native + cols = [ + ibis.coalesce(tb[f], ibis.literal(vd[f])).name(f) if f in names else tb[f] + for f in df.schema.names + ] + return self._to_ibis_dataframe(tb[cols], metadata=metadata) + + def take( + self, + df: DataFrame, + n: int, + presort: str, + na_position: str = "last", + partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + isinstance(n, int), + ValueError("n needs to be an integer"), + ) + + if presort is not None and presort != "": + _presort = parse_presort_exp(presort) + else: + _presort = partition_spec.presort + tbn = self.get_temp_table_name() + idf = self._to_ibis_dataframe(df) + + if len(_presort) == 0: + if len(partition_spec.partition_by) == 0: + return self._to_ibis_dataframe(idf.native.head(n), metadata=metadata) + pcols = ", ".join( + self.encode_column_name(x) for x in partition_spec.partition_by + ) + sql = ( + f"SELECT * FROM (" + f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols}) " + f"AS __fugue_take_param FROM {tbn}" + f") WHERE __fugue_take_param<={n}" + ) + tb = idf.native.alias(tbn).sql(sql) + return self._to_ibis_dataframe(tb[df.schema.names], metadata=metadata) + + sorts: List[str] = [] + for k, v in _presort.items(): + s = self.encode_column_name(k) + s += " ASC" if v else " DESC" + s += " NULLS FIRST" if na_position == "first" else " NULLS LAST" + sorts.append(s) + sort_expr = "ORDER BY " + ", ".join(sorts) + + if len(partition_spec.partition_by) == 0: + sql = f"SELECT * FROM {tbn} {sort_expr} LIMIT {n}" + tb = idf.native.alias(tbn).sql(sql) + return self._to_ibis_dataframe(tb[df.schema.names], metadata=metadata) + + pcols = ", ".join( + self.encode_column_name(x) for x in partition_spec.partition_by + ) + sql = ( + f"SELECT * FROM (" + f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols} {sort_expr}) " + f"AS __fugue_take_param FROM {tbn}" + f") WHERE __fugue_take_param<={n}" + ) + tb = idf.native.alias(tbn).sql(sql) + return self._to_ibis_dataframe(tb[df.schema.names], metadata=metadata) diff --git a/fugue_ibis/extensions.py b/fugue_ibis/extensions.py index 223dec9c..6268696c 100644 --- a/fugue_ibis/extensions.py +++ b/fugue_ibis/extensions.py @@ -1,7 +1,6 @@ from typing import Any, Callable, Dict import ibis -import ibis.expr.types as ir from fugue import DataFrame, DataFrames, Processor, WorkflowDataFrame from fugue.exceptions import FugueWorkflowCompileError from fugue.workflow.workflow import WorkflowDataFrames @@ -10,9 +9,11 @@ from fugue_ibis._utils import LazyIbisObject, _materialize from fugue_ibis.execution.ibis_engine import to_ibis_engine +from ._compat import IbisTable + def run_ibis( - ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr], + ibis_func: Callable[[ibis.BaseBackend], IbisTable], ibis_engine: Any = None, **dfs: WorkflowDataFrame, ) -> WorkflowDataFrame: @@ -51,7 +52,7 @@ def func(backend): @extension_method -def as_ibis(df: WorkflowDataFrame) -> ir.TableExpr: +def as_ibis(df: WorkflowDataFrame) -> IbisTable: """Convert the Fugue workflow dataframe to an ibis table for ibis operations. @@ -112,7 +113,7 @@ def as_ibis(df: WorkflowDataFrame) -> ir.TableExpr: @extension_method(class_type=LazyIbisObject) def as_fugue( - expr: ir.TableExpr, + expr: IbisTable, ibis_engine: Any = None, ) -> WorkflowDataFrame: """Convert a lazy ibis object to Fugue workflow dataframe @@ -174,7 +175,7 @@ def _func( be: ibis.BaseBackend, lazy_expr: LazyIbisObject, ctx: Dict[int, Any], - ) -> ir.TableExpr: + ) -> IbisTable: return _materialize( lazy_expr, {k: be.table(f"_{id(v)}") for k, v in ctx.items()} ) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 89c7a315..61b44af2 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -126,6 +126,8 @@ def _to_arrow_type(dt: pt.DataType) -> pa.DataType: return pa.struct(fields) if isinstance(dt, pt.ArrayType): return pa.list_(_to_arrow_type(dt.elementType)) + if isinstance(dt, pt.MapType): + return pa.map_(_to_arrow_type(dt.keyType), _to_arrow_type(dt.valueType)) return to_arrow_type(dt) diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index d8444c7c..cb8543d4 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -145,7 +145,7 @@ def fs(self) -> FileSystem: def default_sql_engine(self) -> SQLEngine: return SparkSQLEngine(self) - def to_df( + def to_df( # noqa: C901 self, df: Any, schema: Any = None, metadata: Any = None ) -> SparkDataFrame: """Convert a data structure to :class:`~fugue_spark.dataframe.SparkDataFrame` @@ -217,9 +217,22 @@ def to_df( schema is not None, FugueDataFrameInitError("schema can't be None") ) adf = ArrowDataFrame(df, to_schema(schema)) - sdf = self.spark_session.createDataFrame( - adf.as_array(), to_spark_schema(adf.schema) - ) + map_pos = [i for i, t in enumerate(adf.schema.types) if pa.types.is_map(t)] + if len(map_pos) == 0: + sdf = self.spark_session.createDataFrame( + adf.as_array(), to_spark_schema(adf.schema) + ) + else: + + def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]: + for row in rows: + for p in map_pos: + row[p] = dict(row[p]) + yield row + + sdf = self.spark_session.createDataFrame( + to_dict(adf.as_array_iterable()), to_spark_schema(adf.schema) + ) return SparkDataFrame(sdf, adf.schema, metadata) def repartition(self, df: DataFrame, partition_spec: PartitionSpec) -> DataFrame: diff --git a/fugue_spark/ibis_engine.py b/fugue_spark/ibis_engine.py index 1c1d6a08..166a8dd8 100644 --- a/fugue_spark/ibis_engine.py +++ b/fugue_spark/ibis_engine.py @@ -1,8 +1,8 @@ from typing import Any, Callable, Optional import ibis -import ibis.expr.types as ir from fugue import DataFrame, DataFrames, ExecutionEngine +from fugue_ibis import IbisTable from fugue_ibis._utils import to_schema from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine from pyspark.sql import DataFrame as PySparkDataFrame @@ -23,7 +23,7 @@ def __init__(self, execution_engine: ExecutionEngine) -> None: super().__init__(execution_engine) def select( - self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr] + self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], IbisTable] ) -> DataFrame: for k, v in dfs.items(): self.execution_engine.register(v, k) # type: ignore diff --git a/fugue_sql/_visitors.py b/fugue_sql/_visitors.py index 976a0369..ec060b6c 100644 --- a/fugue_sql/_visitors.py +++ b/fugue_sql/_visitors.py @@ -168,6 +168,10 @@ def visitFugueSchemaStructType( fields = self.visit(ctx.fugueSchema()).fields return pa.struct(fields) + def visitFugueSchemaMapType(self, ctx: fp.FugueSchemaMapTypeContext) -> pa.DataType: + tps = self.collectChildren(ctx, fp.FugueSchemaTypeContext) + return pa.map_(tps[0], tps[1]) + def visitFuguePrepartition(self, ctx: fp.FuguePrepartitionContext) -> PartitionSpec: params = self.get_dict(ctx, "algo", "num", "by", "presort") return PartitionSpec(**params) diff --git a/fugue_test/dataframe_suite.py b/fugue_test/dataframe_suite.py index e91c58e7..3c1ee5b2 100644 --- a/fugue_test/dataframe_suite.py +++ b/fugue_test/dataframe_suite.py @@ -170,12 +170,25 @@ def test_as_dict_iterable(self): df = self.df([[pd.Timestamp("2020-01-01"), 1]], "a:datetime,b:int") assert [dict(a=datetime(2020, 1, 1), b=1)] == list(df.as_dict_iterable()) - def test_nested(self): + def test_list_type(self): data = [[[30, 40]]] df = self.df(data, "a:[int]") a = df.as_array(type_safe=True) assert data == a + def test_struct_type(self): + data = [[{"a": 1}], [{"a": 2}]] + df = self.df(data, "x:{a:int}") + a = df.as_array(type_safe=True) + assert data == a + + def test_map_type(self): + data = [[[("a", 1), ("b", 3)]], [[("b", 2)]]] + df = self.df(data, "x:") + a = df.as_array(type_safe=True) + assert data == a + + def test_deep_nested_types(self): data = [[dict(a="1", b=[3, 4], d=1.0)], [dict(b=[30, 40])]] df = self.df(data, "a:{a:str,b:[int]}") a = df.as_array(type_safe=True) @@ -186,7 +199,7 @@ def test_nested(self): a = df.as_array(type_safe=True) assert [[[dict(a=None, b=[30, 40])]]] == a - def test_binary(self): + def test_binary_type(self): data = [[b"\x01\x05"]] df = self.df(data, "a:bytes") a = df.as_array(type_safe=True) diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index bc8c296f..4910b9ec 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.7.2" +__version__ = "0.7.3" diff --git a/requirements.txt b/requirements.txt index c2e53f40..5cfa6e12 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,6 +23,8 @@ pandavro fastavro psutil +duckdb-engine>=0.6.4 + # publish to pypi wheel twine diff --git a/setup.py b/setup.py index 5293bf70..833b4408 100644 --- a/setup.py +++ b/setup.py @@ -31,22 +31,29 @@ def get_version() -> str: keywords="distributed spark dask sql dsl domain specific language", url="http://github.com/fugue-project/fugue", install_requires=[ - "triad>=0.6.6", + "triad>=0.6.8", "adagio>=0.2.4", "qpd>=0.3.1", - "fugue-sql-antlr>=0.1.0", + "fugue-sql-antlr>=0.1.1", "sqlalchemy", "pyarrow>=0.15.1", "pandas>=1.0.2", "jinja2", ], extras_require={ - "cpp_sql_parser": ["fugue-sql-antlr[cpp]>=0.1.0"], + "cpp_sql_parser": ["fugue-sql-antlr[cpp]>=0.1.1"], "spark": ["pyspark"], "dask": ["dask[distributed,dataframe]", "qpd[dask]>=0.3.1"], - "ray": ["ray>=2.0.0", "duckdb>=0.3.2", "pyarrow>=7.0.0"], - "duckdb": ["duckdb>=0.3.2", "pyarrow>=5.0.0", "numpy"], - "ibis": ["ibis-framework>=2.1.1"], + "ray": ["ray>=2.0.0", "duckdb>=0.5.0", "pyarrow>=7.0.0"], + "duckdb": [ + "duckdb>=0.5.0", + "pyarrow>=7.0.0", + "numpy", + ], + "ibis": [ + "ibis-framework>=2.1.1; python_version < '3.8'", + "ibis-framework>=3.2.0; python_version >= '3.8'", + ], "notebook": ["notebook", "jupyterlab", "ipython>=7.10.0"], "all": [ "fugue-sql-antlr[cpp]>=0.1.0", @@ -57,10 +64,10 @@ def get_version() -> str: "notebook", "jupyterlab", "ipython>=7.10.0", - "duckdb>=0.3.2", - "pyarrow>=5.0.0; python_version < '3.7'", - "pyarrow>=7.0.0; python_version >= '3.7'", - "ibis-framework>=2; python_version >= '3.7'", + "duckdb>=0.5.0", + "pyarrow>=7.0.0", + "ibis-framework>=2.1.1; python_version < '3.8'", + "ibis-framework>=3.2.0; python_version >= '3.8'", ], }, classifiers=[ @@ -70,14 +77,13 @@ def get_version() -> str: "Topic :: Software Development :: Libraries :: Python Modules", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3 :: Only", ], - python_requires=">=3.6", + python_requires=">=3.7", package_data={"fugue_notebook": ["nbextension/*"]}, entry_points={ "fugue.plugins": [ diff --git a/tests/fugue_dask/test_dataframe.py b/tests/fugue_dask/test_dataframe.py index c317ea34..f798688f 100644 --- a/tests/fugue_dask/test_dataframe.py +++ b/tests/fugue_dask/test_dataframe.py @@ -128,7 +128,7 @@ def test_as_array(): df = DaskDataFrame([[pandas.NaT, 1.1]], "a:datetime,b:int") df.native["a"] = pd.to_datetime(df.native["a"]) - assert isinstance(df.as_array()[0][0], datetime) + assert df.as_array()[0][0] is None assert isinstance(df.as_array()[0][1], int) df = DaskDataFrame([[1.0, 1.1]], "a:double,b:int") @@ -158,11 +158,11 @@ def test_nan_none(): df = ArrayDataFrame([["a", 1.1], [None, None]], "b:str,c:double") arr = DaskDataFrame(df.as_pandas(), df.schema).as_array()[1] assert arr[0] is None - assert math.isnan(arr[1]) + assert arr[1] is None arr = DaskDataFrame(df.as_array(), df.schema).as_array()[1] assert arr[0] is None - assert math.isnan(arr[1]) + assert arr[1] is None arr = DaskDataFrame(df.as_pandas()["b"], "b:str").as_array()[1] assert arr[0] is None diff --git a/tests/fugue_duckdb/test_dask.py b/tests/fugue_duckdb/test_dask.py index d9155dfa..339491b6 100644 --- a/tests/fugue_duckdb/test_dask.py +++ b/tests/fugue_duckdb/test_dask.py @@ -80,21 +80,6 @@ def test_broadcast(self): ddf = DaskDataFrame(df) assert isinstance(self.engine.broadcast(ddf), DaskDataFrame) - def test_intersect_all(self): - e = self.engine - a = e.to_df([[1, 2, 3], [4, None, 6], [4, None, 6]], "a:double,b:double,c:int") - b = e.to_df( - [[1, 2, 33], [4, None, 6], [4, None, 6], [4, None, 6]], - "a:double,b:double,c:int", - ) - c = e.intersect(a, b, distinct=False) - df_eq( - c, - [[4, None, 6], [4, None, 6]], - "a:double,b:double,c:int", - throw=True, - ) - class DuckDaskBuiltInTests(BuiltInTests.Tests): @classmethod @@ -110,7 +95,7 @@ def tearDownClass(cls): def make_engine(self): e = DuckDaskExecutionEngine( conf={"test": True, "fugue.duckdb.pragma.threads": 2, **_CONF}, - connection=self._con + connection=self._con, ) return e diff --git a/tests/fugue_duckdb/test_execution_engine.py b/tests/fugue_duckdb/test_execution_engine.py index f5578225..e4086760 100644 --- a/tests/fugue_duckdb/test_execution_engine.py +++ b/tests/fugue_duckdb/test_execution_engine.py @@ -36,13 +36,15 @@ def test_intersect_all(self): [[1, 2, 33], [4, None, 6], [4, None, 6], [4, None, 6]], "a:double,b:double,c:int", ) - c = e.intersect(a, b, distinct=False) - df_eq( - c, - [[4, None, 6], [4, None, 6]], - "a:double,b:double,c:int", - throw=True, - ) + raises(NotImplementedError, lambda: e.intersect(a, b, distinct=False)) + # DuckDB 0.5.0 stopped support INTERSECT ALL + # c = e.intersect(a, b, distinct=False) + # df_eq( + # c, + # [[4, None, 6], [4, None, 6]], + # "a:double,b:double,c:int", + # throw=True, + # ) class DuckBuiltInTests(BuiltInTests.Tests): diff --git a/tests/fugue_duckdb/test_utils.py b/tests/fugue_duckdb/test_utils.py index 26919339..1cc2deaa 100644 --- a/tests/fugue_duckdb/test_utils.py +++ b/tests/fugue_duckdb/test_utils.py @@ -84,6 +84,22 @@ def assert_many(*tps): ) ) + assert_( + pa.map_( + pa.struct( + [ + pa.field("x", pa.int64()), + pa.field("yy", pa.struct([pa.field("x", pa.int64())])), + ] + ), + pa.struct( + [ + pa.field("yy", pa.list_(pa.struct([pa.field("x", pa.int64())]))), + ] + ), + ) + ) + raises(ValueError, lambda: to_pa_type("")) raises(ValueError, lambda: to_pa_type("XX")) raises(ValueError, lambda: to_pa_type("XX(1,2)")) diff --git a/tests/fugue_ibis/mock/__init__.py b/tests/fugue_ibis/mock/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fugue_ibis/mock/dataframe.py b/tests/fugue_ibis/mock/dataframe.py new file mode 100644 index 00000000..9d4d3705 --- /dev/null +++ b/tests/fugue_ibis/mock/dataframe.py @@ -0,0 +1,18 @@ +from typing import Any + +from fugue import ArrowDataFrame, DataFrame, LocalDataFrame +from fugue_ibis import IbisDataFrame, IbisTable +from fugue_ibis._utils import to_schema + + +class MockDuckDataFrame(IbisDataFrame): + def _to_new_df(self, table: IbisTable, metadata: Any = None) -> DataFrame: + return MockDuckDataFrame(table, metadata) + + def _to_local_df(self, table: IbisTable, metadata: Any = None) -> LocalDataFrame: + return ArrowDataFrame( + table.execute(), to_schema(table.schema()), metadata=metadata + ) + + def _to_iterable_df(self, table: IbisTable, metadata: Any = None) -> LocalDataFrame: + return self._to_local_df(table, metadata=metadata) diff --git a/tests/fugue_ibis/mock/execution_engine.py b/tests/fugue_ibis/mock/execution_engine.py new file mode 100644 index 00000000..ed02c636 --- /dev/null +++ b/tests/fugue_ibis/mock/execution_engine.py @@ -0,0 +1,159 @@ +import logging +from typing import Any, Callable, Iterable, List, Optional, Union + +import ibis +import pyarrow as pa +from fugue import ( + ArrowDataFrame, + DataFrame, + LocalDataFrame, + NativeExecutionEngine, + PartitionCursor, + PartitionSpec, +) +from fugue.collections.partition import EMPTY_PARTITION_SPEC +from fugue_ibis import IbisDataFrame, IbisExecutionEngine, IbisTable +from triad import FileSystem, assert_or_throw + +from .dataframe import MockDuckDataFrame + + +class MockDuckExecutionEngine(IbisExecutionEngine): + def __init__(self, conf: Any): + super().__init__(conf) + self._backend = ibis.duckdb.connect() + self._native_engine = NativeExecutionEngine(conf) + + @property + def backend(self) -> ibis.BaseBackend: + return self._backend + + def encode_column_name(self, name: str) -> str: + return '"' + name.replace('"', '""') + '"' + + def _to_ibis_dataframe( + self, df: Any, schema: Any = None, metadata: Any = None + ) -> IbisDataFrame: + if isinstance(df, MockDuckDataFrame): + return df + if isinstance(df, DataFrame): + return self._register_df( + df.as_arrow(), + schema=schema if schema is not None else df.schema, + metadata=metadata if metadata is not None else df.metadata, + ) + if isinstance(df, pa.Table): + return self._register_df(df, schema=schema, metadata=metadata) + if isinstance(df, IbisTable): + return MockDuckDataFrame(df, schema=schema, metadata=metadata) + if isinstance(df, Iterable): + adf = ArrowDataFrame(df, schema) + return self._register_df(adf.native, schema=schema, metadata=metadata) + raise NotImplementedError + + def __repr__(self) -> str: + return "MockDuckExecutionEngine" + + @property + def log(self) -> logging.Logger: + return self._native_engine.log + + @property + def fs(self) -> FileSystem: + return self._native_engine.fs + + def repartition( + self, df: DataFrame, partition_spec: PartitionSpec + ) -> DataFrame: # pragma: no cover + self.log.warning("%s doesn't respect repartition", self) + return df + + def map( + self, + df: DataFrame, + map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame], + output_schema: Any, + partition_spec: PartitionSpec, + metadata: Any = None, + on_init: Optional[Callable[[int, DataFrame], Any]] = None, + ) -> DataFrame: + return self._native_engine.map( + df=df, + map_func=map_func, + output_schema=output_schema, + partition_spec=partition_spec, + metadata=metadata, + on_init=on_init, + ) + + def broadcast(self, df: DataFrame) -> DataFrame: + return df + + def persist( + self, + df: DataFrame, + lazy: bool = False, + **kwargs: Any, + ) -> DataFrame: + if isinstance(df, MockDuckDataFrame): + return ArrowDataFrame(df.as_arrow(), metadata=df.metadata) + return self.to_df(df) + + def sample( + self, + df: DataFrame, + n: Optional[int] = None, + frac: Optional[float] = None, + replace: bool = False, + seed: Optional[int] = None, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + (n is None and frac is not None and frac >= 0.0) + or (frac is None and n is not None and n >= 0), + ValueError( + f"one and only one of n and frac should be non-negative, {n}, {frac}" + ), + ) + tn = self.get_temp_table_name() + if frac is not None: + sql = f"SELECT * FROM {tn} USING SAMPLE bernoulli({frac*100} PERCENT)" + else: + sql = f"SELECT * FROM {tn} USING SAMPLE reservoir({n} ROWS)" + if seed is not None: + sql += f" REPEATABLE ({seed})" + idf = self._to_ibis_dataframe(df) + return self._to_ibis_dataframe(idf.native.alias(tn).sql(sql), metadata=metadata) + + def load_df( + self, + path: Union[str, List[str]], + format_hint: Any = None, + columns: Any = None, + **kwargs: Any, + ) -> DataFrame: + return self._native_engine.load_df(path, format_hint, columns, **kwargs) + + def save_df( + self, + df: DataFrame, + path: str, + format_hint: Any = None, + mode: str = "overwrite", + partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC, + force_single: bool = False, + **kwargs: Any, + ) -> None: + return self._native_engine.save_df( + df, path, format_hint, mode, partition_spec, force_single, **kwargs + ) + + def _register_df( + self, + df: pa.Table, + name: Optional[str] = None, + schema: Any = None, + metadata: Any = None, + ) -> MockDuckDataFrame: + tb = self.backend.register(df, name) + return MockDuckDataFrame(tb, metadata=metadata) diff --git a/tests/fugue_ibis/test_dataframe.py b/tests/fugue_ibis/test_dataframe.py new file mode 100644 index 00000000..d37b0a2d --- /dev/null +++ b/tests/fugue_ibis/test_dataframe.py @@ -0,0 +1,57 @@ +import sys +from datetime import datetime +from typing import Any + +import ibis +import pandas as pd +import pytest +from fugue import ArrowDataFrame +from fugue_duckdb.dataframe import DuckDataFrame +from fugue_test.dataframe_suite import DataFrameTests + +from .mock.dataframe import MockDuckDataFrame + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="< 3.8") +class IbisDataFrameTests(DataFrameTests.Tests): + @classmethod + def setUpClass(cls): + cls._con = ibis.duckdb.connect() + + def df( + self, data: Any = None, schema: Any = None, metadata: Any = None + ) -> DuckDataFrame: + df = ArrowDataFrame(data, schema, metadata) + name = f"_{id(df.native)}" + self._con.con.execute("register", (name, df.native)) + return MockDuckDataFrame( + self._con.table(name), schema=schema, metadata=metadata + ) + + def test_is_local(self): + df = self.df([["x", 1]], "a:str,b:int") + assert not df.is_local + assert df.is_bounded + + def _test_as_arrow(self): + # empty + df = self.df([["a", 1]], "a:str,b:int") + assert [["a", 1]] == list(ArrowDataFrame(df.as_arrow()).as_array()) + + def test_map_type(self): + pass + + def test_as_arrow(self): + # empty + df = self.df([], "a:int,b:int") + assert [] == list(ArrowDataFrame(df.as_arrow()).as_dict_iterable()) + # pd.Nat + df = self.df([[pd.NaT, 1]], "a:datetime,b:int") + assert [dict(a=None, b=1)] == list( + ArrowDataFrame(df.as_arrow()).as_dict_iterable() + ) + # pandas timestamps + df = self.df([[pd.Timestamp("2020-01-01"), 1]], "a:datetime,b:int") + assert [dict(a=datetime(2020, 1, 1), b=1)] == list( + ArrowDataFrame(df.as_arrow()).as_dict_iterable() + ) diff --git a/tests/fugue_ibis/test_execution_engine.py b/tests/fugue_ibis/test_execution_engine.py new file mode 100644 index 00000000..b995fc9d --- /dev/null +++ b/tests/fugue_ibis/test_execution_engine.py @@ -0,0 +1,45 @@ +import sys + +import pytest +from fugue_test.builtin_suite import BuiltInTests +from fugue_test.execution_suite import ExecutionEngineTests + +from .mock.execution_engine import MockDuckExecutionEngine + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="< 3.8") +class IbisExecutionEngineTests(ExecutionEngineTests.Tests): + @classmethod + def setUpClass(cls): + cls._engine = cls.make_engine(cls) + + @classmethod + def tearDownClass(cls): + # cls._con.close() + pass + + def make_engine(self): + return MockDuckExecutionEngine({"test": True}) + + def test_select(self): + # it can't work properly with DuckDB (hugeint is not recognized) + pass + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="< 3.8") +class DuckBuiltInTests(BuiltInTests.Tests): + @classmethod + def setUpClass(cls): + cls._engine = cls.make_engine(cls) + + @classmethod + def tearDownClass(cls): + # cls._con.close() + pass + + def make_engine(self): + return MockDuckExecutionEngine({"test": True}) + + def test_df_select(self): + # it can't work properly with DuckDB (hugeint is not recognized) + pass diff --git a/tests/fugue_ibis/test_utils.py b/tests/fugue_ibis/test_utils.py index a1f4b9b4..06143afe 100644 --- a/tests/fugue_ibis/test_utils.py +++ b/tests/fugue_ibis/test_utils.py @@ -46,6 +46,9 @@ def test_schema(): a = Schema("a:[int],b:[{a:str}],c:{a:str},d:{a:[int]}") assert to_schema(to_ibis_schema(a)) == a + a = Schema("a:") + assert to_schema(to_ibis_schema(a)) == a + def test_materialize(): tdf1 = pd.DataFrame([[0, 1], [3, 4]], columns=["a", "b"]) diff --git a/tests/fugue_spark/test_dataframe.py b/tests/fugue_spark/test_dataframe.py index 700fc406..2edf552f 100644 --- a/tests/fugue_spark/test_dataframe.py +++ b/tests/fugue_spark/test_dataframe.py @@ -1,10 +1,7 @@ -import json -import math from datetime import datetime from typing import Any -import numpy as np -import pandas as pd +import pyspark from fugue.dataframe.array_dataframe import ArrayDataFrame from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.dataframe.utils import _df_eq as df_eq @@ -15,9 +12,9 @@ from triad.collections.schema import Schema, SchemaError from triad.exceptions import InvalidOperationError +from fugue_spark import SparkExecutionEngine from fugue_spark._utils.convert import to_schema, to_spark_schema from fugue_spark.dataframe import SparkDataFrame -from fugue_spark import SparkExecutionEngine class SparkDataFrameTests(DataFrameTests.Tests): @@ -32,6 +29,10 @@ def test_alter_columns_invalid(self): # TODO: Spark will silently cast invalid data to nulls without exceptions pass + def test_map_type(self): + if pyspark.__version__ >= "3": + return super().test_map_type() + def test_init(spark_session): sdf = spark_session.createDataFrame([["a", 1]]) diff --git a/tests/fugue_spark/utils/test_convert.py b/tests/fugue_spark/utils/test_convert.py index bd7159bd..e66bc2ed 100644 --- a/tests/fugue_spark/utils/test_convert.py +++ b/tests/fugue_spark/utils/test_convert.py @@ -1,5 +1,10 @@ -from fugue_spark._utils.convert import (to_cast_expression, to_schema, - to_select_expression, to_spark_schema) +import pyspark +from fugue_spark._utils.convert import ( + to_cast_expression, + to_schema, + to_select_expression, + to_spark_schema, +) from pytest import raises @@ -18,29 +23,62 @@ def test(expr): assert to_schema(df) == "a:int" assert to_schema(dict(a=str)) == "a:str" - from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType - schema = StructType([StructField("name", ArrayType(StructType([StructField("nest_name", StringType(), True), StructField("nest_value", IntegerType(), True)]), True), True)]) + from pyspark.sql.types import ( + ArrayType, + IntegerType, + MapType, + StringType, + StructField, + StructType, + ) + + schema = StructType( + [ + StructField( + "name", + ArrayType( + StructType( + [ + StructField("nest_name", StringType(), True), + StructField("nest_value", IntegerType(), True), + ] + ), + True, + ), + True, + ) + ] + ) df = spark_session.createDataFrame([[[("a", 1), ("b", 2)]]], schema) assert to_schema(df) == "name:[{nest_name:str,nest_value:int}]" assert to_spark_schema("name:[{nest_name:str,nest_value:int}]") == schema + if pyspark.__version__ >= "3": + schema = StructType( + [StructField("a", MapType(StringType(), IntegerType(), True), True)], + ) + df = spark_session.createDataFrame([[{"x": 1}], [{"y": 2}]], schema) + assert to_schema(df) == "a:" + assert to_spark_schema("a:") == schema + def test_to_cast_expression(): # length mismatch - raises(ValueError, lambda: to_cast_expression( - "a:int,b:int", "a:int", False)) - assert (False, ["a", "b"]) == to_cast_expression( - "a:int,b:int", "a:int,b:int", False) + raises(ValueError, lambda: to_cast_expression("a:int,b:int", "a:int", False)) assert (False, ["a", "b"]) == to_cast_expression( - "a:int,b:int", "a:int,b:int", True) + "a:int,b:int", "a:int,b:int", False + ) + assert (False, ["a", "b"]) == to_cast_expression("a:int,b:int", "a:int,b:int", True) assert (True, ["aa AS a", "b"]) == to_cast_expression( - "aa:int,b:int", "a:int,b:int", True) - raises(ValueError, lambda: to_cast_expression( - "aa:int,b:int", "a:int,b:int", False)) + "aa:int,b:int", "a:int,b:int", True + ) + raises(ValueError, lambda: to_cast_expression("aa:int,b:int", "a:int,b:int", False)) assert (True, ["CAST(a AS int) a", "b"]) == to_cast_expression( - "a:long,b:int", "a:int,b:int", True) + "a:long,b:int", "a:int,b:int", True + ) assert (True, ["CAST(aa AS int) a", "b"]) == to_cast_expression( - "aa:long,b:int", "a:int,b:int", True) + "aa:long,b:int", "a:int,b:int", True + ) def test_to_select_expression(): @@ -48,9 +86,12 @@ def test_to_select_expression(): raises(KeyError, lambda: to_select_expression("a:int,b:str", ["b", "x"])) assert to_select_expression("a:int,b:str", "b:str,a:int") == ["b", "a"] assert to_select_expression("a:int,b:str", "b:str,a:long") == [ - "b", "CAST(a AS bigint) a"] + "b", + "CAST(a AS bigint) a", + ] assert to_select_expression("a:int,b:double,c:float", "a:str,b:str,c:long") == [ "CAST(a AS string) a", "CAST(IF(isnan(b) OR b IS NULL, NULL, b) AS string) b", - "CAST(IF(isnan(c) OR c IS NULL, NULL, c) AS bigint) c"] - raises(KeyError, lambda: to_select_expression("a:int,b:str", "b:str,x:int")) + "CAST(IF(isnan(c) OR c IS NULL, NULL, c) AS bigint) c", + ] + raises(KeyError, lambda: to_select_expression("a:int,b:str", "b:str,x:int")) diff --git a/tests/fugue_sql/_test_syntax.py b/tests/fugue_sql/_test_syntax.py deleted file mode 100644 index cd613243..00000000 --- a/tests/fugue_sql/_test_syntax.py +++ /dev/null @@ -1,265 +0,0 @@ -from fugue_sql.exceptions import FugueSQLSyntaxError -from fugue_sql._parse import FugueSQL, _detect_case_issue -from pytest import raises -from tests.fugue_sql.utils import ( - bad_single_syntax, - bad_syntax, - good_single_syntax, - good_syntax, -) - - -def test_detect_case_issue(): - assert not _detect_case_issue("", 0.9) - assert not _detect_case_issue("--*&^^", 0.9) - assert _detect_case_issue("abc", 0.9) - assert _detect_case_issue("absdfAsdfsdc", 0.9) - assert not _detect_case_issue("ABC", 0.9) - assert _detect_case_issue("ABCa", 0.1) - - -def test_case_config_error(): - # with a hint of config issue - bad_syntax("a = select a where a==10", ignore_case=False, match=r".*ignore_case.*") - # without a hint of config issue - bad_syntax("a = SELECT a WHERr a==10", ignore_case=False, match=r"^no viable.*") - - -def test_assign_syntax(): - # simple assign not enabled - bad_single_syntax("a = select a where a==10", ignore_case=True, simple_assign=False) - # when simple assign enabled, comparison using = is still valid - good_single_syntax("a = select a where a=10", ignore_case=True, simple_assign=True) - good_single_syntax("a = select a where a==10", ignore_case=True, simple_assign=True) - # multiple expression test - good_syntax( - """ - a = select a where a==10 - b=select x""", - ignore_case=True, - simple_assign=True, - ) - - -def test_partition_syntax(): - good_single_syntax( - "a = transform", - ["", "hash", "even", "rand"], - " prepartition 100 using a", - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "a = transform prepartition 100 ", - ["by a,b,c"], - ["presort a asc, b desc"], - " using a", - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "a = transform prepartition by a", - ["presort a asc, b desc"], - " using a", - ignore_case=True, - simple_assign=True, - ) - - -def test_persist_broadcast_checkpoint_syntax(): - good_single_syntax( - "a = select a", - ["", "lazy"], - [ - "", - "PERSIst", - 'persist (level="a12")', - "weak checkpoint (level='a.b.c')", - "checkpoint", - "deterministic checkpoint 'x'", - "deterministic checkpoint 'x' prepartition by a single (level=1)", - ], - ["BROADCAst"], - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "a = select a", - ["", "checkpoint", "deterministic checkpoint 'x'"], - ["BROADCAst"], - ignore_case=True, - simple_assign=True, - ) - - -def test_select_syntax(): - # TODO: add a?? - bad_single_syntax("SELECT a FROM", ignore_case=False, ansi_sql=True) - good_single_syntax(["a:=", ""], "SELECT a", ["FROM sx"], ignore_case=False) - bad_single_syntax("select a", ["from sx"], ignore_case=False) - good_single_syntax( - "select a", ["from sx"], ["where a=10 and a==10"], ignore_case=True - ) - - # nested - good_single_syntax("SELECT a FROM (TRANSFORM USING x)", ["AS t"], ignore_case=False) - good_single_syntax( - """ - SELECT a FROM - (TRANSFORM USING x) AS x INNER JOIN (TRANSFORM USING x) AS y - ON x.a = b.a - """, - ignore_case=False, - ) - - # no from - good_syntax("select *", ignore_case=True, simple_assign=True) - good_syntax("select * where a=100", ignore_case=True, simple_assign=True) - - -def test_schema_syntax(): - good_syntax( - [ - "*", - "a:int", - "a:int,*", - "*,a:int", - "a:int,*,b:int", - "*-a,b", - "*~c", - "*+c:str,d:int", - "*,k:str+a:str,b:str-c~x", - ], - ignore_case=False, - rule="fugueWildSchema", - ) - - -def test_transform_syntax(): - # test data sources and nested - good_single_syntax( - "a = transform ", - ["", "a,b", "(transform using x)"], - "using x", - ignore_case=True, - simple_assign=True, - ) - # extensions - good_single_syntax( - "transform", - ["prepartition by x"], - " using ", - ["x", "x.Y.z"], - ignore_case=True, - simple_assign=True, - ) - # params - good_single_syntax( - "a= transform using x", - ["", 'params a:10,b="20"'], - ignore_case=True, - simple_assign=True, - ) - # params - good_single_syntax( - "a= transform using x", - ["params"], - [ - "{a:10,b:{x:10,y:true,z:false,w:null}}", - "(a=10,b=True)", - "(a:10,b:{x:10,y:true,z:false,w:null})", - ], - ignore_case=True, - simple_assign=True, - ) - # schemas - good_single_syntax( - "a= transform using x", - ["", "schema a : int,b:{x:int,y:[str]},c:[int]"], - ignore_case=True, - simple_assign=True, - ) - # schemas bad list - bad_single_syntax( - "a= transform using x schema a:int,c:[int,str]", - ignore_case=True, - simple_assign=True, - ) - - -def test_process_create_syntax(): - good_single_syntax( - "a = create using a.b.c(a=1,b=2) schema a:int", - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "process", - ["", "a,b", "(create using x)"], - ["prepartition by x"], - "using a.b.c(a=1,b=2,) schema a:int", - ignore_case=True, - simple_assign=True, - ) - - -def test_output_syntax(): - good_single_syntax( - "output", - ["", "a,b", "(create using x)"], - ["prepartition by x"], - "using a.b.c", - ["(a=1,b=2,)"], - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "print", - ["100 rows", "from", "a,b", "(create using x)"], - ["rowcount"], - ["title 'abc'"], - ignore_case=True, - simple_assign=True, - ) - - -def test_zip_syntax(): - good_single_syntax( - "a = zip a,b", - ["", "cross", "inner", "left outer", "right outer", "full outer"], - ["by x"], - ["presort y desc"], - ignore_case=True, - simple_assign=True, - ) - - -def test_load_save_syntax(): - good_single_syntax( - "load", - ["", "csv", "parquet", "json"], - '"x"', - ["(a=1)"], - ["", "columns a,b", "columns a:int,b:int"], - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "save", - ["a"], - ["to", "overwrite", "append"], - '"x"', - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "save to", - ["single"], - ["", "csv", "parquet", "json"], - '"x"', - ignore_case=True, - simple_assign=True, - ) - good_single_syntax( - "save to 'x'", ["(header=True)"], ignore_case=True, simple_assign=True - ) diff --git a/tests/fugue_sql/test_visitors.py b/tests/fugue_sql/test_visitors.py index 79a0a408..1e1a69d2 100644 --- a/tests/fugue_sql/test_visitors.py +++ b/tests/fugue_sql/test_visitors.py @@ -11,9 +11,12 @@ _PARSE_MODE = "auto" + def test_json(): def assert_eq(expr, expected): - sql = FugueSQLParser(expr, "fugueJsonValue", ignore_case=True, parse_mode=_PARSE_MODE) + sql = FugueSQLParser( + expr, "fugueJsonValue", ignore_case=True, parse_mode=_PARSE_MODE + ) v = _VisitorBase(sql) obj = v.visit(sql.tree) if expected is None: @@ -51,7 +54,9 @@ def assert_eq(expr, expected): def test_schema(): def assert_eq(expr, expected=None): - sql = FugueSQLParser(expr, "fugueSchema", ignore_case=True, parse_mode=_PARSE_MODE) + sql = FugueSQLParser( + expr, "fugueSchema", ignore_case=True, parse_mode=_PARSE_MODE + ) v = _VisitorBase(sql) obj = v.visit(sql.tree) if expected is None: @@ -63,11 +68,14 @@ def assert_eq(expr, expected=None): assert_eq("a:int,B:str,c:[int]") assert_eq("a:int,b:str,C:{A:int,b:str}") assert_eq("a:int,\nb:str,C:[{A:int,b:str}]") + assert_eq("a:,b:,c:[]") def test_wild_schema(): def assert_eq(expr, expected=None): - sql = FugueSQLParser(expr, "fugueWildSchema", ignore_case=True, parse_mode=_PARSE_MODE) + sql = FugueSQLParser( + expr, "fugueWildSchema", ignore_case=True, parse_mode=_PARSE_MODE + ) v = _VisitorBase(sql) obj = v.visit(sql.tree) if expected is None: @@ -91,7 +99,9 @@ def assert_eq(expr, expected=None): def test_pre_partition(): def assert_eq(expr, expected): - sql = FugueSQLParser(expr, "fuguePrepartition", ignore_case=True, parse_mode=_PARSE_MODE) + sql = FugueSQLParser( + expr, "fuguePrepartition", ignore_case=True, parse_mode=_PARSE_MODE + ) v = _VisitorBase(sql) obj = json.dumps(v.visit(sql.tree).jsondict) assert json.dumps(expected.jsondict) == obj @@ -122,7 +132,9 @@ def assert_eq(expr, expected): def test_params(): def assert_eq(expr, expected): - sql = FugueSQLParser(expr, "fugueParams", ignore_case=True, parse_mode=_PARSE_MODE) + sql = FugueSQLParser( + expr, "fugueParams", ignore_case=True, parse_mode=_PARSE_MODE + ) v = _VisitorBase(sql) obj = v.visit(sql.tree) assert expected == obj @@ -136,7 +148,12 @@ def assert_eq(expr, expected): def test_single_output_common_expr(): def assert_eq(expr, using, params, schema): - sql = FugueSQLParser(expr, "fugueSingleOutputExtensionCommon", ignore_case=True, parse_mode=_PARSE_MODE) + sql = FugueSQLParser( + expr, + "fugueSingleOutputExtensionCommon", + ignore_case=True, + parse_mode=_PARSE_MODE, + ) v = _VisitorBase(sql) obj = v.visit(sql.tree) assert using == obj["fugueUsing"] @@ -163,7 +180,9 @@ def assert_eq(expr, using, params, schema): def test_assignment(): def assert_eq(expr, varname, sign): - sql = FugueSQLParser(expr, "fugueAssignment", ignore_case=True, parse_mode=_PARSE_MODE) + sql = FugueSQLParser( + expr, "fugueAssignment", ignore_case=True, parse_mode=_PARSE_MODE + ) v = _VisitorBase(sql) obj = v.visit(sql.tree) assert (varname, sign) == obj