Skip to content

Commit

Permalink
Fugue plugin (#311)
Browse files Browse the repository at this point in the history
* plugin

* update
  • Loading branch information
goodwanghan authored Mar 24, 2022
1 parent 4419961 commit f2676e0
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 55 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- A [smarter](https://github.com/fugue-project/fugue/issues/304) way to determine default npartitions in Dask
- Support [even partitioning](https://github.com/fugue-project/fugue/issues/303) on Dask
- Add handling of [nested ArrayType](https://github.com/fugue-project/fugue/issues/308) on Spark
- Change to [plugin approach](https://github.com/fugue-project/fugue/issues/310) to avoid explicit import

## 0.6.5

Expand Down
2 changes: 2 additions & 0 deletions fugue/_utils/interfaceless.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ def _parse_param( # noqa: C901
param: Optional[inspect.Parameter],
none_as_other: bool = True,
) -> "_FuncParam":
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

if annotation == type(None): # noqa: E721
return _NoneParam(param)
if annotation == inspect.Parameter.empty:
Expand Down
17 changes: 17 additions & 0 deletions fugue/_utils/register.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
try:
from importlib.metadata import entry_points # type:ignore
except ImportError: # pragma: no cover
from importlib_metadata import entry_points # type:ignore


def register_plugins():
for plugin in entry_points().get("fugue.plugins", []):
try:
register_func = plugin.load()
assert callable(register_func), f"{plugin.name} is not a callable"
register_func()
except ImportError: # pragma: no cover
pass


register_plugins()
31 changes: 18 additions & 13 deletions fugue/execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
SqliteEngine,
)

register_execution_engine(
"native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_execution_engine(
"pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore")
register_sql_engine(
"qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)
register_sql_engine(
"qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)

def register():
register_execution_engine(
"native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_execution_engine(
"pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore")
register_sql_engine(
"qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)
register_sql_engine(
"qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)


register()
4 changes: 4 additions & 0 deletions fugue/execution/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ def make_execution_engine(
# SparkExecutionEngine + S2
make_execution_engine((SparkExecutionEngine, "s"))
"""
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

return _EXECUTION_ENGINE_FACTORY.make(engine, conf, **kwargs)


Expand Down Expand Up @@ -404,4 +406,6 @@ def make_sql_engine(
# SqliteEngine(engine)
make_sql_engine(SqliteEngine)
"""
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

return _EXECUTION_ENGINE_FACTORY.make_sql_engine(engine, execution_engine, **kwargs)
2 changes: 2 additions & 0 deletions fugue/extensions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def register(self, name: str, extension: Any, on_dup="overwrite") -> None:
raise ValueError(on_dup)

def get(self, obj: Any) -> Any:
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

if isinstance(obj, str) and obj in self._dict:
return self._dict[obj]
return obj
Expand Down
2 changes: 2 additions & 0 deletions fugue/workflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ def is_acceptable_raw_df(df: Any) -> bool:
:param df: input raw dataframe
:return: whether this dataframe is convertible
"""
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

return any(isinstance(df, t) for t in _VALID_RAW_DF_TYPES)
10 changes: 7 additions & 3 deletions fugue_ibis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# flake8: noqa
from fugue_ibis.execution import pandas_backend
from fugue_ibis.execution.ibis_engine import IbisEngine
from fugue_ibis.extensions import run_ibis, as_ibis, as_fugue
from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine
from fugue_ibis.execution.pandas_backend import _to_pandas_ibis_engine
from fugue_ibis.extensions import as_fugue, as_ibis, run_ibis


def register():
register_ibis_engine(1, _to_pandas_ibis_engine)
5 changes: 1 addition & 4 deletions fugue_ibis/execution/pandas_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from triad.utils.assertion import assert_or_throw
import pandas as pd

from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine
from fugue_ibis.execution.ibis_engine import IbisEngine
from fugue_ibis._utils import to_schema, to_ibis_schema
from ibis.backends.pandas import Backend

Expand Down Expand Up @@ -53,6 +53,3 @@ def table(self, name: str, schema: Any = None):
if schema is None and name in self._schemas
else schema,
)


register_ibis_engine(1, _to_pandas_ibis_engine)
35 changes: 1 addition & 34 deletions fugue_notebook/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

import fugue_sql
import pandas as pd
from fugue import (
ExecutionEngine,
NativeExecutionEngine,
make_execution_engine,
register_execution_engine,
)
from fugue import ExecutionEngine, make_execution_engine
from fugue.dataframe import YieldedDataFrame
from fugue.extensions._builtins.outputters import Show
from fugue_sql.exceptions import FugueSQLSyntaxError
Expand Down Expand Up @@ -38,33 +33,6 @@ def get_pretty_print(self) -> Callable:
"""Fugue dataframe pretty print handler"""
return _default_pretty_print

def register_execution_engines(self):
"""Register execution engines with names. This will also try to register
spark and dask engines if the dependent packages are available and they
are not registered"""
register_execution_engine(
"native",
lambda conf, **kwargs: NativeExecutionEngine(conf=conf),
on_dup="ignore",
)

try:
import pyspark # noqa: F401
import fugue_spark # noqa: F401
except ImportError:
pass

try:
import dask.dataframe # noqa: F401
import fugue_dask # noqa: F401
except ImportError:
pass

try:
import fugue_duckdb # noqa: F401
except ImportError:
pass


@magics_class
class _FugueSQLMagics(Magics):
Expand Down Expand Up @@ -151,5 +119,4 @@ def _setup_fugue_notebook(
fsql_ignore_case=fsql_ignore_case,
)
ipython.register_magics(magics)
s.register_execution_engines()
Show.set_hook(s.get_pretty_print())
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.6.5"
__version__ = "0.6.6"
9 changes: 9 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def get_version() -> str:
"sqlalchemy",
"pyarrow>=0.15.1",
"pandas>=1.0.2",
"importlib-metadata; python_version < '3.8'",
],
extras_require={
"sql": ["antlr4-python3-runtime", "jinja2"],
Expand Down Expand Up @@ -71,4 +72,12 @@ def get_version() -> str:
],
python_requires=">=3.6",
package_data={"fugue_notebook": ["nbextension/*"]},
entry_points={
"fugue.plugins": [
"ibis = fugue_ibis:register",
"duckdb = fugue_duckdb:register",
"spark = fugue_spark:register",
"dask = fugue_dask:register",
]
},
)
24 changes: 24 additions & 0 deletions tests/fugue_dask/test_importless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from fugue import FugueWorkflow
from fugue_sql import fsql


def test_importless():
for engine in ["dask"]:
dag = FugueWorkflow()
dag.df([[0]], "a:int").show()

dag.run(engine)

fsql(
"""
CREATE [[0],[1]] SCHEMA a:int
SELECT * WHERE a<1
PRINT
"""
).run(engine)

dag = FugueWorkflow()
idf = dag.df([[0], [1]], "a:int").as_ibis()
idf[idf.a < 1].as_fugue().show()

dag.run(engine)
30 changes: 30 additions & 0 deletions tests/fugue_duckdb/test_importless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from fugue import FugueWorkflow
from fugue_sql import fsql


def test_importless():
for engine in ["duck", "duckdb"]:
dag = FugueWorkflow()
dag.df([[0]], "a:int").show()

dag.run(engine)

fsql(
"""
CREATE [[0],[1]] SCHEMA a:int
SELECT * WHERE a<1
PRINT
"""
).run(engine)

dag = FugueWorkflow()
idf = dag.df([[0], [1]], "a:int").as_ibis()
idf[idf.a < 1].as_fugue().show()

dag.run(engine)

dag = FugueWorkflow()
tdf = dag.df([[0], [1]], "a:int")
dag.select("SELECT * FROM ", tdf, " WHERE a<1", sql_engine=engine)

dag.run()
10 changes: 10 additions & 0 deletions tests/fugue_ibis/test_importless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from fugue import FugueWorkflow


def test_importless():
for engine in [None]:
dag = FugueWorkflow()
idf = dag.df([[0], [1]], "a:int").as_ibis()
idf[idf.a < 1].as_fugue().show()

dag.run(engine)
26 changes: 26 additions & 0 deletions tests/fugue_spark/test_importless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from fugue import FugueWorkflow
from fugue_sql import fsql
from pyspark.sql import SparkSession


def test_importless():
spark = SparkSession.builder.getOrCreate()
for engine in [spark, "spark"]:
dag = FugueWorkflow()
dag.df([[0]], "a:int").show()

dag.run(engine)

fsql(
"""
CREATE [[0],[1]] SCHEMA a:int
SELECT * WHERE a<1
PRINT
"""
).run(engine)

dag = FugueWorkflow()
idf = dag.df([[0], [1]], "a:int").as_ibis()
idf[idf.a < 1].as_fugue().show()

dag.run(engine)

0 comments on commit f2676e0

Please sign in to comment.