From 53d9e7de23f53ccb964cd8ebeb8125fbb61e1671 Mon Sep 17 00:00:00 2001 From: Laurent Erreca Date: Sun, 27 Feb 2022 21:39:48 +0100 Subject: [PATCH 01/14] Work in progress to fix issue 285 reported here https://github.com/fugue-project/fugue/issues/285 --- fugue/execution/native_execution_engine.py | 6 ++---- fugue_dask/execution_engine.py | 1 + fugue_test/builtin_suite.py | 14 ++++++++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index 78f42dae..6cdcccbf 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -376,10 +376,8 @@ def save_df( force_single: bool = False, **kwargs: Any, ) -> None: - if not partition_spec.empty: - self.log.warning( # pragma: no cover - "partition_spec is not respected in %s.save_df", self - ) + if not force_single and not partition_spec.empty: + kwargs['partition_cols'] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index 7e87130a..ce130db1 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -459,6 +459,7 @@ def save_df( format_hint=format_hint, mode=mode, partition_spec=partition_spec, + force_single=force_single, **kwargs, ) else: diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 7fa33148..e0808dea 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -1175,6 +1175,7 @@ def init_tmpdir(self, tmpdir): def test_io(self): path = os.path.join(self.tmpdir, "a") path2 = os.path.join(self.tmpdir, "b.test.csv") + path3 = os.path.join(self.tmpdir, "c.partition") with self.dag() as dag: b = dag.df([[6, 1], [2, 7]], "c:int,a:long") b.partition(num=3).save(path, fmt="parquet", single=True) @@ -1185,6 +1186,19 @@ def test_io(self): a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int")) a = dag.load(path2, header=True, columns="c:int,a:long") a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long")) + with self.dag() as dag: + b = dag.df([[6, 1], [2, 7]], "c:int,a:long") + b.partition(by='c').save(path3, fmt="parquet", single=False) + b.save(path2, header=True) + assert FileSystem().isdir(path3) + assert FileSystem().isdir(os.path.join(path3, 'c=6')) + assert FileSystem().isdir(os.path.join(path3, 'c=2')) + # TODO: in test below, once issue #288 is fixed, use dag.load instead of pd.read_parquet + pd.testing.assert_frame_equal( + pd.read_parquet(path3).sort_values('a').reset_index(drop=True), + pd.DataFrame({'c': pd.Categorical([6, 2]), 'a': [1, 7]}).reset_index(drop=True), + check_like=True + ) def test_save_and_use(self): path = os.path.join(self.tmpdir, "a") From af81b594409078723b70d2e3eb88c7a85eaa23de Mon Sep 17 00:00:00 2001 From: Laurent Erreca Date: Mon, 28 Feb 2022 10:18:35 +0100 Subject: [PATCH 02/14] Use option partition_on in Dask execution engine to write hive partitioned dataset --- fugue_dask/execution_engine.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index ce130db1..221a523e 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -464,9 +464,7 @@ def save_df( ) else: if not partition_spec.empty: - self.log.warning( # pragma: no cover - "partition_spec is not respected in %s.save_df", self - ) + kwargs['partition_on'] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) From 36c17b8a804aff586d10e8d84facc5d31d97d4ae Mon Sep 17 00:00:00 2001 From: WangCHX Date: Tue, 15 Mar 2022 00:13:20 +0800 Subject: [PATCH 03/14] Add handling for spark array type (#307) --- RELEASE.md | 1 + fugue_spark/_utils/convert.py | 2 ++ tests/fugue_spark/utils/test_convert.py | 8 +++++++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/RELEASE.md b/RELEASE.md index 52bb5b43..99e26e19 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -6,6 +6,7 @@ - Enable DaskExecutionEngine to transform dataframes with [nested](https://github.com/fugue-project/fugue/issues/299) columns - A [smarter](https://github.com/fugue-project/fugue/issues/304) way to determine default npartitions in Dask - Support [even partitioning](https://github.com/fugue-project/fugue/issues/303) on Dask +- Add handling of [nested ArrayType](https://github.com/fugue-project/fugue/issues/308) on Spark ## 0.6.5 diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 86125dc1..89c7a315 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -124,6 +124,8 @@ def _to_arrow_type(dt: pt.DataType) -> pa.DataType: for field in dt ] return pa.struct(fields) + if isinstance(dt, pt.ArrayType): + return pa.list_(_to_arrow_type(dt.elementType)) return to_arrow_type(dt) diff --git a/tests/fugue_spark/utils/test_convert.py b/tests/fugue_spark/utils/test_convert.py index d888d78e..bd7159bd 100644 --- a/tests/fugue_spark/utils/test_convert.py +++ b/tests/fugue_spark/utils/test_convert.py @@ -9,7 +9,7 @@ def test(expr): test("a:int,b:long,c:[int],d:datetime,e:date,f:decimal(3,4),g:str") test("a:{a:[int],b:[str]}") - # test("a:[{a:int}]") TODO: this is not supported by spark, should we support? + test("a:[{a:int}]") s = to_spark_schema(to_spark_schema("a:int")) assert to_spark_schema(s) is s @@ -18,6 +18,12 @@ def test(expr): assert to_schema(df) == "a:int" assert to_schema(dict(a=str)) == "a:str" + from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType + schema = StructType([StructField("name", ArrayType(StructType([StructField("nest_name", StringType(), True), StructField("nest_value", IntegerType(), True)]), True), True)]) + df = spark_session.createDataFrame([[[("a", 1), ("b", 2)]]], schema) + assert to_schema(df) == "name:[{nest_name:str,nest_value:int}]" + assert to_spark_schema("name:[{nest_name:str,nest_value:int}]") == schema + def test_to_cast_expression(): # length mismatch From 1b32865620fbae8f711a4a0c3bcb982407ee65e4 Mon Sep 17 00:00:00 2001 From: Kevin Kho Date: Sat, 19 Mar 2022 01:58:13 -0400 Subject: [PATCH 04/14] adding ecosystem to README --- README.md | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d3ed930e..00f05ef3 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,8 @@ * Big data practitioners finding **testing code** to be costly and slow * Data teams with big data projects that **struggle maintaining code** +For a more comprehensive overview of Fugue, read [this](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de) article. + ## Select Features * **Cross-framework code**: Write code once in native Python, SQL, or pandas then execute it on Dask or Spark with no rewrites. Logic and execution are decoupled through Fugue, enabling users to leverage the Spark and Dask engines without learning the specific framework syntax. @@ -46,13 +48,13 @@ Now, the `map_letter_to_food()` function is brought to the Spark execution engin ```python from fugue import transform -from fugue_spark import SparkExecutionEngine +import fugue_spark df = transform(input_df, map_letter_to_food, schema="*", params=dict(mapping=map_dict), - engine=SparkExecutionEngine + engine="spark" ) df.show() ``` @@ -185,22 +187,40 @@ docker run -p 8888:8888 fugueproject/tutorials:latest For the API docs, [click here](https://fugue.readthedocs.org) +## Ecosystem + +By being an abstraction layer, Fugue can be used with a lot of other open-source projects seamlessly. + +Fugue can use the following projects as backends: + +* [Spark](https://github.com/apache/spark) +* [Dask](https://github.com/dask/dask) +* [Duckdb](https://github.com/duckdb/duckdb) - in-process SQL OLAP database management +* [Ibis](https://github.com/ibis-project/ibis/) - pandas-like interface for SQL engines +* [blazing-sql](https://github.com/BlazingDB/blazingsql) - GPU accelerated SQL engine based on cuDF +* [dask-sql](https://github.com/dask-contrib/dask-sql) - SQL interface for Dask + +Fugue is available as a backend or can integrate with the following projects: + +* [PyCaret](https://github.com/pycaret/pycaret) - low code machine learning +* [Pandera](https://github.com/pandera-dev/pandera) - data validation + + ## Further Resources View some of our latest conferences presentations and content. For a more complete list, check the [Resources](https://fugue-tutorials.readthedocs.io/en/latest/tutorials/resources.html) page in the tutorials. ### Blogs -* [Fugue: Reducing Spark Developer Friction (James Le)](https://jameskle.com/writes/fugue) * [Introducing FugueSQL — SQL for Pandas, Spark, and Dask DataFrames (Towards Data Science by Khuyen Tran)](https://towardsdatascience.com/introducing-fuguesql-sql-for-pandas-spark-and-dask-dataframes-63d461a16b27) * [Interoperable Python and SQL in Jupyter Notebooks (Towards Data Science)](https://towardsdatascience.com/interoperable-python-and-sql-in-jupyter-notebooks-86245e711352) * [Using Pandera on Spark for Data Validation through Fugue (Towards Data Science)](https://towardsdatascience.com/using-pandera-on-spark-for-data-validation-through-fugue-72956f274793) ### Conferences -* [Large Scale Data Validation with Spark and Dask (PyCon US 2021)](https://www.youtube.com/watch?v=2AdvBgjO_3Q) -* [Dask SQL Query Engines (Dask Summit 2021)](https://www.youtube.com/watch?v=bQDN41Bc3bw) -* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon 2021)](https://www.youtube.com/watch?v=fDIRMiwc0aA) +* [Large Scale Data Validation with Spark and Dask (PyCon US)](https://www.youtube.com/watch?v=2AdvBgjO_3Q) +* [FugueSQL - The Enhanced SQL Interface for Pandas, Spark, and Dask DataFrames (PyData Global)](https://www.youtube.com/watch?v=OBpnGYjNBBI) +* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon)](https://www.youtube.com/watch?v=fDIRMiwc0aA) ## Community and Contributing From 9a12cb641c9805c6909593970ae5fffdac6ff673 Mon Sep 17 00:00:00 2001 From: Kevin Kho Date: Sat, 19 Mar 2022 01:58:13 -0400 Subject: [PATCH 05/14] adding ecosystem to README --- README.md | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d3ed930e..1228e185 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,8 @@ * Big data practitioners finding **testing code** to be costly and slow * Data teams with big data projects that **struggle maintaining code** +For a more comprehensive overview of Fugue, read [this](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de) article. + ## Select Features * **Cross-framework code**: Write code once in native Python, SQL, or pandas then execute it on Dask or Spark with no rewrites. Logic and execution are decoupled through Fugue, enabling users to leverage the Spark and Dask engines without learning the specific framework syntax. @@ -46,13 +48,13 @@ Now, the `map_letter_to_food()` function is brought to the Spark execution engin ```python from fugue import transform -from fugue_spark import SparkExecutionEngine +import fugue_spark df = transform(input_df, map_letter_to_food, schema="*", params=dict(mapping=map_dict), - engine=SparkExecutionEngine + engine="spark" ) df.show() ``` @@ -185,22 +187,41 @@ docker run -p 8888:8888 fugueproject/tutorials:latest For the API docs, [click here](https://fugue.readthedocs.org) +## Ecosystem + +By being an abstraction layer, Fugue can be used with a lot of other open-source projects seamlessly. + +Fugue can use the following projects as backends: + +* [Spark](https://github.com/apache/spark) +* [Dask](https://github.com/dask/dask) +* [Duckdb](https://github.com/duckdb/duckdb) - in-process SQL OLAP database management +* [Ibis](https://github.com/ibis-project/ibis/) - pandas-like interface for SQL engines +* [blazing-sql](https://github.com/BlazingDB/blazingsql) - GPU accelerated SQL engine based on cuDF +* [dask-sql](https://github.com/dask-contrib/dask-sql) - SQL interface for Dask + +Fugue is available as a backend or can integrate with the following projects: + +* [PyCaret](https://github.com/pycaret/pycaret) - low code machine learning +* [Pandera](https://github.com/pandera-dev/pandera) - data validation + + ## Further Resources View some of our latest conferences presentations and content. For a more complete list, check the [Resources](https://fugue-tutorials.readthedocs.io/en/latest/tutorials/resources.html) page in the tutorials. ### Blogs -* [Fugue: Reducing Spark Developer Friction (James Le)](https://jameskle.com/writes/fugue) +* [Introducing Fugue - Reducing PySpark Developer Friction](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de) * [Introducing FugueSQL — SQL for Pandas, Spark, and Dask DataFrames (Towards Data Science by Khuyen Tran)](https://towardsdatascience.com/introducing-fuguesql-sql-for-pandas-spark-and-dask-dataframes-63d461a16b27) * [Interoperable Python and SQL in Jupyter Notebooks (Towards Data Science)](https://towardsdatascience.com/interoperable-python-and-sql-in-jupyter-notebooks-86245e711352) * [Using Pandera on Spark for Data Validation through Fugue (Towards Data Science)](https://towardsdatascience.com/using-pandera-on-spark-for-data-validation-through-fugue-72956f274793) ### Conferences -* [Large Scale Data Validation with Spark and Dask (PyCon US 2021)](https://www.youtube.com/watch?v=2AdvBgjO_3Q) -* [Dask SQL Query Engines (Dask Summit 2021)](https://www.youtube.com/watch?v=bQDN41Bc3bw) -* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon 2021)](https://www.youtube.com/watch?v=fDIRMiwc0aA) +* [Large Scale Data Validation with Spark and Dask (PyCon US)](https://www.youtube.com/watch?v=2AdvBgjO_3Q) +* [FugueSQL - The Enhanced SQL Interface for Pandas, Spark, and Dask DataFrames (PyData Global)](https://www.youtube.com/watch?v=OBpnGYjNBBI) +* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon)](https://www.youtube.com/watch?v=fDIRMiwc0aA) ## Community and Contributing From 42daae44e0d8b61fd6e89f868e05c218f8503e47 Mon Sep 17 00:00:00 2001 From: Kevin Kho Date: Sat, 19 Mar 2022 02:15:09 -0400 Subject: [PATCH 06/14] merge conflict --- README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/README.md b/README.md index afb91c4f..db3c20d9 100644 --- a/README.md +++ b/README.md @@ -213,10 +213,7 @@ View some of our latest conferences presentations and content. For a more comple ### Blogs -<<<<<<< HEAD * [Introducing Fugue - Reducing PySpark Developer Friction](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de) -======= ->>>>>>> 1b32865620fbae8f711a4a0c3bcb982407ee65e4 * [Introducing FugueSQL — SQL for Pandas, Spark, and Dask DataFrames (Towards Data Science by Khuyen Tran)](https://towardsdatascience.com/introducing-fuguesql-sql-for-pandas-spark-and-dask-dataframes-63d461a16b27) * [Interoperable Python and SQL in Jupyter Notebooks (Towards Data Science)](https://towardsdatascience.com/interoperable-python-and-sql-in-jupyter-notebooks-86245e711352) * [Using Pandera on Spark for Data Validation through Fugue (Towards Data Science)](https://towardsdatascience.com/using-pandera-on-spark-for-data-validation-through-fugue-72956f274793) From f2676e074ab5a843b624e884f53b62032a862aa5 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 24 Mar 2022 14:34:53 -0700 Subject: [PATCH 07/14] Fugue plugin (#311) * plugin * update --- RELEASE.md | 1 + fugue/_utils/interfaceless.py | 2 ++ fugue/_utils/register.py | 17 +++++++++++++ fugue/execution/__init__.py | 31 +++++++++++++---------- fugue/execution/factory.py | 4 +++ fugue/extensions/_utils.py | 2 ++ fugue/workflow/utils.py | 2 ++ fugue_ibis/__init__.py | 10 +++++--- fugue_ibis/execution/pandas_backend.py | 5 +--- fugue_notebook/env.py | 35 +------------------------- fugue_version/__init__.py | 2 +- setup.py | 9 +++++++ tests/fugue_dask/test_importless.py | 24 ++++++++++++++++++ tests/fugue_duckdb/test_importless.py | 30 ++++++++++++++++++++++ tests/fugue_ibis/test_importless.py | 10 ++++++++ tests/fugue_spark/test_importless.py | 26 +++++++++++++++++++ 16 files changed, 155 insertions(+), 55 deletions(-) create mode 100644 fugue/_utils/register.py create mode 100644 tests/fugue_dask/test_importless.py create mode 100644 tests/fugue_duckdb/test_importless.py create mode 100644 tests/fugue_ibis/test_importless.py create mode 100644 tests/fugue_spark/test_importless.py diff --git a/RELEASE.md b/RELEASE.md index 99e26e19..fe77c543 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -7,6 +7,7 @@ - A [smarter](https://github.com/fugue-project/fugue/issues/304) way to determine default npartitions in Dask - Support [even partitioning](https://github.com/fugue-project/fugue/issues/303) on Dask - Add handling of [nested ArrayType](https://github.com/fugue-project/fugue/issues/308) on Spark +- Change to [plugin approach](https://github.com/fugue-project/fugue/issues/310) to avoid explicit import ## 0.6.5 diff --git a/fugue/_utils/interfaceless.py b/fugue/_utils/interfaceless.py index 337d9d02..bb210fa1 100644 --- a/fugue/_utils/interfaceless.py +++ b/fugue/_utils/interfaceless.py @@ -263,6 +263,8 @@ def _parse_param( # noqa: C901 param: Optional[inspect.Parameter], none_as_other: bool = True, ) -> "_FuncParam": + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + if annotation == type(None): # noqa: E721 return _NoneParam(param) if annotation == inspect.Parameter.empty: diff --git a/fugue/_utils/register.py b/fugue/_utils/register.py new file mode 100644 index 00000000..9abbd063 --- /dev/null +++ b/fugue/_utils/register.py @@ -0,0 +1,17 @@ +try: + from importlib.metadata import entry_points # type:ignore +except ImportError: # pragma: no cover + from importlib_metadata import entry_points # type:ignore + + +def register_plugins(): + for plugin in entry_points().get("fugue.plugins", []): + try: + register_func = plugin.load() + assert callable(register_func), f"{plugin.name} is not a callable" + register_func() + except ImportError: # pragma: no cover + pass + + +register_plugins() diff --git a/fugue/execution/__init__.py b/fugue/execution/__init__.py index 0e985e34..309b1d4e 100644 --- a/fugue/execution/__init__.py +++ b/fugue/execution/__init__.py @@ -14,16 +14,21 @@ SqliteEngine, ) -register_execution_engine( - "native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" -) -register_execution_engine( - "pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" -) -register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore") -register_sql_engine( - "qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" -) -register_sql_engine( - "qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" -) + +def register(): + register_execution_engine( + "native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" + ) + register_execution_engine( + "pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" + ) + register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore") + register_sql_engine( + "qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" + ) + register_sql_engine( + "qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" + ) + + +register() diff --git a/fugue/execution/factory.py b/fugue/execution/factory.py index cb93f580..04bcaa0e 100644 --- a/fugue/execution/factory.py +++ b/fugue/execution/factory.py @@ -352,6 +352,8 @@ def make_execution_engine( # SparkExecutionEngine + S2 make_execution_engine((SparkExecutionEngine, "s")) """ + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + return _EXECUTION_ENGINE_FACTORY.make(engine, conf, **kwargs) @@ -404,4 +406,6 @@ def make_sql_engine( # SqliteEngine(engine) make_sql_engine(SqliteEngine) """ + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + return _EXECUTION_ENGINE_FACTORY.make_sql_engine(engine, execution_engine, **kwargs) diff --git a/fugue/extensions/_utils.py b/fugue/extensions/_utils.py index 72b14ab8..5aa7fa56 100644 --- a/fugue/extensions/_utils.py +++ b/fugue/extensions/_utils.py @@ -27,6 +27,8 @@ def register(self, name: str, extension: Any, on_dup="overwrite") -> None: raise ValueError(on_dup) def get(self, obj: Any) -> Any: + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + if isinstance(obj, str) and obj in self._dict: return self._dict[obj] return obj diff --git a/fugue/workflow/utils.py b/fugue/workflow/utils.py index 98c13cda..f830f897 100644 --- a/fugue/workflow/utils.py +++ b/fugue/workflow/utils.py @@ -23,4 +23,6 @@ def is_acceptable_raw_df(df: Any) -> bool: :param df: input raw dataframe :return: whether this dataframe is convertible """ + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + return any(isinstance(df, t) for t in _VALID_RAW_DF_TYPES) diff --git a/fugue_ibis/__init__.py b/fugue_ibis/__init__.py index 3b19a7be..0f0c0143 100644 --- a/fugue_ibis/__init__.py +++ b/fugue_ibis/__init__.py @@ -1,4 +1,8 @@ # flake8: noqa -from fugue_ibis.execution import pandas_backend -from fugue_ibis.execution.ibis_engine import IbisEngine -from fugue_ibis.extensions import run_ibis, as_ibis, as_fugue +from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine +from fugue_ibis.execution.pandas_backend import _to_pandas_ibis_engine +from fugue_ibis.extensions import as_fugue, as_ibis, run_ibis + + +def register(): + register_ibis_engine(1, _to_pandas_ibis_engine) diff --git a/fugue_ibis/execution/pandas_backend.py b/fugue_ibis/execution/pandas_backend.py index 95417af6..59c628ae 100644 --- a/fugue_ibis/execution/pandas_backend.py +++ b/fugue_ibis/execution/pandas_backend.py @@ -12,7 +12,7 @@ from triad.utils.assertion import assert_or_throw import pandas as pd -from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine +from fugue_ibis.execution.ibis_engine import IbisEngine from fugue_ibis._utils import to_schema, to_ibis_schema from ibis.backends.pandas import Backend @@ -53,6 +53,3 @@ def table(self, name: str, schema: Any = None): if schema is None and name in self._schemas else schema, ) - - -register_ibis_engine(1, _to_pandas_ibis_engine) diff --git a/fugue_notebook/env.py b/fugue_notebook/env.py index 94ef8f61..440e87de 100644 --- a/fugue_notebook/env.py +++ b/fugue_notebook/env.py @@ -5,12 +5,7 @@ import fugue_sql import pandas as pd -from fugue import ( - ExecutionEngine, - NativeExecutionEngine, - make_execution_engine, - register_execution_engine, -) +from fugue import ExecutionEngine, make_execution_engine from fugue.dataframe import YieldedDataFrame from fugue.extensions._builtins.outputters import Show from fugue_sql.exceptions import FugueSQLSyntaxError @@ -38,33 +33,6 @@ def get_pretty_print(self) -> Callable: """Fugue dataframe pretty print handler""" return _default_pretty_print - def register_execution_engines(self): - """Register execution engines with names. This will also try to register - spark and dask engines if the dependent packages are available and they - are not registered""" - register_execution_engine( - "native", - lambda conf, **kwargs: NativeExecutionEngine(conf=conf), - on_dup="ignore", - ) - - try: - import pyspark # noqa: F401 - import fugue_spark # noqa: F401 - except ImportError: - pass - - try: - import dask.dataframe # noqa: F401 - import fugue_dask # noqa: F401 - except ImportError: - pass - - try: - import fugue_duckdb # noqa: F401 - except ImportError: - pass - @magics_class class _FugueSQLMagics(Magics): @@ -151,5 +119,4 @@ def _setup_fugue_notebook( fsql_ignore_case=fsql_ignore_case, ) ipython.register_magics(magics) - s.register_execution_engines() Show.set_hook(s.get_pretty_print()) diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index 7bbb2ef5..4c513f3b 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.6.5" +__version__ = "0.6.6" diff --git a/setup.py b/setup.py index ad67d3fa..a1e8a0fc 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ def get_version() -> str: "sqlalchemy", "pyarrow>=0.15.1", "pandas>=1.0.2", + "importlib-metadata; python_version < '3.8'", ], extras_require={ "sql": ["antlr4-python3-runtime", "jinja2"], @@ -71,4 +72,12 @@ def get_version() -> str: ], python_requires=">=3.6", package_data={"fugue_notebook": ["nbextension/*"]}, + entry_points={ + "fugue.plugins": [ + "ibis = fugue_ibis:register", + "duckdb = fugue_duckdb:register", + "spark = fugue_spark:register", + "dask = fugue_dask:register", + ] + }, ) diff --git a/tests/fugue_dask/test_importless.py b/tests/fugue_dask/test_importless.py new file mode 100644 index 00000000..dd0112b8 --- /dev/null +++ b/tests/fugue_dask/test_importless.py @@ -0,0 +1,24 @@ +from fugue import FugueWorkflow +from fugue_sql import fsql + + +def test_importless(): + for engine in ["dask"]: + dag = FugueWorkflow() + dag.df([[0]], "a:int").show() + + dag.run(engine) + + fsql( + """ + CREATE [[0],[1]] SCHEMA a:int + SELECT * WHERE a<1 + PRINT + """ + ).run(engine) + + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) \ No newline at end of file diff --git a/tests/fugue_duckdb/test_importless.py b/tests/fugue_duckdb/test_importless.py new file mode 100644 index 00000000..d15442e3 --- /dev/null +++ b/tests/fugue_duckdb/test_importless.py @@ -0,0 +1,30 @@ +from fugue import FugueWorkflow +from fugue_sql import fsql + + +def test_importless(): + for engine in ["duck", "duckdb"]: + dag = FugueWorkflow() + dag.df([[0]], "a:int").show() + + dag.run(engine) + + fsql( + """ + CREATE [[0],[1]] SCHEMA a:int + SELECT * WHERE a<1 + PRINT + """ + ).run(engine) + + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) + + dag = FugueWorkflow() + tdf = dag.df([[0], [1]], "a:int") + dag.select("SELECT * FROM ", tdf, " WHERE a<1", sql_engine=engine) + + dag.run() diff --git a/tests/fugue_ibis/test_importless.py b/tests/fugue_ibis/test_importless.py new file mode 100644 index 00000000..27512b5d --- /dev/null +++ b/tests/fugue_ibis/test_importless.py @@ -0,0 +1,10 @@ +from fugue import FugueWorkflow + + +def test_importless(): + for engine in [None]: + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) diff --git a/tests/fugue_spark/test_importless.py b/tests/fugue_spark/test_importless.py new file mode 100644 index 00000000..37ec1efc --- /dev/null +++ b/tests/fugue_spark/test_importless.py @@ -0,0 +1,26 @@ +from fugue import FugueWorkflow +from fugue_sql import fsql +from pyspark.sql import SparkSession + + +def test_importless(): + spark = SparkSession.builder.getOrCreate() + for engine in [spark, "spark"]: + dag = FugueWorkflow() + dag.df([[0]], "a:int").show() + + dag.run(engine) + + fsql( + """ + CREATE [[0],[1]] SCHEMA a:int + SELECT * WHERE a<1 + PRINT + """ + ).run(engine) + + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) From d0c7d96da3bc8a5a3c3bd2d420fca3516d31805e Mon Sep 17 00:00:00 2001 From: Kevin Kho Date: Sat, 2 Apr 2022 15:24:35 -0400 Subject: [PATCH 08/14] upgrading black version --- .pre-commit-config.yaml | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f6d82c29..4b93ec10 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -43,7 +43,7 @@ repos: hooks: - id: pylint - repo: https://github.com/ambv/black - rev: 20.8b1 + rev: v22.3.0 hooks: - id: black types: [python] diff --git a/requirements.txt b/requirements.txt index cb91fc7c..2a0c3936 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ qpd[dask] # test requirements pre-commit -black +black>=22.3.0 mypy flake8 autopep8 From a53fc9c038bcaadafbf0ecf6943917c1f2bbfc44 Mon Sep 17 00:00:00 2001 From: Kevin Kho Date: Sat, 2 Apr 2022 15:37:21 -0400 Subject: [PATCH 09/14] fixing black version --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4b93ec10..628f2152 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -43,7 +43,7 @@ repos: hooks: - id: pylint - repo: https://github.com/ambv/black - rev: v22.3.0 + rev: 22.3.0 hooks: - id: black types: [python] From 7678f317eefa934d4f9f80f00f82c3d2c7f6ac9d Mon Sep 17 00:00:00 2001 From: Laurent Erreca Date: Sun, 27 Feb 2022 21:39:48 +0100 Subject: [PATCH 10/14] Work in progress to fix issue 285 reported here https://github.com/fugue-project/fugue/issues/285 --- fugue/execution/native_execution_engine.py | 6 ++---- fugue_dask/execution_engine.py | 1 + fugue_test/builtin_suite.py | 14 ++++++++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index 78f42dae..6cdcccbf 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -376,10 +376,8 @@ def save_df( force_single: bool = False, **kwargs: Any, ) -> None: - if not partition_spec.empty: - self.log.warning( # pragma: no cover - "partition_spec is not respected in %s.save_df", self - ) + if not force_single and not partition_spec.empty: + kwargs['partition_cols'] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index 7e87130a..ce130db1 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -459,6 +459,7 @@ def save_df( format_hint=format_hint, mode=mode, partition_spec=partition_spec, + force_single=force_single, **kwargs, ) else: diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 7fa33148..e0808dea 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -1175,6 +1175,7 @@ def init_tmpdir(self, tmpdir): def test_io(self): path = os.path.join(self.tmpdir, "a") path2 = os.path.join(self.tmpdir, "b.test.csv") + path3 = os.path.join(self.tmpdir, "c.partition") with self.dag() as dag: b = dag.df([[6, 1], [2, 7]], "c:int,a:long") b.partition(num=3).save(path, fmt="parquet", single=True) @@ -1185,6 +1186,19 @@ def test_io(self): a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int")) a = dag.load(path2, header=True, columns="c:int,a:long") a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long")) + with self.dag() as dag: + b = dag.df([[6, 1], [2, 7]], "c:int,a:long") + b.partition(by='c').save(path3, fmt="parquet", single=False) + b.save(path2, header=True) + assert FileSystem().isdir(path3) + assert FileSystem().isdir(os.path.join(path3, 'c=6')) + assert FileSystem().isdir(os.path.join(path3, 'c=2')) + # TODO: in test below, once issue #288 is fixed, use dag.load instead of pd.read_parquet + pd.testing.assert_frame_equal( + pd.read_parquet(path3).sort_values('a').reset_index(drop=True), + pd.DataFrame({'c': pd.Categorical([6, 2]), 'a': [1, 7]}).reset_index(drop=True), + check_like=True + ) def test_save_and_use(self): path = os.path.join(self.tmpdir, "a") From 3f1d7e46a65ce1dff6034f6aee21cd73442b2da7 Mon Sep 17 00:00:00 2001 From: Laurent Erreca Date: Mon, 28 Feb 2022 10:18:35 +0100 Subject: [PATCH 11/14] Use option partition_on in Dask execution engine to write hive partitioned dataset --- fugue_dask/execution_engine.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index ce130db1..221a523e 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -464,9 +464,7 @@ def save_df( ) else: if not partition_spec.empty: - self.log.warning( # pragma: no cover - "partition_spec is not respected in %s.save_df", self - ) + kwargs['partition_on'] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) From 8af3bb0749089d0890f931a1de7a3704f12fdf07 Mon Sep 17 00:00:00 2001 From: Laurent Erreca Date: Sun, 3 Apr 2022 12:42:46 +0200 Subject: [PATCH 12/14] Handle hive partitioning with Duckdb execution engine --- fugue_duckdb/_io.py | 2 +- fugue_duckdb/execution_engine.py | 2 ++ fugue_test/builtin_suite.py | 1 - 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index 91bd6c95..c792da12 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -70,7 +70,7 @@ def save_df( NotImplementedError(f"{mode} is not supported"), ) p = FileParser(uri, format_hint).assert_no_glob() - if p.file_format not in self._format_save: + if (p.file_format not in self._format_save) or ('partition_cols' in kwargs): self._fs.makedirs(os.path.dirname(uri), recreate=True) ldf = ArrowDataFrame(df.native.arrow()) return save_df( diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index 38d3deb0..bb3b0de2 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -465,6 +465,8 @@ def save_df( force_single: bool = False, **kwargs: Any, ) -> None: + if not partition_spec.empty and not force_single: + kwargs['partition_cols'] = partition_spec.partition_by dio = DuckDBIO(self.fs, self.connection) dio.save_df(self.to_df(df), path, format_hint, mode, **kwargs) diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index e0808dea..cffc8088 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -1189,7 +1189,6 @@ def test_io(self): with self.dag() as dag: b = dag.df([[6, 1], [2, 7]], "c:int,a:long") b.partition(by='c').save(path3, fmt="parquet", single=False) - b.save(path2, header=True) assert FileSystem().isdir(path3) assert FileSystem().isdir(os.path.join(path3, 'c=6')) assert FileSystem().isdir(os.path.join(path3, 'c=2')) From a5617175257034e1fdea6d85f3605c0524ba22bf Mon Sep 17 00:00:00 2001 From: Laurent Erreca Date: Sun, 3 Apr 2022 22:05:59 +0200 Subject: [PATCH 13/14] Clean code with pylint --- fugue/execution/native_execution_engine.py | 2 +- fugue_dask/execution_engine.py | 2 +- fugue_duckdb/_io.py | 2 +- fugue_duckdb/execution_engine.py | 2 +- fugue_test/builtin_suite.py | 20 +++++++++++--------- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index 6cdcccbf..02a43897 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -377,7 +377,7 @@ def save_df( **kwargs: Any, ) -> None: if not force_single and not partition_spec.empty: - kwargs['partition_cols'] = partition_spec.partition_by + kwargs["partition_cols"] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index 221a523e..e58324f8 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -464,7 +464,7 @@ def save_df( ) else: if not partition_spec.empty: - kwargs['partition_on'] = partition_spec.partition_by + kwargs["partition_on"] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index c792da12..415e6951 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -70,7 +70,7 @@ def save_df( NotImplementedError(f"{mode} is not supported"), ) p = FileParser(uri, format_hint).assert_no_glob() - if (p.file_format not in self._format_save) or ('partition_cols' in kwargs): + if (p.file_format not in self._format_save) or ("partition_cols" in kwargs): self._fs.makedirs(os.path.dirname(uri), recreate=True) ldf = ArrowDataFrame(df.native.arrow()) return save_df( diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index bb3b0de2..22f205d6 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -466,7 +466,7 @@ def save_df( **kwargs: Any, ) -> None: if not partition_spec.empty and not force_single: - kwargs['partition_cols'] = partition_spec.partition_by + kwargs["partition_cols"] = partition_spec.partition_by dio = DuckDBIO(self.fs, self.connection) dio.save_df(self.to_df(df), path, format_hint, mode, **kwargs) diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 9a43380a..da22bd71 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -589,8 +589,7 @@ def test_out_cotransform(self): # noqa: C901 def incr(): fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True) fs.writetext(str(uuid4()) + ".txt", "") - return fs.glob("*.tx" - "t").count().files + return fs.glob("*.tx" "t").count().files def t1( df: Iterable[Dict[str, Any]], df2: pd.DataFrame @@ -1189,15 +1188,18 @@ def test_io(self): a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long")) with self.dag() as dag: b = dag.df([[6, 1], [2, 7]], "c:int,a:long") - b.partition(by='c').save(path3, fmt="parquet", single=False) + b.partition(by="c").save(path3, fmt="parquet", single=False) assert FileSystem().isdir(path3) - assert FileSystem().isdir(os.path.join(path3, 'c=6')) - assert FileSystem().isdir(os.path.join(path3, 'c=2')) - # TODO: in test below, once issue #288 is fixed, use dag.load instead of pd.read_parquet + assert FileSystem().isdir(os.path.join(path3, "c=6")) + assert FileSystem().isdir(os.path.join(path3, "c=2")) + # TODO: in test below, once issue #288 is fixed, use dag.load + # instead of pd.read_parquet pd.testing.assert_frame_equal( - pd.read_parquet(path3).sort_values('a').reset_index(drop=True), - pd.DataFrame({'c': pd.Categorical([6, 2]), 'a': [1, 7]}).reset_index(drop=True), - check_like=True + pd.read_parquet(path3).sort_values("a").reset_index(drop=True), + pd.DataFrame({"c": pd.Categorical([6, 2]), "a": [1, 7]}).reset_index( + drop=True + ), + check_like=True, ) def test_save_and_use(self): From b3e86b507e31538431808a16e617aeea9126d8fd Mon Sep 17 00:00:00 2001 From: Laurent Erreca Date: Sun, 3 Apr 2022 22:39:55 +0200 Subject: [PATCH 14/14] Use ArrowDataFrame(df.as_arrow()) instead of ArrowDataFrame(df.native.arrow()) --- fugue_duckdb/_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index 415e6951..c299a50f 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -72,7 +72,7 @@ def save_df( p = FileParser(uri, format_hint).assert_no_glob() if (p.file_format not in self._format_save) or ("partition_cols" in kwargs): self._fs.makedirs(os.path.dirname(uri), recreate=True) - ldf = ArrowDataFrame(df.native.arrow()) + ldf = ArrowDataFrame(df.as_arrow()) return save_df( ldf, uri=uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs )