Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
Browse files Browse the repository at this point in the history
…AIP-84/list_dag_runs_batch
  • Loading branch information
rawwar committed Nov 20, 2024
2 parents 5c2ab68 + 8554001 commit dd2e70a
Show file tree
Hide file tree
Showing 241 changed files with 9,264 additions and 2,580 deletions.
2 changes: 1 addition & 1 deletion .github/actions/install-pre-commit/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ inputs:
default: 3.9
uv-version:
description: 'uv version to use'
default: 0.5.2
default: 0.5.3
pre-commit-version:
description: 'pre-commit version to use'
default: 4.0.1
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ jobs:
- tests-with-lowest-direct-resolution
- additional-prod-image-tests
- tests-kubernetes
- tests-task-sdk
- finalize-tests
if: github.event_name == 'schedule' && failure()
runs-on: ["ubuntu-22.04"]
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.2
ARG AIRFLOW_UV_VERSION=0.5.3
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.2
ARG AIRFLOW_UV_VERSION=0.5.3
# TODO(potiuk): automate with upgrade check (possibly)
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"
Expand Down
4 changes: 2 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@
"version": (".version", "", False),
# Deprecated lazy imports
"AirflowException": (".exceptions", "AirflowException", True),
"Dataset": (".assets", "Dataset", True),
"Dataset": (".sdk.definitions.asset", "Dataset", True),
}
if TYPE_CHECKING:
# These objects are imported by PEP-562, however, static analyzers and IDE's
# have no idea about typing of these objects.
# Add it under TYPE_CHECKING block should help with it.
from airflow.assets import Asset, Dataset
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.sdk.definitions.asset import Asset, Dataset


def __getattr__(name: str):
Expand Down
57 changes: 0 additions & 57 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.helpers import exactly_one
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand Down Expand Up @@ -87,7 +86,6 @@ def set_state(
*,
tasks: Collection[Operator | tuple[Operator, int]],
run_id: str | None = None,
logical_date: datetime | None = None,
upstream: bool = False,
downstream: bool = False,
future: bool = False,
Expand All @@ -107,7 +105,6 @@ def set_state(
:param tasks: the iterable of tasks or (task, map_index) tuples from which to work.
``task.dag`` needs to be set
:param run_id: the run_id of the dagrun to start looking from
:param logical_date: the logical date from which to start looking (deprecated)
:param upstream: Mark all parents (upstream tasks)
:param downstream: Mark all siblings (downstream tasks) of task_id
:param future: Mark all future tasks on the interval of the dag up until
Expand All @@ -121,21 +118,12 @@ def set_state(
if not tasks:
return []

if not exactly_one(logical_date, run_id):
raise ValueError("Exactly one of dag_run_id and logical_date must be set")

if logical_date and not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")

task_dags = {task[0].dag if isinstance(task, tuple) else task.dag for task in tasks}
if len(task_dags) > 1:
raise ValueError(f"Received tasks from multiple DAGs: {task_dags}")
dag = next(iter(task_dags))
if dag is None:
raise ValueError("Received tasks with no DAG")

if logical_date:
run_id = dag.get_dagrun(logical_date=logical_date, session=session).run_id
if not run_id:
raise ValueError("Received tasks with no run_id")

Expand Down Expand Up @@ -279,7 +267,6 @@ def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: SA
def set_dag_run_state_to_success(
*,
dag: DAG,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -290,27 +277,15 @@ def set_dag_run_state_to_success(
Set for a specific logical date and its task instances to success.
:param dag: the DAG of which to alter state
:param logical_date: the logical date from which to start looking(deprecated)
:param run_id: the run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: ValueError if dag or logical_date is invalid
"""
if not exactly_one(logical_date, run_id):
return []

if not dag:
return []

if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")
# Mark the dag run to success.
Expand All @@ -333,7 +308,6 @@ def set_dag_run_state_to_success(
def set_dag_run_state_to_failed(
*,
dag: DAG,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -344,27 +318,14 @@ def set_dag_run_state_to_failed(
Set for a specific logical date and its task instances to failed.
:param dag: the DAG of which to alter state
:param logical_date: the logical date from which to start looking(deprecated)
:param run_id: the DAG run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: AssertionError if dag or logical_date is invalid
"""
if not exactly_one(logical_date, run_id):
return []
if not dag:
return []

if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id

if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")

Expand Down Expand Up @@ -429,7 +390,6 @@ def __set_dag_run_state_to_running_or_queued(
*,
new_state: DagRunState,
dag: DAG,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession,
Expand All @@ -438,28 +398,15 @@ def __set_dag_run_state_to_running_or_queued(
Set the dag run for a specific logical date to running.
:param dag: the DAG of which to alter state
:param logical_date: the logical date from which to start looking
:param run_id: the id of the DagRun
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
"""
res: list[TaskInstance] = []

if not exactly_one(logical_date, run_id):
return res

if not dag:
return res

if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"DagRun with run_id: {run_id} not found")
# Mark the dag run to running.
Expand All @@ -474,7 +421,6 @@ def __set_dag_run_state_to_running_or_queued(
def set_dag_run_state_to_running(
*,
dag: DAG,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -487,7 +433,6 @@ def set_dag_run_state_to_running(
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.RUNNING,
dag=dag,
logical_date=logical_date,
run_id=run_id,
commit=commit,
session=session,
Expand All @@ -498,7 +443,6 @@ def set_dag_run_state_to_running(
def set_dag_run_state_to_queued(
*,
dag: DAG,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -511,7 +455,6 @@ def set_dag_run_state_to_queued(
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.QUEUED,
dag=dag,
logical_date=logical_date,
run_id=run_id,
commit=commit,
session=session,
Expand Down
4 changes: 2 additions & 2 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ def _trigger_dag(
run_id = run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, data_interval=data_interval
)
dag_run = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, logical_date=logical_date)
dag_run = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id)

if dag_run:
raise DagRunAlreadyExists(dag_run, logical_date=logical_date, run_id=run_id)
raise DagRunAlreadyExists(dag_run)

run_conf = None
if conf:
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
queued_event_collection_schema,
queued_event_schema,
)
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.sdk.definitions.asset import Asset
from airflow.utils import timezone
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
Expand Down Expand Up @@ -181,6 +181,7 @@ def _generate_queued_event_where_clause(
return where_clause


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
Expand All @@ -203,6 +204,7 @@ def get_dag_asset_queued_event(
return queued_event_schema.dump(queued_event)


@mark_fastapi_migration_done
@security.requires_access_asset("DELETE")
@security.requires_access_dag("GET")
@provide_session
Expand Down Expand Up @@ -272,6 +274,7 @@ def delete_dag_asset_queued_events(
)


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
def get_asset_queued_events(
Expand Down
3 changes: 3 additions & 0 deletions airflow/api_connexion/endpoints/config_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.api_connexion.schemas.config_schema import Config, ConfigOption, ConfigSection, config_schema
from airflow.configuration import conf
from airflow.settings import json
from airflow.utils.api_migration import mark_fastapi_migration_done

LINE_SEP = "\n" # `\n` cannot appear in f-strings

Expand Down Expand Up @@ -65,6 +66,7 @@ def _config_to_json(config: Config) -> str:
return json.dumps(config_schema.dump(config), indent=4)


@mark_fastapi_migration_done
@security.requires_access_configuration("GET")
def get_config(*, section: str | None = None) -> Response:
"""Get current configuration."""
Expand Down Expand Up @@ -102,6 +104,7 @@ def get_config(*, section: str | None = None) -> Response:
)


@mark_fastapi_migration_done
@security.requires_access_configuration("GET")
def get_value(*, section: str, option: str) -> Response:
serializer = {
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def _fetch_dag_runs(
return session.scalars(query.offset(offset).limit(limit)).all(), total_entries


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@format_parameters(
{
Expand Down
Loading

0 comments on commit dd2e70a

Please sign in to comment.