Skip to content

Commit

Permalink
Handle backfill for DAGs with no schedule (#45709)
Browse files Browse the repository at this point in the history
  • Loading branch information
vatsrahul1001 authored Jan 16, 2025
1 parent 73342f9 commit a63b652
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 11 deletions.
33 changes: 23 additions & 10 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
AlreadyRunningBackfill,
Backfill,
BackfillDagRun,
DagNoScheduleException,
_create_backfill,
_do_dry_run,
)
Expand Down Expand Up @@ -209,6 +210,11 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag {backfill_request.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{backfill_request.dag_id} has no schedule",
)


@backfills_router.post(
Expand All @@ -227,14 +233,21 @@ def create_backfill_dry_run(
from_date = timezone.coerce_datetime(body.from_date)
to_date = timezone.coerce_datetime(body.to_date)

backfills_dry_run = _do_dry_run(
dag_id=body.dag_id,
from_date=from_date,
to_date=to_date,
reverse=body.run_backwards,
reprocess_behavior=body.reprocess_behavior,
session=session,
)
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]
try:
backfills_dry_run = _do_dry_run(
dag_id=body.dag_id,
from_date=from_date,
to_date=to_date,
reverse=body.run_backwards,
reprocess_behavior=body.reprocess_behavior,
session=session,
)
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]

return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))

return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{body.dag_id} has no schedule",
)
22 changes: 21 additions & 1 deletion airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ class AlreadyRunningBackfill(AirflowException):
"""


class DagNoScheduleException(AirflowException):
"""
Raised when attempting to create backfill for a DAG with no schedule.
:meta private:
"""


class ReprocessBehavior(str, Enum):
"""
Internal enum for setting reprocess behavior in a backfill.
Expand Down Expand Up @@ -197,12 +205,19 @@ def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavio


def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, session) -> list[datetime]:
from airflow.models import DagModel
from airflow.models.serialized_dag import SerializedDagModel

serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
dag = serdag.dag
_validate_backfill_params(dag, reverse, reprocess_behavior)

no_schedule = session.scalar(
select(func.count()).where(DagModel.timetable_summary == "None", DagModel.dag_id == dag_id)
)
if no_schedule:
raise DagNoScheduleException(f"{dag_id} has no schedule")

dagrun_info_list = _get_info_list(
dag=dag,
from_date=from_date,
Expand Down Expand Up @@ -317,7 +332,12 @@ def _create_backfill(
serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
if not serdag:
raise NotFound(f"Could not find dag {dag_id}")
# todo: if dag has no schedule, raise
no_schedule = session.scalar(
select(func.count()).where(DagModel.timetable_summary == "None", DagModel.dag_id == dag_id)
)
if no_schedule:
raise DagNoScheduleException(f"{dag_id} has no schedule")

num_active = session.scalar(
select(func.count()).where(
Backfill.dag_id == dag_id,
Expand Down
27 changes: 27 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,33 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl
"updated_at": mock.ANY,
}

def test_no_schedule_dag(self, session, dag_maker, test_client):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="None") as dag:
EmptyOperator(task_id="mytask")
session.query(DagModel).all()
session.commit()
from_date = pendulum.parse("2024-01-01")
from_date_iso = to_iso(from_date)
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_iso(to_date)
max_active_runs = 5
data = {
"dag_id": dag.dag_id,
"from_date": f"{from_date_iso}",
"to_date": f"{to_date_iso}",
"max_active_runs": max_active_runs,
"run_backwards": False,
"dag_run_conf": {"param1": "val1", "param2": True},
"dry_run": False,
"reprocess_behavior": ReprocessBehavior.NONE,
}
response = test_client.post(
url="/public/backfills",
json=data,
)
assert response.status_code == 409
assert response.json().get("detail") == f"{dag.dag_id} has no schedule"


class TestCreateBackfillDryRun(TestBackfillEndpoint):
@pytest.mark.parametrize(
Expand Down

0 comments on commit a63b652

Please sign in to comment.