From f365a68dc5df816fc1ad9860bf546013ea92bb87 Mon Sep 17 00:00:00 2001 From: Eduard Carreras Date: Fri, 5 Jan 2018 14:41:59 +0100 Subject: [PATCH 1/3] Only Spawn self.max_procs - Workers.count(queue) --- autoworker/__init__.py | 22 ++++++++++++---------- requirements.txt | 2 +- setup.py | 3 +-- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/autoworker/__init__.py b/autoworker/__init__.py index 4968290..7c3d115 100644 --- a/autoworker/__init__.py +++ b/autoworker/__init__.py @@ -3,6 +3,8 @@ from redis import StrictRedis from rq.contrib.legacy import cleanup_ghosts +from rq.worker import Worker +from rq.utils import import_attribute from osconf import config_from_environment @@ -19,9 +21,7 @@ class AutoWorker(object): def __init__(self, queue=None, max_procs=None, skip_failed=True, default_result_ttl=None): if queue is None: - self.queue = 'default' - else: - self.queue = queue + queue = 'default' if max_procs is None: self.max_procs = MAX_PROCS elif 1 <= max_procs < MAX_PROCS + 1: @@ -38,22 +38,23 @@ def __init__(self, queue=None, max_procs=None, skip_failed=True, ) self.skip_failed = skip_failed self.default_result_ttl = default_result_ttl + self.connection = StrictRedis.from_url(self.config['redis_url']) + queue_class = import_attribute(self.config['queue_class']) + self.queue = queue_class(queue, connection=self.connection) def worker(self): """Internal target to use in multiprocessing """ - from rq.utils import import_attribute - conn = StrictRedis.from_url(self.config['redis_url']) - cleanup_ghosts(conn) + cleanup_ghosts(self.connection) worker_class = import_attribute(self.config['worker_class']) - queue_class = import_attribute(self.config['queue_class']) - q = [queue_class(self.queue, connection=conn)] if self.skip_failed: exception_handlers = [] else: exception_handlers = None + worker = worker_class( - q, connection=conn, exception_handlers=exception_handlers, + [self.queue], connection=self.connection, + exception_handlers=exception_handlers, default_result_ttl=self.default_result_ttl ) worker._name = '{}-auto'.format(worker.name) @@ -68,8 +69,9 @@ def work(self): """Spawn the multiple workers using multiprocessing and `self.worker`_ targget """ + max_procs = self.max_procs - Worker.count(self.queue) self.processes = [ - mp.Process(target=self._create_worker) for _ in range(0, self.max_procs) + mp.Process(target=self._create_worker) for _ in range(0, max_procs) ] for proc in self.processes: proc.daemon = True diff --git a/requirements.txt b/requirements.txt index 75dd785..61d2f66 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -rq +rq>=0.10.0 diff --git a/setup.py b/setup.py index 7ca221d..29ece95 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,6 @@ -import sys from setuptools import setup, find_packages -INSTALL_REQUIRES = ['rq', 'osconf'] +INSTALL_REQUIRES = ['rq>=0.10.0', 'osconf'] setup( name='autoworker', From 308ac5eff11741497bd26143197c2f87351eed92 Mon Sep 17 00:00:00 2001 From: Eduard Carreras Date: Sun, 7 Jan 2018 11:53:41 +0100 Subject: [PATCH 2/3] Fix tests comparing queues --- spec/autoworker_spec.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/spec/autoworker_spec.py b/spec/autoworker_spec.py index 7bace6c..8a263ed 100644 --- a/spec/autoworker_spec.py +++ b/spec/autoworker_spec.py @@ -1,7 +1,9 @@ import os from autoworker import AutoWorker +from rq.queue import Queue from expects import * +from mamba import * # Setup environment variable os.environ['AUTOWORKER_REDIS_URL'] = 'redis://localhost:6379/0' @@ -27,14 +29,16 @@ def callback(): expect(callback).to(raise_error(ValueError)) - with context('if no queue is defient'): + with context('if no queue is defined'): with it('must be "default" queue'): a = AutoWorker() - expect(a.queue).to(equal('default')) + q = Queue('default', connection=a.connection) + expect(a.queue).to(equal(q)) with context('if a queue is defined'): with it('have to be the same value'): a = AutoWorker('low') - expect(a.queue).to(equal('low')) + q = Queue('low', connection=a.connection) + expect(a.queue).to(equal(q)) with description('An instance of a AutoWorker'): From 4597c5fdc5a8bcd0a2ffb61bfb2b53f9ce46636f Mon Sep 17 00:00:00 2001 From: Eduard Carreras Date: Sun, 7 Jan 2018 11:56:12 +0100 Subject: [PATCH 3/3] Fix Workers.count() pass queue as named arg --- autoworker/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autoworker/__init__.py b/autoworker/__init__.py index 7c3d115..74a2316 100644 --- a/autoworker/__init__.py +++ b/autoworker/__init__.py @@ -69,7 +69,7 @@ def work(self): """Spawn the multiple workers using multiprocessing and `self.worker`_ targget """ - max_procs = self.max_procs - Worker.count(self.queue) + max_procs = self.max_procs - Worker.count(queue=self.queue) self.processes = [ mp.Process(target=self._create_worker) for _ in range(0, max_procs) ]