Skip to content

Commit

Permalink
Allow get_entity_subset_from_asset_graph_subsetto stil return EntityS…
Browse files Browse the repository at this point in the history
…ubsets for serialized AssetGraphSubsets when the underlying partitions definition has gotten larger

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Jan 31, 2025
1 parent 74f86c8 commit 14de39c
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ def get_entity_subset_from_asset_graph_subset(
self.asset_graph.has(key), f"Asset graph does not contain {key.to_user_string()}"
)

serializable_subset = asset_graph_subset.get_asset_subset(key, self.asset_graph)
check.invariant(
serializable_subset.is_compatible_with_partitions_def(
self._get_partitions_def(key),
),
f"Partitions definition for {key.to_user_string()} is not compatible with the passed in AssetGraphSubset",
serializable_subset = asset_graph_subset.get_asset_subset(
key, self.asset_graph
).with_asset_graph_partitions_def(
self._get_partitions_def(key),
self._temporal_context.effective_dt,
self._instance,
)

return EntitySubset(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from dataclasses import dataclass, replace
from datetime import datetime
from typing import Generic, Optional, Union

import dagster._check as check
from dagster._check.functions import CheckError
from dagster._core.definitions.asset_key import T_EntityKey
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition import (
AllPartitionsSubset,
PartitionsDefinition,
PartitionsSubset,
)
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsSubset
from dagster._core.definitions.time_window_partitions import (
TimeWindowPartitionsDefinition,
TimeWindowPartitionsSubset,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._serdes.serdes import DataclassSerializer, whitelist_for_serdes

EntitySubsetValue = Union[bool, PartitionsSubset]
Expand Down Expand Up @@ -66,6 +72,61 @@ def is_empty(self) -> bool:
else:
return not self.bool_value

def with_asset_graph_partitions_def(
self,
partitions_def: Optional[PartitionsDefinition],
current_time: Optional[datetime],
dynamic_partitions_store: Optional[DynamicPartitionsStore],
) -> "SerializableEntitySubset":
"""Used to transform a persisted SerializableEntitySubset (e.g. generated from backfill data)
into an up-to-date one based on the latest partitions information in the asset graph. Raises
an exception if the partitions in the asset graph are now totally incompatible (say, a
partitioned asset is now unpartitioned, or the subset references keys that no longer exist)
but adjusts the SerializableEntitySubset to reflect the latest version of the
PartitionsDefinition if it differs but is still valid (for example, the range of the
partitions have been extended and the subset is still valid.
"""
if self.is_partitioned:
check.invariant(
partitions_def is not None,
f"{self.key.to_user_string()} was partitioned when originally stored, but is no longer partitioned.",
)
if isinstance(self.value, AllPartitionsSubset):
check.invariant(
self.value.partitions_def == partitions_def,
f"Partitions definition for {self.key.to_user_string()} is no longer compatible with an AllPartitionsSubset",
)
return self
elif isinstance(self.value, TimeWindowPartitionsSubset):
current_partitions_def = self.value.partitions_def
if (
not isinstance(self.value, TimeWindowPartitionsSubset)
or not isinstance(partitions_def, TimeWindowPartitionsDefinition)
or current_partitions_def.timezone != partitions_def.timezone
or current_partitions_def.fmt != partitions_def.fmt
or current_partitions_def.cron_schedule != current_partitions_def.cron_schedule
):
raise CheckError(
f"Stored partitions definition for {self.key.to_user_string()} is no longer compatible with the latest partitions definition",
)
missing_subset = self.value - partitions_def.subset_with_all_partitions()
if not missing_subset.is_empty:
raise CheckError(
f"Stored partitions definition for {self.key.to_user_string()} includes partitions {missing_subset} that are no longer present",
)

return SerializableEntitySubset(
self.key, self.value.with_partitions_def(partitions_def)
)
else:
return self
else:
check.invariant(
partitions_def is None,
f"{self.key.to_user_string()} was un-partitioned when originally stored, but is now partitioned.",
)
return self

def is_compatible_with_partitions_def(
self, partitions_def: Optional[PartitionsDefinition]
) -> bool:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
from dagster import AssetDep, Definitions, IdentityPartitionMapping, asset
import pytest
from dagster import (
AssetDep,
DailyPartitionsDefinition,
Definitions,
IdentityPartitionMapping,
TimeWindow,
asset,
)
from dagster._check.functions import CheckError
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition_mapping import LastPartitionMapping, StaticPartitionMapping
from dagster._core.definitions.time_window_partitions import (
PersistedTimeWindow,
TimeWindowPartitionsSubset,
)
from dagster._core.instance import DagsterInstance
from dagster._time import create_datetime

Expand Down Expand Up @@ -67,6 +81,68 @@ def down_asset(): ...
assert required_but_nonexistent_subset.is_empty


def test_partitions_definition_validity_changes():
daily_partitions_def = DailyPartitionsDefinition(start_date="2022-01-01")

@asset(partitions_def=daily_partitions_def)
def asset0(): ...

defs = Definitions([asset0])

# partition subset that is still valid because it is a subset of the current one
old_partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")
old_partitions_subset = TimeWindowPartitionsSubset(
partitions_def=old_partitions_def,
num_partitions=None,
included_time_windows=[
PersistedTimeWindow.from_public_time_window(
TimeWindow(start=create_datetime(2023, 1, 1), end=create_datetime(2023, 3, 1)),
"UTC",
)
],
)
old_asset_graph_subset = AssetGraphSubset(
partitions_subsets_by_asset_key={asset0.key: old_partitions_subset}
)

with DagsterInstance.ephemeral() as instance:
asset_graph_view = AssetGraphView.for_test(defs, instance)

entity_subset = asset_graph_view.get_entity_subset_from_asset_graph_subset(
old_asset_graph_subset,
asset0.key,
)
assert (
entity_subset.get_internal_subset_value().included_time_windows
== old_partitions_subset.included_time_windows
)
assert entity_subset.get_internal_subset_value().partitions_def == daily_partitions_def

invalid_partitions_def = DailyPartitionsDefinition(start_date="2021-01-01")
invalid_partitions_subset = TimeWindowPartitionsSubset(
partitions_def=invalid_partitions_def,
num_partitions=None,
included_time_windows=[
PersistedTimeWindow.from_public_time_window(
TimeWindow(start=create_datetime(2021, 1, 1), end=create_datetime(2021, 3, 1)),
"UTC",
)
],
)
invalid_asset_graph_subset = AssetGraphSubset(
partitions_subsets_by_asset_key={asset0.key: invalid_partitions_subset}
)

with pytest.raises(
CheckError,
match="that are no longer present",
):
asset_graph_view.get_entity_subset_from_asset_graph_subset(
invalid_asset_graph_subset,
asset0.key,
)


def test_subset_traversal_static_partitions() -> None:
number_keys = {"1", "2", "3"}
letter_keys = {"a", "b", "c"}
Expand Down

0 comments on commit 14de39c

Please sign in to comment.