Skip to content

Commit

Permalink
Update dag reserialize command (apache#43949)
Browse files Browse the repository at this point in the history
* Update dag reserialize command

Now that we have versioning, users must be sure they want to use the
dags reserialize command, as it deletes dag history.

I updated the command to ensure users know it will delete DAG history
and answer yes for it to continue. The DagVersion is deleted and since
it has foreignkey to SerializedDagModel and DagCode, those will also
get deleted.

Also updated the _reserialize function at DB upgrade so that it doesn't
delete the serializedDag since that won't be necessary

Updated the test to use session fixture instead of create_session

* add --clear-history to dag reserialize command

* Add news fragment item

* Remove clear-history

* fix test

* fixup! fix test

* Update newsfragments/43949.significant.rst
  • Loading branch information
ephraimbuddy authored Nov 26, 2024
1 parent f8a61cb commit dc86801
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 44 deletions.
16 changes: 4 additions & 12 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,13 +967,6 @@ def string_lower_type(val):
help="The maximum number of triggers that a Triggerer will run at one time.",
)

# reserialize
ARG_CLEAR_ONLY = Arg(
("--clear-only",),
action="store_true",
help="If passed, serialized DAGs will be cleared but not reserialized.",
)

ARG_DAG_LIST_COLUMNS = Arg(
("--columns",),
type=string_list_type,
Expand Down Expand Up @@ -1243,15 +1236,14 @@ class GroupCommand(NamedTuple):
),
ActionCommand(
name="reserialize",
help="Reserialize all DAGs by parsing the DagBag files",
help="Reserialize DAGs by parsing the DagBag files",
description=(
"Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized "
"from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the "
"version of Airflow that you are running."
"Reserialize DAGs in the metadata DB. This can be "
"particularly useful if your serialized DAGs become out of sync with the Airflow "
"version you are using."
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"),
args=(
ARG_CLEAR_ONLY,
ARG_SUBDIR,
ARG_VERBOSE,
),
Expand Down
9 changes: 3 additions & 6 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from typing import TYPE_CHECKING

import re2
from sqlalchemy import delete, select
from sqlalchemy import select

from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
Expand Down Expand Up @@ -537,8 +537,5 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No
@provide_session
def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
"""Serialize a DAG instance."""
session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False))

if not args.clear_only:
dagbag = DagBag(process_subdir(args.subdir))
dagbag.sync_to_db(session=session)
dagbag = DagBag(process_subdir(args.subdir))
dagbag.sync_to_db(session=session)
3 changes: 0 additions & 3 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import attrs
from sqlalchemy import (
Table,
delete,
exc,
func,
inspect,
Expand Down Expand Up @@ -924,9 +923,7 @@ def check_and_run_migrations():

def _reserialize_dags(*, session: Session) -> None:
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel

session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False))
dagbag = DagBag(collect_dags=False)
dagbag.collect_dags(only_if_updated=False)
dagbag.sync_to_db(session=session)
Expand Down
5 changes: 5 additions & 0 deletions newsfragments/43949.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The ``--clear-only`` option of ``airflow dags reserialize`` command is now removed.

The ``--clear-only`` option was added to clear the serialized DAGs without reserializing them.
This option has been removed as it is no longer needed. We have implemented DAG versioning and can
no longer delete serialized dag without going through ``airflow db-clean`` command. This command is now only for reserializing DAGs.
43 changes: 20 additions & 23 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from airflow.models import DagBag, DagModel, DagRun
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import _run_inline_trigger
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.triggers.base import TriggerEvent
Expand Down Expand Up @@ -79,34 +80,31 @@ def teardown_class(cls) -> None:
def setup_method(self):
clear_db_runs() # clean-up all dag run before start each test

def test_reserialize(self):
def test_reserialize(self, session):
# Assert that there are serialized Dags
with create_session() as session:
serialized_dags_before_command = session.query(SerializedDagModel).all()
serialized_dags_before_command = session.query(SerializedDagModel).all()
assert len(serialized_dags_before_command) # There are serialized DAGs to delete

# Run clear of serialized dags
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"]))
# Assert no serialized Dags
with create_session() as session:
serialized_dags_after_clear = session.query(SerializedDagModel).all()
assert not len(serialized_dags_after_clear)

# Serialize manually
# delete all versioning
session.query(DagVersion).delete()

serialized_dags_before_command = session.query(SerializedDagModel).all()
assert not len(serialized_dags_before_command) # There are no more serialized dag
dag_version_before_command = session.query(DagVersion).all()
assert not len(dag_version_before_command)
# Serialize the dags
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize"]))
# Assert serialized Dags
serialized_dags_after_clear = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_clear)
dag_version_after_command = session.query(DagVersion).all()
assert len(dag_version_after_command)

# Check serialized DAGs are back
with create_session() as session:
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back

def test_reserialize_should_support_subdir_argument(self):
def test_reserialize_should_support_subdir_argument(self, session):
# Run clear of serialized dags
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"]))
session.query(DagVersion).delete()

# Assert no serialized Dags
with create_session() as session:
serialized_dags_after_clear = session.query(SerializedDagModel).all()
serialized_dags_after_clear = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_clear) == 0

# Serialize manually
Expand All @@ -120,8 +118,7 @@ def test_reserialize_should_support_subdir_argument(self):
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--subdir", dag_path]))

# Check serialized DAG are back
with create_session() as session:
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_reserialize) == 1 # Serialized DAG back

def test_show_dag_dependencies_print(self):
Expand Down

0 comments on commit dc86801

Please sign in to comment.