Skip to content

Commit

Permalink
Reorganize
Browse files Browse the repository at this point in the history
-Force "args" to be a tuple
-added kwargs
  • Loading branch information
crispyDyne committed Jul 29, 2020
1 parent f6ca3d9 commit 6db2d55
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 44 deletions.
7 changes: 3 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
.vscode

super_logs.conf
supervisord.log
supervisord.pid
*.log
*.pid
*.rdb

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
6 changes: 2 additions & 4 deletions exampleTasks.py → examples/exampleTasks.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@


def simpleTask(x):
return 2*x


def addJobs(x):
newJobs = []
for i in range(x):
newJobs.append({'func':simpleTask,'args': i})
newJobs.append({'func':simpleTask,'args': (2*x + i)})

return {'result':2*x, 'addJobs':newJobs}


def addSubJob(x):
newJobs = []
for i in range(x):
newJobs.append({'func':simpleTask,'args': i})
newJobs.append({'func':simpleTask,'args': (2*x + i)})

return {'result':2*x, 'addJobs':{'jobs':newJobs }}
28 changes: 14 additions & 14 deletions examples.py → examples/examples.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from rq import Queue
from redis import Redis

from manager import manager, waitForProjectResults

from rq_manager import manager, getProjectResults
from exampleTasks import addSubJob, simpleTask, addJobs, addSubJob

import json
from pprint import pprint

Expand All @@ -14,14 +14,14 @@ def jsonCopy(d):
redis_conn = Redis()
q = Queue(connection=redis_conn,is_async=True) # no args implies the default queue

### Run jobs in parallel
### Run jobs in parallel:
project = {'jobs':[
{'func':simpleTask,'args': 1},
{'func':simpleTask,'args': 2}]
}

managerJob = q.enqueue(manager,project)
projectResults = waitForProjectResults(managerJob)
projectResults = getProjectResults(managerJob)
pprint(projectResults)

### Run jobs in series:
Expand All @@ -31,18 +31,18 @@ def jsonCopy(d):
}

managerJob = q.enqueue(manager,project)
projectResults = waitForProjectResults(managerJob)
projectResults = getProjectResults(managerJob)
pprint(projectResults)

### Run with dependent arguments:
project = {'jobs':[
{'func':simpleTask,'args': 1},
{'func':simpleTask, 'previousJobArgs': True}, # this job will wait to start until the previous job is finished
{'func':simpleTask, 'previousJobArgs': True}, # this job will wait
{'func':simpleTask,'args': 3}] # this job will NOT wait
}

managerJob = q.enqueue(manager,project)
projectResults = waitForProjectResults(managerJob)
projectResults = getProjectResults(managerJob)
pprint(projectResults)

### Run jobs with multiple dependancy:
Expand All @@ -59,10 +59,10 @@ def jsonCopy(d):
]}

managerJob = q.enqueue(manager,project)
projectResults = waitForProjectResults(managerJob)
projectResults = getProjectResults(managerJob)
pprint(projectResults)

### Create Jobs as you go.
### Add jobs as you go
project = {'jobs':[
{
'blocking':True,
Expand All @@ -76,22 +76,22 @@ def jsonCopy(d):
}

managerJob = q.enqueue(manager,project)
projectResults = waitForProjectResults(managerJob)
projectResults = getProjectResults(managerJob)
pprint(projectResults)

###
### Add a job with child jobs as you go
project = {'jobs':[
{
'blocking':True,
'jobs':[ # these two jobs will be run first
{'func':simpleTask,'args': 1},
{'func':addSubJob,'args': 2} # This job adds a new job with subjobs
{'func':addSubJob,'args': 2} # This job adds a new job with child jobs
# {'jobs': New Jobs are placed here}
],
},
{'func':simpleTask,'args': 3}]
{'func':simpleTask,'args': 2}]
}

managerJob = q.enqueue(manager,project)
projectResults = waitForProjectResults(managerJob)
projectResults = getProjectResults(managerJob)
pprint(projectResults)
4 changes: 2 additions & 2 deletions managerTest.py → examples/managerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from rq import Queue
from redis import Redis

from manager import manager, waitForProjectResults
from rq_manager import manager, getProjectResults
import tasks

import json
Expand Down Expand Up @@ -62,6 +62,6 @@ def jsonCopy(d):

managerJob = q.enqueue(manager,project)

projectResults = waitForProjectResults(managerJob)
projectResults = getProjectResults(managerJob)
pprint(projectResults[-1])

8 changes: 5 additions & 3 deletions supervisord.conf → examples/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ nodaemon=true
process_name=%(program_name)s_%(process_num)02d
command=rq worker
numprocs=10
stdout_logfile=super_logs.conf
stdout_logfile=super_logs.log
loglevel = debug
redirect_stderr=true
# autostart=true
# autorestart=true
stdout_logfile_maxbytes=0
autostart=true
autorestart=true
# # user=root
# startretries=50
# stopsignal=INT
Expand Down
File renamed without changes.
17 changes: 9 additions & 8 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ Manage jobs with multiple or tree-like dependancy.

Create a "project", which is a tree of jobs, then give it to the "manager" to complete it!

managedProject = q.enque(manager,project)
managerJob = q.enque(manager,project)
projectResults = getProjectResults(managerJob)

## A few trivial examples.
## A few examples
Define a simple job:
```python
def simpleJob(x):
return 2*x
```

### Run jobs in parallel
### Run jobs in parallel:
Jobs in an array will be started immediately and run in parallel.
```Python
project = {'jobs':[
Expand Down Expand Up @@ -46,7 +47,7 @@ A job can have child jobs. The parent job is not finised until all of the child
```Python
project = {'jobs':[
{
'blocking':True # this job, and its child jobs, must finished before moving on.
'blocking':True # this job, and its child jobs, must finish before moving on.
'jobs':[
{'func':simpleJob,'args': 1},
{'func':simpleJob,'args': 2}],
Expand All @@ -71,7 +72,7 @@ Jobs will be appended to the current job array
project = {'jobs':[
{
'blocking':True,
'jobs':[ # these two jobs will be run first
'jobs':[ # these two jobs will run first
{'func':simpleTask,'args': 2},
{'func':addJobs,'args': 4} # This job adds new jobs
# New Jobs are placed here
Expand All @@ -81,8 +82,8 @@ project = {'jobs':[
}
```

### Add sub jobs as you go
Define a job that creates new subjob filled with jobs.
### Add job with child jobs as you go
Define a job that creates new a job filled with child jobs.
```Python
def addSubJob(n):
newJobArray = []
Expand All @@ -99,7 +100,7 @@ project = {'jobs':[
'blocking':True,
'jobs':[ # these two jobs will be run first
{'func':simpleTask,'args': 2},
{'func':addSubJob,'args': 4} # This job adds a new job with subjobs
{'func':addSubJob,'args': 4} # This job adds a new job with child jobs
# {'jobs': New Jobs are placed here}
],
},
Expand Down
1 change: 1 addition & 0 deletions rq_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .manager import *
24 changes: 15 additions & 9 deletions manager.py → rq_manager/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections
import time
# from pprint import pprint
from pprint import pprint

from rq import Queue
from redis import Redis
Expand All @@ -22,8 +22,8 @@ def manager(project):
else:
# Assign defaults if not set.
if 'loop' not in project: project['loop'] = 0
if 'maxLoop' not in project: project['maxLoop'] = 100
if 'sleepTime' not in project: project['sleepTime'] = 0.5
if 'maxLoop' not in project: project['maxLoop'] = 600
if 'sleepTime' not in project: project['sleepTime'] = 1

if (project['loop'] < project['maxLoop']):
project['loop'] += 1 # keep track of the number of loops
Expand Down Expand Up @@ -60,11 +60,16 @@ def runJobs(jobs):
args = getJobResults(previousJob)
else:
finished = False
continue
else: # or simply use the provided arguments
args = jsonCopy(job['args'])
continue

job_temp = q.enqueue(job['func'],args) # queue the job.
else: # or simply use the provided
args = job['args']

if not isinstance(args,tuple):
args = (args,)

kwargs = job.get('kwargs',{})
job_temp = q.enqueue(job['func'],args=args,kwargs=kwargs) # queue the job.
job['id'] = job_temp.id # store the job id so we can keep track of its progress.

finished = False # The jobs are not finished, we just started a job!
Expand Down Expand Up @@ -97,7 +102,7 @@ def runJobs(jobs):
return finished

def getJobResults(job):
# Get results of a job. If a job has subjobs, then an array of results will be returned.
# Get results of a job. If a job has child jobs, then an array of the child results will be returned.
if 'func' in job:
results = job['result']
else:
Expand All @@ -107,7 +112,8 @@ def getJobResults(job):

return results

def waitForProjectResults(managerJob):

def getProjectResults(managerJob):
project = None
finished = False
while not finished:
Expand Down
9 changes: 9 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from setuptools import setup, find_packages
setup(
name="rq-manager",
version="0.1",
description='Manager for complex rq ',
author='Chris Patton',
author_email='[email protected]',
packages=['rq_manager']
)

0 comments on commit 6db2d55

Please sign in to comment.