Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(db-scheduler): optimize all_as_schedule query #835

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions django_celery_beat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import close_old_connections, transaction
from django.db.models import Q
from django.db.utils import DatabaseError, InterfaceError
from django.utils import timezone
from kombu.utils.encoding import safe_repr, safe_str
from kombu.utils.json import dumps, loads

from .clockedschedule import clocked
from .models import (ClockedSchedule, CrontabSchedule, IntervalSchedule,
PeriodicTask, PeriodicTasks, SolarSchedule)
from .utils import NEVER_CHECK_TIMEOUT
from .utils import NEVER_CHECK_TIMEOUT, now

# This scheduler must wake up more frequently than the
# regular of 5 minutes because it needs to take external
Expand Down Expand Up @@ -251,7 +253,17 @@ def setup_schedule(self):
def all_as_schedule(self):
alirafiei75 marked this conversation as resolved.
Show resolved Hide resolved
debug('DatabaseScheduler: Fetching database schedule')
s = {}
for model in self.Model.objects.enabled():
next_five_minutes = now() + datetime.timedelta(minutes=5)
exclude_clock_tasks_query = Q(
clocked__isnull=False, clocked__clocked_time__gt=next_five_minutes
)
exclude_hours = self.get_excluded_hours_for_crontab_tasks()
exclude_cron_tasks_query = Q(
crontab__isnull=False, crontab__hour__in=exclude_hours
)
for model in self.Model.objects.enabled().exclude(
exclude_clock_tasks_query | exclude_cron_tasks_query
):
try:
s[model.name] = self.Entry(model, app=self.app)
except ValueError:
Expand Down Expand Up @@ -377,3 +389,32 @@ def schedule(self):
repr(entry) for entry in self._schedule.values()),
)
return self._schedule

@staticmethod
alirafiei75 marked this conversation as resolved.
Show resolved Hide resolved
def get_excluded_hours_for_crontab_tasks():
# Generate the full list of allowed hours for crontabs
allowed_crontab_hours = [
str(hour).zfill(2) for hour in range(24)
] + [
str(hour) for hour in range(10)
]

# Get current, next, and previous hours
current_time = timezone.localtime(now())
current_hour = current_time.hour
next_hour = (current_hour + 1) % 24
previous_hour = (current_hour - 1) % 24

# Create a set of hours to remove (both padded and non-padded versions)
hours_to_remove = {
str(current_hour).zfill(2), str(current_hour),
str(next_hour).zfill(2), str(next_hour),
str(previous_hour).zfill(2), str(previous_hour),
str(4), "04", # celery's default cleanup task
}

# Filter out 'should be considered' hours
return [
hour for hour in allowed_crontab_hours
if hour not in hours_to_remove
]
77 changes: 66 additions & 11 deletions t/unit/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,22 +456,67 @@ def setup_scheduler(self, app):
self.m4.save()
self.m4.refresh_from_db()

dt_aware = make_aware(datetime(day=26,
month=7,
year=3000,
hour=1,
minute=0)) # future time
# disabled, should not be in schedule
self.m5 = self.create_model_interval(
schedule(timedelta(seconds=1)))
self.m5.enabled = False
self.m5.save()

# near future time (should be in schedule)
now = datetime.now()
two_minutes_later = now + timedelta(minutes=2)
dt_aware = make_aware(
datetime(
day=two_minutes_later.day,
month=two_minutes_later.month,
year=two_minutes_later.year,
hour=two_minutes_later.hour,
minute=two_minutes_later.minute
)
)
self.m6 = self.create_model_clocked(
clocked(dt_aware)
)
self.m6.save()
self.m6.refresh_from_db()

# disabled, should not be in schedule
m5 = self.create_model_interval(
schedule(timedelta(seconds=1)))
m5.enabled = False
m5.save()
# distant future time (should not be in schedule)
ten_minutes_later = now + timedelta(minutes=10)
distant_dt_aware = make_aware(
datetime(
day=ten_minutes_later.day,
month=ten_minutes_later.month,
year=ten_minutes_later.year,
hour=ten_minutes_later.hour,
minute=ten_minutes_later.minute
)
)
self.m7 = self.create_model_clocked(
clocked(distant_dt_aware)
)
self.m7.save()
self.m7.refresh_from_db()

now_hour = timezone.localtime(timezone.now()).hour
# near future time (should be in schedule)
self.m8 = self.create_model_crontab(
crontab(hour=str(now_hour)))
self.m8.save()
self.m8.refresh_from_db()
self.m9 = self.create_model_crontab(
crontab(hour=str((now_hour + 1) % 24)))
self.m9.save()
self.m9.refresh_from_db()
self.m10 = self.create_model_crontab(
crontab(hour=str((now_hour - 1) % 24)))
self.m10.save()
self.m10.refresh_from_db()

# distant future time (should not be in schedule)
self.m11 = self.create_model_crontab(
crontab(hour=str((now_hour + 2) % 24)))
self.m11.save()
self.m11.refresh_from_db()

self.s = self.Scheduler(app=self.app)

Expand All @@ -483,11 +528,21 @@ def test_constructor(self):
def test_all_as_schedule(self):
sched = self.s.schedule
assert sched
assert len(sched) == 6
assert len(sched) == 9
assert 'celery.backend_cleanup' in sched
for n, e in sched.items():
assert isinstance(e, self.s.Entry)

def test_get_excluded_hours_for_crontab_tasks(self):
now_hour = timezone.localtime(timezone.now()).hour
excluded_hours = self.s.get_excluded_hours_for_crontab_tasks()

assert str(now_hour) not in excluded_hours
assert str((now_hour + 1) % 24) not in excluded_hours
assert str((now_hour - 1) % 24) not in excluded_hours
assert str((now_hour + 2) % 24) in excluded_hours
assert str((now_hour - 2) % 24) in excluded_hours

def test_schedule_changed(self):
self.m2.args = '[16, 16]'
self.m2.save()
Expand Down