From c5a1199c6c206f432f4dc374af0de92ba6413549 Mon Sep 17 00:00:00 2001 From: Brad Rowe Date: Fri, 23 Jul 2021 08:08:23 -0700 Subject: [PATCH] add --job-class --- README.md | 3 ++ rq_dashboard/cli.py | 9 +++++ rq_dashboard/web.py | 99 ++++++++++++++++++++++++++++----------------- tests/test_basic.py | 16 ++++++++ 4 files changed, 90 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index d011b0fa..d4cbda11 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ environment variables with prefix `RQ_DASHBOARD_*`: - RQ_DASHBOARD_REDIS_URL=redis:// - RQ_DASHBOARD_USERNAME=rq - RQ_DASHBOARD_PASSWORD=password + - RQ_DASHBOARD_JOB_CLASS=mypackage.CustomJobClass See more info on how to pass environment variables in [Docker documentation](https://docs.docker.com/engine/reference/commandline/run/#set-environment-variables--e---env---env-file) @@ -79,6 +80,8 @@ Options: path) -u, --redis-url TEXT Redis URL. Can be specified multiple times. Default: redis://127.0.0.1:6379 + -j, --job-class TEXT RQ Job class to use. It will be imported at + runtime, so it must be installed. --poll-interval, --interval INTEGER Refresh interval in ms --extra-path TEXT Append specified directories to sys.path diff --git a/rq_dashboard/cli.py b/rq_dashboard/cli.py index 7342de7c..c3d2f439 100755 --- a/rq_dashboard/cli.py +++ b/rq_dashboard/cli.py @@ -112,6 +112,12 @@ def make_flask_app(config, username, password, url_prefix, compatibility_mode=Tr multiple=True, help="Redis URL. Can be specified multiple times. Default: redis://127.0.0.1:6379", ) +@click.option( + "-j", + "--job-class", + default=None, + help="RQ Job class to use. It will be imported at runtime, so it must be installed.", +) @click.option( "--redis-sentinels", default=None, @@ -166,6 +172,7 @@ def run( redis_password, redis_database, redis_url, + job_class, redis_sentinels, redis_master_name, poll_interval, @@ -196,6 +203,8 @@ def run( app.config["RQ_DASHBOARD_REDIS_URL"] = redis_url else: app.config["RQ_DASHBOARD_REDIS_URL"] = "redis://127.0.0.1:6379" + if job_class: + app.config["RQ_DASHBOARD_JOB_CLASS"] = job_class if redis_host: app.config["DEPRECATED_OPTIONS"].append("--redis-host") if redis_port: diff --git a/rq_dashboard/web.py b/rq_dashboard/web.py index 97346755..41d98f61 100644 --- a/rq_dashboard/web.py +++ b/rq_dashboard/web.py @@ -32,21 +32,14 @@ url_for, ) from redis_sentinel_url import connect as from_url -from rq import ( - VERSION as rq_version, - Queue, - Worker, - pop_connection, - push_connection, - requeue_job, -) -from rq.job import Job +from rq import VERSION as rq_version, Queue, Worker, pop_connection, push_connection from rq.registry import ( DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, ) +from rq.utils import import_attribute from six import string_types from .legacy_config import upgrade_config @@ -86,6 +79,10 @@ def push_rq_connection(): new_instance = current_app.redis_conn push_connection(new_instance) current_app.redis_conn = new_instance + job_class = current_app.config.get("RQ_DASHBOARD_JOB_CLASS", "rq.job.Job") + if isinstance(job_class, string_types): + job_class = import_attribute(job_class) + current_app.job_class = job_class @blueprint.teardown_request @@ -117,7 +114,9 @@ def serialize_queues(instance_number, queues): per_page="8", page="1", ), - failed_job_registry_count=FailedJobRegistry(q.name).count, + failed_job_registry_count=FailedJobRegistry( + q.name, job_class=current_app.job_class + ).count, failed_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -126,7 +125,9 @@ def serialize_queues(instance_number, queues): per_page="8", page="1", ), - started_job_registry_count=StartedJobRegistry(q.name).count, + started_job_registry_count=StartedJobRegistry( + q.name, job_class=current_app.job_class + ).count, started_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -135,7 +136,9 @@ def serialize_queues(instance_number, queues): per_page="8", page="1", ), - deferred_job_registry_count=DeferredJobRegistry(q.name).count, + deferred_job_registry_count=DeferredJobRegistry( + q.name, job_class=current_app.job_class + ).count, deferred_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -144,7 +147,9 @@ def serialize_queues(instance_number, queues): per_page="8", page="1", ), - finished_job_registry_count=FinishedJobRegistry(q.name).count, + finished_job_registry_count=FinishedJobRegistry( + q.name, job_class=current_app.job_class + ).count, finished_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -212,19 +217,27 @@ def favicon(): def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page): - queue = Queue(queue_name) + queue = Queue(queue_name, job_class=current_app.job_class) if registry_name != "queued": if per_page >= 0: per_page = offset + (per_page - 1) if registry_name == "failed": - current_queue = FailedJobRegistry(queue_name) + current_queue = FailedJobRegistry( + queue_name, job_class=current_app.job_class + ) elif registry_name == "deferred": - current_queue = DeferredJobRegistry(queue_name) + current_queue = DeferredJobRegistry( + queue_name, job_class=current_app.job_class + ) elif registry_name == "started": - current_queue = StartedJobRegistry(queue_name) + current_queue = StartedJobRegistry( + queue_name, job_class=current_app.job_class + ) elif registry_name == "finished": - current_queue = FinishedJobRegistry(queue_name) + current_queue = FinishedJobRegistry( + queue_name, job_class=current_app.job_class + ) else: current_queue = queue total_items = current_queue.count @@ -254,7 +267,7 @@ def queues_overview(instance_number): "rq_dashboard/queues.html", current_instance=instance_number, instance_list=current_app.config.get("RQ_DASHBOARD_REDIS_URL"), - queues=Queue.all(), + queues=Queue.all(job_class=current_app.job_class), rq_url_prefix=url_for(".queues_overview"), rq_dashboard_version=rq_dashboard_version, rq_version=rq_version, @@ -275,7 +288,7 @@ def workers_overview(instance_number): "rq_dashboard/workers.html", current_instance=instance_number, instance_list=current_app.config.get("RQ_DASHBOARD_REDIS_URL"), - workers=Worker.all(), + workers=Worker.all(job_class=current_app.job_class), rq_url_prefix=url_for(".queues_overview"), rq_dashboard_version=rq_dashboard_version, rq_version=rq_version, @@ -303,15 +316,15 @@ def workers_overview(instance_number): ) def jobs_overview(instance_number, queue_name, registry_name, per_page, page): if queue_name is None: - queue = Queue() + queue = Queue(job_class=current_app.job_class) else: - queue = Queue(queue_name) + queue = Queue(queue_name, job_class=current_app.job_class) r = make_response( render_template( "rq_dashboard/jobs.html", current_instance=instance_number, instance_list=current_app.config.get("RQ_DASHBOARD_REDIS_URL"), - queues=Queue.all(), + queues=Queue.all(job_class=current_app.job_class), queue=queue, per_page=per_page, page=page, @@ -331,7 +344,7 @@ def jobs_overview(instance_number, queue_name, registry_name, per_page, page): @blueprint.route("//view/job/") def job_view(instance_number, job_id): - job = Job.fetch(job_id) + job = current_app.job_class.fetch(job_id) r = make_response( render_template( "rq_dashboard/job.html", @@ -353,7 +366,7 @@ def job_view(instance_number, job_id): @blueprint.route("/job//delete", methods=["POST"]) @jsonify def delete_job_view(job_id): - job = Job.fetch(job_id) + job = current_app.job_class.fetch(job_id) job.delete() return dict(status="OK") @@ -361,18 +374,20 @@ def delete_job_view(job_id): @blueprint.route("/job//requeue", methods=["POST"]) @jsonify def requeue_job_view(job_id): - requeue_job(job_id, connection=current_app.redis_conn) + job = current_app.job_class.fetch(job_id, connection=current_app.redis_conn) + job.requeue() return dict(status="OK") @blueprint.route("/requeue/", methods=["GET", "POST"]) @jsonify def requeue_all(queue_name): - fq = Queue(queue_name).failed_job_registry + fq = Queue(queue_name, job_class=current_app.job_class).failed_job_registry job_ids = fq.get_job_ids() count = len(job_ids) for job_id in job_ids: - requeue_job(job_id, connection=current_app.redis_conn) + job = current_app.job_class.fetch(job_id, connection=current_app.redis_conn) + job.requeue() return dict(status="OK", count=count) @@ -380,22 +395,30 @@ def requeue_all(queue_name): @jsonify def empty_queue(queue_name, registry_name): if registry_name == "queued": - q = Queue(queue_name) + q = Queue(queue_name, job_class=current_app.job_class) q.empty() elif registry_name == "failed": - ids = FailedJobRegistry(queue_name).get_job_ids() + ids = FailedJobRegistry( + queue_name, job_class=current_app.job_class + ).get_job_ids() for id in ids: delete_job_view(id) elif registry_name == "deferred": - ids = DeferredJobRegistry(queue_name).get_job_ids() + ids = DeferredJobRegistry( + queue_name, job_class=current_app.job_class + ).get_job_ids() for id in ids: delete_job_view(id) elif registry_name == "started": - ids = StartedJobRegistry(queue_name).get_job_ids() + ids = StartedJobRegistry( + queue_name, job_class=current_app.job_class + ).get_job_ids() for id in ids: delete_job_view(id) elif registry_name == "finished": - ids = FinishedJobRegistry(queue_name).get_job_ids() + ids = FinishedJobRegistry( + queue_name, job_class=current_app.job_class + ).get_job_ids() for id in ids: delete_job_view(id) return dict(status="OK") @@ -404,7 +427,7 @@ def empty_queue(queue_name, registry_name): @blueprint.route("/queue//compact", methods=["POST"]) @jsonify def compact_queue(queue_name): - q = Queue(queue_name) + q = Queue(queue_name, job_class=current_app.job_class) q.compact() return dict(status="OK") @@ -412,7 +435,9 @@ def compact_queue(queue_name): @blueprint.route("//data/queues.json") @jsonify def list_queues(instance_number): - queues = serialize_queues(instance_number, sorted(Queue.all())) + queues = serialize_queues( + instance_number, sorted(Queue.all(job_class=current_app.job_class)), + ) return dict(queues=queues) @@ -512,7 +537,7 @@ def list_jobs(instance_number, queue_name, registry_name, per_page, page): @blueprint.route("//data/job/.json") @jsonify def job_info(instance_number, job_id): - job = Job.fetch(job_id) + job = current_app.job_class.fetch(job_id) return dict( id=job.id, created_at=serialize_date(job.created_at), @@ -542,7 +567,7 @@ def serialize_queue_names(worker): version=getattr(worker, "version", ""), python_version=getattr(worker, "python_version", ""), ) - for worker in Worker.all() + for worker in Worker.all(job_class=current_app.job_class) ), key=lambda w: (w["state"], w["queues"], w["name"]), ) diff --git a/tests/test_basic.py b/tests/test_basic.py index d68649f1..a7bd1067 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -107,6 +107,22 @@ def test_worker_version_field(self): w.register_death() +class CustomRqClassTestCase(BasicTestCase): + def setUp(self): + super().setUp() + self.app.config['RQ_DASHBOARD_JOB_CLASS'] = 'rq.job.Job' + + def test_invalid_job_class(self): + self.app.config['RQ_DASHBOARD_JOB_CLASS'] = 'rq.job.NotAJobClass' + + with self.assertRaises(AttributeError) as ae: + self.client.get('/0/data/queues.json') + + self.assertIn('NotAJobClass', str(ae.exception)) + + + __all__ = [ 'BasicTestCase', + 'CustomRqClassTestCase' ]