-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtasks.py
32 lines (26 loc) · 946 Bytes
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import os, time
from datetime import datetime
from subprocess import run, PIPE, STDOUT
# Celery configuration
from celery import Celery
os.environ.setdefault('C_FORCE_ROOT', 'true')
app = Celery("tasks",
broker=os.environ.get('CELERY_BROKER_URL', 'redis://redis'),
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis'))
app.conf.CELERY_WORKER_SEND_TASK_EVENTS = True
def execR(cmd):
# Runs command and returns output (unless an error is raised)
return run(cmd, check=True, stdout=PIPE, stderr=STDOUT, text=True).stdout
# Regarding running R expressions:
# - Use 'cat(2+2)' to capture R output as a Celery result
# - If the command raises an error, the task's state will be FAILURE
@app.task
def R(cmd):
# Run R expression
return execR(["Rscript", "-e", cmd])
@app.task
def Rscript(cmd):
# Run Rscript
return execR(["Rscript", cmd])
if __name__ == "__main__":
app.start()