diff --git a/arq/worker.py b/arq/worker.py index b19a3f78..81afd5b7 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -382,14 +382,14 @@ async def _poll_iteration(self) -> None: await self.start_jobs(job_ids) - if self.allow_abort_jobs: - await self._cancel_aborted_jobs() - - for job_id, t in list(self.tasks.items()): - if t.done(): - del self.tasks[job_id] - # required to make sure errors in run_job get propagated - t.result() + if self.allow_abort_jobs: + await self._cancel_aborted_jobs() + + for job_id, t in list(self.tasks.items()): + if t.done(): + del self.tasks[job_id] + # required to make sure errors in run_job get propagated + t.result() await self.heart_beat() diff --git a/tests/test_worker.py b/tests/test_worker.py index 28a9fda0..aa56085b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -116,19 +116,23 @@ async def test_worker_signal_completes_job_before_shutting_down(caplog, arq_redi async def sleep_job(ctx, time): await asyncio.sleep(time) - await arq_redis.enqueue_job('sleep_job', 0.2, _job_id='short_sleep') # should be cancelled + await arq_redis.enqueue_job('sleep_job', 0.2, _job_id='short_sleep') # should be completed await arq_redis.enqueue_job('sleep_job', 5, _job_id='long_sleep') # should be cancelled worker = worker( functions=[func(sleep_job, name='sleep_job', max_tries=1)], - job_completion_wait=0.4, + job_completion_wait=0.5, job_timeout=10, ) assert worker.jobs_complete == 0 asyncio.create_task(worker.main()) await asyncio.sleep(0.1) worker.handle_sig_wait_for_completion(signal.SIGINT) + assert len(worker.tasks) == 2 # should be two tasks when sigint is sent assert worker.allow_pick_jobs is False - await asyncio.sleep(0.5) + await asyncio.sleep(0.3) + assert len(worker.tasks) == 1 # slept a bit, first job should now be complete and self.tasks should be updated + await asyncio.sleep(0.3) + assert len(worker.tasks) == 0 # slept longer than `job_completion_wait`, task should be cancelled and updated logs = [rec.message for rec in caplog.records] assert 'shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 0 retries ◆ 2 to be completed' in logs assert 'shutdown on SIGINT, wait complete ◆ 1 jobs complete ◆ 0 failed ◆ 0 retries ◆ 1 ongoing to cancel' in logs