Skip to content

Commit

Permalink
Use the backend-configured model (apache#12336)
Browse files Browse the repository at this point in the history
Rather than import the backend Task model directly, use the class that the backend actually uses. This could have been customised, and there is no reason not to use this reference.
  • Loading branch information
mjpieters authored Nov 13, 2020
1 parent 458ad93 commit d54f087
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from celery import Celery, Task, states as celery_states
from celery.backends.base import BaseKeyValueStoreBackend
from celery.backends.database import DatabaseBackend, Task as TaskDb, session_cleanup
from celery.backends.database import DatabaseBackend, session_cleanup
from celery.result import AsyncResult
from celery.signals import import_modules as celery_import_modules
from setproctitle import setproctitle # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -562,8 +562,9 @@ def _get_many_from_kv_backend(self, async_tasks) -> Mapping[str, EventBufferValu
def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]:
task_ids = _tasks_list_to_task_ids(async_tasks)
session = app.backend.ResultSession()
task_cls = app.backend.task_cls
with session_cleanup(session):
tasks = session.query(TaskDb).filter(TaskDb.task_id.in_(task_ids)).all()
tasks = session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()

task_results = [app.backend.meta_from_decoded(task.to_dict()) for task in tasks]
task_results_by_task_id = {task_result["task_id"]: task_result for task_result in task_results}
Expand Down

0 comments on commit d54f087

Please sign in to comment.