diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index 08ee1726ccc22..8a90438af700a 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -42,6 +42,7 @@ def _trigger_dag( dag_bag: DagBag, *, triggered_by: DagRunTriggeredByType, + run_after: datetime, run_id: str | None = None, conf: dict | str | None = None, logical_date: datetime | None = None, @@ -54,6 +55,7 @@ def _trigger_dag( :param dag_id: DAG ID :param dag_bag: DAG Bag model :param triggered_by: the entity which triggers the dag_run + :param run_after: the datetime before which dag cannot run. :param run_id: ID of the run :param conf: configuration :param logical_date: logical date of the run @@ -65,26 +67,30 @@ def _trigger_dag( if dag is None or dag_id not in dag_bag.dags: raise DagNotFound(f"Dag id {dag_id} not found") - logical_date = logical_date or timezone.utcnow() - - if not timezone.is_localized(logical_date): - raise ValueError("The logical date should be localized") - - if replace_microseconds: - logical_date = logical_date.replace(microsecond=0) - - if dag.default_args and "start_date" in dag.default_args: - min_dag_start_date = dag.default_args["start_date"] - if min_dag_start_date and logical_date < min_dag_start_date: - raise ValueError( - f"Logical date [{logical_date.isoformat()}] should be >= start_date " - f"[{min_dag_start_date.isoformat()}] from DAG's default_args" - ) - coerced_logical_date = timezone.coerce_datetime(logical_date) - - data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date) - run_id = run_id or dag.timetable.generate_run_id( - run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, data_interval=data_interval + if logical_date: + if not timezone.is_localized(logical_date): + raise ValueError("The logical date should be localized") + + if replace_microseconds: + logical_date = logical_date.replace(microsecond=0) + + if dag.default_args and "start_date" in dag.default_args: + min_dag_start_date = dag.default_args["start_date"] + if min_dag_start_date and logical_date < min_dag_start_date: + raise ValueError( + f"Logical date [{logical_date.isoformat()}] should be >= start_date " + f"[{min_dag_start_date.isoformat()}] from DAG's default_args" + ) + coerced_logical_date = timezone.coerce_datetime(logical_date) + data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date) + else: + coerced_logical_date = None + data_interval = None + + run_id = run_id or DagRun.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=coerced_logical_date, + run_after=timezone.coerce_datetime(run_after), ) # This intentionally does not use 'session' in the current scope because it @@ -102,7 +108,7 @@ def _trigger_dag( run_id=run_id, logical_date=logical_date, data_interval=data_interval, - run_after=data_interval.end, + run_after=run_after, conf=run_conf, run_type=DagRunType.MANUAL, triggered_by=triggered_by, @@ -120,6 +126,7 @@ def trigger_dag( dag_id: str, *, triggered_by: DagRunTriggeredByType, + run_after: datetime | None = None, run_id: str | None = None, conf: dict | str | None = None, logical_date: datetime | None = None, @@ -131,6 +138,7 @@ def trigger_dag( :param dag_id: DAG ID :param triggered_by: the entity which triggers the dag_run + :param run_after: the datetime before which dag won't run. :param run_id: ID of the dag_run :param conf: configuration :param logical_date: date of execution @@ -147,6 +155,7 @@ def trigger_dag( dag_id=dag_id, dag_bag=dagbag, run_id=run_id, + run_after=run_after or timezone.utcnow(), conf=conf, logical_date=logical_date, replace_microseconds=replace_microseconds, diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 28e85cfe61a2a..cddf67261d5c5 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -327,7 +327,8 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: except ValidationError as err: raise BadRequest(detail=str(err)) - logical_date = pendulum.instance(post_body["logical_date"]) + logical_date = pendulum.instance(post_body["logical_date"]) if post_body.get("logical_date") else None + run_after = pendulum.instance(post_body["run_after"]) run_id = post_body["run_id"] dagrun_instance = session.scalar( select(DagRun) @@ -352,12 +353,14 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: end=pendulum.instance(data_interval_end), ) else: - data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + data_interval = ( + dag.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None + ) dag_run = dag.create_dagrun( run_id=run_id, logical_date=logical_date, data_interval=data_interval, - run_after=data_interval.end, + run_after=run_after, conf=post_body.get("conf"), run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.REST_API, diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index c2560613def70..8b3a8df54c8c4 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -63,7 +63,8 @@ class Meta: run_id = auto_field(data_key="dag_run_id") dag_id = auto_field(dump_only=True) - logical_date = auto_field(data_key="logical_date", validate=validate_istimezone) + logical_date = auto_field(data_key="logical_date", allow_none=True, validate=validate_istimezone) + run_after = auto_field(data_key="run_after", validate=validate_istimezone) start_date = auto_field(dump_only=True) end_date = auto_field(dump_only=True) state = DagStateField(dump_only=True) @@ -78,17 +79,23 @@ class Meta: @pre_load def autogenerate(self, data, **kwargs): - """Auto generate run_id and logical_date if they are not provided.""" - logical_date = data.get("logical_date", _MISSING) + """Auto generate run_id and run_after if they are not provided.""" + run_after = data.get("run_after", _MISSING) - # Auto-generate logical_date if missing - if logical_date is _MISSING: - data["logical_date"] = str(timezone.utcnow()) + # Auto-generate run_after if missing + if run_after is _MISSING: + data["run_after"] = str(timezone.utcnow()) if "dag_run_id" not in data: try: + if logical_date_str := data.get("logical_date"): + logical_date = timezone.parse(logical_date_str) + else: + logical_date = None data["dag_run_id"] = DagRun.generate_run_id( - DagRunType.MANUAL, timezone.parse(data["logical_date"]) + run_type=DagRunType.MANUAL, + logical_date=logical_date, + run_after=timezone.parse(data["run_after"]), ) except (ParserError, TypeError) as err: raise BadRequest("Incorrect datetime argument", detail=str(err)) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 18be129a1952c..295eedca4240b 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -20,11 +20,11 @@ from datetime import datetime from enum import Enum -import pendulum from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel from airflow.models import DagRun +from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -82,9 +82,11 @@ class TriggerDAGRunPostBody(StrictBaseModel): """Trigger DAG Run Serializer for POST body.""" dag_run_id: str | None = None - logical_date: AwareDatetime | None data_interval_start: AwareDatetime | None = None data_interval_end: AwareDatetime | None = None + logical_date: AwareDatetime | None + run_after: datetime = Field(default_factory=timezone.utcnow) + conf: dict = Field(default_factory=dict) note: str | None = None @@ -102,7 +104,7 @@ def check_data_intervals(cls, values): def validate_dag_run_id(self): if not self.dag_run_id: self.dag_run_id = DagRun.generate_run_id( - DagRunType.MANUAL, self.logical_date or pendulum.now("UTC") + run_type=DagRunType.MANUAL, logical_date=self.logical_date, run_after=self.run_after ) return self diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 6d2223e6c25f2..ade0da25b5dce 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -10501,12 +10501,6 @@ components: - type: string - type: 'null' title: Dag Run Id - logical_date: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Logical Date data_interval_start: anyOf: - type: string @@ -10519,6 +10513,16 @@ components: format: date-time - type: 'null' title: Data Interval End + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date + run_after: + type: string + format: date-time + title: Run After conf: type: object title: Conf diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 19bf526efc98d..afce667e8cfb0 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -347,7 +347,6 @@ def trigger_dag_run( ) -> DAGRunResponse: """Trigger a DAG.""" dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1)) - now = pendulum.now("UTC") if not dm: raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found") @@ -359,6 +358,7 @@ def trigger_dag_run( logical_date = timezone.coerce_datetime(body.logical_date) coerced_logical_date = timezone.coerce_datetime(logical_date) + run_after = timezone.coerce_datetime(body.run_after) try: dag: DAG = request.app.state.dag_bag.get_dag(dag_id) @@ -369,13 +369,26 @@ def trigger_dag_run( end=pendulum.instance(body.data_interval_end), ) else: - data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now) + if body.logical_date: + data_interval = dag.timetable.infer_manual_data_interval( + run_after=coerced_logical_date or run_after + ) + run_after = data_interval.end + else: + data_interval = None + + if body.dag_run_id: + run_id = body.dag_run_id + else: + run_id = DagRun.generate_run_id( + run_type=DagRunType.SCHEDULED, logical_date=coerced_logical_date, run_after=run_after + ) dag_run = dag.create_dagrun( - run_id=cast(str, body.dag_run_id), + run_id=run_id, logical_date=coerced_logical_date, data_interval=data_interval, - run_after=data_interval.end, + run_after=run_after, conf=body.conf, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.REST_API, diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index df1cb5ab52907..f340893e5aef7 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1286,7 +1286,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.SCHEDULED, - logical_date=dag_model.next_dagrun, + run_after=dag_model.next_dagrun, data_interval=data_interval, ), logical_date=dag_model.next_dagrun, @@ -1394,12 +1394,10 @@ def _create_dag_runs_asset_triggered( data_interval = dag.timetable.data_interval_for_events(logical_date, asset_events) dag_run = dag.create_dagrun( - run_id=dag.timetable.generate_run_id( + run_id=DagRun.generate_run_id( run_type=DagRunType.ASSET_TRIGGERED, logical_date=logical_date, - data_interval=data_interval, - session=session, - events=asset_events, + run_after=max(logical_dates.values()), ), logical_date=logical_date, data_interval=data_interval, diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 735f09f3a3f74..b4b15a6e613c7 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -286,6 +286,8 @@ def _create_backfill_dag_run( backfill_sort_ordinal, session, ): + from airflow.models.dagrun import DagRun + with session.begin_nested(): should_skip_create_backfill = should_create_backfill_dag_run( info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session @@ -296,10 +298,8 @@ def _create_backfill_dag_run( dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) try: dr = dag.create_dagrun( - run_id=dag.timetable.generate_run_id( - run_type=DagRunType.BACKFILL_JOB, - logical_date=info.logical_date, - data_interval=info.data_interval, + run_id=DagRun.generate_run_id( + run_type=DagRunType.BACKFILL_JOB, logical_date=info.logical_date, run_after=info.run_after ), logical_date=info.logical_date, data_interval=info.data_interval, diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 3361fe33df6f7..ea6a7c34b1149 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -628,7 +628,11 @@ def run( # This is _mostly_ only used in tests dr = DagRun( dag_id=self.dag_id, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date), + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=info.logical_date, + run_after=info.run_after, + ), run_type=DagRunType.MANUAL, logical_date=info.logical_date, data_interval=info.data_interval, diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 7ebbb48b4b40d..111852f2728ee 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1602,6 +1602,7 @@ def cli(self): @provide_session def test( self, + run_after: datetime | None = None, logical_date: datetime | None = None, run_conf: dict[str, Any] | None = None, conn_file_path: str | None = None, @@ -1613,6 +1614,7 @@ def test( """ Execute one single DagRun for a given DAG and logical date. + :param run_after: the datetime before which to Dag won't run. :param logical_date: logical date for the DAG run :param run_conf: configuration to pass to newly created dagrun :param conn_file_path: file path to a connection file in either yaml or json @@ -1652,7 +1654,6 @@ def add_logger_if_needed(ti: TaskInstance): exit_stack.callback(lambda: secrets_backend_list.pop(0)) with exit_stack: - logical_date = logical_date or timezone.utcnow() self.validate() self.log.debug("Clearing existing task instances for logical date %s", logical_date) self.clear( @@ -1663,16 +1664,23 @@ def add_logger_if_needed(ti: TaskInstance): ) self.log.debug("Getting dagrun for dag %s", self.dag_id) logical_date = timezone.coerce_datetime(logical_date) - data_interval = self.timetable.infer_manual_data_interval(run_after=logical_date) + run_after = timezone.coerce_datetime(run_after) or timezone.coerce_datetime(timezone.utcnow()) + data_interval = ( + self.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None + ) scheduler_dag = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self)) dr: DagRun = _get_or_create_dagrun( dag=scheduler_dag, - start_date=logical_date, + start_date=logical_date or run_after, logical_date=logical_date, data_interval=data_interval, - run_after=data_interval.end, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + run_after=run_after, + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=logical_date, + run_after=run_after, + ), session=session, conf=run_conf, triggered_by=DagRunTriggeredByType.TEST, @@ -1756,8 +1764,8 @@ def create_dagrun( self, *, run_id: str, - logical_date: datetime | None, - data_interval: tuple[datetime, datetime], + logical_date: datetime | None = None, + data_interval: tuple[datetime, datetime] | None = None, run_after: datetime, conf: dict | None = None, run_type: DagRunType, @@ -2485,8 +2493,8 @@ def _get_or_create_dagrun( *, dag: DAG, run_id: str, - logical_date: datetime, - data_interval: tuple[datetime, datetime], + logical_date: datetime | None, + data_interval: tuple[datetime, datetime] | None, run_after: datetime, conf: dict | None, triggered_by: DagRunTriggeredByType, diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index d9bf14c3f983b..199762d69ef7c 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -78,6 +78,7 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks from airflow.utils.state import DagRunState, State, TaskInstanceState +from airflow.utils.strings import get_random_string from airflow.utils.types import NOTSET, DagRunTriggeredByType, DagRunType if TYPE_CHECKING: @@ -621,10 +622,20 @@ def find_duplicate(cls, dag_id: str, run_id: str, *, session: Session = NEW_SESS return session.scalars(select(cls).where(cls.dag_id == dag_id, cls.run_id == run_id)).one_or_none() @staticmethod - def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str: - """Generate Run ID based on Run Type and logical Date.""" + def generate_run_id( + *, run_type: DagRunType, logical_date: datetime | None = None, run_after: datetime + ) -> str: + """ + Generate Run ID based on Run Type, run_after and logical Date. + + :param run_type: type of DagRun + :param logical_date: the logical date + :param run_after: the date before which dag run won't start. + """ # _Ensure_ run_type is a DagRunType, not just a string from user code - return DagRunType(run_type).generate_run_id(logical_date) + if logical_date: + return DagRunType(run_type).generate_run_id(suffix=run_after.isoformat()) + return DagRunType(run_type).generate_run_id(suffix=f"{run_after.isoformat()}_{get_random_string()}") @staticmethod @provide_session diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py index da81090ff9a6d..d5d863c298491 100644 --- a/airflow/timetables/base.py +++ b/airflow/timetables/base.py @@ -281,8 +281,8 @@ def generate_run_id( self, *, run_type: DagRunType, - logical_date: DateTime, + run_after: DateTime, data_interval: DataInterval | None, **extra, ) -> str: - return run_type.generate_run_id(logical_date) + return run_type.generate_run_id(suffix=run_after.isoformat()) diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py index 20e8085fe0d37..45574daa37e53 100644 --- a/airflow/timetables/simple.py +++ b/airflow/timetables/simple.py @@ -24,7 +24,6 @@ if TYPE_CHECKING: from pendulum import DateTime - from sqlalchemy import Session from airflow.models.asset import AssetEvent from airflow.sdk.definitions.asset import BaseAsset @@ -186,15 +185,22 @@ def generate_run_id( self, *, run_type: DagRunType, - logical_date: DateTime, data_interval: DataInterval | None, - session: Session | None = None, - events: Collection[AssetEvent] | None = None, + run_after: DateTime, **extra, ) -> str: + """ + Generate Run ID based on Run Type, run_after and logical Date. + + :param run_type: type of DagRun + :param data_interval: the data interval + :param run_after: the date before which dag run won't start. + """ from airflow.models.dagrun import DagRun - return DagRun.generate_run_id(run_type, logical_date) + logical_date = data_interval.start if data_interval is not None else run_after + + return DagRun.generate_run_id(run_type=run_type, logical_date=logical_date, run_after=run_after) def data_interval_for_events( self, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 3e69736e87d58..7d307ac521770 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -5816,7 +5816,7 @@ export const $TriggerDAGRunPostBody = { ], title: "Dag Run Id", }, - logical_date: { + data_interval_start: { anyOf: [ { type: "string", @@ -5826,9 +5826,9 @@ export const $TriggerDAGRunPostBody = { type: "null", }, ], - title: "Logical Date", + title: "Data Interval Start", }, - data_interval_start: { + data_interval_end: { anyOf: [ { type: "string", @@ -5838,9 +5838,9 @@ export const $TriggerDAGRunPostBody = { type: "null", }, ], - title: "Data Interval Start", + title: "Data Interval End", }, - data_interval_end: { + logical_date: { anyOf: [ { type: "string", @@ -5850,7 +5850,12 @@ export const $TriggerDAGRunPostBody = { type: "null", }, ], - title: "Data Interval End", + title: "Logical Date", + }, + run_after: { + type: "string", + format: "date-time", + title: "Run After", }, conf: { type: "object", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 9a2ed62122ada..c3bff10bcd6e8 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1416,9 +1416,10 @@ export type TimeDelta = { */ export type TriggerDAGRunPostBody = { dag_run_id?: string | null; - logical_date: string | null; data_interval_start?: string | null; data_interval_end?: string | null; + logical_date: string | null; + run_after?: string; conf?: { [key: string]: unknown; }; diff --git a/airflow/utils/types.py b/airflow/utils/types.py index 46f295c4ee21a..28822919e382e 100644 --- a/airflow/utils/types.py +++ b/airflow/utils/types.py @@ -22,8 +22,6 @@ import airflow.sdk.definitions._internal.types if TYPE_CHECKING: - from datetime import datetime - from airflow.typing_compat import TypeAlias ArgNotSet: TypeAlias = airflow.sdk.definitions._internal.types.ArgNotSet @@ -42,8 +40,13 @@ class DagRunType(str, enum.Enum): def __str__(self) -> str: return self.value - def generate_run_id(self, logical_date: datetime) -> str: - return f"{self}__{logical_date.isoformat()}" + def generate_run_id(self, *, suffix: str) -> str: + """ + Generate a string for DagRun based on suffix string. + + :param suffix: Generate run_id from suffix. + """ + return f"{self}__{suffix}" @staticmethod def from_run_id(run_id: str) -> DagRunType: diff --git a/airflow/www/views.py b/airflow/www/views.py index f99dcd9161f33..b3fae31fefe0d 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2032,7 +2032,8 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): origin = get_safe_url(request.values.get("origin")) unpause = request.values.get("unpause") request_conf = request.values.get("conf") - request_logical_date = request.values.get("logical_date", default=timezone.utcnow().isoformat()) + request_logical_date = request.values.get("logical_date") + request_run_after = request.values.get("run_after", default=timezone.utcnow().isoformat()) is_dag_run_conf_overrides_params = conf.getboolean("core", "dag_run_conf_overrides_params") dag = get_airflow_app().dag_bag.get_dag(dag_id) dag_orm: DagModel = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id).limit(1)) @@ -2148,7 +2149,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): ) try: - logical_date = timezone.parse(request_logical_date, strict=True) + logical_date = timezone.parse(request_logical_date, strict=True) if request_logical_date else None except ParserError: flash("Invalid logical date", "error") form = DateTimeForm(data={"logical_date": timezone.utcnow().isoformat()}) @@ -2160,6 +2161,19 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): form=form, ) + try: + run_after = timezone.parse(request_run_after, strict=True) + except ParserError: + flash("Invalid run_after", "error") + form = DateTimeForm(data={"run_after": timezone.utcnow().isoformat()}) + return self.render_template( + "airflow/trigger.html", + form_fields=form_fields, + **render_params, + conf=request_conf or {}, + form=form, + ) + dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, session=session) if dr: if dr.run_id == run_id: @@ -2224,12 +2238,12 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): "warning", ) - data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date or run_after) if not run_id: - run_id = dag.timetable.generate_run_id( - logical_date=logical_date, - data_interval=data_interval, + run_id = DagRun.generate_run_id( run_type=DagRunType.MANUAL, + logical_date=logical_date, + run_after=run_after, ) try: diff --git a/docker_tests/test_docker_compose_quick_start.py b/docker_tests/test_docker_compose_quick_start.py index 4b6335facff51..15f346e57e3d7 100644 --- a/docker_tests/test_docker_compose_quick_start.py +++ b/docker_tests/test_docker_compose_quick_start.py @@ -101,7 +101,11 @@ def test_trigger_dag_and_wait_for_result(default_docker_image, tmp_path_factory, compose.execute(service="airflow-scheduler", command=["airflow", "scheduler", "-n", "50"]) api_request("PATCH", path=f"dags/{DAG_ID}", json={"is_paused": False}) - api_request("POST", path=f"dags/{DAG_ID}/dagRuns", json={"dag_run_id": DAG_RUN_ID}) + api_request( + "POST", + path=f"dags/{DAG_ID}/dagRuns", + json={"dag_run_id": DAG_RUN_ID, "logical_date": "2020-06-11T18:00:00+00:00"}, + ) wait_for_terminal_dag_state(dag_id=DAG_ID, dag_run_id=DAG_RUN_ID) dag_state = api_request("GET", f"dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}").get("state") diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py index cadd8954cfb5e..2e0a96620a1de 100644 --- a/kubernetes_tests/test_base.py +++ b/kubernetes_tests/test_base.py @@ -264,8 +264,10 @@ def start_dag(self, dag_id, host): assert result.status_code == 200, f"Could not enable DAG: {result_json}" post_string = f"http://{host}/api/v1/dags/{dag_id}/dagRuns" print(f"Calling [start_dag]#2 {post_string}") + + logical_date = datetime.now(timezone.utc).isoformat() # Trigger a new dagrun - result = self.session.post(post_string, json={}) + result = self.session.post(post_string, json={"logical_date": logical_date}) try: result_json = result.json() except ValueError: @@ -292,7 +294,8 @@ def start_job_in_kubernetes(self, dag_id, host): for dag_run in dag_runs: if dag_run["dag_id"] == dag_id: logical_date = dag_run["logical_date"] + run_after = dag_run["run_after"] dag_run_id = dag_run["dag_run_id"] break - assert logical_date is not None, f"No logical_date can be found for the dag with {dag_id}" + assert run_after is not None, f"No run_after can be found for the dag with {dag_id}" return dag_run_id, logical_date diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 9e4cd9e43727b..e5637b5a9db17 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -50,13 +50,30 @@ def create_context(task) -> Context: + from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + dag = DAG(dag_id="dag", schedule=None) logical_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=timezone.parse_timezone("Europe/Amsterdam")) - dag_run = DagRun( - dag_id=dag.dag_id, - logical_date=logical_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), - ) + + if AIRFLOW_V_3_0_PLUS: + dag_run = DagRun( + dag_id=dag.dag_id, + logical_date=logical_date, + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=logical_date, + run_after=logical_date, + ), + ) + else: + dag_run = DagRun( + dag_id=dag.dag_id, + logical_date=logical_date, + run_id=DagRun.generate_run_id( # type: ignore[call-arg] + run_type=DagRunType.MANUAL, + logical_date=logical_date, + ), + ) task_instance = TaskInstance(task=task) task_instance.dag_run = dag_run task_instance.dag_id = dag.dag_id diff --git a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py index 5342363f0939d..691a549696116 100644 --- a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py +++ b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py @@ -37,6 +37,8 @@ from airflow.utils.session import create_session from airflow.utils.types import DagRunType +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0) JOB_OPERATORS_PATH = "airflow.providers.cncf.kubernetes.operators.job.{}" HOOK_CLASS = JOB_OPERATORS_PATH.format("KubernetesHook") @@ -57,11 +59,20 @@ def create_context(task, persist_to_db=False, map_index=None): else: dag = DAG(dag_id="dag", schedule=None, start_date=pendulum.now()) dag.add_task(task) - dag_run = DagRun( - run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE), - run_type=DagRunType.MANUAL, - dag_id=dag.dag_id, - ) + if AIRFLOW_V_3_0_PLUS: + dag_run = DagRun( + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, run_after=DEFAULT_DATE + ), + run_type=DagRunType.MANUAL, + dag_id=dag.dag_id, + ) + else: + dag_run = DagRun( + run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE), + run_type=DagRunType.MANUAL, + dag_id=dag.dag_id, + ) task_instance = TaskInstance(task=task, run_id=dag_run.run_id) task_instance.dag_run = dag_run if map_index is not None: diff --git a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py index 309d2abe642e1..f447651585e60 100644 --- a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py @@ -100,7 +100,9 @@ def create_context(task, persist_to_db=False, map_index=None): now = timezone.utcnow() if AIRFLOW_V_3_0_PLUS: dag_run = DagRun( - run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE), + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, run_after=DEFAULT_DATE + ), run_type=DagRunType.MANUAL, dag_id=dag.dag_id, logical_date=now, diff --git a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py index c6814222c4775..b7172f873461a 100644 --- a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -177,7 +177,9 @@ def create_context(task): dag_run = DagRun( dag_id=dag.dag_id, logical_date=logical_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date, run_after=logical_date + ), ) else: dag_run = DagRun( diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py index 02057310907f3..26e607c8c710b 100644 --- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py +++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py @@ -208,6 +208,7 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "end_date": None, "state": "running", "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -224,6 +225,7 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "end_date": None, "state": "running", "logical_date": self.default_time_2, + "run_after": self.default_time_2, "external_trigger": True, "start_date": self.default_time, "conf": {}, diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py index 852c4288bbe31..9f4c93549ce88 100644 --- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py +++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py @@ -128,12 +128,22 @@ def test_should_respond_200_with_tilde_and_granular_dag_access(self): task_id_1 = "test-task-id-1" logical_date = "2005-04-02T00:00:00+00:00" logical_date_parsed = timezone.parse(logical_date) - dag_run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after = "2005-04-02T00:00:00+00:00" + run_after_parsed = timezone.parse(run_after) + dag_run_id_1 = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=logical_date_parsed, + run_after=run_after_parsed, + ) self._create_xcom_entries(dag_id_1, dag_run_id_1, logical_date_parsed, task_id_1) dag_id_2 = "test-dag-id-2" task_id_2 = "test-task-id-2" - run_id_2 = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_id_2 = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=logical_date_parsed, + run_after=run_after_parsed, + ) self._create_xcom_entries(dag_id_2, run_id_2, logical_date_parsed, task_id_2) self._create_invalid_xcom_entries(logical_date_parsed) response = self.client.get( diff --git a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py index 9eb8a68265679..1a5721e9d08c6 100644 --- a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py +++ b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py @@ -327,7 +327,9 @@ def create_context(self, task, dag=None): dag_run = DagRun( dag_id=dag.dag_id, logical_date=logical_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date, run_after=logical_date + ), ) else: dag_run = DagRun( diff --git a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py index e05f9454c9549..f89b2319d117f 100644 --- a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py +++ b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py @@ -119,7 +119,9 @@ def create_context(self, task, dag=None): dag_run = DagRun( dag_id=dag.dag_id, logical_date=logical_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date, run_after=logical_date + ), ) task_instance = TaskInstance(task=task) @@ -262,7 +264,9 @@ def create_context(self, task, dag=None): dag_run = DagRun( dag_id=dag.dag_id, logical_date=logical_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date, run_after=logical_date + ), ) task_instance = TaskInstance(task=task) diff --git a/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py b/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py index 43795f22ace6f..be66ff67ca899 100644 --- a/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py +++ b/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py @@ -183,7 +183,9 @@ def create_context(task, dag=None): dag_run = DagRun( dag_id=dag.dag_id, logical_date=logical_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + run_id=DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date, run_after=logical_date + ), ) else: dag_run = DagRun( diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 053a6011b63f1..b5ac483d1f216 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -192,7 +192,11 @@ def execute(self, context: Context): if self.trigger_run_id: run_id = str(self.trigger_run_id) else: - run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=parsed_logical_date, + run_after=parsed_logical_date, + ) try: dag_run = trigger_dag( diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 3ce4019c824d9..94028976e1502 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -109,6 +109,7 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi run_id=f"TEST_DAG_RUN_ID_{i}", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(days=i - 1), + run_after=timezone.parse(self.default_time) + timedelta(days=i - 1), start_date=timezone.parse(self.default_time), external_trigger=True, state=state, @@ -126,6 +127,7 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi run_id=f"TEST_DAG_RUN_ID_{i}", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time_2), + run_after=timezone.parse(self.default_time_2), start_date=timezone.parse(self.default_time), external_trigger=True, state=state, @@ -190,6 +192,7 @@ def test_should_respond_200(self, session): run_id="TEST_DAG_RUN_ID", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), + run_after=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), external_trigger=True, state="running", @@ -201,6 +204,7 @@ def test_should_respond_200(self, session): "end_date": None, "state": "running", "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -240,6 +244,7 @@ def test_should_raises_401_unauthenticated(self, session): run_id="TEST_DAG_RUN_ID", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), + run_after=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), external_trigger=True, ) @@ -263,6 +268,7 @@ def test_should_return_specified_fields(self, session, fields): run_id="TEST_DAG_RUN_ID", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), + run_after=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), external_trigger=True, state="running", @@ -287,6 +293,7 @@ def test_should_respond_400_with_not_exists_fields(self, session): run_id="TEST_DAG_RUN_ID", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), + run_after=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), external_trigger=True, state="running", @@ -312,6 +319,7 @@ def test_should_respond_200(self, session): "end_date": None, "state": "running", "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -328,6 +336,7 @@ def test_should_respond_200(self, session): "end_date": None, "state": "running", "logical_date": self.default_time_2, + "run_after": self.default_time_2, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -382,6 +391,7 @@ def test_return_correct_results_with_order_by(self, session): "end_date": None, "state": "running", "logical_date": self.default_time_2, + "run_after": self.default_time_2, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -398,6 +408,7 @@ def test_return_correct_results_with_order_by(self, session): "end_date": None, "state": "running", "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -550,6 +561,7 @@ def _create_dag_runs(self, count): run_id=f"TEST_DAG_RUN_ID{i}", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(minutes=i), + run_after=timezone.parse(self.default_time) + timedelta(minutes=i), start_date=timezone.parse(self.default_time), external_trigger=True, ) @@ -655,6 +667,7 @@ def _create_dag_runs(self): run_id=f"TEST_START_EXEC_DAY_1{i}", run_type=DagRunType.MANUAL, logical_date=timezone.parse(dates[i]), + run_after=timezone.parse(dates[i]), start_date=timezone.parse(dates[i]), external_trigger=True, state=DagRunState.SUCCESS, @@ -699,6 +712,7 @@ def test_should_respond_200(self): "end_date": None, "state": "running", "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -716,6 +730,7 @@ def test_should_respond_200(self): "end_date": None, "state": "running", "logical_date": self.default_time_2, + "run_after": self.default_time_2, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -777,6 +792,7 @@ def test_order_by_descending_works(self): "end_date": None, "state": "running", "logical_date": self.default_time_2, + "run_after": self.default_time_2, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -793,6 +809,7 @@ def test_order_by_descending_works(self): "end_date": None, "state": "running", "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "start_date": self.default_time, "conf": {}, @@ -925,6 +942,7 @@ def _create_dag_runs(self, count): state="running", run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(minutes=i), + run_after=timezone.parse(self.default_time) + timedelta(minutes=i), start_date=timezone.parse(self.default_time), external_trigger=True, ) @@ -1008,6 +1026,7 @@ def _create_dag_runs(self): run_id=f"TEST_START_EXEC_DAY_1{i}", run_type=DagRunType.MANUAL, logical_date=timezone.parse(date), + run_after=timezone.parse(date), start_date=timezone.parse(date), external_trigger=True, state="success", @@ -1084,21 +1103,36 @@ def test_end_date_gte_lte(self, payload, expected_dag_run_ids): class TestPostDagRun(TestDagRunEndpoint): @time_machine.travel(timezone.utcnow(), tick=False) @pytest.mark.parametrize( - "dag_run_id, logical_date, note, data_interval_start, data_interval_end", + "dag_run_id, logical_date, run_after, note, data_interval_start, data_interval_end", [ pytest.param( - "TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", None, None, id="all-present" + "TEST_DAG_RUN", + "2020-06-11T18:00:00+00:00", + "2020-06-11T18:00:00+00:00", + "test-note", + None, + None, + id="all-present", ), pytest.param( "TEST_DAG_RUN", "2024-06-11T18:00:00+00:00", + "2024-06-11T18:00:00+00:00", "test-note", "2024-01-03T00:00:00+00:00", "2024-01-04T05:00:00+00:00", id="all-present-with-dates", ), - pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, None, id="only-date"), - pytest.param(None, None, None, None, None, id="all-missing"), + pytest.param( + None, + "2020-06-11T18:00:00+00:00", + "2020-06-11T18:00:00+00:00", + None, + None, + None, + id="only-date", + ), + pytest.param(None, None, "2020-06-11T18:00:00+00:00", None, None, None, id="all-missing"), ], ) def test_should_respond_200( @@ -1106,20 +1140,18 @@ def test_should_respond_200( session, dag_run_id, logical_date, + run_after, note, data_interval_start, data_interval_end, ): self._create_dag("TEST_DAG_ID") - - # We freeze time for this test, so we could check it into the returned dates. - fixed_now = timezone.utcnow() - # raise NotImplementedError("TODO: Add tests for data_interval_start and data_interval_end") request_json = {} if logical_date is not None: request_json["logical_date"] = logical_date + request_json["run_after"] = run_after if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id if data_interval_start is not None: @@ -1136,12 +1168,11 @@ def test_should_respond_200( assert response.status_code == 200 - if logical_date is None: - expected_logical_date = fixed_now.isoformat() - else: - expected_logical_date = logical_date + expected_logical_date = logical_date if logical_date is not None else None + + # when logical_date is null, run_id is run_after + random string. if dag_run_id is None: - expected_dag_run_id = f"manual__{expected_logical_date}" + expected_dag_run_id = f"manual__{run_after}" else: expected_dag_run_id = dag_run_id @@ -1157,6 +1188,7 @@ def test_should_respond_200( "dag_run_id": expected_dag_run_id, "end_date": None, "logical_date": expected_logical_date, + "run_after": run_after, "external_trigger": True, "start_date": None, "state": "queued", @@ -1167,8 +1199,15 @@ def test_should_respond_200( "note": note, } expected_response_json.update({"triggered_by": "rest_api"} if AIRFLOW_V_3_0_PLUS else {}) + response_json = response.json + for key in expected_response_json: + if key != "dag_run_id": + assert response_json[key] == expected_response_json[key], f"Mismatch on key {key}" - assert response.json == expected_response_json + assert response_json["dag_run_id"].startswith(expected_dag_run_id), ( + f"dag_run_id '{response_json['dag_run_id']}' does not start with expected prefix " + f"'{expected_dag_run_id}'" + ) _check_last_log(session, dag_id="TEST_DAG_ID", event="api.post_dag_run", logical_date=None) def test_raises_validation_error_for_invalid_request(self): @@ -1255,6 +1294,7 @@ def test_should_response_200_for_matching_logical_date(self): "api/v1/dags/TEST_DAG_ID/dagRuns", json={ "logical_date": logical_date, + "run_after": logical_date, }, environ_overrides={"REMOTE_USER": "test"}, ) @@ -1266,6 +1306,7 @@ def test_should_response_200_for_matching_logical_date(self): "dag_run_id": dag_run_id, "end_date": None, "logical_date": logical_date, + "run_after": logical_date, "external_trigger": True, "start_date": None, "state": "queued", @@ -1512,6 +1553,7 @@ def test_should_respond_200(self, state, run_type, dag_maker, session): "dag_run_id": dag_run_id, "end_date": dr.end_date.isoformat() if state != State.QUEUED else None, "logical_date": dr.logical_date.isoformat(), + "run_after": dr.run_after.isoformat(), "external_trigger": False, "start_date": dr.start_date.isoformat() if state != State.QUEUED else None, "state": state, @@ -1686,6 +1728,7 @@ def test_should_respond_200(self, dag_maker, session): "end_date": None, "external_trigger": False, "logical_date": dr.logical_date.isoformat(), + "run_after": dr.run_after.isoformat(), "start_date": None, "state": "queued", "data_interval_start": dr.data_interval_start.isoformat(), @@ -1908,6 +1951,7 @@ def test_should_respond_200(self, dag_maker, session): "end_date": dr.end_date.isoformat(), "external_trigger": True, "logical_date": self.default_time, + "run_after": self.default_time, "start_date": self.default_time, "state": "success", "data_interval_start": None, @@ -1936,6 +1980,7 @@ def test_should_respond_200(self, dag_maker, session): "dag_run_id": dr.run_id, "end_date": dr.end_date.isoformat(), "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "start_date": self.default_time, "state": "success", diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py b/tests/api_connexion/endpoints/test_xcom_endpoint.py index 784bc6142d40d..598d3a99546ad 100644 --- a/tests/api_connexion/endpoints/test_xcom_endpoint.py +++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py @@ -110,9 +110,13 @@ def test_should_respond_200_stringify(self): dag_id = "test-dag-id" task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" + run_after = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after_parsed = timezone.parse(run_after) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, xcom_key, {"key": "value"}) response = self.client.get( f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", @@ -136,9 +140,13 @@ def test_should_respond_200_native(self): dag_id = "test-dag-id" task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" + run_after = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after_parsed = timezone.parse(run_after) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, xcom_key, {"key": "value"}) response = self.client.get( f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false", @@ -162,9 +170,13 @@ def test_should_raise_404_for_non_existent_xcom(self): dag_id = "test-dag-id" task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" + run_after = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after_parsed = timezone.parse(run_after) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, xcom_key) response = self.client.get( f"/api/v1/dags/nonexistentdagid/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", @@ -177,9 +189,13 @@ def test_should_raises_401_unauthenticated(self): dag_id = "test-dag-id" task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" + run_after = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after_parsed = timezone.parse(run_after) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, xcom_key) response = self.client.get( f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}" @@ -191,9 +207,13 @@ def test_should_raise_403_forbidden(self): dag_id = "test-dag-id" task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" + run_after = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after_parsed = timezone.parse(run_after) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, xcom_key) response = self.client.get( @@ -303,8 +323,12 @@ def test_should_respond_200(self): dag_id = "test-dag-id" task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" + run_after = "2005-04-02T00:00:00+00:00" logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after_parsed = timezone.parse(run_after) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entries(dag_id, run_id, logical_date_parsed, task_id) response = self.client.get( @@ -345,13 +369,19 @@ def test_should_respond_200_with_tilde_and_access_to_all_dags(self): dag_id_1 = "test-dag-id-1" task_id_1 = "test-task-id-1" logical_date = "2005-04-02T00:00:00+00:00" + run_after = "2005-04-02T00:00:00+00:00" logical_date_parsed = timezone.parse(logical_date) - run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after_parsed = timezone.parse(run_after) + run_id_1 = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entries(dag_id_1, run_id_1, logical_date_parsed, task_id_1) dag_id_2 = "test-dag-id-2" task_id_2 = "test-task-id-2" - run_id_2 = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_id_2 = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entries(dag_id_2, run_id_2, logical_date_parsed, task_id_2) response = self.client.get( @@ -409,7 +439,11 @@ def test_should_respond_200_with_map_index(self): task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" logical_date_parsed = timezone.parse(logical_date) - dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after = "2005-04-02T00:00:00+00:00" + run_after_parsed = timezone.parse(run_after) + dag_run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entries(dag_id, dag_run_id, logical_date_parsed, task_id, mapped_ti=True) def assert_expected_result(expected_entries, map_index=None): @@ -453,7 +487,11 @@ def test_should_respond_200_with_xcom_key(self): task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" logical_date_parsed = timezone.parse(logical_date) - dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after = "2005-04-02T00:00:00+00:00" + run_after_parsed = timezone.parse(run_after) + dag_run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entries(dag_id, dag_run_id, logical_date_parsed, task_id, mapped_ti=True) def assert_expected_result(expected_entries, key=None): @@ -495,7 +533,11 @@ def test_should_raises_401_unauthenticated(self): task_id = "test-task-id" logical_date = "2005-04-02T00:00:00+00:00" logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) + run_after = "2005-04-02T00:00:00+00:00" + run_after_parsed = timezone.parse(run_after) + run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed + ) self._create_xcom_entries(dag_id, run_id, logical_date_parsed, task_id) response = self.client.get( @@ -580,7 +622,11 @@ def setup_method(self): self.task_id = "test-task-id" self.logical_date = "2005-04-02T00:00:00+00:00" self.logical_date_parsed = timezone.parse(self.logical_date) - self.run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date_parsed) + run_after = "2005-04-02T00:00:00+00:00" + run_after_parsed = timezone.parse(run_after) + self.run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=self.logical_date_parsed, run_after=run_after_parsed + ) @pytest.mark.parametrize( "query_params, expected_xcom_ids", diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py index 61b3dac6350a8..251946d17fa68 100644 --- a/tests/api_connexion/schemas/test_dag_run_schema.py +++ b/tests/api_connexion/schemas/test_dag_run_schema.py @@ -63,6 +63,7 @@ def test_serialize(self, session): state="running", run_type=DagRunType.MANUAL.value, logical_date=timezone.parse(self.default_time), + run_after=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), conf='{"start": "stop"}', **triggered_by_kwargs, @@ -85,6 +86,7 @@ def test_serialize(self, session): "last_scheduling_decision": None, "run_type": "manual", "note": None, + "run_after": self.default_time, } expected_deserialized_dagrun.update({"triggered_by": "test"} if AIRFLOW_V_3_0_PLUS else {}) @@ -94,30 +96,30 @@ def test_serialize(self, session): "serialized_dagrun, expected_result", [ ( # Conf not provided - {"dag_run_id": "my-dag-run", "logical_date": DEFAULT_TIME}, - {"run_id": "my-dag-run", "logical_date": parse(DEFAULT_TIME)}, + {"dag_run_id": "my-dag-run", "run_after": DEFAULT_TIME}, + {"run_id": "my-dag-run", "run_after": parse(DEFAULT_TIME)}, ), ( { "dag_run_id": "my-dag-run", - "logical_date": DEFAULT_TIME, + "run_after": DEFAULT_TIME, "conf": {"start": "stop"}, }, { "run_id": "my-dag-run", - "logical_date": parse(DEFAULT_TIME), + "run_after": parse(DEFAULT_TIME), "conf": {"start": "stop"}, }, ), ( { "dag_run_id": "my-dag-run", - "logical_date": DEFAULT_TIME, + "run_after": DEFAULT_TIME, "conf": '{"start": "stop"}', }, { "run_id": "my-dag-run", - "logical_date": parse(DEFAULT_TIME), + "run_after": parse(DEFAULT_TIME), "conf": {"start": "stop"}, }, ), @@ -131,7 +133,7 @@ def test_autofill_fields(self): """Dag_run_id and logical_date fields are autogenerated if missing""" serialized_dagrun = {} result = dagrun_schema.load(serialized_dagrun) - assert result == {"logical_date": result["logical_date"], "run_id": result["run_id"]} + assert result == {"run_after": result["run_after"], "run_id": result["run_id"]} def test_invalid_logical_date_raises(self): serialized_dagrun = {"logical_date": "mydate"} @@ -151,6 +153,7 @@ def test_serialize(self, session): run_id="my-dag-run", state="running", logical_date=timezone.parse(self.default_time), + run_after=timezone.parse(self.default_time), run_type=DagRunType.MANUAL.value, start_date=timezone.parse(self.default_time), conf='{"start": "stop"}', @@ -176,6 +179,7 @@ def test_serialize(self, session): "dag_run_id": "my-dag-run", "end_date": None, "logical_date": self.default_time, + "run_after": self.default_time, "external_trigger": True, "state": "running", "start_date": self.default_time, @@ -195,6 +199,7 @@ def test_serialize(self, session): "end_date": None, "state": "running", "logical_date": self.second_time, + "run_after": self.second_time, "external_trigger": True, "start_date": self.default_time, "conf": {}, diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py b/tests/api_fastapi/core_api/routes/public/test_xcom.py index e3d9b3641a91b..dd5a073c1baae 100644 --- a/tests/api_fastapi/core_api/routes/public/test_xcom.py +++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py @@ -49,7 +49,10 @@ logical_date_parsed = timezone.parse(TEST_EXECUTION_DATE) logical_date_formatted = logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ") -run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) +run_after_parsed = timezone.parse(TEST_EXECUTION_DATE) +run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, run_after=run_after_parsed +) @provide_session diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 8d82a249aec2d..76b1ceb0e9a7a 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -138,16 +138,17 @@ def _create_dagrun( state: DagRunState = DagRunState.RUNNING, start_date: datetime | None = None, ) -> DagRun: - run_id = dag.timetable.generate_run_id( + run_after = logical_date or timezone.utcnow() + run_id = DagRun.generate_run_id( run_type=run_type, logical_date=logical_date, - data_interval=data_interval, + run_after=run_after, ) return dag.create_dagrun( run_id=run_id, logical_date=logical_date, data_interval=data_interval, - run_after=data_interval.end, + run_after=run_after, run_type=run_type, state=state, start_date=start_date, diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index af815279737dd..aee56bee883e3 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -156,7 +156,7 @@ def _create_dagrun( data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval)) run_id = dag.timetable.generate_run_id( run_type=run_type, - logical_date=logical_date, # type: ignore + run_after=logical_date or data_interval.end, # type: ignore data_interval=data_interval, ) return dag.create_dagrun( @@ -2974,7 +2974,7 @@ def test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker ) def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagRunType) -> None: dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily") - run_id = run_id_type.generate_run_id(DEFAULT_DATE) + run_id = DagRun.generate_run_id(run_type=run_id_type, run_after=DEFAULT_DATE, logical_date=DEFAULT_DATE) with pytest.raises(ValueError) as ctx: dag.create_dagrun( diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index f4324abf4850a..b7328ba3713ed 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -111,7 +111,7 @@ def create_dag_run( dag_run = dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=run_type, - logical_date=logical_date, + run_after=logical_date, data_interval=data_interval, ), run_type=run_type, @@ -863,7 +863,7 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state): dr = dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.SCHEDULED, - logical_date=DEFAULT_DATE, + run_after=DEFAULT_DATE, data_interval=dag.infer_automated_data_interval(DEFAULT_DATE), ), run_type=DagRunType.SCHEDULED, @@ -941,7 +941,7 @@ def test_emit_scheduling_delay(self, session, schedule, expected): dag_run = dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.SCHEDULED, - logical_date=dag.start_date, + run_after=dag.start_date, data_interval=dag.infer_automated_data_interval(dag.start_date), ), run_type=DagRunType.SCHEDULED, @@ -1051,7 +1051,7 @@ def test_verify_integrity_task_start_and_end_date(Stats_incr, session, run_type, dag_id=dag.dag_id, run_type=run_type, logical_date=DEFAULT_DATE, - run_id=DagRun.generate_run_id(run_type, DEFAULT_DATE), + run_id=DagRun.generate_run_id(run_type=run_type, logical_date=DEFAULT_DATE, run_after=DEFAULT_DATE), ) dag_run.dag = dag diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 3d985394646d0..2d9d1a6edcb1a 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -3611,7 +3611,12 @@ def _env_var_check_callback(self): assert os.environ["AIRFLOW_CTX_DAG_ID"] == "test_echo_env_variables" assert os.environ["AIRFLOW_CTX_TASK_ID"] == "hive_in_python_op" assert DEFAULT_DATE.isoformat() == os.environ["AIRFLOW_CTX_LOGICAL_DATE"] - assert DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE) == os.environ["AIRFLOW_CTX_DAG_RUN_ID"] + assert ( + DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, run_after=DEFAULT_DATE + ) + == os.environ["AIRFLOW_CTX_DAG_RUN_ID"] + ) def test_echo_env_variables(self, dag_maker): with dag_maker( diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py index ea0d95290f02a..45d7b264c44bd 100644 --- a/tests/models/test_xcom.py +++ b/tests/models/test_xcom.py @@ -58,7 +58,9 @@ def reset_db(): @pytest.fixture def task_instance_factory(request, session: Session): def func(*, dag_id, task_id, logical_date): - run_id = DagRun.generate_run_id(DagRunType.SCHEDULED, logical_date) + run_id = DagRun.generate_run_id( + run_type=DagRunType.SCHEDULED, logical_date=logical_date, run_after=logical_date + ) run = DagRun( dag_id=dag_id, run_type=DagRunType.SCHEDULED, diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index db86eb0fa7de9..1c72dae332a0e 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -125,7 +125,9 @@ def test_trigger_dagrun(self, dag_maker): dagrun = dag_maker.session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() assert dagrun.external_trigger - assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.logical_date) + assert dagrun.run_id == DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=dagrun.logical_date, run_after=dagrun.logical_date + ) self.assert_extra_link(dagrun, task, dag_maker.session) def test_trigger_dagrun_custom_run_id(self, dag_maker): @@ -167,7 +169,9 @@ def test_trigger_dagrun_with_logical_date(self, dag_maker): dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() assert dagrun.external_trigger assert dagrun.logical_date == custom_logical_date - assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_logical_date) + assert dagrun.run_id == DagRun.generate_run_id( + run_type=DagRunType.MANUAL, logical_date=custom_logical_date, run_after=custom_logical_date + ) self.assert_extra_link(dagrun, task, session) def test_trigger_dagrun_twice(self, dag_maker): diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index d6bf001c5bd76..239eca8cdc165 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -115,7 +115,7 @@ def setup_method(self): self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) self.args = {"owner": "airflow", "start_date": DEFAULT_DATE} self.dag = DAG(TEST_DAG_ID, schedule=None, default_args=self.args) - self.dag_run_id = DagRunType.MANUAL.generate_run_id(DEFAULT_DATE) + self.dag_run_id = DagRunType.MANUAL.generate_run_id(suffix=DEFAULT_DATE.isoformat()) def add_time_sensor(self, task_id=TEST_TASK_ID): op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag) @@ -1239,7 +1239,7 @@ def run_tasks( runs[dag.dag_id] = dagrun = dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.MANUAL, - logical_date=logical_date, + run_after=logical_date, data_interval=data_interval, ), logical_date=logical_date, diff --git a/tests/timetables/test_assets_timetable.py b/tests/timetables/test_assets_timetable.py index 541ef2abb6ea6..0158ddfd5cf38 100644 --- a/tests/timetables/test_assets_timetable.py +++ b/tests/timetables/test_assets_timetable.py @@ -208,7 +208,11 @@ def test_generate_run_id(asset_timetable: AssetOrTimeSchedule) -> None: :param asset_timetable: The AssetOrTimeSchedule instance to test. """ run_id = asset_timetable.generate_run_id( - run_type=DagRunType.MANUAL, extra_args="test", logical_date=DateTime.now(), data_interval=None + run_type=DagRunType.MANUAL, + extra_args="test", + logical_date=DateTime.now(), + run_after=DateTime.now(), + data_interval=None, ) assert isinstance(run_id, str) diff --git a/tests/utils/test_state.py b/tests/utils/test_state.py index de393f38c3516..035740db46fec 100644 --- a/tests/utils/test_state.py +++ b/tests/utils/test_state.py @@ -41,7 +41,7 @@ def test_dagrun_state_enum_escape(): dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.SCHEDULED, - logical_date=DEFAULT_DATE, + run_after=DEFAULT_DATE, data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), ), run_type=DagRunType.SCHEDULED, diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 150b48f7459ba..76ddb08228336 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -929,11 +929,18 @@ def create_dagrun(self, *, logical_date=None, **kwargs): if "run_type" not in kwargs: kwargs["run_id"] = "test" else: - kwargs["run_id"] = dag.timetable.generate_run_id( - run_type=run_type, - logical_date=logical_date, - data_interval=data_interval, - ) + if AIRFLOW_V_3_0_PLUS: + kwargs["run_id"] = dag.timetable.generate_run_id( + run_type=run_type, + run_after=logical_date or timezone.coerce_datetime(timezone.utcnow()), + data_interval=data_interval, + ) + else: + kwargs["run_id"] = dag.timetable.generate_run_id( + run_type=run_type, + logical_date=logical_date or timezone.coerce_datetime(timezone.utcnow()), + data_interval=data_interval, + ) kwargs["run_type"] = run_type if AIRFLOW_V_3_0_PLUS: