Skip to content

Commit

Permalink
Implementation of metadata-based freshness (#1060)
Browse files Browse the repository at this point in the history
* changelog

* turn on metadata-based source freshness capability

* add boundary test to confirm get_table raises an error properly

* add metadata-based source freshness for a relation

* remove unnecessary test setup

* remove unnecessary fixture from test

* update from main
  • Loading branch information
mikealfare authored Feb 14, 2024
1 parent ea3abee commit 34eadae
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231218-155409.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for checking table-last-modified by metadata
time: 2023-12-18T15:54:09.69635-05:00
custom:
Author: mikealfare
Issue: "938"
31 changes: 31 additions & 0 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from datetime import datetime
import json
import threading
from multiprocessing.context import SpawnContext
Expand All @@ -20,9 +21,12 @@
SchemaSearchMap,
available,
)
from dbt.adapters.base.impl import FreshnessResponse
from dbt.adapters.cache import _make_ref_key_dict # type: ignore
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
import dbt_common.clients.agate_helper
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.contracts.macros import MacroResolverProtocol
from dbt_common.contracts.constraints import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint # type: ignore
from dbt_common.dataclass_schema import dbtClassMixin
from dbt.adapters.events.logging import AdapterLogger
Expand All @@ -36,6 +40,7 @@
import google.cloud.bigquery
from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable
import google.cloud.exceptions
import pytz

from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager
from dbt.adapters.bigquery.column import get_nested_column_data_types
Expand Down Expand Up @@ -118,6 +123,12 @@ class BigQueryAdapter(BaseAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED,
}

_capabilities: CapabilityDict = CapabilityDict(
{
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
}
)

def __init__(self, config, mp_context: SpawnContext) -> None:
super().__init__(config, mp_context)
self.connections: BigQueryConnectionManager = self.connections
Expand Down Expand Up @@ -709,6 +720,26 @@ def _get_catalog_schemas(self, relation_config: Iterable[RelationConfig]) -> Sch
)
return result

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
conn = self.connections.get_thread_connection()
client: google.cloud.bigquery.Client = conn.handle

table_ref = self.get_table_ref_from_relation(source)
table = client.get_table(table_ref)
snapshot = datetime.now(tz=pytz.UTC)

freshness = FreshnessResponse(
max_loaded_at=table.modified,
snapshotted_at=snapshot,
age=(snapshot - table.modified).total_seconds(),
)

return None, freshness

@available.parse(lambda *a, **k: {})
def get_common_options(
self, config: Dict[str, Any], node: Dict[str, Any], temporary: bool = False
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/bigquery/relation_configs/_materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
}

# optional
if "partition_by" in relation_config.config:
if relation_config.config and "partition_by" in relation_config.config:
config_dict.update({"partition": PartitionConfig.parse_model_node(relation_config)})

if "cluster_by" in relation_config.config:
if relation_config.config and "cluster_by" in relation_config.config:
config_dict.update(
{"cluster": BigQueryClusterConfig.parse_relation_config(relation_config)}
)
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/bigquery/relation_configs/_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]:
This doesn't currently collect `time_ingestion_partitioning` and `copy_partitions`
because this was built for materialized views, which do not support those settings.
"""
config_dict = relation_config.config.extra.get("partition_by") # type: ignore
config_dict: Dict[str, str] = relation_config.config.extra.get("partition_by") # type: ignore
if "time_ingestion_partitioning" in config_dict:
del config_dict["time_ingestion_partitioning"]
if "copy_partitions" in config_dict:
Expand Down
18 changes: 18 additions & 0 deletions tests/boundary/test_bigquery_sdk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pytest

from dbt.tests.util import get_connection
from google.cloud.bigquery import Client, DatasetReference, TableReference
from google.api_core.exceptions import NotFound


@pytest.mark.parametrize("table_name", ["this_table_does_not_exist"])
def test_get_table_does_not_exist(project, table_name):
"""
TODO: replace dbt project methods with direct connection instantiation
"""
with get_connection(project.adapter) as conn:
client: Client = conn.handle
dataset_ref = DatasetReference(project.database, project.test_schema)
table_ref = TableReference(dataset_ref, table_name)
with pytest.raises(NotFound):
client.get_table(table_ref)
23 changes: 23 additions & 0 deletions tests/functional/adapter/sources_freshness_tests/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
SCHEMA_YML = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_source
"""

SEED_TEST_SOURCE_CSV = """
id,name
1,Martin
2,Jeter
3,Ruth
4,Gehrig
5,DiMaggio
6,Torre
7,Mantle
8,Berra
9,Maris
""".strip()
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import pytest

from dbt.tests.util import run_dbt

from tests.functional.adapter.sources_freshness_tests import files


class TestGetLastRelationModified:
@pytest.fixture(scope="class")
def seeds(self):
return {"test_source.csv": files.SEED_TEST_SOURCE_CSV}

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": files.SCHEMA_YML}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
# we need the schema name for the sources section
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
run_dbt(["seed"])
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

def test_get_last_relation_modified(self, project):
results = run_dbt(["source", "freshness"])
assert len(results) == 1
result = results[0]
assert result.status == "pass"

0 comments on commit 34eadae

Please sign in to comment.