diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f6d82c29..628f2152 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -43,7 +43,7 @@ repos: hooks: - id: pylint - repo: https://github.com/ambv/black - rev: 20.8b1 + rev: 22.3.0 hooks: - id: black types: [python] diff --git a/README.md b/README.md index d3ed930e..db3c20d9 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ [![PyPI license](https://img.shields.io/pypi/l/fugue.svg)](https://pypi.python.org/pypi/fugue/) [![codecov](https://codecov.io/gh/fugue-project/fugue/branch/master/graph/badge.svg?token=ZO9YD5N3IA)](https://codecov.io/gh/fugue-project/fugue) [![Codacy Badge](https://app.codacy.com/project/badge/Grade/4fa5f2f53e6f48aaa1218a89f4808b91)](https://www.codacy.com/gh/fugue-project/fugue/dashboard?utm_source=github.com&utm_medium=referral&utm_content=fugue-project/fugue&utm_campaign=Badge_Grade) +[![Downloads](https://pepy.tech/badge/fugue)](https://pepy.tech/project/fugue) | Documentation | Tutorials | Chat with us on slack! | | --- | --- | --- | @@ -19,6 +20,8 @@ * Big data practitioners finding **testing code** to be costly and slow * Data teams with big data projects that **struggle maintaining code** +For a more comprehensive overview of Fugue, read [this](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de) article. + ## Select Features * **Cross-framework code**: Write code once in native Python, SQL, or pandas then execute it on Dask or Spark with no rewrites. Logic and execution are decoupled through Fugue, enabling users to leverage the Spark and Dask engines without learning the specific framework syntax. @@ -46,13 +49,13 @@ Now, the `map_letter_to_food()` function is brought to the Spark execution engin ```python from fugue import transform -from fugue_spark import SparkExecutionEngine +import fugue_spark df = transform(input_df, map_letter_to_food, schema="*", params=dict(mapping=map_dict), - engine=SparkExecutionEngine + engine="spark" ) df.show() ``` @@ -185,22 +188,41 @@ docker run -p 8888:8888 fugueproject/tutorials:latest For the API docs, [click here](https://fugue.readthedocs.org) +## Ecosystem + +By being an abstraction layer, Fugue can be used with a lot of other open-source projects seamlessly. + +Fugue can use the following projects as backends: + +* [Spark](https://github.com/apache/spark) +* [Dask](https://github.com/dask/dask) +* [Duckdb](https://github.com/duckdb/duckdb) - in-process SQL OLAP database management +* [Ibis](https://github.com/ibis-project/ibis/) - pandas-like interface for SQL engines +* [blazing-sql](https://github.com/BlazingDB/blazingsql) - GPU accelerated SQL engine based on cuDF +* [dask-sql](https://github.com/dask-contrib/dask-sql) - SQL interface for Dask + +Fugue is available as a backend or can integrate with the following projects: + +* [PyCaret](https://github.com/pycaret/pycaret) - low code machine learning +* [Pandera](https://github.com/pandera-dev/pandera) - data validation + + ## Further Resources View some of our latest conferences presentations and content. For a more complete list, check the [Resources](https://fugue-tutorials.readthedocs.io/en/latest/tutorials/resources.html) page in the tutorials. ### Blogs -* [Fugue: Reducing Spark Developer Friction (James Le)](https://jameskle.com/writes/fugue) +* [Introducing Fugue - Reducing PySpark Developer Friction](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de) * [Introducing FugueSQL — SQL for Pandas, Spark, and Dask DataFrames (Towards Data Science by Khuyen Tran)](https://towardsdatascience.com/introducing-fuguesql-sql-for-pandas-spark-and-dask-dataframes-63d461a16b27) * [Interoperable Python and SQL in Jupyter Notebooks (Towards Data Science)](https://towardsdatascience.com/interoperable-python-and-sql-in-jupyter-notebooks-86245e711352) * [Using Pandera on Spark for Data Validation through Fugue (Towards Data Science)](https://towardsdatascience.com/using-pandera-on-spark-for-data-validation-through-fugue-72956f274793) ### Conferences -* [Large Scale Data Validation with Spark and Dask (PyCon US 2021)](https://www.youtube.com/watch?v=2AdvBgjO_3Q) -* [Dask SQL Query Engines (Dask Summit 2021)](https://www.youtube.com/watch?v=bQDN41Bc3bw) -* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon 2021)](https://www.youtube.com/watch?v=fDIRMiwc0aA) +* [Large Scale Data Validation with Spark and Dask (PyCon US)](https://www.youtube.com/watch?v=2AdvBgjO_3Q) +* [FugueSQL - The Enhanced SQL Interface for Pandas, Spark, and Dask DataFrames (PyData Global)](https://www.youtube.com/watch?v=OBpnGYjNBBI) +* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon)](https://www.youtube.com/watch?v=fDIRMiwc0aA) ## Community and Contributing diff --git a/RELEASE.md b/RELEASE.md index 52bb5b43..fe77c543 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -6,6 +6,8 @@ - Enable DaskExecutionEngine to transform dataframes with [nested](https://github.com/fugue-project/fugue/issues/299) columns - A [smarter](https://github.com/fugue-project/fugue/issues/304) way to determine default npartitions in Dask - Support [even partitioning](https://github.com/fugue-project/fugue/issues/303) on Dask +- Add handling of [nested ArrayType](https://github.com/fugue-project/fugue/issues/308) on Spark +- Change to [plugin approach](https://github.com/fugue-project/fugue/issues/310) to avoid explicit import ## 0.6.5 diff --git a/fugue/_utils/interfaceless.py b/fugue/_utils/interfaceless.py index 337d9d02..bb210fa1 100644 --- a/fugue/_utils/interfaceless.py +++ b/fugue/_utils/interfaceless.py @@ -263,6 +263,8 @@ def _parse_param( # noqa: C901 param: Optional[inspect.Parameter], none_as_other: bool = True, ) -> "_FuncParam": + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + if annotation == type(None): # noqa: E721 return _NoneParam(param) if annotation == inspect.Parameter.empty: diff --git a/fugue/_utils/register.py b/fugue/_utils/register.py new file mode 100644 index 00000000..9abbd063 --- /dev/null +++ b/fugue/_utils/register.py @@ -0,0 +1,17 @@ +try: + from importlib.metadata import entry_points # type:ignore +except ImportError: # pragma: no cover + from importlib_metadata import entry_points # type:ignore + + +def register_plugins(): + for plugin in entry_points().get("fugue.plugins", []): + try: + register_func = plugin.load() + assert callable(register_func), f"{plugin.name} is not a callable" + register_func() + except ImportError: # pragma: no cover + pass + + +register_plugins() diff --git a/fugue/execution/__init__.py b/fugue/execution/__init__.py index 0e985e34..309b1d4e 100644 --- a/fugue/execution/__init__.py +++ b/fugue/execution/__init__.py @@ -14,16 +14,21 @@ SqliteEngine, ) -register_execution_engine( - "native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" -) -register_execution_engine( - "pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" -) -register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore") -register_sql_engine( - "qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" -) -register_sql_engine( - "qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" -) + +def register(): + register_execution_engine( + "native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" + ) + register_execution_engine( + "pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore" + ) + register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore") + register_sql_engine( + "qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" + ) + register_sql_engine( + "qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore" + ) + + +register() diff --git a/fugue/execution/factory.py b/fugue/execution/factory.py index cb93f580..04bcaa0e 100644 --- a/fugue/execution/factory.py +++ b/fugue/execution/factory.py @@ -352,6 +352,8 @@ def make_execution_engine( # SparkExecutionEngine + S2 make_execution_engine((SparkExecutionEngine, "s")) """ + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + return _EXECUTION_ENGINE_FACTORY.make(engine, conf, **kwargs) @@ -404,4 +406,6 @@ def make_sql_engine( # SqliteEngine(engine) make_sql_engine(SqliteEngine) """ + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + return _EXECUTION_ENGINE_FACTORY.make_sql_engine(engine, execution_engine, **kwargs) diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index 78f42dae..02a43897 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -376,10 +376,8 @@ def save_df( force_single: bool = False, **kwargs: Any, ) -> None: - if not partition_spec.empty: - self.log.warning( # pragma: no cover - "partition_spec is not respected in %s.save_df", self - ) + if not force_single and not partition_spec.empty: + kwargs["partition_cols"] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) diff --git a/fugue/extensions/_utils.py b/fugue/extensions/_utils.py index 72b14ab8..5aa7fa56 100644 --- a/fugue/extensions/_utils.py +++ b/fugue/extensions/_utils.py @@ -27,6 +27,8 @@ def register(self, name: str, extension: Any, on_dup="overwrite") -> None: raise ValueError(on_dup) def get(self, obj: Any) -> Any: + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + if isinstance(obj, str) and obj in self._dict: return self._dict[obj] return obj diff --git a/fugue/workflow/utils.py b/fugue/workflow/utils.py index 98c13cda..f830f897 100644 --- a/fugue/workflow/utils.py +++ b/fugue/workflow/utils.py @@ -23,4 +23,6 @@ def is_acceptable_raw_df(df: Any) -> bool: :param df: input raw dataframe :return: whether this dataframe is convertible """ + import fugue._utils.register # pylint: disable=W0611 # noqa: F401 + return any(isinstance(df, t) for t in _VALID_RAW_DF_TYPES) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index 7e87130a..e58324f8 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -459,13 +459,12 @@ def save_df( format_hint=format_hint, mode=mode, partition_spec=partition_spec, + force_single=force_single, **kwargs, ) else: if not partition_spec.empty: - self.log.warning( # pragma: no cover - "partition_spec is not respected in %s.save_df", self - ) + kwargs["partition_on"] = partition_spec.partition_by self.fs.makedirs(os.path.dirname(path), recreate=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index 91bd6c95..c299a50f 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -70,9 +70,9 @@ def save_df( NotImplementedError(f"{mode} is not supported"), ) p = FileParser(uri, format_hint).assert_no_glob() - if p.file_format not in self._format_save: + if (p.file_format not in self._format_save) or ("partition_cols" in kwargs): self._fs.makedirs(os.path.dirname(uri), recreate=True) - ldf = ArrowDataFrame(df.native.arrow()) + ldf = ArrowDataFrame(df.as_arrow()) return save_df( ldf, uri=uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs ) diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index 38d3deb0..22f205d6 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -465,6 +465,8 @@ def save_df( force_single: bool = False, **kwargs: Any, ) -> None: + if not partition_spec.empty and not force_single: + kwargs["partition_cols"] = partition_spec.partition_by dio = DuckDBIO(self.fs, self.connection) dio.save_df(self.to_df(df), path, format_hint, mode, **kwargs) diff --git a/fugue_ibis/__init__.py b/fugue_ibis/__init__.py index 3b19a7be..0f0c0143 100644 --- a/fugue_ibis/__init__.py +++ b/fugue_ibis/__init__.py @@ -1,4 +1,8 @@ # flake8: noqa -from fugue_ibis.execution import pandas_backend -from fugue_ibis.execution.ibis_engine import IbisEngine -from fugue_ibis.extensions import run_ibis, as_ibis, as_fugue +from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine +from fugue_ibis.execution.pandas_backend import _to_pandas_ibis_engine +from fugue_ibis.extensions import as_fugue, as_ibis, run_ibis + + +def register(): + register_ibis_engine(1, _to_pandas_ibis_engine) diff --git a/fugue_ibis/execution/pandas_backend.py b/fugue_ibis/execution/pandas_backend.py index 95417af6..59c628ae 100644 --- a/fugue_ibis/execution/pandas_backend.py +++ b/fugue_ibis/execution/pandas_backend.py @@ -12,7 +12,7 @@ from triad.utils.assertion import assert_or_throw import pandas as pd -from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine +from fugue_ibis.execution.ibis_engine import IbisEngine from fugue_ibis._utils import to_schema, to_ibis_schema from ibis.backends.pandas import Backend @@ -53,6 +53,3 @@ def table(self, name: str, schema: Any = None): if schema is None and name in self._schemas else schema, ) - - -register_ibis_engine(1, _to_pandas_ibis_engine) diff --git a/fugue_notebook/env.py b/fugue_notebook/env.py index 94ef8f61..440e87de 100644 --- a/fugue_notebook/env.py +++ b/fugue_notebook/env.py @@ -5,12 +5,7 @@ import fugue_sql import pandas as pd -from fugue import ( - ExecutionEngine, - NativeExecutionEngine, - make_execution_engine, - register_execution_engine, -) +from fugue import ExecutionEngine, make_execution_engine from fugue.dataframe import YieldedDataFrame from fugue.extensions._builtins.outputters import Show from fugue_sql.exceptions import FugueSQLSyntaxError @@ -38,33 +33,6 @@ def get_pretty_print(self) -> Callable: """Fugue dataframe pretty print handler""" return _default_pretty_print - def register_execution_engines(self): - """Register execution engines with names. This will also try to register - spark and dask engines if the dependent packages are available and they - are not registered""" - register_execution_engine( - "native", - lambda conf, **kwargs: NativeExecutionEngine(conf=conf), - on_dup="ignore", - ) - - try: - import pyspark # noqa: F401 - import fugue_spark # noqa: F401 - except ImportError: - pass - - try: - import dask.dataframe # noqa: F401 - import fugue_dask # noqa: F401 - except ImportError: - pass - - try: - import fugue_duckdb # noqa: F401 - except ImportError: - pass - @magics_class class _FugueSQLMagics(Magics): @@ -151,5 +119,4 @@ def _setup_fugue_notebook( fsql_ignore_case=fsql_ignore_case, ) ipython.register_magics(magics) - s.register_execution_engines() Show.set_hook(s.get_pretty_print()) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 86125dc1..89c7a315 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -124,6 +124,8 @@ def _to_arrow_type(dt: pt.DataType) -> pa.DataType: for field in dt ] return pa.struct(fields) + if isinstance(dt, pt.ArrayType): + return pa.list_(_to_arrow_type(dt.elementType)) return to_arrow_type(dt) diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 7fa33148..da22bd71 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -589,7 +589,7 @@ def test_out_cotransform(self): # noqa: C901 def incr(): fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True) fs.writetext(str(uuid4()) + ".txt", "") - return fs.glob("*.txt").count().files + return fs.glob("*.tx" "t").count().files def t1( df: Iterable[Dict[str, Any]], df2: pd.DataFrame @@ -1175,6 +1175,7 @@ def init_tmpdir(self, tmpdir): def test_io(self): path = os.path.join(self.tmpdir, "a") path2 = os.path.join(self.tmpdir, "b.test.csv") + path3 = os.path.join(self.tmpdir, "c.partition") with self.dag() as dag: b = dag.df([[6, 1], [2, 7]], "c:int,a:long") b.partition(num=3).save(path, fmt="parquet", single=True) @@ -1185,6 +1186,21 @@ def test_io(self): a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int")) a = dag.load(path2, header=True, columns="c:int,a:long") a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long")) + with self.dag() as dag: + b = dag.df([[6, 1], [2, 7]], "c:int,a:long") + b.partition(by="c").save(path3, fmt="parquet", single=False) + assert FileSystem().isdir(path3) + assert FileSystem().isdir(os.path.join(path3, "c=6")) + assert FileSystem().isdir(os.path.join(path3, "c=2")) + # TODO: in test below, once issue #288 is fixed, use dag.load + # instead of pd.read_parquet + pd.testing.assert_frame_equal( + pd.read_parquet(path3).sort_values("a").reset_index(drop=True), + pd.DataFrame({"c": pd.Categorical([6, 2]), "a": [1, 7]}).reset_index( + drop=True + ), + check_like=True, + ) def test_save_and_use(self): path = os.path.join(self.tmpdir, "a") diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index 7bbb2ef5..4c513f3b 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.6.5" +__version__ = "0.6.6" diff --git a/requirements.txt b/requirements.txt index cb91fc7c..2a0c3936 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ qpd[dask] # test requirements pre-commit -black +black>=22.3.0 mypy flake8 autopep8 diff --git a/setup.py b/setup.py index ad67d3fa..a1e8a0fc 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ def get_version() -> str: "sqlalchemy", "pyarrow>=0.15.1", "pandas>=1.0.2", + "importlib-metadata; python_version < '3.8'", ], extras_require={ "sql": ["antlr4-python3-runtime", "jinja2"], @@ -71,4 +72,12 @@ def get_version() -> str: ], python_requires=">=3.6", package_data={"fugue_notebook": ["nbextension/*"]}, + entry_points={ + "fugue.plugins": [ + "ibis = fugue_ibis:register", + "duckdb = fugue_duckdb:register", + "spark = fugue_spark:register", + "dask = fugue_dask:register", + ] + }, ) diff --git a/tests/fugue_dask/test_importless.py b/tests/fugue_dask/test_importless.py new file mode 100644 index 00000000..dd0112b8 --- /dev/null +++ b/tests/fugue_dask/test_importless.py @@ -0,0 +1,24 @@ +from fugue import FugueWorkflow +from fugue_sql import fsql + + +def test_importless(): + for engine in ["dask"]: + dag = FugueWorkflow() + dag.df([[0]], "a:int").show() + + dag.run(engine) + + fsql( + """ + CREATE [[0],[1]] SCHEMA a:int + SELECT * WHERE a<1 + PRINT + """ + ).run(engine) + + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) \ No newline at end of file diff --git a/tests/fugue_duckdb/test_importless.py b/tests/fugue_duckdb/test_importless.py new file mode 100644 index 00000000..d15442e3 --- /dev/null +++ b/tests/fugue_duckdb/test_importless.py @@ -0,0 +1,30 @@ +from fugue import FugueWorkflow +from fugue_sql import fsql + + +def test_importless(): + for engine in ["duck", "duckdb"]: + dag = FugueWorkflow() + dag.df([[0]], "a:int").show() + + dag.run(engine) + + fsql( + """ + CREATE [[0],[1]] SCHEMA a:int + SELECT * WHERE a<1 + PRINT + """ + ).run(engine) + + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) + + dag = FugueWorkflow() + tdf = dag.df([[0], [1]], "a:int") + dag.select("SELECT * FROM ", tdf, " WHERE a<1", sql_engine=engine) + + dag.run() diff --git a/tests/fugue_ibis/test_importless.py b/tests/fugue_ibis/test_importless.py new file mode 100644 index 00000000..27512b5d --- /dev/null +++ b/tests/fugue_ibis/test_importless.py @@ -0,0 +1,10 @@ +from fugue import FugueWorkflow + + +def test_importless(): + for engine in [None]: + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) diff --git a/tests/fugue_spark/test_importless.py b/tests/fugue_spark/test_importless.py new file mode 100644 index 00000000..37ec1efc --- /dev/null +++ b/tests/fugue_spark/test_importless.py @@ -0,0 +1,26 @@ +from fugue import FugueWorkflow +from fugue_sql import fsql +from pyspark.sql import SparkSession + + +def test_importless(): + spark = SparkSession.builder.getOrCreate() + for engine in [spark, "spark"]: + dag = FugueWorkflow() + dag.df([[0]], "a:int").show() + + dag.run(engine) + + fsql( + """ + CREATE [[0],[1]] SCHEMA a:int + SELECT * WHERE a<1 + PRINT + """ + ).run(engine) + + dag = FugueWorkflow() + idf = dag.df([[0], [1]], "a:int").as_ibis() + idf[idf.a < 1].as_fugue().show() + + dag.run(engine) diff --git a/tests/fugue_spark/utils/test_convert.py b/tests/fugue_spark/utils/test_convert.py index d888d78e..bd7159bd 100644 --- a/tests/fugue_spark/utils/test_convert.py +++ b/tests/fugue_spark/utils/test_convert.py @@ -9,7 +9,7 @@ def test(expr): test("a:int,b:long,c:[int],d:datetime,e:date,f:decimal(3,4),g:str") test("a:{a:[int],b:[str]}") - # test("a:[{a:int}]") TODO: this is not supported by spark, should we support? + test("a:[{a:int}]") s = to_spark_schema(to_spark_schema("a:int")) assert to_spark_schema(s) is s @@ -18,6 +18,12 @@ def test(expr): assert to_schema(df) == "a:int" assert to_schema(dict(a=str)) == "a:str" + from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType + schema = StructType([StructField("name", ArrayType(StructType([StructField("nest_name", StringType(), True), StructField("nest_value", IntegerType(), True)]), True), True)]) + df = spark_session.createDataFrame([[[("a", 1), ("b", 2)]]], schema) + assert to_schema(df) == "name:[{nest_name:str,nest_value:int}]" + assert to_spark_schema("name:[{nest_name:str,nest_value:int}]") == schema + def test_to_cast_expression(): # length mismatch