Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ORM vs migration files inconsistencies #44221

Merged
merged 14 commits into from
Dec 1, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def downgrade():
if conn.dialect.name == "mssql":
with op.batch_alter_table("log") as batch_op:
batch_op.drop_index("idx_log_event")
batch_op.alter_column("event", type_=sa.String(30), nullable=False)
batch_op.alter_column("event", type_=sa.String(30))
batch_op.create_index("idx_log_event", ["event"])
else:
with op.batch_alter_table("log") as batch_op:
batch_op.alter_column("event", type_=sa.String(30), nullable=False)
batch_op.alter_column("event", type_=sa.String(30))
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def upgrade():
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_constraint("task_instance_note_user_fkey", type_="foreignkey")

if op.get_bind().dialect.name == "mysql":
with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.drop_index("dag_run_note_user_fkey")

with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_index("task_instance_note_user_fkey")


def downgrade():
"""Unapply Drop ab_user.id foreign key."""
Expand All @@ -53,3 +60,10 @@ def downgrade():

with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.create_foreign_key("dag_run_note_user_fkey", "ab_user", ["user_id"], ["id"])

if op.get_bind().dialect.name == "mysql":
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.create_index("task_instance_note_user_fkey", ["user_id"], unique=False)

with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.create_index("dag_run_note_user_fkey", ["user_id"], unique=False)
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.db_types import TIMESTAMP

# revision identifiers, used by Alembic.
revision = "1cdc775ca98f"
down_revision = "a2c32e6c7729"
Expand All @@ -44,14 +45,24 @@

def upgrade():
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("execution_date", new_column_name="logical_date", existing_type=sa.TIMESTAMP)
batch_op.alter_column(
"execution_date",
new_column_name="logical_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
)
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique")


def downgrade():
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("logical_date", new_column_name="execution_date", existing_type=sa.TIMESTAMP)
batch_op.alter_column(
"logical_date",
new_column_name="execution_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
)
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.create_unique_constraint(
"dag_run_dag_id_execution_date_key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ def upgrade():
op.create_table(
"backfill",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("dag_id", sa.String(length=250), nullable=True),
sa.Column("dag_id", sa.String(length=250), nullable=False),
sa.Column("from_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
sa.Column("to_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=True),
sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=False),
sa.Column("is_paused", sa.Boolean(), nullable=True),
sa.Column("max_active_runs", sa.Integer(), nullable=False),
sa.Column("created_at", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
Expand All @@ -65,6 +65,10 @@ def upgrade():
sa.Column("sort_ordinal", sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("backfill_dag_run_pkey")),
sa.UniqueConstraint("backfill_id", "dag_run_id", name="ix_bdr_backfill_id_dag_run_id"),
sa.ForeignKeyConstraint(
["backfill_id"], ["backfill.id"], name="bdr_backfill_fkey", ondelete="cascade"
),
sa.ForeignKeyConstraint(["dag_run_id"], ["dag_run.id"], name="bdr_dag_run_fkey", ondelete="set null"),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def upgrade():
# Add 'name' column. Set it to nullable for now.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
batch_op.add_column(
sa.Column("group", _STRING_COLUMN_TYPE, default=str, server_default="", nullable=False)
)
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default="", nullable=False))
# Fill name from uri column.
with Session(bind=op.get_bind()) as session:
session.execute(sa.text("update dataset set name=uri"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
def upgrade():
"""Apply Add exception_reason and logical_date to BackfillDagRun."""
with op.batch_alter_table("backfill", schema=None) as batch_op:
batch_op.add_column(sa.Column("reprocess_behavior", sa.String(length=250), nullable=True))
batch_op.add_column(sa.Column("reprocess_behavior", sa.String(length=250), nullable=False))
with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("exception_reason", sa.String(length=250), nullable=True))
batch_op.add_column(sa.Column("logical_date", UtcDateTime(timezone=True), nullable=False))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def upgrade():
columns=["dag_id"],
unique=False,
)

with op.batch_alter_table("dag_schedule_asset_alias_reference", schema=None) as batch_op:
batch_op.create_foreign_key(
constraint_name="dsaar_asset_alias_fkey",
referent_table="asset_alias",
Expand Down Expand Up @@ -284,14 +284,8 @@ def upgrade():
columns=["dag_id"],
unique=False,
)

batch_op.create_foreign_key(
constraint_name="toar_asset_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op:
batch_op.create_foreign_key("toar_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE")
batch_op.create_foreign_key(
constraint_name="toar_dag_id_fkey",
referent_table="dag",
Expand Down Expand Up @@ -320,14 +314,8 @@ def upgrade():
columns=["target_dag_id"],
unique=False,
)

batch_op.create_foreign_key(
constraint_name="adrq_asset_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op:
batch_op.create_foreign_key("adrq_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE")
batch_op.create_foreign_key(
constraint_name="adrq_dag_fkey",
referent_table="dag",
Expand Down Expand Up @@ -564,7 +552,6 @@ def downgrade():

with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as batch_op:
batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)

batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey")
batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,32 @@ def _get_type_id_column(dialect_name: str) -> sa.types.TypeEngine:
return sa.String(36)


def create_foreign_keys():
for fk in ti_fk_constraints:
if fk["table"] in ["task_instance_history", "task_map"]:
continue
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
for fk in ti_fk_constraints:
if fk["table"] not in ["task_instance_history", "task_map"]:
continue
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
onupdate="CASCADE",
)


def upgrade():
"""Add UUID primary key to task instance table."""
conn = op.get_bind()
Expand Down Expand Up @@ -232,15 +258,7 @@ def upgrade():
batch_op.create_primary_key("task_instance_pkey", ["id"])

# Create foreign key constraints
for fk in ti_fk_constraints:
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
create_foreign_keys()


def downgrade():
Expand Down Expand Up @@ -270,12 +288,4 @@ def downgrade():
batch_op.create_primary_key("task_instance_pkey", ti_fk_cols)

# Re-add foreign key constraints
for fk in ti_fk_constraints:
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
create_foreign_keys()
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,5 @@ def downgrade():

with op.batch_alter_table("xcom", schema=None) as batch_op:
batch_op.drop_column("value_old")

op.execute(sa.text("DROP TABLE IF EXISTS _xcom_archive"))
1 change: 0 additions & 1 deletion airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def import_all_models():
import airflow.models.serialized_dag
import airflow.models.taskinstancehistory
import airflow.models.tasklog
import airflow.providers.fab.auth_manager.models
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved


def __getattr__(name):
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class AssetAliasModel(Base):
),
"mysql",
),
default=str,
default="",
nullable=False,
)

Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aa9e2e5b2a52af1e92bc876727ad5e8958e291315096fc5249a9afa2c21a5d06
b42b04b6cc47650cb9e7a37258a6e8e99bdca2677253715505b8ad287192bf72
4 changes: 3 additions & 1 deletion scripts/in_container/run_generate_migration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@

cd "${AIRFLOW_SOURCES}" || exit 1
cd "airflow" || exit 1
airflow db reset
airflow db reset -y
airflow db downgrade -n 2.10.3 -y
airflow db migrate -r heads
alembic revision --autogenerate -m "${@}"
2 changes: 2 additions & 0 deletions tests/utils/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self):
lambda t: (t[0] == "remove_table" and t[1].name == "sqlite_sequence"),
# fab version table
lambda t: (t[0] == "remove_table" and t[1].name == "alembic_version_fab"),
# Ignore _xcom_archive table
lambda t: (t[0] == "remove_table" and t[1].name == "_xcom_archive"),
]

for ignore in ignores:
Expand Down
4 changes: 3 additions & 1 deletion tests/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,13 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, e

@pytest.mark.parametrize(
"skip_archive, expected_archives",
[pytest.param(True, 0, id="skip_archive"), pytest.param(False, 1, id="do_archive")],
[pytest.param(True, 1, id="skip_archive"), pytest.param(False, 2, id="do_archive")],
)
def test__skip_archive(self, skip_archive, expected_archives):
"""
Verify that running cleanup_table with drops the archives when requested.

Archived tables from DB migration should be kept when skip_archive is True.
"""
base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone("UTC"))
num_tis = 10
Expand Down
5 changes: 5 additions & 0 deletions tests_common/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ def initial_db_init():
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_auth_manager import get_auth_manager

from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS

db.resetdb()
if AIRFLOW_V_3_0_PLUS:
db.downgrade(to_revision="5f2621c13b39")
db.upgradedb(to_revision="head")
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
_bootstrap_dagbag()
# minimal app to add roles
flask_app = Flask(__name__)
Expand Down