Skip to content

Commit

Permalink
Fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Marishka17 committed Feb 13, 2025
1 parent 7ca3a31 commit 7e97810
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 152 deletions.
28 changes: 19 additions & 9 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
from PIL import Image
from redis.lock import Lock
from rest_framework.reverse import reverse as _reverse
from rq.job import Dependency, Job
from rq.job import Dependency as RQDependency, Job as RQJob
from rq.registry import BaseRegistry as RQBaseRegistry

Import = namedtuple("Import", ["module", "name", "alias"])

Expand Down Expand Up @@ -148,7 +149,7 @@ def parse_exception_message(msg):
pass
return parsed_msg

def process_failed_job(rq_job: Job):
def process_failed_job(rq_job: RQJob):
exc_info = str(rq_job.exc_info or '')
rq_job.delete()

Expand All @@ -163,41 +164,50 @@ def define_dependent_job(
user_id: int,
should_be_dependent: bool = settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER,
*,
rq_id: Optional[str] = None,
) -> Optional[Dependency]:
rq_id: str | None = None,
) -> RQDependency | None:
if not should_be_dependent:
return None

queues = [queue.deferred_job_registry, queue, queue.started_job_registry]
queues: list[RQBaseRegistry | DjangoRQ] = [queue.deferred_job_registry, queue, queue.started_job_registry]
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True]
all_user_jobs = []
all_user_jobs: list[RQJob] = []
for q, f in zip(queues, filters):
job_ids = q.get_job_ids()
jobs = q.job_class.fetch_many(job_ids, q.connection)
jobs = filter(lambda job: job and job.meta.get("user", {}).get("id") == user_id and f(job), jobs)
all_user_jobs.extend(jobs)

# prevent possible cyclic dependencies
if rq_id:
# Prevent cases where an RQ job depends on itself.
# It isn't possible to have multiple RQ jobs with the same ID in Redis.
# However, it is possible to achieve a situation
# where 2 parallel requests try to enqueue RQ jobs with the same ID
# if an rq_job with a defined ID is fetched without a lock,
# but a lock is used when defining the dependent job.
if rq_id in {job.id for job in all_user_jobs}:
return None

# prevent possible cyclic dependencies
all_job_dependency_ids = {
dep_id.decode()
for job in all_user_jobs
for dep_id in job.dependency_ids or ()
}

if Job.redis_job_namespace_prefix + rq_id in all_job_dependency_ids:
if RQJob.redis_job_namespace_prefix + rq_id in all_job_dependency_ids:
return None

user_jobs = [
job for job in all_user_jobs
if not job.meta.get(KEY_TO_EXCLUDE_FROM_DEPENDENCY)
]

return Dependency(jobs=[sorted(user_jobs, key=lambda job: job.created_at)[-1]], allow_failure=True) if user_jobs else None
return RQDependency(jobs=[sorted(user_jobs, key=lambda job: job.created_at)[-1]], allow_failure=True) if user_jobs else None


def get_rq_lock_by_user(queue: DjangoRQ, user_id: int, *, timeout: Optional[int] = 30, blocking_timeout: Optional[int] = None) -> Union[Lock, nullcontext]:
Expand Down
Loading

0 comments on commit 7e97810

Please sign in to comment.