Skip to content

Commit

Permalink
Change manual run data interval behavior based on logical_date (apach…
Browse files Browse the repository at this point in the history
…e#46512)

* Add logic for if logical date null, there should be no data interval

* When logical_date is None, data_interval should be None

* fix the tests

* fix the tests

* fix the tests

* fix the assignment

* Fix the PR comments
  • Loading branch information
sunank200 authored Feb 13, 2025
1 parent 8dd900b commit f50f1ce
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 50 deletions.
2 changes: 1 addition & 1 deletion airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _trigger_dag(
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
coerced_logical_date = timezone.coerce_datetime(logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=run_after)
else:
coerced_logical_date = None
data_interval = None
Expand Down
19 changes: 10 additions & 9 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,16 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:

data_interval_start = post_body.get("data_interval_start")
data_interval_end = post_body.get("data_interval_end")
if data_interval_start and data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(data_interval_start),
end=pendulum.instance(data_interval_end),
)
else:
data_interval = (
dag.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None
)
data_interval = None
if logical_date:
if data_interval_start and data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(data_interval_start),
end=pendulum.instance(data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=run_after)

dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
Expand Down
2 changes: 0 additions & 2 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ def check_data_intervals(cls, values):
)
return values

## when logical date is null, the run id should be generated from run_after + random string.
# TODO: AIP83: we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged
@model_validator(mode="after")
def validate_dag_run_id(self):
if not self.dag_run_id:
Expand Down
18 changes: 8 additions & 10 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,20 +362,18 @@ def trigger_dag_run(

try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)

if body.data_interval_start and body.data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(body.data_interval_start),
end=pendulum.instance(body.data_interval_end),
)
else:
if body.logical_date:
data_interval = None
if body.logical_date:
if body.data_interval_start and body.data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(body.data_interval_start),
end=pendulum.instance(body.data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(
run_after=coerced_logical_date or run_after
)
run_after = data_interval.end
else:
data_interval = None

if body.dag_run_id:
run_id = body.dag_run_id
Expand Down
16 changes: 9 additions & 7 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
from pathlib import Path
from typing import TYPE_CHECKING, Protocol, cast

import pendulum

from airflow import settings
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
Expand Down Expand Up @@ -129,28 +127,32 @@ def _get_dag_run(
f"of {logical_date_or_run_id!r} not found"
)

dag_run_logical_date = pendulum.instance(logical_date or timezone.utcnow())
dag_run_logical_date = timezone.coerce_datetime(logical_date)
data_interval = (
dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
if dag_run_logical_date
else None
)
run_after = data_interval.end if data_interval else timezone.utcnow()
if create_if_necessary == "memory":
data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = DagRun(
dag_id=dag.dag_id,
run_id=logical_date_or_run_id,
run_type=DagRunType.MANUAL,
external_trigger=True,
logical_date=dag_run_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
triggered_by=DagRunTriggeredByType.CLI,
state=DagRunState.RUNNING,
)
return dag_run, True
elif create_if_necessary == "db":
data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = dag.create_dagrun(
run_id=_generate_temporary_run_id(),
logical_date=dag_run_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.CLI,
dag_version=None,
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def _create_backfill_dag_run(
run_type=DagRunType.BACKFILL_JOB, logical_date=info.logical_date, run_after=info.run_after
),
logical_date=info.logical_date,
data_interval=info.data_interval,
data_interval=info.data_interval if info.logical_date else None,
run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ def test(
"""
Execute one single DagRun for a given DAG and logical date.
:param run_after: the datetime before which to Dag won't run.
:param run_after: the datetime before which to Dag cannot run.
:param logical_date: logical date for the DAG run
:param run_conf: configuration to pass to newly created dagrun
:param conn_file_path: file path to a connection file in either yaml or json
Expand Down Expand Up @@ -1786,6 +1786,9 @@ def create_dagrun(
:meta private:
"""
logical_date = timezone.coerce_datetime(logical_date)
# For manual runs where logical_date is None, ensure no data_interval is set.
if logical_date is None and data_interval is not None:
raise ValueError("data_interval must be None when logical_date is None")

if data_interval and not isinstance(data_interval, DataInterval):
data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval))
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,15 @@ def __init__(
dag_version: DagVersion | None = None,
bundle_version: str | None = None,
):
# For manual runs where logical_date is None, ensure no data_interval is set.
if logical_date is None and data_interval is not None:
raise ValueError("data_interval must be None if logical_date is None")

if data_interval is None:
# Legacy: Only happen for runs created prior to Airflow 2.2.
self.data_interval_start = self.data_interval_end = None
else:
self.data_interval_start, self.data_interval_end = data_interval

self.bundle_version = bundle_version
self.dag_id = dag_id
self.run_id = run_id
Expand Down
7 changes: 7 additions & 0 deletions airflow/timetables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,11 @@ def generate_run_id(
data_interval: DataInterval | None,
**extra,
) -> str:
"""
Generate a unique run ID.
:param run_type: The type of DAG run.
:param run_after: the datetime before which to Dag cannot run.
:param data_interval: The data interval of the DAG run.
"""
return run_type.generate_run_id(suffix=run_after.isoformat())
9 changes: 7 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2238,7 +2238,12 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION):
"warning",
)

data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date or run_after)
if logical_date:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date or run_after)
run_after = data_interval.end
else:
data_interval = None

if not run_id:
run_id = DagRun.generate_run_id(
run_type=DagRunType.MANUAL,
Expand All @@ -2251,7 +2256,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION):
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.UI,
Expand Down
11 changes: 9 additions & 2 deletions tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,12 @@ def _dags_for_trigger_tests(self, session=None):
[
("dag_run_5", "test-note", None, None),
("dag_run_6", "test-note", "2024-01-03T00:00:00+00:00", "2024-01-04T05:00:00+00:00"),
(None, None, None, None),
(
None,
None,
None,
None,
),
],
)
def test_should_respond_200(
Expand All @@ -1157,6 +1162,7 @@ def test_should_respond_200(
request_json["data_interval_start"] = data_interval_start
if data_interval_end is not None:
request_json["data_interval_end"] = data_interval_end
request_json["logical_date"] = fixed_now

response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
Expand All @@ -1174,13 +1180,14 @@ def test_should_respond_200(
if data_interval_start is not None and data_interval_end is not None:
expected_data_interval_start = data_interval_start.replace("+00:00", "Z")
expected_data_interval_end = data_interval_end.replace("+00:00", "Z")
expected_logical_date = fixed_now.replace("+00:00", "Z")

expected_response_json = {
"conf": {},
"dag_id": DAG1_ID,
"dag_run_id": expected_dag_run_id,
"end_date": None,
"logical_date": fixed_now.replace("+00:00", "Z"),
"logical_date": expected_logical_date,
"run_after": fixed_now.replace("+00:00", "Z"),
"external_trigger": True,
"start_date": None,
Expand Down
3 changes: 2 additions & 1 deletion tests/cli/commands/remote_commands/test_asset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None:
"data_interval_start": None,
"logical_date": None,
"queued_at": None,
"run_after": None,
"run_after": "2025-02-12T19:27:59.066046Z",
}

assert run_list[0] | undeterministic == undeterministic | {
Expand All @@ -146,4 +146,5 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None:
"start_date": None,
"state": "queued",
"triggered_by": "cli",
"run_after": "2025-02-12T19:27:59.066046Z",
}
12 changes: 5 additions & 7 deletions tests/cli/commands/remote_commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ def test_trigger_dag(self):
"trigger",
"example_bash_operator",
"--run-id=test_trigger_dag",
"--exec-date=2021-06-04T09:00:00+08:00",
'--conf={"foo": "bar"}',
],
),
Expand All @@ -442,13 +441,12 @@ def test_trigger_dag(self):
assert dagrun.external_trigger
assert dagrun.conf == {"foo": "bar"}

# Coerced to UTC.
assert dagrun.logical_date.isoformat(timespec="seconds") == "2021-06-04T01:00:00+00:00"
# logical_date is None as it's not provided
assert dagrun.logical_date is None

# example_bash_operator runs every day at midnight, so the data interval
# should be aligned to the previous day.
assert dagrun.data_interval_start.isoformat(timespec="seconds") == "2021-06-03T00:00:00+00:00"
assert dagrun.data_interval_end.isoformat(timespec="seconds") == "2021-06-04T00:00:00+00:00"
# data_interval is None as logical_date is None
assert dagrun.data_interval_start is None
assert dagrun.data_interval_end is None

def test_trigger_dag_with_microseconds(self):
dag_command.dag_trigger(
Expand Down
9 changes: 3 additions & 6 deletions tests/cli/commands/remote_commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,16 @@ def test_test(self):
# Check that prints, and log messages, are shown
assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue()

@mock.patch("airflow.utils.timezone.utcnow")
def test_test_no_logical_date(self, mock_utcnow):
def test_test_no_logical_date(self):
"""Test the `airflow test` command"""
now = pendulum.now("UTC")
mock_utcnow.return_value = now
ds = now.strftime("%Y%m%d")
args = self.parser.parse_args(["tasks", "test", "example_python_operator", "print_the_context"])

with redirect_stdout(io.StringIO()) as stdout:
task_command.task_test(args)

# Check that prints, and log messages, are shown
assert f"'example_python_operator__print_the_context__{ds}'" in stdout.getvalue()
assert "example_python_operator" in stdout.getvalue()
assert "print_the_context" in stdout.getvalue()

def test_cli_test_different_path(self, session, tmp_path):
"""
Expand Down
12 changes: 12 additions & 0 deletions tests/www/views/test_views_trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ def test_trigger_dag_logical_date_data_interval(admin_client):
assert run.data_interval_end == today_midnight


def test_trigger_dag_logical_date_as_none(admin_client):
test_dag_id = "example_bash_operator"

admin_client.post(f"dags/{test_dag_id}/trigger", data={"conf": "{}"})

with create_session() as session:
run = session.query(DagRun).filter(DagRun.dag_id == test_dag_id).first()
assert run is not None
assert DagRunType.MANUAL in run.run_id
assert run.run_type == DagRunType.MANUAL


def test_trigger_dag_form(admin_client):
test_dag_id = "example_bash_operator"
resp = admin_client.get(f"dags/{test_dag_id}/trigger")
Expand Down

0 comments on commit f50f1ce

Please sign in to comment.