Skip to content

Commit

Permalink
Careful assertion on empty dump (#165)
Browse files Browse the repository at this point in the history
* ⭐ TaskOnKart.fail_on_empty_dump

* 👕 yapf

* 📝 TaskOnKart.fail_on_empty_dump

* 📝 TaskOnKart.fail_on_empty_dump
  • Loading branch information
Hi-king authored Jan 12, 2021
1 parent 611627f commit 6088284
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 20 deletions.
50 changes: 30 additions & 20 deletions docs/task_on_kart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,6 @@ The `load` method loads individual task input by passing a key of an input dicti
data_b = self.load('b')
TaskOnKart.load_generator
----------------
The :func:`~gokart.task.TaskOnKart.load_generator` method is used to load input data with generator.
For instance, an example implementation could be as follows:

.. code:: python
def requires(self):
return TaskA(param='called by TaskB')
def run(self):
for data in self.load_generator():
any_process(data)
Usage is the same as `TaskOnKart.generator`.
`load_generator` reads the divided file into iterations.
It's effective when can't read all data to memory, because `load_generator` doesn't load all files at once.


TaskOnKart.dump
----------------
The :func:`~gokart.task.TaskOnKart.dump` method is used to dump results of tasks.
Expand Down Expand Up @@ -147,3 +127,33 @@ In the case that a task has 2 or more output, it is possible to specify output t
b_data = do_something_b(self.load())
self.dump(a_data, 'a')
self.dump(b_data, 'b')
Advanced Features
---------------------

TaskOnKart.load_generator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The :func:`~gokart.task.TaskOnKart.load_generator` method is used to load input data with generator.
For instance, an example implementation could be as follows:

.. code:: python
def requires(self):
return TaskA(param='called by TaskB')
def run(self):
for data in self.load_generator():
any_process(data)
Usage is the same as `TaskOnKart.generator`.
`load_generator` reads the divided file into iterations.
It's effective when can't read all data to memory, because `load_generator` doesn't load all files at once.


TaskOnKart.fail_on_empty_dump
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Raise `AssertionError` on trying to dump empty dataframe.

Empty caches sometimes hide bugs and let us spend much time debugging. This feature notice us some bugs (including wrong datasources) in the early stage.
3 changes: 3 additions & 0 deletions gokart/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TaskOnKart(luigi.Task):
redis_host = luigi.Parameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_port = luigi.Parameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_timeout = luigi.IntParameter(default=180, description='Redis lock will be released after `redis_timeout` seconds', significant=False)
fail_on_empty_dump: bool = gokart.ExplicitBoolParameter(default=False, description='Fail when task dumps empty DF', significant=False)

def __init__(self, *args, **kwargs):
self._add_configuration(kwargs, 'TaskOnKart')
Expand Down Expand Up @@ -227,6 +228,8 @@ def _flatten_recursively(dfs):

def dump(self, obj, target: Union[None, str, TargetOnKart] = None) -> None:
PandasTypeConfigMap().check(obj, task_namespace=self.task_namespace)
if self.fail_on_empty_dump and isinstance(obj, pd.DataFrame):
assert not obj.empty
self._get_output_target(target).dump(obj)

def make_unique_id(self):
Expand Down
12 changes: 12 additions & 0 deletions test/test_task_on_kart.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,18 @@ def test_dump(self):
task.dump(1)
target.dump.assert_called_once()

def test_fail_on_empty_dump(self):
# do not fail
task = _DummyTask(fail_on_empty_dump=False)
target = MagicMock(spec=TargetOnKart)
task.output = MagicMock(return_value=target)
task.dump(pd.DataFrame())
target.dump.assert_called_once()

# fail
task = _DummyTask(fail_on_empty_dump=True)
self.assertRaises(AssertionError, lambda: task.dump(pd.DataFrame()))

@patch('luigi.configuration.get_config')
def test_add_configuration(self, mock_config: MagicMock):
mock_config.return_value = {'_DummyTask': {'list_param': '["c", "d"]', 'param': '3', 'bool_param': 'True'}}
Expand Down

0 comments on commit 6088284

Please sign in to comment.