Skip to content

Commit

Permalink
Redesign extension and dataframe registry (#375)
Browse files Browse the repository at this point in the history
* Redesign extension and dataframe registry

* update

* Redesign FugueWorkflow df method

* fix tests

* move all to plugin mode
  • Loading branch information
goodwanghan authored Oct 18, 2022
1 parent 7d78bf6 commit 68975b4
Show file tree
Hide file tree
Showing 30 changed files with 589 additions and 225 deletions.
12 changes: 6 additions & 6 deletions docs/api/fugue.workflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ fugue.workflow
.. |FugueDataTypes| replace:: :doc:`Fugue Data Types <tutorial:tutorials/appendix/generate_types>`


fugue.workflow.module
---------------------
fugue.workflow.input
--------------------

.. automodule:: fugue.workflow.module
.. automodule:: fugue.workflow.input
:members:
:undoc-members:
:show-inheritance:

fugue.workflow.utils
--------------------
fugue.workflow.module
---------------------

.. automodule:: fugue.workflow.utils
.. automodule:: fugue.workflow.module
:members:
:undoc-members:
:show-inheritance:
Expand Down
18 changes: 15 additions & 3 deletions fugue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,19 @@
QPDPandasEngine,
SqliteEngine,
)
from fugue.extensions.creator import Creator, creator, register_creator
from fugue.extensions.outputter import Outputter, outputter, register_outputter
from fugue.extensions.processor import Processor, processor, register_processor
from fugue.extensions.creator import Creator, creator, parse_creator, register_creator
from fugue.extensions.outputter import (
Outputter,
outputter,
parse_outputter,
register_outputter,
)
from fugue.extensions.processor import (
Processor,
parse_processor,
processor,
register_processor,
)
from fugue.extensions.transformer import (
CoTransformer,
OutputCoTransformer,
Expand All @@ -42,6 +52,8 @@
cotransformer,
output_cotransformer,
output_transformer,
parse_output_transformer,
parse_transformer,
register_output_transformer,
register_transformer,
transformer,
Expand Down
2 changes: 1 addition & 1 deletion fugue/collections/yielded.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __copy__(self) -> Any: # pragma: no cover
"""``copy`` should have no effect"""
return self

def __deepcopy__(self, memo: Any) -> Any:
def __deepcopy__(self, memo: Any) -> Any: # pragma: no cover
"""``deepcopy`` should have no effect"""
return self

Expand Down
18 changes: 15 additions & 3 deletions fugue/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
# flake8: noqa
from fugue.extensions.creator import Creator, creator, register_creator
from fugue.extensions.outputter import Outputter, outputter, register_outputter
from fugue.extensions.processor import Processor, processor, register_processor
from fugue.extensions.creator import Creator, creator, parse_creator, register_creator
from fugue.extensions.outputter import (
Outputter,
outputter,
parse_outputter,
register_outputter,
)
from fugue.extensions.processor import (
Processor,
parse_processor,
processor,
register_processor,
)
from fugue.extensions.transformer import (
CoTransformer,
OutputCoTransformer,
Expand All @@ -10,6 +20,8 @@
cotransformer,
output_cotransformer,
output_transformer,
parse_output_transformer,
parse_transformer,
register_output_transformer,
register_transformer,
transformer,
Expand Down
2 changes: 1 addition & 1 deletion fugue/extensions/_builtins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# flake8: noqa
from fugue.extensions._builtins.creators import Load, LoadYielded
from fugue.extensions._builtins.creators import Load, CreateData
from fugue.extensions._builtins.outputters import (
AssertEqual,
AssertNotEqual,
Expand Down
54 changes: 48 additions & 6 deletions fugue/extensions/_builtins/creators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Any, Callable, Optional

from fugue.collections.yielded import Yielded, YieldedFile
from fugue.dataframe import DataFrame
from fugue.exceptions import FugueWorkflowCompileError
from fugue.extensions.creator import Creator
from triad import ParamDict, Schema, assert_or_throw, to_uuid


class Load(Creator):
Expand All @@ -15,10 +19,48 @@ def create(self) -> DataFrame:
)


class LoadYielded(Creator):
class CreateData(Creator):
def __init__(
self,
df: Any,
schema: Any = None,
metadata: Any = None,
data_determiner: Optional[Callable[[Any], Any]] = None,
) -> None:
if isinstance(df, Yielded):
assert_or_throw(
schema is None and metadata is None,
FugueWorkflowCompileError(
"schema and metadata must be None when data is Yielded"
),
)
super().__init__()
self._df = df
self._schema = schema if schema is None else Schema(schema)
self._metadata = metadata if metadata is None else ParamDict(metadata)
self._data_determiner = data_determiner

def create(self) -> DataFrame:
yielded = self.params.get_or_throw("yielded", Yielded)
if isinstance(yielded, YieldedFile):
return self.execution_engine.load_df(path=yielded.path)
else:
return self.execution_engine.to_df(yielded.result)
if isinstance(self._df, Yielded):
if isinstance(self._df, YieldedFile):
return self.execution_engine.load_df(path=self._df.path)
else:
return self.execution_engine.to_df(self._df.result) # type: ignore
return self.execution_engine.to_df(
self._df, schema=self._schema, metadata=self._metadata
)

def _df_uid(self):
if self._data_determiner is not None:
return self._data_determiner(self._df)
if isinstance(self._df, Yielded):
return self._df
return 1

def __uuid__(self) -> str:
return to_uuid(
super().__uuid__(),
self._df_uid(),
self._schema,
self._metadata,
)
24 changes: 0 additions & 24 deletions fugue/extensions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,6 @@
from triad.utils.assertion import assert_or_throw


class ExtensionRegistry:
def __init__(self) -> None:
self._dict: Dict[str, Any] = {}

def register(self, name: str, extension: Any, on_dup="overwrite") -> None:
if name not in self._dict:
self._dict[name] = extension
if on_dup in ["raise", "throw"]:
raise KeyError(f"{name} is already registered")
if on_dup == "overwrite":
self._dict[name] = extension
return
if on_dup == "ignore":
return
raise ValueError(on_dup)

def get(self, obj: Any) -> Any:
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

if isinstance(obj, str) and obj in self._dict:
return self._dict[obj]
return obj


def parse_validation_rules_from_comment(func: Callable) -> Dict[str, Any]:
res: Dict[str, Any] = {}
for key in [
Expand Down
7 changes: 6 additions & 1 deletion fugue/extensions/creator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# flake8: noqa
from fugue.extensions.creator.convert import _to_creator, creator, register_creator
from fugue.extensions.creator.convert import (
_to_creator,
creator,
parse_creator,
register_creator,
)
from fugue.extensions.creator.creator import Creator
56 changes: 48 additions & 8 deletions fugue/extensions/creator/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,66 @@
from typing import Any, Callable, Dict, List, Optional, no_type_check

from fugue._utils.interfaceless import FunctionWrapper, parse_output_schema_from_comment
from fugue._utils.registry import fugue_plugin
from fugue.dataframe import DataFrame
from fugue.exceptions import FugueInterfacelessError
from fugue.extensions._utils import ExtensionRegistry
from fugue.extensions.creator.creator import Creator
from triad import ParamDict
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars, to_function, to_instance
from triad.utils.hash import to_uuid

_CREATOR_REGISTRY = ExtensionRegistry()
_CREATOR_REGISTRY = ParamDict()


def register_creator(alias: str, obj: Any, on_dup: str = "overwrite") -> None:
"""Register creator with an alias.
@fugue_plugin
def parse_creator(obj: Any) -> Any:
"""Parse an object to another object that can be converted to a Fugue
:class:`~fugue.extensions.creator.creator.Creator`.
.. admonition:: Examples
.. code-block:: python
from fugue import Creator, parse_creator, FugueWorkflow
from triad import to_uuid
class My(Creator):
def __init__(self, x):
self.x = x
def create(self) :
raise NotImplementedError
def __uuid__(self) -> str:
return to_uuid(super().__uuid__(), self.x)
@parse_creator.candidate(
lambda x: isinstance(x, str) and x.startswith("-*"))
def _parse(obj):
return My(obj)
dag = FugueWorkflow()
dag.create("-*abc").show()
# == dag.create(My("-*abc")).show()
dag.run()
"""
if isinstance(obj, str) and obj in _CREATOR_REGISTRY:
return _CREATOR_REGISTRY[obj]
return obj


def register_creator(alias: str, obj: Any, on_dup: int = ParamDict.OVERWRITE) -> None:
"""Register creator with an alias. This is a simplified version of
:func:`~.parse_creator`
:param alias: alias of the creator
:param obj: the object that can be converted to
:class:`~fugue.extensions.creator.creator.Creator`
:param on_dup: action on duplicated ``alias``. It can be "overwrite", "ignore"
(not overwriting) or "throw" (throw exception), defaults to "overwrite".
:param on_dup: see :meth:`triad.collections.dict.ParamDict.update`
, defaults to ``ParamDict.OVERWRITE``
.. tip::
Expand Down Expand Up @@ -78,7 +118,7 @@ def register_extensions():
dag.create("mc").show() # use my_creator by alias
dag.run()
"""
_CREATOR_REGISTRY.register(alias, obj, on_dup=on_dup)
_CREATOR_REGISTRY.update({alias: obj}, on_dup=on_dup)


def creator(schema: Any = None) -> Callable[[Any], "_FuncAsCreator"]:
Expand All @@ -101,7 +141,7 @@ def _to_creator(
local_vars: Optional[Dict[str, Any]] = None,
) -> Creator:
global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
obj = _CREATOR_REGISTRY.get(obj)
obj = parse_creator(obj)
exp: Optional[Exception] = None
try:
return copy.copy(
Expand Down
1 change: 1 addition & 0 deletions fugue/extensions/outputter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from fugue.extensions.outputter.convert import (
_to_outputter,
outputter,
parse_outputter,
register_outputter,
)
from fugue.extensions.outputter.outputter import Outputter
54 changes: 46 additions & 8 deletions fugue/extensions/outputter/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,66 @@
from typing import Any, Callable, Dict, List, Optional, no_type_check

from fugue._utils.interfaceless import FunctionWrapper
from fugue._utils.registry import fugue_plugin
from fugue.dataframe import DataFrames
from fugue.exceptions import FugueInterfacelessError
from fugue.extensions._utils import (
ExtensionRegistry,
parse_validation_rules_from_comment,
to_validation_rules,
)
from fugue.extensions.outputter.outputter import Outputter
from triad import ParamDict, to_uuid
from triad.utils.convert import get_caller_global_local_vars, to_function, to_instance
from triad.utils.hash import to_uuid

_OUTPUTTER_REGISTRY = ExtensionRegistry()
_OUTPUTTER_REGISTRY = ParamDict()


def register_outputter(alias: str, obj: Any, on_dup: str = "overwrite") -> None:
@fugue_plugin
def parse_outputter(obj: Any) -> Any:
"""Parse an object to another object that can be converted to a Fugue
:class:`~fugue.extensions.outputter.outputter.Outputter`.
.. admonition:: Examples
.. code-block:: python
from fugue import Outputter, parse_outputter, FugueWorkflow
from triad import to_uuid
class My(Outputter):
def __init__(self, x):
self.x = x
def process(self, dfs):
raise NotImplementedError
def __uuid__(self) -> str:
return to_uuid(super().__uuid__(), self.x)
@parse_outputter.candidate(
lambda x: isinstance(x, str) and x.startswith("-*"))
def _parse(obj):
return My(obj)
dag = FugueWorkflow()
dag.df([[0]], "a:int").output("-*abc")
# == dag.df([[0]], "a:int").output(My("-*abc"))
dag.run()
"""
if isinstance(obj, str) and obj in _OUTPUTTER_REGISTRY:
return _OUTPUTTER_REGISTRY[obj]
return obj


def register_outputter(alias: str, obj: Any, on_dup: int = ParamDict.OVERWRITE) -> None:
"""Register outputter with an alias.
:param alias: alias of the processor
:param obj: the object that can be converted to
:class:`~fugue.extensions.outputter.outputter.Outputter`
:param on_dup: action on duplicated ``alias``. It can be "overwrite", "ignore"
(not overwriting) or "throw" (throw exception), defaults to "overwrite".
:param on_dup: see :meth:`triad.collections.dict.ParamDict.update`
, defaults to ``ParamDict.OVERWRITE``
.. tip::
Expand Down Expand Up @@ -81,7 +119,7 @@ def register_extensions():
dag.df([[0]],"a:int").output("mo")
dag.run()
"""
_OUTPUTTER_REGISTRY.register(alias, obj, on_dup=on_dup)
_OUTPUTTER_REGISTRY.update({alias: obj}, on_dup=on_dup)


def outputter(**validation_rules: Any) -> Callable[[Any], "_FuncAsOutputter"]:
Expand All @@ -106,7 +144,7 @@ def _to_outputter(
validation_rules: Optional[Dict[str, Any]] = None,
) -> Outputter:
global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
obj = _OUTPUTTER_REGISTRY.get(obj)
obj = parse_outputter(obj)
exp: Optional[Exception] = None
if validation_rules is None:
validation_rules = {}
Expand Down
Loading

0 comments on commit 68975b4

Please sign in to comment.