This repository has been archived by the owner on Aug 6, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathworkflow.py
53 lines (44 loc) · 1.84 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import luigi
from numerapi.numerapi import NumerAPI
from tasks.numerai_fetch_training_data import FetchAndExtractData
from tasks.numerai_train_and_predict import TrainAndPredict
from tasks.numerai_upload_predictions import UploadPredictions
class Workflow(luigi.Task):
"""
A luigi task pipeline that fetches the most recent data, trains a model on said data
and does a submission to the numer.ai website.
:param: output_path (str):
path where the data shall be stored, defaults to ``./data/``.
:param: public_id (str):
public_id from the API credentials
:param: secret (str):
secret from the API credentials
"""
output_path = luigi.Parameter(default='./data/')
public_id = luigi.Parameter()
secret = luigi.Parameter()
def requires(self):
"""
Formulates the incoming dependencies, in this case the retrieval of the data for
the current tournament. Returns the results of the
:py:class:`tasks.numerai_train_and_predict.TrainAndPredict` task as a way to
forcefully call the task without depending on it.
"""
data = FetchAndExtractData(output_path=self.output_path)
return [
TrainAndPredict(output_path=self.output_path)
]
def run(self):
"""
After the incoming dependencies
(:py:class:`tasks.numerai_fetch_training_data.FetchAndExtractData`, and
:py:class:`tasks.numerai_train_and_predict.TrainAndPredict`) are done, the only
thing that's left is the submission of the predictions. This method is
taking care of this.
"""
task_deps = self.input()
for task in task_deps:
yield UploadPredictions(filepath=task.path,
public_id=self.public_id,
secret=self.secret)