Skip to content

Commit

Permalink
Deprecate Python 3.6, enable map type support, add IbisDataFrame (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan authored Sep 28, 2022
1 parent e9a7062 commit f7c5e63
Show file tree
Hide file tree
Showing 39 changed files with 1,010 additions and 385 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, "3.10"]
python-version: [3.7, "3.10"]

steps:
- uses: actions/checkout@v2
Expand Down
6 changes: 6 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## 0.7.3

- [362](https://github.com/fugue-project/fugue/issues/362) Remove Python 3.6 Support
- [363](https://github.com/fugue-project/fugue/issues/363) Create IbisDataFrame and IbisExecutionEngine
- [364](https://github.com/fugue-project/fugue/issues/364) Enable Map type support

## 0.7.2

- [348](https://github.com/fugue-project/fugue/issues/348) Make create data error more informative
Expand Down
13 changes: 8 additions & 5 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import inspect
import logging
import os
from typing import Any, Callable, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union

import pandas as pd
from fugue._utils.interfaceless import (
Expand Down Expand Up @@ -266,13 +266,16 @@ def dropna(
self,
df: DataFrame,
how: str = "any",
thresh: int = None,
thresh: Optional[int] = None,
subset: List[str] = None,
metadata: Any = None,
) -> DataFrame:
d = df.as_pandas().dropna(
axis=0, how=how, thresh=thresh, subset=subset, inplace=False
)
kwargs: Dict[str, Any] = dict(axis=0, subset=subset, inplace=False)
if thresh is None:
kwargs["how"] = how
else:
kwargs["thresh"] = thresh
d = df.as_pandas().dropna(**kwargs)
return PandasDataFrame(d.reset_index(drop=True), df.schema, metadata)

def fillna(
Expand Down
4 changes: 2 additions & 2 deletions fugue_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dask.dataframe as pd
import pandas
import pyarrow as pa
from fugue.dataframe import DataFrame, LocalDataFrame, PandasDataFrame
from fugue.dataframe import ArrowDataFrame, DataFrame, LocalDataFrame, PandasDataFrame
from fugue.dataframe.dataframe import _input_schema
from fugue.exceptions import FugueDataFrameOperationError
from triad.collections.schema import Schema
Expand Down Expand Up @@ -183,7 +183,7 @@ def as_array(
df: DataFrame = self
if columns is not None:
df = df[columns]
return PandasDataFrame(df.as_pandas(), schema=df.schema).as_array(
return ArrowDataFrame(df.as_pandas(), schema=df.schema).as_array(
type_safe=type_safe
)

Expand Down
4 changes: 2 additions & 2 deletions fugue_dask/ibis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import dask.dataframe as dd
import ibis
import ibis.expr.types as ir
from fugue import DataFrame, DataFrames, ExecutionEngine
from fugue_ibis import IbisTable
from fugue_ibis._utils import to_ibis_schema, to_schema
from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine
from ibis.backends.dask import Backend
Expand All @@ -24,7 +24,7 @@ def __init__(self, execution_engine: ExecutionEngine) -> None:
super().__init__(execution_engine)

def select(
self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr]
self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], IbisTable]
) -> DataFrame:
pdfs = {
k: self.execution_engine.to_df(v).native # type: ignore
Expand Down
35 changes: 29 additions & 6 deletions fugue_duckdb/_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import date, datetime
from typing import Any, Dict, Iterable, Tuple
from typing import Any, Dict, Iterable, Optional, Tuple
from uuid import uuid4

import numpy as np
Expand Down Expand Up @@ -85,6 +85,10 @@ def _to_duck_type(tp: pa.DataType) -> str:
if pa.types.is_struct(tp):
inner = ",".join(f.name + " " + _to_duck_type(f.type) for f in tp)
return f"STRUCT({inner})"
if pa.types.is_map(tp):
k = _to_duck_type(tp.key_type)
v = _to_duck_type(tp.item_type)
return f"MAP({k},{v})"
if pa.types.is_list(tp):
inner = _to_duck_type(tp.value_type)
return f"{inner}[]"
Expand All @@ -108,6 +112,12 @@ def _to_pa_type(duck_type: str) -> pa.DataType:
for k, v in _split_comma(duck_type[p + 1 : -1])
]
return pa.struct(fields)
if tp == "MAP":
fields = [
_to_pa_type(t.strip())
for t, _ in _split_comma(duck_type[p + 1 : -1], split_char=None)
]
return pa.map_(fields[0], fields[1])
if tp != "DECIMAL":
raise Exception
pair = duck_type[p + 1 : -1].split(",", 1)
Expand All @@ -130,6 +140,10 @@ def _to_duck_type_legacy(tp: pa.DataType) -> str: # pragma: no cover
if pa.types.is_list(tp):
inner = _to_duck_type_legacy(tp.value_type)
return f"LIST<{inner}>"
if pa.types.is_map(tp):
k = _to_duck_type_legacy(tp.key_type)
v = _to_duck_type_legacy(tp.item_type)
return f"LIST<{k},{v}>"
if pa.types.is_decimal(tp):
return f"DECIMAL({tp.precision}, {tp.scale})"
return _PA_TYPES_TO_DUCK[tp]
Expand Down Expand Up @@ -174,7 +188,10 @@ def _to_pa_type_legacy(duck_type: str) -> pa.DataType: # pragma: no cover


def _split_comma(
expr: str, split_char=" ", left_char="(", right_char=")"
expr: str,
split_char: Optional[str] = " ",
left_char: str = "(",
right_char: str = ")",
) -> Iterable[Tuple[str, str]]:
lv = 0
start = 0
Expand All @@ -184,8 +201,14 @@ def _split_comma(
elif expr[i] == right_char:
lv -= 1
elif lv == 0 and expr[i] == ",":
x = expr[start:i].strip().split(split_char, 1)
yield x[0], x[1]
if split_char is None:
yield expr[start:i].strip(), ""
else:
x = expr[start:i].strip().split(split_char, 1)
yield x[0], x[1]
start = i + 1
x = expr[start : len(expr)].strip().split(split_char, 1)
yield x[0], x[1]
if split_char is None:
yield expr[start : len(expr)].strip(), ""
else:
x = expr[start : len(expr)].strip().split(split_char, 1)
yield x[0], x[1]
20 changes: 17 additions & 3 deletions fugue_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,31 @@ def as_array(
) -> List[Any]:
if columns is not None:
return self[columns].as_array(type_safe=type_safe)
return [list(x) for x in self._rel.fetchall()]
return self._fetchall(self._rel)

def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
if columns is not None:
yield from self[columns].as_array_iterable(type_safe=type_safe)
else:
yield from [list(x) for x in self._rel.fetchall()]
yield from self._fetchall(self._rel)

def head(self, n: int, columns: Optional[List[str]] = None) -> List[Any]:
if columns is not None:
return self[columns].head(n)
return [list(x) for x in self._rel.limit(n).fetchall()]
return self._fetchall(self._rel.limit(n))

def _fetchall(self, rel: DuckDBPyRelation) -> List[List[Any]]:
map_pos = [i for i, t in enumerate(self.schema.types) if pa.types.is_map(t)]
if len(map_pos) == 0:
return [list(x) for x in rel.fetchall()]
else:

def to_list(row: Any) -> List[Any]:
res = list(row)
for p in map_pos:
res[p] = list(zip(row[p]["key"], row[p]["value"]))
return res

return [to_list(x) for x in rel.fetchall()]
8 changes: 4 additions & 4 deletions fugue_duckdb/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ def intersect(
) -> DataFrame:
if distinct:
t1, t2 = get_temp_df_name(), get_temp_df_name()
sql = f"SELECT * FROM {t1} INTERSECT SELECT * FROM {t2}"
sql = f"SELECT * FROM {t1} INTERSECT DISTINCT SELECT * FROM {t2}"
return self._sql(sql, {t1: df1, t2: df2}, metadata=metadata)
return DuckDataFrame(
self._to_duck_df(df1).native.intersect(self._to_duck_df(df2).native),
metadata=metadata,
raise NotImplementedError(
"DuckDB doesn't have consist behavior on INTERSECT ALL,"
" so Fugue doesn't support it"
)

def distinct(
Expand Down
4 changes: 2 additions & 2 deletions fugue_duckdb/ibis_engine.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Any, Callable, Optional

import ibis
import ibis.expr.types as ir
from fugue import DataFrame, DataFrames, ExecutionEngine
from fugue_ibis import IbisTable
from fugue_ibis._utils import to_ibis_schema
from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine
from ibis.backends.pandas import Backend
Expand All @@ -12,7 +12,7 @@

class DuckDBIbisEngine(IbisEngine):
def select(
self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], ir.TableExpr]
self, dfs: DataFrames, ibis_func: Callable[[ibis.BaseBackend], IbisTable]
) -> DataFrame:
be = _BackendWrapper().connect({})
be.set_schemas(dfs)
Expand Down
12 changes: 9 additions & 3 deletions fugue_ibis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# flake8: noqa
from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine
from fugue_ibis.execution.pandas_backend import _to_pandas_ibis_engine
from fugue_ibis.extensions import as_fugue, as_ibis, run_ibis
from triad import run_at_def

from ._compat import IbisTable
from .dataframe import IbisDataFrame
from .execution.ibis_engine import IbisEngine, register_ibis_engine
from .execution.pandas_backend import _to_pandas_ibis_engine
from .execution_engine import IbisExecutionEngine
from .extensions import as_fugue, as_ibis, run_ibis


@run_at_def
def register():
register_ibis_engine(1, _to_pandas_ibis_engine)
7 changes: 7 additions & 0 deletions fugue_ibis/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# flake8: noqa
# pylint: disable-all

try:
from ibis.expr.types import Table as IbisTable
except Exception:
from ibis.expr.types import TableExpr as IbisTable
11 changes: 9 additions & 2 deletions fugue_ibis/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def _ibis_to_pa_type(tp: dt.DataType) -> pa.DataType:
if isinstance(tp, dt.Struct):
fields = [pa.field(n, _ibis_to_pa_type(t)) for n, t in zip(tp.names, tp.types)]
return pa.struct(fields)
if isinstance(tp, dt.Map):
return pa.map_(_ibis_to_pa_type(tp.key_type), _ibis_to_pa_type(tp.value_type))
raise NotImplementedError(tp) # pragma: no cover


Expand All @@ -84,10 +86,15 @@ def _pa_to_ibis_type(tp: pa.DataType) -> dt.DataType:
return _PYARROW_TO_IBIS[tp]
if pa.types.is_list(tp):
ttp = _pa_to_ibis_type(tp.value_type)
return dt.Array(ttp)
return dt.Array(value_type=ttp)
if pa.types.is_struct(tp):
fields = [(f.name, _pa_to_ibis_type(f.type)) for f in tp]
return dt.Struct([x[0] for x in fields], [x[1] for x in fields])
return dt.Struct.from_tuples(fields)
if pa.types.is_map(tp):
return dt.Map(
key_type=_pa_to_ibis_type(tp.key_type),
value_type=_pa_to_ibis_type(tp.item_type),
)
raise NotImplementedError(tp) # pragma: no cover


Expand Down
Loading

0 comments on commit f7c5e63

Please sign in to comment.