From 4eb11cfebc9c851634e25c271b25bc7b378f22ce Mon Sep 17 00:00:00 2001 From: 6syun9 <6syun9@gmail.com> Date: Sun, 1 Sep 2019 19:27:53 +0900 Subject: [PATCH 1/5] add module versions log --- gokart/task.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/gokart/task.py b/gokart/task.py index 8c4b78c6..d61867dc 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -1,5 +1,7 @@ import hashlib import os +import sys +from importlib import import_module from logging import getLogger from typing import Union, List, Any, Callable, Set, Optional, Dict @@ -268,3 +270,18 @@ def restore(cls, unique_id): @luigi.Task.event_handler(luigi.Event.FAILURE) def _log_unique_id(self, exception): logger.info(f'FAILURE:\n task name={type(self).__name__}\n unique id={self.make_unique_id()}') + + @luigi.Task.event_handler(luigi.Event.START) + def _dump_module_versions(self): + self.dump(self.get_module_versions(), self._get_module_versions_target()) + + def _get_module_versions_target(self): + return self.make_target(f'log/module_versions/{type(self).__name__}.txt') + + def get_module_versions(self) -> str: + module_versions = [] + for x in set([x.split('.')[0] for x in sys.modules.keys() if '_' not in x]): + module = import_module(x) + if '__version__' in dir(module): + module_versions.append(f'{x}=={module.__version__.split(" ")[0]}') + return '\n'.join(module_versions) From ac2db4b07e70cfcaa5c3564f61a2d1b1b6570cc3 Mon Sep 17 00:00:00 2001 From: 6syun9 <6syun9@gmail.com> Date: Tue, 3 Sep 2019 12:23:55 +0900 Subject: [PATCH 2/5] change _get_module_version --- gokart/task.py | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/gokart/task.py b/gokart/task.py index d61867dc..831b7b70 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -27,18 +27,18 @@ class TaskOnKart(luigi.Task): * :py:meth:`dump` - this save a object as output of this task. """ - workspace_directory = luigi.Parameter( - default='./resources/', description='A directory to set outputs on. Please use a path starts with s3:// when you use s3.', - significant=False) # type: str + workspace_directory = luigi.Parameter(default='./resources/', + description='A directory to set outputs on. Please use a path starts with s3:// when you use s3.', + significant=False) # type: str local_temporary_directory = luigi.Parameter(default='./resources/tmp/', description='A directory to save temporary files.', significant=False) # type: str rerun = luigi.BoolParameter(default=False, description='If this is true, this task will run even if all output files exist.', significant=False) - strict_check = luigi.BoolParameter( - default=False, description='If this is true, this task will not run only if all input and output files exits.', significant=False) - modification_time_check = luigi.BoolParameter( - default=False, - description='If this is true, this task will not run only if all input and output files exits,' - ' and all input files are modified before output file are modified.', - significant=False) + strict_check = luigi.BoolParameter(default=False, + description='If this is true, this task will not run only if all input and output files exits.', + significant=False) + modification_time_check = luigi.BoolParameter(default=False, + description='If this is true, this task will not run only if all input and output files exits,' + ' and all input files are modified before output file are modified.', + significant=False) delete_unnecessary_output_files = luigi.BoolParameter(default=False, description='If this is true, delete unnecessary output files.', significant=False) def __init__(self, *args, **kwargs): @@ -107,12 +107,11 @@ def make_target(self, relative_file_path: str, use_unique_id: bool = True, proce def make_large_data_frame_target(self, relative_file_path: str, use_unique_id: bool = True, max_byte=int(2**26)) -> TargetOnKart: file_path = os.path.join(self.workspace_directory, relative_file_path) unique_id = self.make_unique_id() if use_unique_id else None - return gokart.target.make_model_target( - file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, - load_function=gokart.target.LargeDataFrameProcessor.load) + return gokart.target.make_model_target(file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, + load_function=gokart.target.LargeDataFrameProcessor.load) def make_model_target(self, relative_file_path: str, @@ -130,12 +129,11 @@ def make_model_target(self, file_path = os.path.join(self.workspace_directory, relative_file_path) assert relative_file_path[-3:] == 'zip', f'extension must be zip, but {relative_file_path} is passed.' unique_id = self.make_unique_id() if use_unique_id else None - return gokart.target.make_model_target( - file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=save_function, - load_function=load_function) + return gokart.target.make_model_target(file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=save_function, + load_function=load_function) def load(self, target: Union[None, str, TargetOnKart] = None) -> Any: def _load(targets): @@ -163,11 +161,13 @@ def _load(targets): def load_data_frame(self, target: Union[None, str, TargetOnKart] = None, required_columns: Optional[Set[str]] = None) -> pd.DataFrame: data = self.load(target=target) if isinstance(data, list): + def _pd_concat(dfs): if isinstance(dfs, list): return pd.concat([_pd_concat(df) for df in dfs]) else: return dfs + data = _pd_concat(data) required_columns = required_columns or set() @@ -273,12 +273,12 @@ def _log_unique_id(self, exception): @luigi.Task.event_handler(luigi.Event.START) def _dump_module_versions(self): - self.dump(self.get_module_versions(), self._get_module_versions_target()) + self.dump(self._get_module_versions(), self._get_module_versions_target()) def _get_module_versions_target(self): return self.make_target(f'log/module_versions/{type(self).__name__}.txt') - def get_module_versions(self) -> str: + def _get_module_versions(self) -> str: module_versions = [] for x in set([x.split('.')[0] for x in sys.modules.keys() if '_' not in x]): module = import_module(x) From b2bfc5b069d4672d1d70101f40670aeec232a767 Mon Sep 17 00:00:00 2001 From: 6syun9 <6syun9@gmail.com> Date: Fri, 20 Sep 2019 16:40:10 +0900 Subject: [PATCH 3/5] format --- gokart/task.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/gokart/task.py b/gokart/task.py index 93f7c5bc..46f1d68c 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -27,9 +27,9 @@ class TaskOnKart(luigi.Task): * :py:meth:`dump` - this save a object as output of this task. """ - workspace_directory = luigi.Parameter(default='./resources/', - description='A directory to set outputs on. Please use a path starts with s3:// when you use s3.', - significant=False) # type: str + workspace_directory = luigi.Parameter( + default='./resources/', description='A directory to set outputs on. Please use a path starts with s3:// when you use s3.', + significant=False) # type: str local_temporary_directory = luigi.Parameter(default='./resources/tmp/', description='A directory to save temporary files.', significant=False) # type: str rerun = luigi.BoolParameter(default=False, description='If this is true, this task will run even if all output files exist.', significant=False) strict_check = luigi.BoolParameter( @@ -111,11 +111,12 @@ def make_target(self, relative_file_path: str, use_unique_id: bool = True, proce def make_large_data_frame_target(self, relative_file_path: str, use_unique_id: bool = True, max_byte=int(2**26)) -> TargetOnKart: file_path = os.path.join(self.workspace_directory, relative_file_path) unique_id = self.make_unique_id() if use_unique_id else None - return gokart.target.make_model_target(file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, - load_function=gokart.target.LargeDataFrameProcessor.load) + return gokart.target.make_model_target( + file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, + load_function=gokart.target.LargeDataFrameProcessor.load) def make_model_target(self, relative_file_path: str, @@ -133,11 +134,12 @@ def make_model_target(self, file_path = os.path.join(self.workspace_directory, relative_file_path) assert relative_file_path[-3:] == 'zip', f'extension must be zip, but {relative_file_path} is passed.' unique_id = self.make_unique_id() if use_unique_id else None - return gokart.target.make_model_target(file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=save_function, - load_function=load_function) + return gokart.target.make_model_target( + file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=save_function, + load_function=load_function) def load(self, target: Union[None, str, TargetOnKart] = None) -> Any: def _load(targets): @@ -165,13 +167,11 @@ def _load(targets): def load_data_frame(self, target: Union[None, str, TargetOnKart] = None, required_columns: Optional[Set[str]] = None) -> pd.DataFrame: data = self.load(target=target) if isinstance(data, list): - def _pd_concat(dfs): if isinstance(dfs, list): return pd.concat([_pd_concat(df) for df in dfs]) else: return dfs - data = _pd_concat(data) required_columns = required_columns or set() From 94e98f8353dbe2c2cb31ab12eb0731cec8506d93 Mon Sep 17 00:00:00 2001 From: 6syun9 <6syun9@gmail.com> Date: Fri, 20 Sep 2019 16:42:55 +0900 Subject: [PATCH 4/5] rm blank space --- gokart/task.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/gokart/task.py b/gokart/task.py index 46f1d68c..cef4966f 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -28,8 +28,8 @@ class TaskOnKart(luigi.Task): """ workspace_directory = luigi.Parameter( - default='./resources/', description='A directory to set outputs on. Please use a path starts with s3:// when you use s3.', - significant=False) # type: str + default='./resources/', description='A directory to set outputs on. Please use a path starts with s3:// when you use s3.', + significant=False) # type: str local_temporary_directory = luigi.Parameter(default='./resources/tmp/', description='A directory to save temporary files.', significant=False) # type: str rerun = luigi.BoolParameter(default=False, description='If this is true, this task will run even if all output files exist.', significant=False) strict_check = luigi.BoolParameter( @@ -112,11 +112,11 @@ def make_large_data_frame_target(self, relative_file_path: str, use_unique_id: b file_path = os.path.join(self.workspace_directory, relative_file_path) unique_id = self.make_unique_id() if use_unique_id else None return gokart.target.make_model_target( - file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, - load_function=gokart.target.LargeDataFrameProcessor.load) + file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, + load_function=gokart.target.LargeDataFrameProcessor.load) def make_model_target(self, relative_file_path: str, @@ -135,11 +135,11 @@ def make_model_target(self, assert relative_file_path[-3:] == 'zip', f'extension must be zip, but {relative_file_path} is passed.' unique_id = self.make_unique_id() if use_unique_id else None return gokart.target.make_model_target( - file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=save_function, - load_function=load_function) + file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=save_function, + load_function=load_function) def load(self, target: Union[None, str, TargetOnKart] = None) -> Any: def _load(targets): From 7fedd9db4b7f9866caf1eb0e4adb85d605ed7ab8 Mon Sep 17 00:00:00 2001 From: 6syun9 <6syun9@gmail.com> Date: Fri, 20 Sep 2019 16:44:17 +0900 Subject: [PATCH 5/5] fix yapf --- gokart/task.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/gokart/task.py b/gokart/task.py index cef4966f..6b71bc1a 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -112,11 +112,11 @@ def make_large_data_frame_target(self, relative_file_path: str, use_unique_id: b file_path = os.path.join(self.workspace_directory, relative_file_path) unique_id = self.make_unique_id() if use_unique_id else None return gokart.target.make_model_target( - file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, - load_function=gokart.target.LargeDataFrameProcessor.load) + file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save, + load_function=gokart.target.LargeDataFrameProcessor.load) def make_model_target(self, relative_file_path: str, @@ -135,11 +135,11 @@ def make_model_target(self, assert relative_file_path[-3:] == 'zip', f'extension must be zip, but {relative_file_path} is passed.' unique_id = self.make_unique_id() if use_unique_id else None return gokart.target.make_model_target( - file_path=file_path, - temporary_directory=self.local_temporary_directory, - unique_id=unique_id, - save_function=save_function, - load_function=load_function) + file_path=file_path, + temporary_directory=self.local_temporary_directory, + unique_id=unique_id, + save_function=save_function, + load_function=load_function) def load(self, target: Union[None, str, TargetOnKart] = None) -> Any: def _load(targets):