Skip to content

Commit

Permalink
Remove duckdb cap (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan authored Jul 10, 2022
1 parent 52bd892 commit 2506847
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 35 deletions.
96 changes: 81 additions & 15 deletions fugue_duckdb/_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from datetime import datetime, date
import pyarrow as pa
from datetime import date, datetime
from typing import Any, Dict, Iterable, Tuple
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP
from uuid import uuid4
import pandas as pd

import numpy as np
import pandas as pd
import pyarrow as pa
from duckdb import __version__ as _DUCKDB_VERSION
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP

_LEGACY_DUCKDB = _DUCKDB_VERSION < "0.3.3"

_DUCK_TYPES_TO_PA: Dict[str, pa.DataType] = {
"BIGINT": pa.int64(),
Expand Down Expand Up @@ -65,12 +69,66 @@ def get_temp_df_name() -> str:


def to_duck_type(tp: pa.DataType) -> str:
if _LEGACY_DUCKDB: # pragma: no cover
return _to_duck_type_legacy(tp)
return _to_duck_type(tp)


def to_pa_type(duck_type: str) -> pa.DataType:
if _LEGACY_DUCKDB: # pragma: no cover
return _to_pa_type_legacy(duck_type)
return _to_pa_type(duck_type)


def _to_duck_type(tp: pa.DataType) -> str:
try:
if pa.types.is_struct(tp):
inner = ",".join(f.name + " " + _to_duck_type(f.type) for f in tp)
return f"STRUCT({inner})"
if pa.types.is_list(tp):
inner = _to_duck_type(tp.value_type)
return f"{inner}[]"
if pa.types.is_decimal(tp):
return f"DECIMAL({tp.precision}, {tp.scale})"
return _PA_TYPES_TO_DUCK[tp]
except Exception: # pragma: no cover
raise ValueError(f"can't convert {tp} to DuckDB data type")


def _to_pa_type(duck_type: str) -> pa.DataType:
try:
if duck_type.endswith("[]"):
return pa.list_(_to_pa_type(duck_type[:-2]))
p = duck_type.find("(")
if p > 0:
tp = duck_type[:p]
if tp == "STRUCT":
fields = [
pa.field(k, _to_pa_type(v.strip()))
for k, v in _split_comma(duck_type[p + 1 : -1])
]
return pa.struct(fields)
if tp != "DECIMAL":
raise Exception
pair = duck_type[p + 1 : -1].split(",", 1)
return pa.decimal128(int(pair[0]), int(pair[1]))
if duck_type == "HUGEINT":
return pa.int64()
if duck_type in _DUCK_TYPES_TO_PA:
return _DUCK_TYPES_TO_PA[duck_type]
raise Exception
except Exception:
raise ValueError(f"{duck_type} is not supported")


def _to_duck_type_legacy(tp: pa.DataType) -> str: # pragma: no cover
# TODO: remove in the future
try:
if pa.types.is_struct(tp):
inner = ",".join(f.name + ": " + to_duck_type(f.type) for f in tp)
inner = ",".join(f.name + ": " + _to_duck_type_legacy(f.type) for f in tp)
return f"STRUCT<{inner}>"
if pa.types.is_list(tp):
inner = to_duck_type(tp.value_type)
inner = _to_duck_type_legacy(tp.value_type)
return f"LIST<{inner}>"
if pa.types.is_decimal(tp):
return f"DECIMAL({tp.precision}, {tp.scale})"
Expand All @@ -79,18 +137,24 @@ def to_duck_type(tp: pa.DataType) -> str:
raise ValueError(f"can't convert {tp} to DuckDB data type")


def to_pa_type(duck_type: str) -> pa.DataType:
def _to_pa_type_legacy(duck_type: str) -> pa.DataType: # pragma: no cover
# TODO: remove in the future
try:
p = duck_type.find("<")
if p > 0:
tp = duck_type[:p]
if tp == "LIST":
itp = to_pa_type(duck_type[p + 1 : -1])
itp = _to_pa_type_legacy(duck_type[p + 1 : -1])
return pa.list_(itp)
if tp == "STRUCT":
fields = [
pa.field(k, to_pa_type(v.strip()))
for k, v in _split_comma(duck_type[p + 1 : -1])
pa.field(k, _to_pa_type_legacy(v.strip()))
for k, v in _split_comma(
duck_type[p + 1 : -1],
split_char=":",
left_char="<",
right_char=">",
)
]
return pa.struct(fields)
raise Exception
Expand All @@ -109,17 +173,19 @@ def to_pa_type(duck_type: str) -> pa.DataType:
raise ValueError(f"{duck_type} is not supported")


def _split_comma(expr: str) -> Iterable[Tuple[str, str]]:
def _split_comma(
expr: str, split_char=" ", left_char="(", right_char=")"
) -> Iterable[Tuple[str, str]]:
lv = 0
start = 0
for i in range(len(expr)):
if expr[i] == "<":
if expr[i] == left_char:
lv += 1
elif expr[i] == ">":
elif expr[i] == right_char:
lv -= 1
elif lv == 0 and expr[i] == ",":
x = expr[start:i].strip().split(":", 1)
x = expr[start:i].strip().split(split_char, 1)
yield x[0], x[1]
start = i + 1
x = expr[start : len(expr)].strip().split(":", 1)
x = expr[start : len(expr)].strip().split(split_char, 1)
yield x[0], x[1]
7 changes: 4 additions & 3 deletions fugue_duckdb/dask.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any, Optional, Union, Callable
from typing import Any, Callable, Optional, Union

import duckdb
import pyarrow as pa
from duckdb import DuckDBPyConnection
from fugue import DataFrame, LocalDataFrame, PartitionCursor, PartitionSpec
from fugue.collections.partition import EMPTY_PARTITION_SPEC
Expand All @@ -9,7 +11,6 @@
import dask.dataframe as dd
from fugue_duckdb.dataframe import DuckDataFrame
from fugue_duckdb.execution_engine import DuckExecutionEngine
import pyarrow as pa


class DuckDaskExecutionEngine(DuckExecutionEngine):
Expand Down Expand Up @@ -37,7 +38,7 @@ def to_df(self, df: Any, schema: Any = None, metadata: Any = None) -> DuckDataFr
)
else:
return DuckDataFrame(
self.connection.from_arrow_table(ddf.as_arrow()),
duckdb.arrow(ddf.as_arrow(), connection=self.connection),
metadata=dict(ddf.metadata),
)
return super().to_df(df, schema, metadata)
Expand Down
19 changes: 11 additions & 8 deletions fugue_duckdb/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ def _duck_select(self, dfs: DataFrames, statement: str) -> DataFrame:
for k, v in dfs.items():
tdf: Any = self.execution_engine.to_df(v)
if k not in self._cache or self._cache[k] != id(tdf.native):
# tdf.native.create_view(k, replace=True)
kk = k + get_temp_df_name()
tdf.native.query( # TODO: a hack to avoid DuckDB stability issue
kk, f"CREATE OR REPLACE TEMP VIEW {k} AS SELECT * FROM {kk}"
)
tdf.native.create_view(k, replace=True)
# TODO: remove the following hack, if it is stable
# kk = k + get_temp_df_name()
# tdf.native.query(
# kk, f"CREATE OR REPLACE TEMP VIEW {k} AS SELECT * FROM {kk}"
# )
self._cache[k] = id(tdf.native)
result = self.execution_engine.connection.query(statement) # type: ignore
return DuckDataFrame(result)
Expand All @@ -67,7 +68,7 @@ def _other_select(self, dfs: DataFrames, statement: str) -> DataFrame:
conn = duckdb.connect()
try:
for k, v in dfs.items():
conn.from_arrow_table(v.as_arrow()).create_view(k)
duckdb.arrow(v.as_arrow(), connection=conn).create_view(k)
return ArrowDataFrame(conn.execute(statement).arrow())
finally:
conn.close()
Expand Down Expand Up @@ -147,12 +148,14 @@ def to_df(self, df: Any, schema: Any = None, metadata: Any = None) -> DuckDataFr
)
else:
rdf = DuckDataFrame(
self.connection.from_arrow_table(df.as_arrow()),
duckdb.arrow(df.as_arrow(), connection=self.connection),
metadata=dict(df.metadata),
)
return rdf
tdf = ArrowDataFrame(df, schema)
return DuckDataFrame(self.connection.from_arrow_table(tdf.native), metadata)
return DuckDataFrame(
duckdb.arrow(tdf.native, connection=self.connection), metadata
)

def repartition(
self, df: DataFrame, partition_spec: PartitionSpec
Expand Down
10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_version() -> str:
"jupyterlab",
"ipython>=7.10.0",
"dash",
"duckdb>=0.3.1,<0.3.3",
"duckdb>=0.3.2",
"pyarrow>=5.0.0",
"ibis-framework>=2; python_version >= '3.7'",
],
Expand All @@ -77,10 +77,10 @@ def get_version() -> str:
package_data={"fugue_notebook": ["nbextension/*"]},
entry_points={
"fugue.plugins": [
"ibis = fugue_ibis:register",
"duckdb = fugue_duckdb:register",
"spark = fugue_spark:register",
"dask = fugue_dask:register",
"ibis = fugue_ibis:register[ibis]",
"duckdb = fugue_duckdb:register[duckdb]",
"spark = fugue_spark:register[spark]",
"dask = fugue_dask:register[dask]",
]
},
)
2 changes: 1 addition & 1 deletion tests/fugue_duckdb/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def df(
self, data: Any = None, schema: Any = None, metadata: Any = None
) -> DuckDataFrame:
df = ArrowDataFrame(data, schema, metadata)
return DuckDataFrame(self._con.from_arrow_table(df.native), metadata=metadata)
return DuckDataFrame(duckdb.arrow(df.native, self._con), metadata=metadata)

def test_as_array_special_values(self):
for func in [
Expand Down
20 changes: 17 additions & 3 deletions tests/fugue_duckdb/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def test_type_conversion():
con = duckdb.connect()

def assert_(tp):
dt = con.from_arrow_table(pa.Table.from_pydict(dict(a=pa.nulls(2, tp)))).types[
0
]
dt = duckdb.arrow(pa.Table.from_pydict(dict(a=pa.nulls(2, tp))), con).types[0]
assert to_pa_type(dt) == tp
dt = to_duck_type(tp)
assert to_pa_type(dt) == tp
Expand All @@ -69,6 +67,22 @@ def assert_many(*tps):
assert_(
pa.struct([pa.field("x", pa.int64()), pa.field("yy", pa.list_(pa.int64()))])
)
assert_(
pa.struct(
[
pa.field("x", pa.int64()),
pa.field("yy", pa.struct([pa.field("x", pa.int64())])),
]
)
)
assert_(
pa.struct(
[
pa.field("x", pa.int64()),
pa.field("yy", pa.list_(pa.struct([pa.field("x", pa.int64())]))),
]
)
)

raises(ValueError, lambda: to_pa_type(""))
raises(ValueError, lambda: to_pa_type("XX"))
Expand Down

0 comments on commit 2506847

Please sign in to comment.