Skip to content

Commit

Permalink
Allow get_entity_subset_from_asset_graph_subsetto stil return EntityS…
Browse files Browse the repository at this point in the history
…ubsets for serialized AssetGraphSubsets when the underlying partitions definition has gotten larger

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Feb 3, 2025
1 parent 23aeed2 commit 68e2d51
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)

from dagster import _check as check
from dagster._check.functions import CheckError
from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
Expand All @@ -27,6 +28,7 @@
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
TimeWindowPartitionsSubset,
get_time_partitions_def,
)
from dagster._core.loader import LoadingContext
Expand Down Expand Up @@ -198,18 +200,72 @@ def get_entity_subset_from_asset_graph_subset(
self.asset_graph.has(key), f"Asset graph does not contain {key.to_user_string()}"
)

serializable_subset = asset_graph_subset.get_asset_subset(key, self.asset_graph)
check.invariant(
serializable_subset.is_compatible_with_partitions_def(
self._get_partitions_def(key),
),
f"Partitions definition for {key.to_user_string()} is not compatible with the passed in AssetGraphSubset",
serializable_subset = self._with_current_partitions_def(
asset_graph_subset.get_asset_subset(key, self.asset_graph)
)

return EntitySubset(
self, key=key, value=_ValidatedEntitySubsetValue(serializable_subset.value)
)

def _with_current_partitions_def(
self, serializable_subset: SerializableEntitySubset
) -> SerializableEntitySubset:
"""Used to transform a persisted SerializableEntitySubset (e.g. generated from backfill data)
into an up-to-date one based on the latest partitions information in the asset graph. Raises
an exception if the partitions in the asset graph are now totally incompatible (say, a
partitioned asset is now unpartitioned, or the subset references keys that no longer exist)
but adjusts the SerializableEntitySubset to reflect the latest version of the
PartitionsDefinition if it differs but is still valid (for example, the range of the
partitions have been extended and the subset is still valid.
"""
current_partitions_def = self._get_partitions_def(serializable_subset.key)
key = serializable_subset.key
value = serializable_subset.value
if serializable_subset.is_partitioned:
check.invariant(
current_partitions_def is not None,
f"{key.to_user_string()} was partitioned when originally stored, but is no longer partitioned.",
)
if isinstance(value, AllPartitionsSubset):
check.invariant(
value.partitions_def == current_partitions_def,
f"Partitions definition for {key.to_user_string()} has an AllPartitionsSubset and no longer matches the stored partitions definition",
)
return serializable_subset
elif isinstance(value, TimeWindowPartitionsSubset):
serialized_partitions_def = value.partitions_def
if (
not isinstance(current_partitions_def, TimeWindowPartitionsDefinition)
or serialized_partitions_def.timezone != current_partitions_def.timezone
or serialized_partitions_def.fmt != current_partitions_def.fmt
or serialized_partitions_def.cron_schedule
!= current_partitions_def.cron_schedule
):
raise CheckError(
f"Stored partitions definition for {key.to_user_string()} is no longer compatible with the latest partitions definition",
)
missing_subset = value - current_partitions_def.subset_with_all_partitions(
self._temporal_context.effective_dt,
self._instance,
)
if not missing_subset.is_empty:
raise CheckError(
f"Stored partitions definition for {key.to_user_string()} includes partitions {missing_subset} that are no longer present",
)

return SerializableEntitySubset(
key, value.with_partitions_def(current_partitions_def)
)
else:
return serializable_subset
else:
check.invariant(
current_partitions_def is None,
f"{key.to_user_string()} was un-partitioned when originally stored, but is now partitioned.",
)
return serializable_subset

def iterate_asset_subsets(
self, asset_graph_subset: AssetGraphSubset
) -> Iterable[EntitySubset[AssetKey]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
from dagster import AssetDep, Definitions, IdentityPartitionMapping, asset
from typing import cast

import pytest
from dagster import (
AssetDep,
DailyPartitionsDefinition,
Definitions,
IdentityPartitionMapping,
TimeWindow,
asset,
)
from dagster._check.functions import CheckError
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition_mapping import LastPartitionMapping, StaticPartitionMapping
from dagster._core.definitions.time_window_partitions import (
PersistedTimeWindow,
TimeWindowPartitionsSubset,
)
from dagster._core.instance import DagsterInstance
from dagster._time import create_datetime

Expand Down Expand Up @@ -67,6 +83,67 @@ def down_asset(): ...
assert required_but_nonexistent_subset.is_empty


def test_partitions_definition_validity_changes():
daily_partitions_def = DailyPartitionsDefinition(start_date="2022-01-01")

@asset(partitions_def=daily_partitions_def)
def asset0(): ...

defs = Definitions([asset0])

# partition subset that is still valid because it is a subset of the current one
old_partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")
old_partitions_subset = TimeWindowPartitionsSubset(
partitions_def=old_partitions_def,
num_partitions=None,
included_time_windows=[
PersistedTimeWindow.from_public_time_window(
TimeWindow(start=create_datetime(2023, 1, 1), end=create_datetime(2023, 3, 1)),
"UTC",
)
],
)
old_asset_graph_subset = AssetGraphSubset(
partitions_subsets_by_asset_key={asset0.key: old_partitions_subset}
)

with DagsterInstance.ephemeral() as instance:
asset_graph_view = AssetGraphView.for_test(defs, instance)

entity_subset = asset_graph_view.get_entity_subset_from_asset_graph_subset(
old_asset_graph_subset,
asset0.key,
)

subset = cast(TimeWindowPartitionsSubset, entity_subset.get_internal_subset_value())
assert subset.included_time_windows == old_partitions_subset.included_time_windows
assert subset.partitions_def == daily_partitions_def

invalid_partitions_def = DailyPartitionsDefinition(start_date="2021-01-01")
invalid_partitions_subset = TimeWindowPartitionsSubset(
partitions_def=invalid_partitions_def,
num_partitions=None,
included_time_windows=[
PersistedTimeWindow.from_public_time_window(
TimeWindow(start=create_datetime(2021, 1, 1), end=create_datetime(2021, 3, 1)),
"UTC",
)
],
)
invalid_asset_graph_subset = AssetGraphSubset(
partitions_subsets_by_asset_key={asset0.key: invalid_partitions_subset}
)

with pytest.raises(
CheckError,
match="that are no longer present",
):
asset_graph_view.get_entity_subset_from_asset_graph_subset(
invalid_asset_graph_subset,
asset0.key,
)


def test_subset_traversal_static_partitions() -> None:
number_keys = {"1", "2", "3"}
letter_keys = {"a", "b", "c"}
Expand Down

0 comments on commit 68e2d51

Please sign in to comment.