Skip to content

Commit

Permalink
refactor _create_freshness_response
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Mar 25, 2024
1 parent 82d8217 commit ed81529
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,25 +1330,11 @@ def calculate_freshness_from_metadata_batch(
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

if last_modified_val is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")

age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
freshness_response = self._create_freshness_response(last_modified_val, snapshotted_at_val)
source_relation_for_result = schema_identifier_to_source[
(schema.lower(), identifier.lower())
]
freshness_responses[source_relation_for_result] = freshness
freshness_responses[source_relation_for_result] = freshness_response

return adapter_response, freshness_responses

Expand All @@ -1359,11 +1345,27 @@ def calculate_freshness_from_metadata(
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
adapter_response, freshness_responses = self.calculate_freshness_from_metadata_batch(
sources=[source],
information_schema=source.information_schema_only(),
macro_resolver=macro_resolver,
)
return adapter_response, list(freshness_responses.values())[0]

def _create_freshness_response(self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]) -> FreshnessResponse:
if last_modified is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified, None, "last_modified")

snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at")
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return freshness

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
runs. The hook can assume it has a connection available.
Expand Down

0 comments on commit ed81529

Please sign in to comment.