Skip to content

Commit

Permalink
Merge pull request #10 from gisce/only_spawn_required_num_workers
Browse files Browse the repository at this point in the history
Only Spawn self.max_procs - Workers.count(queue)
  • Loading branch information
ecarreras authored Jan 7, 2018
2 parents f429f8d + 4597c5f commit 9855646
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 16 deletions.
22 changes: 12 additions & 10 deletions autoworker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from redis import StrictRedis
from rq.contrib.legacy import cleanup_ghosts
from rq.worker import Worker
from rq.utils import import_attribute
from osconf import config_from_environment


Expand All @@ -19,9 +21,7 @@ class AutoWorker(object):
def __init__(self, queue=None, max_procs=None, skip_failed=True,
default_result_ttl=None):
if queue is None:
self.queue = 'default'
else:
self.queue = queue
queue = 'default'
if max_procs is None:
self.max_procs = MAX_PROCS
elif 1 <= max_procs < MAX_PROCS + 1:
Expand All @@ -38,22 +38,23 @@ def __init__(self, queue=None, max_procs=None, skip_failed=True,
)
self.skip_failed = skip_failed
self.default_result_ttl = default_result_ttl
self.connection = StrictRedis.from_url(self.config['redis_url'])
queue_class = import_attribute(self.config['queue_class'])
self.queue = queue_class(queue, connection=self.connection)

def worker(self):
"""Internal target to use in multiprocessing
"""
from rq.utils import import_attribute
conn = StrictRedis.from_url(self.config['redis_url'])
cleanup_ghosts(conn)
cleanup_ghosts(self.connection)
worker_class = import_attribute(self.config['worker_class'])
queue_class = import_attribute(self.config['queue_class'])
q = [queue_class(self.queue, connection=conn)]
if self.skip_failed:
exception_handlers = []
else:
exception_handlers = None

worker = worker_class(
q, connection=conn, exception_handlers=exception_handlers,
[self.queue], connection=self.connection,
exception_handlers=exception_handlers,
default_result_ttl=self.default_result_ttl
)
worker._name = '{}-auto'.format(worker.name)
Expand All @@ -68,8 +69,9 @@ def work(self):
"""Spawn the multiple workers using multiprocessing and `self.worker`_
targget
"""
max_procs = self.max_procs - Worker.count(queue=self.queue)
self.processes = [
mp.Process(target=self._create_worker) for _ in range(0, self.max_procs)
mp.Process(target=self._create_worker) for _ in range(0, max_procs)
]
for proc in self.processes:
proc.daemon = True
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rq
rq>=0.10.0
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import sys
from setuptools import setup, find_packages

INSTALL_REQUIRES = ['rq', 'osconf']
INSTALL_REQUIRES = ['rq>=0.10.0', 'osconf']

setup(
name='autoworker',
Expand Down
10 changes: 7 additions & 3 deletions spec/autoworker_spec.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
from autoworker import AutoWorker
from rq.queue import Queue

from expects import *
from mamba import *

# Setup environment variable
os.environ['AUTOWORKER_REDIS_URL'] = 'redis://localhost:6379/0'
Expand All @@ -27,14 +29,16 @@ def callback():

expect(callback).to(raise_error(ValueError))

with context('if no queue is defient'):
with context('if no queue is defined'):
with it('must be "default" queue'):
a = AutoWorker()
expect(a.queue).to(equal('default'))
q = Queue('default', connection=a.connection)
expect(a.queue).to(equal(q))
with context('if a queue is defined'):
with it('have to be the same value'):
a = AutoWorker('low')
expect(a.queue).to(equal('low'))
q = Queue('low', connection=a.connection)
expect(a.queue).to(equal(q))


with description('An instance of a AutoWorker'):
Expand Down

0 comments on commit 9855646

Please sign in to comment.