Skip to content

Commit

Permalink
Rename latest bundle version to bundle version (#45719)
Browse files Browse the repository at this point in the history
Why .... well because it might not always be the version of that bundle.  It's the bundle version that was seen when this dag was last processed, which, could be stale.

The thing is, for the current dag (as represented by this model), there is only ever one bundle version -- the bundle version that was seen when the dag was parsed.  So I think by simply calling it bundle version, we are less likely to confuse.
  • Loading branch information
dstandish authored Jan 16, 2025
1 parent c71bd78 commit d3fc6c4
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 52 deletions.
2 changes: 1 addition & 1 deletion airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def update_dags(
dm.timetable_description = dag.timetable.description
dm.asset_expression = dag.timetable.asset_condition.as_expression()
dm.bundle_name = self.bundle_name
dm.latest_bundle_version = self.bundle_version
dm.bundle_version = self.bundle_version

last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id)
if last_automated_run is None:
Expand Down
4 changes: 2 additions & 2 deletions airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def upgrade():
)
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True))
batch_op.add_column(sa.Column("latest_bundle_version", sa.String(length=200), nullable=True))
batch_op.add_column(sa.Column("bundle_version", sa.String(length=200), nullable=True))
batch_op.create_foreign_key(
batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"]
)
Expand All @@ -63,7 +63,7 @@ def upgrade():
def downgrade():
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey")
batch_op.drop_column("latest_bundle_version")
batch_op.drop_column("bundle_version")
batch_op.drop_column("bundle_name")
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_column("bundle_version")
Expand Down
16 changes: 7 additions & 9 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,7 @@ def _create_orm_dagrun(
data_interval=data_interval,
triggered_by=triggered_by,
backfill_id=backfill_id,
bundle_version=session.scalar(
select(DagModel.latest_bundle_version).where(DagModel.dag_id == dag.dag_id)
),
bundle_version=session.scalar(select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id)),
)
# Load defaults into the following two fields to ensure result can be serialized detached
run.log_template_id = int(session.scalar(select(func.max(LogTemplate.__table__.c.id))))
Expand Down Expand Up @@ -779,9 +777,9 @@ def get_bundle_name(self, session=NEW_SESSION) -> str | None:
return session.scalar(select(DagModel.bundle_name).where(DagModel.dag_id == self.dag_id))

@provide_session
def get_latest_bundle_version(self, session=NEW_SESSION) -> str | None:
"""Return the latest version of the bundle this DAG is in."""
return session.scalar(select(DagModel.latest_bundle_version).where(DagModel.dag_id == self.dag_id))
def get_bundle_version(self, session=NEW_SESSION) -> str | None:
"""Return the bundle version that was seen when this dag was processed."""
return session.scalar(select(DagModel.bundle_version).where(DagModel.dag_id == self.dag_id))

@methodtools.lru_cache(maxsize=None)
@classmethod
Expand Down Expand Up @@ -1868,7 +1866,7 @@ def sync_to_db(self, session=NEW_SESSION):
"""
# TODO: AIP-66 should this be in the model?
bundle_name = self.get_bundle_name(session=session)
bundle_version = self.get_latest_bundle_version(session=session)
bundle_version = self.get_bundle_version(session=session)
self.bulk_write_to_db(bundle_name, bundle_version, [self], session=session)

def get_default_view(self):
Expand Down Expand Up @@ -2045,8 +2043,8 @@ class DagModel(Base):
# associated zip.
fileloc = Column(String(2000))
bundle_name = Column(StringID(), ForeignKey("dag_bundle.name"), nullable=True)
# The version of the bundle the last time the DAG was parsed
latest_bundle_version = Column(String(200), nullable=True)
# The version of the bundle the last time the DAG was processed
bundle_version = Column(String(200), nullable=True)
# String representing the owners
owners = Column(String(2000))
# Display name of the dag
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
32350c3c7d1dd29eca4205458eab946ece6628f7f53d30c4e0a8f1ee914f1372
a877009126cad78bdd6336ac298e42d76dbb29dc88d5ecb9e5344f95dfe9c2b7
68 changes: 34 additions & 34 deletions docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion tests/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,4 +537,4 @@ def test_bundle_name_and_version_are_stored(self, testing_dag_bundle, session):
update_dag_parsing_results_in_db("testing", "1.0", [self.dag_to_lazy_serdag(dag)], {}, set(), session)
orm_dag = session.get(DagModel, "mydag")
assert orm_dag.bundle_name == "testing"
assert orm_dag.latest_bundle_version == "1.0"
assert orm_dag.bundle_version == "1.0"
8 changes: 4 additions & 4 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2104,17 +2104,17 @@ def test_get_bundle_name(self, testing_dag_bundle):
DAG.bulk_write_to_db("testing", None, [dag])
assert dag.get_bundle_name() == "testing"

def test_get_latest_bundle_version(self, testing_dag_bundle):
def test_get_bundle_version(self, testing_dag_bundle):
dag = DAG("dag")

# until we've sycned, it'll be None
assert dag.get_latest_bundle_version() is None
assert dag.get_bundle_version() is None

# Now, it can be none or a str
DAG.bulk_write_to_db("testing", None, [dag])
assert dag.get_latest_bundle_version() is None
assert dag.get_bundle_version() is None
DAG.bulk_write_to_db("testing", "abc", [dag])
assert dag.get_latest_bundle_version() == "abc"
assert dag.get_bundle_version() == "abc"


class TestDagModel:
Expand Down

0 comments on commit d3fc6c4

Please sign in to comment.