-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtasks.py
61 lines (51 loc) · 2.1 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""
product definitions collector
"""
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from django.utils import timezone
from collectors.framework.models import collector
from osidb.models import PsContact, PsModule, PsProduct, PsUpdateStream
from .core import (
PRODUCT_DEFINITIONS_URL,
fetch_product_definitions,
sanitize_product_definitions,
sync_product_definitions,
)
logger = get_task_logger(__name__)
@collector(
# Execute this every 3 hours
# TODO: crontab seems to be not sufficient as a scheduler here
# since it is only capable of running the job at every fixed third hour
# eg. 3:00,6:00,9:00,etc. and thus there exist a scenario in which
# the OSIDB is run lets say 3:01 and this job will be scheduled on 6:00
# which is really not what we want, since there may be other collectors
# depending on this one
# TODO: Use django_celery_beat which has PeriodicTask with IntervalSchedule
# What we use here is equivalent to PeriodicTask with CrontabSchedule
crontab=crontab(minute="0", hour="*/3"),
data_models=[PsContact, PsModule, PsProduct, PsUpdateStream],
)
def product_definitions_collector(collector_obj) -> None:
"""product definitions collector"""
# Fetch raw json data from GitLab
logger.info(f"Fetching Product Definitions from '{PRODUCT_DEFINITIONS_URL}'")
raw_data = fetch_product_definitions()
(
ps_products,
ps_modules,
ps_update_streams,
ps_contacts,
) = sanitize_product_definitions(raw_data)
logger.info(
(
f"Fetched {len(ps_products)} PS Products, {len(ps_modules)} PS Modules, "
f"{len(ps_update_streams)} PS Update Streams and "
f"{len(ps_contacts)} PS Contacts, going to sync."
)
)
# Sync all product definitions in a single transaction
sync_product_definitions(ps_products, ps_modules, ps_update_streams, ps_contacts)
collector_obj.store(updated_until_dt=timezone.now())
logger.info("Product Definitions sync was successful.")
return f"The run of {collector_obj.name} finished."