From 0816675ca41020868e4a991fe311174d41c47441 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 28 Mar 2024 14:32:27 -0700 Subject: [PATCH 1/7] TableLastModifiedMetadataBatch capability --- dbt/adapters/redshift/impl.py | 1 + dev-requirements.txt | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index a77601895..18faee48c 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -58,6 +58,7 @@ class RedshiftAdapter(SQLAdapter): { Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full), Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full), } ) diff --git a/dev-requirements.txt b/dev-requirements.txt index 6d29276cf..ffa079f72 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,8 +1,8 @@ # install latest changes in dbt-core + dbt-postgres -git+https://github.com/dbt-labs/dbt-adapters.git -git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter +git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness +git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git -git+https://github.com/dbt-labs/dbt-core.git#subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@batch-metadata-freshness#subdirectory=core git+https://github.com/dbt-labs/dbt-postgres.git # if version 1.x or greater -> pin to major version From b4b5876a0aff7c1a16ff1bc63197ab5a02f337ee Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 28 Mar 2024 16:46:52 -0700 Subject: [PATCH 2/7] pin ddtrace --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index ffa079f72..08d978ea5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -10,7 +10,7 @@ git+https://github.com/dbt-labs/dbt-postgres.git black~=23.12 bumpversion~=0.6.0 click~=8.1 -ddtrace~=2.3 +ddtrace==2.3.0 flake8~=6.1 flaky~=3.7 freezegun~=1.3 From 9906c69a3b3f02263b73cccbe9a20b242605ab82 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 2 Apr 2024 16:04:17 -0700 Subject: [PATCH 3/7] open connection before calculate_freshness_from_metadata_batch --- dbt/adapters/redshift/impl.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index 18faee48c..ea38d38c7 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -1,12 +1,14 @@ import os from dataclasses import dataclass from dbt_common.contracts.constraints import ConstraintType -from typing import Optional, Set, Any, Dict, Type +from typing import Optional, Set, Any, Dict, Type, List, Tuple from collections import namedtuple from dbt.adapters.base import PythonJobHelper -from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport +from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport, FreshnessResponse from dbt.adapters.base.meta import available +from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support +from dbt.adapters.contracts.macros import MacroResolverProtocol from dbt.adapters.sql import SQLAdapter from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.events.logging import AdapterLogger @@ -184,3 +186,12 @@ def generate_python_submission_response(self, submission_result: Any) -> Adapter def debug_query(self): """Override for DebugTask method""" self.execute("select 1 as id") + + def calculate_freshness_from_metadata_batch( + self, + sources: List[BaseRelation], + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]: + conn = self.connections.get_if_exists() + self.connections.open(conn) + return super().calculate_freshness_from_metadata_batch(sources, macro_resolver) From e3f0931a8562912425529d13f7f2ec754d585efa Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 3 Apr 2024 17:05:42 -0700 Subject: [PATCH 4/7] remove calculate_freshness_from_metadata_batch override --- dbt/adapters/redshift/impl.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index ea38d38c7..18faee48c 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -1,14 +1,12 @@ import os from dataclasses import dataclass from dbt_common.contracts.constraints import ConstraintType -from typing import Optional, Set, Any, Dict, Type, List, Tuple +from typing import Optional, Set, Any, Dict, Type from collections import namedtuple from dbt.adapters.base import PythonJobHelper -from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport, FreshnessResponse +from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport from dbt.adapters.base.meta import available -from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support -from dbt.adapters.contracts.macros import MacroResolverProtocol from dbt.adapters.sql import SQLAdapter from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.events.logging import AdapterLogger @@ -186,12 +184,3 @@ def generate_python_submission_response(self, submission_result: Any) -> Adapter def debug_query(self): """Override for DebugTask method""" self.execute("select 1 as id") - - def calculate_freshness_from_metadata_batch( - self, - sources: List[BaseRelation], - macro_resolver: Optional[MacroResolverProtocol] = None, - ) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]: - conn = self.connections.get_if_exists() - self.connections.open(conn) - return super().calculate_freshness_from_metadata_batch(sources, macro_resolver) From 02bbbbcb0f777b2015cce91c7f16b2757c94a74e Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 4 Apr 2024 17:14:45 -0700 Subject: [PATCH 5/7] changelog entry --- .changes/unreleased/Features-20240404-171441.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240404-171441.yaml diff --git a/.changes/unreleased/Features-20240404-171441.yaml b/.changes/unreleased/Features-20240404-171441.yaml new file mode 100644 index 000000000..f1ac623c0 --- /dev/null +++ b/.changes/unreleased/Features-20240404-171441.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Support TableLastModifiedMetadataBatch capability +time: 2024-04-04T17:14:41.313087-07:00 +custom: + Author: michelleark + Issue: "755" From efa6b1b0b4661eab5bf96a06d4262faee1f2fe35 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 5 Apr 2024 13:43:59 -0700 Subject: [PATCH 6/7] TestGetLastRelationModifiedBatch --- .../test_get_relation_last_modified.py | 128 ++++++++++++++++-- 1 file changed, 118 insertions(+), 10 deletions(-) diff --git a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py index 6a77d22ae..c31e9ac61 100644 --- a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py +++ b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py @@ -1,12 +1,24 @@ import os +import pytest +from unittest import mock +from dbt.adapters.redshift.impl import RedshiftAdapter +from dbt.adapters.capability import Capability, CapabilityDict +from dbt.cli.main import dbtRunner from dbt.tests.util import run_dbt -import pytest from tests.functional.adapter.sources_freshness_tests import files -class TestGetLastRelationModified: +class SetupGetLastRelationModified: + @pytest.fixture(scope="class", autouse=True) + def set_env_vars(self, project): + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + +class TestGetLastRelationModified(SetupGetLastRelationModified): @pytest.fixture(scope="class") def seeds(self): return { @@ -18,14 +30,6 @@ def seeds(self): def models(self): return {"schema.yml": files.SCHEMA_YML} - @pytest.fixture(scope="class", autouse=True) - def setup(self, project): - # we need the schema name for the sources section - os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema - run_dbt(["seed"]) - yield - del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] - @pytest.mark.parametrize( "source,status,expect_pass", [ @@ -34,9 +38,113 @@ def setup(self, project): ], ) def test_get_last_relation_modified(self, project, source, status, expect_pass): + run_dbt(["seed"]) + results = run_dbt( ["source", "freshness", "--select", f"source:{source}"], expect_pass=expect_pass ) assert len(results) == 1 result = results[0] assert result.status == status + + +freshness_metadata_schema_batch_yml = """ +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_table + - name: test_table2 + - name: test_table_with_loaded_at_field + loaded_at_field: my_loaded_at_field +""" + + +class TestGetLastRelationModifiedBatch(SetupGetLastRelationModified): + @pytest.fixture(scope="class") + def custom_schema(self, project, set_env_vars): + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + ) + project.adapter.drop_schema(relation) + project.adapter.create_schema(relation) + + yield relation.schema + + with project.adapter.connection_named("__test"): + project.adapter.drop_schema(relation) + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": freshness_metadata_schema_batch_yml} + + def get_freshness_result_for_table(self, table_name, results): + for result in results: + if result.node.name == table_name: + return result + return None + + def test_get_last_relation_modified_batch(self, project, custom_schema): + project.run_sql( + f"create table {custom_schema}.test_table as (select 1 as id, 'test' as name);" + ) + project.run_sql( + f"create table {custom_schema}.test_table2 as (select 1 as id, 'test' as name);" + ) + project.run_sql( + f"create table {custom_schema}.test_table_with_loaded_at_field as (select 1 as id, timestamp '2009-09-15 10:59:43' as my_loaded_at_field);" + ) + + runner = dbtRunner() + freshness_results_batch = runner.invoke(["source", "freshness"]).result + + assert len(freshness_results_batch) == 3 + test_table_batch_result = self.get_freshness_result_for_table( + "test_table", freshness_results_batch + ) + test_table2_batch_result = self.get_freshness_result_for_table( + "test_table2", freshness_results_batch + ) + test_table_with_loaded_at_field_batch_result = self.get_freshness_result_for_table( + "test_table_with_loaded_at_field", freshness_results_batch + ) + + # Remove TableLastModifiedMetadataBatch and run freshness on same input without batch strategy + capabilities_no_batch = CapabilityDict( + { + capability: support + for capability, support in RedshiftAdapter.capabilities().items() + if capability != Capability.TableLastModifiedMetadataBatch + } + ) + with mock.patch.object( + RedshiftAdapter, "capabilities", return_value=capabilities_no_batch + ): + freshness_results = runner.invoke(["source", "freshness"]).result + + assert len(freshness_results) == 3 + test_table_result = self.get_freshness_result_for_table("test_table", freshness_results) + test_table2_result = self.get_freshness_result_for_table("test_table2", freshness_results) + test_table_with_loaded_at_field_result = self.get_freshness_result_for_table( + "test_table_with_loaded_at_field", freshness_results + ) + + # assert results between batch vs non-batch freshness strategy are equivalent + assert test_table_result.status == test_table_batch_result.status + assert test_table_result.max_loaded_at == test_table_batch_result.max_loaded_at + + assert test_table2_result.status == test_table2_batch_result.status + assert test_table2_result.max_loaded_at == test_table2_batch_result.max_loaded_at + + assert ( + test_table_with_loaded_at_field_batch_result.status + == test_table_with_loaded_at_field_result.status + ) + assert ( + test_table_with_loaded_at_field_batch_result.max_loaded_at + == test_table_with_loaded_at_field_result.max_loaded_at + ) From 80c9aeb663a9e3d85839f7b067b8dbc1c8cfef40 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 12 Apr 2024 12:51:14 -0700 Subject: [PATCH 7/7] restore dev-requirements.txt --- dev-requirements.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index cb374f92d..85edead99 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,8 +1,8 @@ # install latest changes in dbt-core + dbt-postgres -git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness -git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness#subdirectory=dbt-tests-adapter +git+https://github.com/dbt-labs/dbt-adapters.git +git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git -git+https://github.com/dbt-labs/dbt-core.git@batch-metadata-freshness#subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#subdirectory=core git+https://github.com/dbt-labs/dbt-postgres.git # if version 1.x or greater -> pin to major version