Skip to content

Commit

Permalink
Refactor DAG not found handling for backfills (#45707)
Browse files Browse the repository at this point in the history
Currently, we are getting HTTP500 with FastAPI when we try to create a backfill for non exiting DAG. This better handles it better.
  • Loading branch information
vatsrahul1001 authored Jan 17, 2025
1 parent f3666e7 commit b8ba8c5
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
13 changes: 12 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.exceptions import DagNotFound
from airflow.models import DagRun
from airflow.models.backfill import (
AlreadyRunningBackfill,
Expand Down Expand Up @@ -216,6 +217,12 @@ def create_backfill(
detail=f"{backfill_request.dag_id} has no schedule",
)

except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {backfill_request.dag_id}",
)


@backfills_router.post(
path="/dry_run",
Expand Down Expand Up @@ -245,7 +252,11 @@ def create_backfill_dry_run(
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]

return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))

except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {body.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
Expand Down
7 changes: 4 additions & 3 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
from sqlalchemy.orm import relationship, validates
from sqlalchemy_jsonfield import JSONField

from airflow.api_connexion.exceptions import NotFound
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models.base import Base, StringID
from airflow.models.dag_version import DagVersion
from airflow.settings import json
Expand Down Expand Up @@ -209,6 +208,8 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess
from airflow.models.serialized_dag import SerializedDagModel

serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
if not serdag:
raise DagNotFound(f"Could not find dag {dag_id}")
dag = serdag.dag
_validate_backfill_params(dag, reverse, reprocess_behavior)

Expand Down Expand Up @@ -331,7 +332,7 @@ def _create_backfill(
with create_session() as session:
serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
if not serdag:
raise NotFound(f"Could not find dag {dag_id}")
raise DagNotFound(f"Could not find dag {dag_id}")
no_schedule = session.scalar(
select(func.count()).where(DagModel.timetable_summary == "None", DagModel.dag_id == dag_id)
)
Expand Down
25 changes: 25 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,31 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl
"updated_at": mock.ANY,
}

def test_dag_not_exist(self, session, test_client):
session.query(DagModel).all()
session.commit()
from_date = pendulum.parse("2024-01-01")
from_date_iso = to_iso(from_date)
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_iso(to_date)
max_active_runs = 5
data = {
"dag_id": "DAG_NOT_EXIST",
"from_date": f"{from_date_iso}",
"to_date": f"{to_date_iso}",
"max_active_runs": max_active_runs,
"run_backwards": False,
"dag_run_conf": {"param1": "val1", "param2": True},
"dry_run": False,
"reprocess_behavior": ReprocessBehavior.NONE,
}
response = test_client.post(
url="/public/backfills",
json=data,
)
assert response.status_code == 404
assert response.json().get("detail") == "Could not find dag DAG_NOT_EXIST"

def test_no_schedule_dag(self, session, dag_maker, test_client):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="None") as dag:
EmptyOperator(task_id="mytask")
Expand Down

0 comments on commit b8ba8c5

Please sign in to comment.