Skip to content

Commit

Permalink
Fix ORM vs migration files inconsistencies (apache#44221)
Browse files Browse the repository at this point in the history
* Fix ORM vs migration files inconsistencies

There have been some inconsistences between ORM and migration files
but it doesn't fail in tests. This is an attempt to fix the inconsistency
and also have it fail in tests

* fix for mysql and postgres

* fixup! fix for mysql and postgres

* fix for sqlite

* fixup! fix for sqlite

* fixup! fixup! fix for sqlite

* use TIMESTAMP from db_types

* skip_archive should not delete _xcom_archive tables since that was created by migration

* fix conflicts

* fixup! fix conflicts

* drop _xcom_archive table if it exists

* use sql for dropping xcom_archive table

* fix conflicts

* remove added migration file and make it work in one file
  • Loading branch information
ephraimbuddy authored and Lefteris Gilmaz committed Jan 5, 2025
1 parent 38f9ceb commit 89b8b84
Show file tree
Hide file tree
Showing 16 changed files with 88 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def downgrade():
if conn.dialect.name == "mssql":
with op.batch_alter_table("log") as batch_op:
batch_op.drop_index("idx_log_event")
batch_op.alter_column("event", type_=sa.String(30), nullable=False)
batch_op.alter_column("event", type_=sa.String(30))
batch_op.create_index("idx_log_event", ["event"])
else:
with op.batch_alter_table("log") as batch_op:
batch_op.alter_column("event", type_=sa.String(30), nullable=False)
batch_op.alter_column("event", type_=sa.String(30))
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def upgrade():
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_constraint("task_instance_note_user_fkey", type_="foreignkey")

if op.get_bind().dialect.name == "mysql":
with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.drop_index("dag_run_note_user_fkey")

with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_index("task_instance_note_user_fkey")


def downgrade():
"""Unapply Drop ab_user.id foreign key."""
Expand All @@ -53,3 +60,10 @@ def downgrade():

with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.create_foreign_key("dag_run_note_user_fkey", "ab_user", ["user_id"], ["id"])

if op.get_bind().dialect.name == "mysql":
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.create_index("task_instance_note_user_fkey", ["user_id"], unique=False)

with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.create_index("dag_run_note_user_fkey", ["user_id"], unique=False)
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.db_types import TIMESTAMP

# revision identifiers, used by Alembic.
revision = "1cdc775ca98f"
down_revision = "a2c32e6c7729"
Expand All @@ -44,14 +45,24 @@

def upgrade():
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("execution_date", new_column_name="logical_date", existing_type=sa.TIMESTAMP)
batch_op.alter_column(
"execution_date",
new_column_name="logical_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
)
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique")


def downgrade():
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("logical_date", new_column_name="execution_date", existing_type=sa.TIMESTAMP)
batch_op.alter_column(
"logical_date",
new_column_name="execution_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
)
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.create_unique_constraint(
"dag_run_dag_id_execution_date_key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ def upgrade():
op.create_table(
"backfill",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("dag_id", sa.String(length=250), nullable=True),
sa.Column("dag_id", sa.String(length=250), nullable=False),
sa.Column("from_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
sa.Column("to_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=True),
sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=False),
sa.Column("is_paused", sa.Boolean(), nullable=True),
sa.Column("max_active_runs", sa.Integer(), nullable=False),
sa.Column("created_at", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
Expand All @@ -65,6 +65,10 @@ def upgrade():
sa.Column("sort_ordinal", sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("backfill_dag_run_pkey")),
sa.UniqueConstraint("backfill_id", "dag_run_id", name="ix_bdr_backfill_id_dag_run_id"),
sa.ForeignKeyConstraint(
["backfill_id"], ["backfill.id"], name="bdr_backfill_fkey", ondelete="cascade"
),
sa.ForeignKeyConstraint(["dag_run_id"], ["dag_run.id"], name="bdr_dag_run_fkey", ondelete="set null"),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def upgrade():
# Add 'name' column. Set it to nullable for now.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
batch_op.add_column(
sa.Column("group", _STRING_COLUMN_TYPE, default=str, server_default="", nullable=False)
)
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default="", nullable=False))
# Fill name from uri column.
with Session(bind=op.get_bind()) as session:
session.execute(sa.text("update dataset set name=uri"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
def upgrade():
"""Apply Add exception_reason and logical_date to BackfillDagRun."""
with op.batch_alter_table("backfill", schema=None) as batch_op:
batch_op.add_column(sa.Column("reprocess_behavior", sa.String(length=250), nullable=True))
batch_op.add_column(sa.Column("reprocess_behavior", sa.String(length=250), nullable=False))
with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("exception_reason", sa.String(length=250), nullable=True))
batch_op.add_column(sa.Column("logical_date", UtcDateTime(timezone=True), nullable=False))
Expand Down
23 changes: 5 additions & 18 deletions airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def upgrade():
columns=["dag_id"],
unique=False,
)

with op.batch_alter_table("dag_schedule_asset_alias_reference", schema=None) as batch_op:
batch_op.create_foreign_key(
constraint_name="dsaar_asset_alias_fkey",
referent_table="asset_alias",
Expand Down Expand Up @@ -284,14 +284,8 @@ def upgrade():
columns=["dag_id"],
unique=False,
)

batch_op.create_foreign_key(
constraint_name="toar_asset_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op:
batch_op.create_foreign_key("toar_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE")
batch_op.create_foreign_key(
constraint_name="toar_dag_id_fkey",
referent_table="dag",
Expand Down Expand Up @@ -320,14 +314,8 @@ def upgrade():
columns=["target_dag_id"],
unique=False,
)

batch_op.create_foreign_key(
constraint_name="adrq_asset_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op:
batch_op.create_foreign_key("adrq_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE")
batch_op.create_foreign_key(
constraint_name="adrq_dag_fkey",
referent_table="dag",
Expand Down Expand Up @@ -564,7 +552,6 @@ def downgrade():

with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as batch_op:
batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)

batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey")
batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,32 @@ def _get_type_id_column(dialect_name: str) -> sa.types.TypeEngine:
return sa.String(36)


def create_foreign_keys():
for fk in ti_fk_constraints:
if fk["table"] in ["task_instance_history", "task_map"]:
continue
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
for fk in ti_fk_constraints:
if fk["table"] not in ["task_instance_history", "task_map"]:
continue
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
onupdate="CASCADE",
)


def upgrade():
"""Add UUID primary key to task instance table."""
conn = op.get_bind()
Expand Down Expand Up @@ -232,15 +258,7 @@ def upgrade():
batch_op.create_primary_key("task_instance_pkey", ["id"])

# Create foreign key constraints
for fk in ti_fk_constraints:
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
create_foreign_keys()


def downgrade():
Expand Down Expand Up @@ -270,12 +288,4 @@ def downgrade():
batch_op.create_primary_key("task_instance_pkey", ti_fk_cols)

# Re-add foreign key constraints
for fk in ti_fk_constraints:
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
create_foreign_keys()
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,5 @@ def downgrade():

with op.batch_alter_table("xcom", schema=None) as batch_op:
batch_op.drop_column("value_old")

op.execute(sa.text("DROP TABLE IF EXISTS _xcom_archive"))
1 change: 0 additions & 1 deletion airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def import_all_models():
import airflow.models.serialized_dag
import airflow.models.taskinstancehistory
import airflow.models.tasklog
import airflow.providers.fab.auth_manager.models


def __getattr__(name):
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class AssetAliasModel(Base):
),
"mysql",
),
default=str,
default="",
nullable=False,
)

Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aa9e2e5b2a52af1e92bc876727ad5e8958e291315096fc5249a9afa2c21a5d06
b42b04b6cc47650cb9e7a37258a6e8e99bdca2677253715505b8ad287192bf72
4 changes: 3 additions & 1 deletion scripts/in_container/run_generate_migration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@

cd "${AIRFLOW_SOURCES}" || exit 1
cd "airflow" || exit 1
airflow db reset
airflow db reset -y
airflow db downgrade -n 2.10.3 -y
airflow db migrate -r heads
alembic revision --autogenerate -m "${@}"
2 changes: 2 additions & 0 deletions tests/utils/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self):
lambda t: (t[0] == "remove_table" and t[1].name == "sqlite_sequence"),
# fab version table
lambda t: (t[0] == "remove_table" and t[1].name == "alembic_version_fab"),
# Ignore _xcom_archive table
lambda t: (t[0] == "remove_table" and t[1].name == "_xcom_archive"),
]

for ignore in ignores:
Expand Down
4 changes: 3 additions & 1 deletion tests/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,13 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, e

@pytest.mark.parametrize(
"skip_archive, expected_archives",
[pytest.param(True, 0, id="skip_archive"), pytest.param(False, 1, id="do_archive")],
[pytest.param(True, 1, id="skip_archive"), pytest.param(False, 2, id="do_archive")],
)
def test__skip_archive(self, skip_archive, expected_archives):
"""
Verify that running cleanup_table with drops the archives when requested.
Archived tables from DB migration should be kept when skip_archive is True.
"""
base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone("UTC"))
num_tis = 10
Expand Down
5 changes: 5 additions & 0 deletions tests_common/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ def initial_db_init():
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_auth_manager import get_auth_manager

from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS

db.resetdb()
if AIRFLOW_V_3_0_PLUS:
db.downgrade(to_revision="5f2621c13b39")
db.upgradedb(to_revision="head")
_bootstrap_dagbag()
# minimal app to add roles
flask_app = Flask(__name__)
Expand Down

0 comments on commit 89b8b84

Please sign in to comment.