Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch #127

Merged
merged 29 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2beba0b
extend RelationConfig and MaterializationConfig
MichelleArk Mar 1, 2024
151b131
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 11, 2024
727093c
first pass
MichelleArk Mar 11, 2024
ffaf808
accept information schema in calculate_freshness_from_metadata_batch
MichelleArk Mar 11, 2024
b66644a
implement calculate_freshness_from_metadata in terms of calculate_fre…
MichelleArk Mar 11, 2024
54670e0
add TableLastModifiedMetadataBatch capability
MichelleArk Mar 11, 2024
8046a3e
return batched freshness results keyed by (schema, identifier)
MichelleArk Mar 13, 2024
1dbf3e9
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 13, 2024
2c28576
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 14, 2024
a9cffcd
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 22, 2024
9af8c85
handle queries across information_schema in calculate_freshness_from_…
MichelleArk Mar 25, 2024
82d8217
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 25, 2024
ed81529
refactor _create_freshness_response
MichelleArk Mar 25, 2024
c44779a
make schema_identifier_to_source type-safe
MichelleArk Mar 25, 2024
62be8df
update TableLastModifiedMetadataBatch description
MichelleArk Mar 25, 2024
9a6c829
changelog entry
MichelleArk Mar 25, 2024
17701ad
refactor to _get_catalog_relations_by_info_schema, _parse_freshness_row
MichelleArk Mar 28, 2024
511cf14
sanitize raw_relation for freshness batch calculation
MichelleArk Apr 2, 2024
01f27a5
ensure a connection is open if possible prior to executing macro
MichelleArk Apr 4, 2024
05c7149
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 4, 2024
154e066
fix agate typing
MichelleArk Apr 4, 2024
8c707cc
lazy load agate_helper
MichelleArk Apr 4, 2024
8a1deac
add needs_conn to BaseAdapter.execute_macro
MichelleArk Apr 4, 2024
f3dcac2
docstring
MichelleArk Apr 4, 2024
ebee880
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 5, 2024
02988c8
Merge branch 'main' into batch-metadata-freshness
colin-rogers-dbt Apr 11, 2024
0bbf7ed
cleanup adapter_responses parsing in calculate_freshness_from_metadat…
MichelleArk Apr 11, 2024
a43f04c
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 12, 2024
7cf501e
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240325-180611.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch
time: 2024-03-25T18:06:11.816163-04:00
custom:
Author: michelleark
Issue: "138"
99 changes: 77 additions & 22 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ def _get_cache_schemas(self, relation_configs: Iterable[RelationConfig]) -> Set[
populate.
"""
return {
self.Relation.create_from(quoting=self.config, relation_config=relation_config).without_identifier()
self.Relation.create_from(
quoting=self.config, relation_config=relation_config
).without_identifier()
for relation_config in relation_configs
}

Expand Down Expand Up @@ -1098,6 +1100,12 @@ def execute_macro(

macro_function = CallableMacroGenerator(macro, macro_context)

# A connection may or may not be required for a given macro execution
connection = self.connections.get_if_exists()
# If a connection exists, ensure it is open prior to executing a macro
if connection:
self.connections.open(connection)

with self.connections.exception_handler(f"macro {macro_name}"):
result = macro_function(**kwargs)
return result
Expand Down Expand Up @@ -1285,46 +1293,93 @@ def calculate_freshness(
}
return adapter_response, freshness

def calculate_freshness_from_metadata_batch(
self,
sources: List[BaseRelation],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]:
# Track schema, identifiers of sources for lookup from batch query
schema_identifier_to_source = {
(
source.path.get_lowered_part(ComponentName.Schema),
source.path.get_lowered_part(ComponentName.Identifier),
): source
for source in sources
}

# Group metadata sources by information schema -- one query per information schema will be necessary
sources_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = self._get_catalog_relations_by_info_schema(sources)

freshness_responses: Dict[BaseRelation, FreshnessResponse] = {}
adapter_responses: List[Optional[AdapterResponse]] = []
for (
information_schema,
sources_for_information_schema,
) in sources_by_info_schema.items():
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs={
"information_schema": information_schema,
"relations": sources_for_information_schema,
},
macro_resolver=macro_resolver,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
adapter_responses.append(adapter_response)

for row in table:
raw_relation, freshness_response = self._parse_freshness_row(row, table)
source_relation_for_result = schema_identifier_to_source[raw_relation]
freshness_responses[source_relation_for_result] = freshness_response

return adapter_responses, freshness_responses

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs: Dict[str, Any] = {
"information_schema": source.information_schema_only(),
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs=kwargs,
adapter_responses, freshness_responses = self.calculate_freshness_from_metadata_batch(
sources=[source],
macro_resolver=macro_resolver,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

try:
row = table[0]
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)
adapter_response = adapter_responses[0] if adapter_responses else None
return adapter_response, list(freshness_responses.values())[0]

if last_modified_val is None:
def _create_freshness_response(
self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]
) -> FreshnessResponse:
if last_modified is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")
max_loaded_at = _utc(last_modified, None, "last_modified")

snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at")
age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return adapter_response, freshness
return freshness

def _parse_freshness_row(self, row: agate.Row, table: agate.Table) -> Tuple[Any, FreshnessResponse]:
try:
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
identifier = get_column_value_uncased("identifier", row)
schema = get_column_value_uncased("schema", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

freshness_response = self._create_freshness_response(
last_modified_val,
snapshotted_at_val
)
raw_relation = schema.lower().strip(), identifier.lower().strip()
return raw_relation, freshness_response

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class Capability(str, Enum):
TableLastModifiedMetadata = "TableLastModifiedMetadata"
"""Indicates support for determining the time of the last table modification by querying database metadata."""

TableLastModifiedMetadataBatch = "TableLastModifiedMetadataBatch"
"""Indicates support for performantly determining the time of the last table modification by querying database metadata in batch."""


class Support(str, Enum):
Unknown = "Unknown"
Expand Down