diff --git a/.changes/unreleased/Features-20231218-155409.yaml b/.changes/unreleased/Features-20231218-155409.yaml new file mode 100644 index 000000000..bc965b06f --- /dev/null +++ b/.changes/unreleased/Features-20231218-155409.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for checking table-last-modified by metadata +time: 2023-12-18T15:54:09.69635-05:00 +custom: + Author: mikealfare + Issue: "938" diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 3e394ad26..eb2aec175 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from datetime import datetime import json import threading from multiprocessing.context import SpawnContext @@ -20,9 +21,12 @@ SchemaSearchMap, available, ) +from dbt.adapters.base.impl import FreshnessResponse from dbt.adapters.cache import _make_ref_key_dict # type: ignore +from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability import dbt_common.clients.agate_helper from dbt.adapters.contracts.connection import AdapterResponse +from dbt.adapters.contracts.macros import MacroResolverProtocol from dbt_common.contracts.constraints import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint # type: ignore from dbt_common.dataclass_schema import dbtClassMixin from dbt.adapters.events.logging import AdapterLogger @@ -36,6 +40,7 @@ import google.cloud.bigquery from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable import google.cloud.exceptions +import pytz from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager from dbt.adapters.bigquery.column import get_nested_column_data_types @@ -118,6 +123,12 @@ class BigQueryAdapter(BaseAdapter): ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED, } + _capabilities: CapabilityDict = CapabilityDict( + { + Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + } + ) + def __init__(self, config, mp_context: SpawnContext) -> None: super().__init__(config, mp_context) self.connections: BigQueryConnectionManager = self.connections @@ -709,6 +720,26 @@ def _get_catalog_schemas(self, relation_config: Iterable[RelationConfig]) -> Sch ) return result + def calculate_freshness_from_metadata( + self, + source: BaseRelation, + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + conn = self.connections.get_thread_connection() + client: google.cloud.bigquery.Client = conn.handle + + table_ref = self.get_table_ref_from_relation(source) + table = client.get_table(table_ref) + snapshot = datetime.now(tz=pytz.UTC) + + freshness = FreshnessResponse( + max_loaded_at=table.modified, + snapshotted_at=snapshot, + age=(snapshot - table.modified).total_seconds(), + ) + + return None, freshness + @available.parse(lambda *a, **k: {}) def get_common_options( self, config: Dict[str, Any], node: Dict[str, Any], temporary: bool = False diff --git a/dbt/adapters/bigquery/relation_configs/_materialized_view.py b/dbt/adapters/bigquery/relation_configs/_materialized_view.py index fd0c191c3..81ca6b3de 100644 --- a/dbt/adapters/bigquery/relation_configs/_materialized_view.py +++ b/dbt/adapters/bigquery/relation_configs/_materialized_view.py @@ -75,10 +75,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any } # optional - if "partition_by" in relation_config.config: + if relation_config.config and "partition_by" in relation_config.config: config_dict.update({"partition": PartitionConfig.parse_model_node(relation_config)}) - if "cluster_by" in relation_config.config: + if relation_config.config and "cluster_by" in relation_config.config: config_dict.update( {"cluster": BigQueryClusterConfig.parse_relation_config(relation_config)} ) diff --git a/dbt/adapters/bigquery/relation_configs/_partition.py b/dbt/adapters/bigquery/relation_configs/_partition.py index 8fe8bf5d6..97e695fc0 100644 --- a/dbt/adapters/bigquery/relation_configs/_partition.py +++ b/dbt/adapters/bigquery/relation_configs/_partition.py @@ -111,7 +111,7 @@ def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]: This doesn't currently collect `time_ingestion_partitioning` and `copy_partitions` because this was built for materialized views, which do not support those settings. """ - config_dict = relation_config.config.extra.get("partition_by") # type: ignore + config_dict: Dict[str, str] = relation_config.config.extra.get("partition_by") # type: ignore if "time_ingestion_partitioning" in config_dict: del config_dict["time_ingestion_partitioning"] if "copy_partitions" in config_dict: diff --git a/tests/boundary/test_bigquery_sdk.py b/tests/boundary/test_bigquery_sdk.py new file mode 100644 index 000000000..b8e6c9995 --- /dev/null +++ b/tests/boundary/test_bigquery_sdk.py @@ -0,0 +1,18 @@ +import pytest + +from dbt.tests.util import get_connection +from google.cloud.bigquery import Client, DatasetReference, TableReference +from google.api_core.exceptions import NotFound + + +@pytest.mark.parametrize("table_name", ["this_table_does_not_exist"]) +def test_get_table_does_not_exist(project, table_name): + """ + TODO: replace dbt project methods with direct connection instantiation + """ + with get_connection(project.adapter) as conn: + client: Client = conn.handle + dataset_ref = DatasetReference(project.database, project.test_schema) + table_ref = TableReference(dataset_ref, table_name) + with pytest.raises(NotFound): + client.get_table(table_ref) diff --git a/tests/functional/adapter/sources_freshness_tests/files.py b/tests/functional/adapter/sources_freshness_tests/files.py new file mode 100644 index 000000000..eaca96648 --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/files.py @@ -0,0 +1,23 @@ +SCHEMA_YML = """version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_source +""" + +SEED_TEST_SOURCE_CSV = """ +id,name +1,Martin +2,Jeter +3,Ruth +4,Gehrig +5,DiMaggio +6,Torre +7,Mantle +8,Berra +9,Maris +""".strip() diff --git a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py new file mode 100644 index 000000000..08e263edb --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py @@ -0,0 +1,30 @@ +import os +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.adapter.sources_freshness_tests import files + + +class TestGetLastRelationModified: + @pytest.fixture(scope="class") + def seeds(self): + return {"test_source.csv": files.SEED_TEST_SOURCE_CSV} + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": files.SCHEMA_YML} + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + # we need the schema name for the sources section + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + run_dbt(["seed"]) + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + def test_get_last_relation_modified(self, project): + results = run_dbt(["source", "freshness"]) + assert len(results) == 1 + result = results[0] + assert result.status == "pass"