From 9a23f2f00cd7f899034a2eeecc09408859533bfc Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 10 Oct 2023 17:32:26 -0400 Subject: [PATCH 1/5] committing to park changes and wrap up other 1.7 items --- dbt/adapters/bigquery/impl.py | 12 ++++++++++- .../relation_configs/materialized_view.py | 5 +---- .../bigquery/relation_configs/partition.py | 20 ++++++++++--------- .../partition/describe.sql | 4 ++-- .../relations/materialized_view/describe.sql | 7 +++++++ 5 files changed, 32 insertions(+), 16 deletions(-) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index cc0a6c5f3..5eafc2e2f 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -32,7 +32,7 @@ import google.auth import google.oauth2 import google.cloud.bigquery -from google.cloud.bigquery import AccessEntry, SchemaField +from google.cloud.bigquery import AccessEntry, SchemaField, Table import google.cloud.exceptions from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager @@ -555,6 +555,16 @@ def parse_partition_by(self, raw_partition_by: Any) -> Optional[PartitionConfig] def get_table_ref_from_relation(self, relation: BaseRelation): return self.connections.table_ref(relation.database, relation.schema, relation.identifier) + @available.parse(lambda *a, **k: True) + def get_table(self, relation: BigQueryRelation) -> Optional[Table]: + try: + table = self.connections.get_bq_table( + database=relation.database, schema=relation.schema, identifier=relation.identifier + ) + except google.cloud.exceptions.NotFound: + table = None + return table + def _update_column_dict(self, bq_column_dict, dbt_columns, parent=""): """ Helper function to recursively traverse the schema of a table in the diff --git a/dbt/adapters/bigquery/relation_configs/materialized_view.py b/dbt/adapters/bigquery/relation_configs/materialized_view.py index f3e3914f9..ef6a223d2 100644 --- a/dbt/adapters/bigquery/relation_configs/materialized_view.py +++ b/dbt/adapters/bigquery/relation_configs/materialized_view.py @@ -98,10 +98,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, # optional if partition_by := relation_results.get("partition_by"): - if len(partition_by) > 0: - config_dict.update( - {"partition": PartitionConfig.parse_relation_results(partition_by[0])} - ) + config_dict.update({"partition": PartitionConfig.parse_relation_results(partition_by)}) # type: ignore cluster_by: agate.Table = relation_results.get("cluster_by") # type: ignore if len(cluster_by) > 0: diff --git a/dbt/adapters/bigquery/relation_configs/partition.py b/dbt/adapters/bigquery/relation_configs/partition.py index 9c9714d7f..8dd193ad1 100644 --- a/dbt/adapters/bigquery/relation_configs/partition.py +++ b/dbt/adapters/bigquery/relation_configs/partition.py @@ -1,11 +1,11 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional -import agate from dbt.contracts.graph.nodes import ModelNode from dbt.dataclass_schema import dbtClassMixin, ValidationError import dbt.exceptions from dbt.adapters.relation_configs import RelationConfigChange +from google.cloud.bigquery.table import Table @dataclass @@ -108,21 +108,23 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: return model_node.config.extra.get("partition_by") @classmethod - def parse_relation_results(cls, describe_relation_results: agate.Row) -> Dict[str, Any]: + def parse_relation_results(cls, bq_table: Table) -> Dict[str, Any]: """ - Parse the results of a describe query into a raw config for `PartitionConfig.parse` + Parse the results of a BQ Table object into a raw config for `PartitionConfig.parse` """ + range_partitioning = bq_table.range_partitioning + time_partitioning = bq_table.time_partitioning config_dict = { - "field": describe_relation_results.get("partition_column_name"), - "data_type": describe_relation_results.get("partition_data_type"), - "granularity": describe_relation_results.get("partition_type"), + "field": time_partitioning.field, + "data_type": "", + "granularity": time_partitioning.type_, } # combine range fields into dictionary, like the model config range_dict = { - "start": describe_relation_results.get("partition_start"), - "end": describe_relation_results.get("partition_end"), - "interval": describe_relation_results.get("partition_interval"), + "start": range_partitioning.range_.start, + "end": range_partitioning.range_.end, + "interval": range_partitioning.range_.interval, } config_dict.update({"range": range_dict}) diff --git a/dbt/include/bigquery/macros/relation_components/partition/describe.sql b/dbt/include/bigquery/macros/relation_components/partition/describe.sql index e8e205801..1efdd56a1 100644 --- a/dbt/include/bigquery/macros/relation_components/partition/describe.sql +++ b/dbt/include/bigquery/macros/relation_components/partition/describe.sql @@ -37,6 +37,6 @@ {% macro bigquery__describe_partition(relation) %} - {% set _sql = bigquery__get_describe_partition_sql(relation) %} - {% do return(run_query(_sql)) %} + {% set bq_relation = adapter.connections.get_bq_table(relation.database, relation.schema, relation.identifier) %} + {% do return(bq_relation) %} {% endmacro %} diff --git a/dbt/include/bigquery/macros/relations/materialized_view/describe.sql b/dbt/include/bigquery/macros/relations/materialized_view/describe.sql index 231443cf8..64bbdaee8 100644 --- a/dbt/include/bigquery/macros/relations/materialized_view/describe.sql +++ b/dbt/include/bigquery/macros/relations/materialized_view/describe.sql @@ -1,4 +1,10 @@ {% macro bigquery__describe_materialized_view(relation) %} + {% set bq_relation = adapter.get_table(relation) %} + {% do return(bq_relation) %} +{% endmacro %} + + +{% macro bigquery__describe_materialized_view_sql(relation) %} {%- set _materialized_view_sql -%} select table_name, @@ -16,6 +22,7 @@ {% do return({ 'materialized_view': _materialized_view, + 'partition_by': bigquery__describe_partition(relation), 'cluster_by': _cluster_by, 'options': _options }) %} From 9aafbb9261446da8bdc262b855c43164c2875299 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 10 Oct 2023 23:42:59 -0400 Subject: [PATCH 2/5] update describe to use the sdk instead of sql to pick up partition information --- dbt/adapters/bigquery/impl.py | 27 +++++---- dbt/adapters/bigquery/relation.py | 12 ++-- .../bigquery/relation_configs/_base.py | 14 ++--- .../bigquery/relation_configs/cluster.py | 6 +- .../relation_configs/materialized_view.py | 57 ++++++++----------- .../bigquery/relation_configs/options.py | 42 +++++++------- .../bigquery/relation_configs/partition.py | 40 +++++++------ .../relations/materialized_view/alter.sql | 2 +- .../relations/materialized_view/describe.sql | 29 ---------- .../adapter/describe_relation/_files.py | 46 ++++++++++++--- .../test_describe_relation.py | 34 +++++------ 11 files changed, 151 insertions(+), 158 deletions(-) delete mode 100644 dbt/include/bigquery/macros/relations/materialized_view/describe.sql diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 5eafc2e2f..5578a4a27 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -32,7 +32,7 @@ import google.auth import google.oauth2 import google.cloud.bigquery -from google.cloud.bigquery import AccessEntry, SchemaField, Table +from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable import google.cloud.exceptions from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager @@ -555,16 +555,6 @@ def parse_partition_by(self, raw_partition_by: Any) -> Optional[PartitionConfig] def get_table_ref_from_relation(self, relation: BaseRelation): return self.connections.table_ref(relation.database, relation.schema, relation.identifier) - @available.parse(lambda *a, **k: True) - def get_table(self, relation: BigQueryRelation) -> Optional[Table]: - try: - table = self.connections.get_bq_table( - database=relation.database, schema=relation.schema, identifier=relation.identifier - ) - except google.cloud.exceptions.NotFound: - table = None - return table - def _update_column_dict(self, bq_column_dict, dbt_columns, parent=""): """ Helper function to recursively traverse the schema of a table in the @@ -759,17 +749,26 @@ def get_view_options(self, config: Dict[str, Any], node: Dict[str, Any]) -> Dict opts = self.get_common_options(config, node) return opts + @available.parse(lambda *a, **k: True) + def get_bq_table(self, relation: BigQueryRelation) -> Optional[BigQueryTable]: + try: + table = self.connections.get_bq_table( + relation.database, relation.schema, relation.identifier + ) + except google.cloud.exceptions.NotFound: + table = None + return table + def describe_relation(self, relation: BigQueryRelation): if relation.type == RelationType.MaterializedView: - macro = "bigquery__describe_materialized_view" + bq_table = self.get_bq_table(relation) parser = BigQueryMaterializedViewConfig else: raise dbt.exceptions.DbtRuntimeError( f"The method `BigQueryAdapter.describe_relation` is not implemented " f"for the relation type: {relation.type}" ) - relation_results = self.execute_macro(macro, kwargs={"relation": relation}) - return parser.from_relation_results(relation_results) + return parser.from_bq_table(bq_table) @available.parse_none def grant_access_to(self, entity, entity_type, role, grant_target_dict): diff --git a/dbt/adapters/bigquery/relation.py b/dbt/adapters/bigquery/relation.py index 184e133a1..65daa2808 100644 --- a/dbt/adapters/bigquery/relation.py +++ b/dbt/adapters/bigquery/relation.py @@ -1,10 +1,10 @@ from dataclasses import dataclass, field -from typing import FrozenSet, Optional +from typing import FrozenSet, Optional, TypeVar from itertools import chain, islice from dbt.context.providers import RuntimeConfigObject from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema -from dbt.adapters.relation_configs import RelationResults, RelationConfigChangeAction +from dbt.adapters.relation_configs import RelationConfigChangeAction from dbt.adapters.bigquery.relation_configs import ( BigQueryClusterConfigChange, BigQueryMaterializedViewConfig, @@ -16,7 +16,7 @@ from dbt.contracts.relation import RelationType from dbt.exceptions import CompilationError from dbt.utils import filter_null_values -from typing import TypeVar +from google.cloud.bigquery import Table as BigQueryTable Self = TypeVar("Self", bound="BigQueryRelation") @@ -74,12 +74,10 @@ def materialized_view_from_model_node( @classmethod def materialized_view_config_changeset( - cls, relation_results: RelationResults, runtime_config: RuntimeConfigObject + cls, table: BigQueryTable, runtime_config: RuntimeConfigObject ) -> Optional[BigQueryMaterializedViewConfigChangeset]: config_change_collection = BigQueryMaterializedViewConfigChangeset() - existing_materialized_view = BigQueryMaterializedViewConfig.from_relation_results( - relation_results - ) + existing_materialized_view = BigQueryMaterializedViewConfig.from_bq_table(table) new_materialized_view = cls.materialized_view_from_model_node(runtime_config.model) assert isinstance(existing_materialized_view, BigQueryMaterializedViewConfig) assert isinstance(new_materialized_view, BigQueryMaterializedViewConfig) diff --git a/dbt/adapters/bigquery/relation_configs/_base.py b/dbt/adapters/bigquery/relation_configs/_base.py index 37f9423e9..92de1a854 100644 --- a/dbt/adapters/bigquery/relation_configs/_base.py +++ b/dbt/adapters/bigquery/relation_configs/_base.py @@ -3,7 +3,9 @@ import agate from dbt.adapters.base.relation import Policy -from dbt.adapters.relation_configs import RelationConfigBase, RelationResults +from dbt.adapters.relation_configs import RelationConfigBase +from google.cloud.bigquery import Table as BigQueryTable + from dbt.adapters.bigquery.relation_configs.policies import ( BigQueryIncludePolicy, BigQueryQuotePolicy, @@ -35,16 +37,14 @@ def parse_model_node(cls, model_node: ModelNode) -> dict: ) @classmethod - def from_relation_results(cls, relation_results: RelationResults) -> "RelationConfigBase": - relation_config = cls.parse_relation_results(relation_results) + def from_bq_table(cls, table: BigQueryTable) -> "RelationConfigBase": + relation_config = cls.parse_bq_table(table) relation = cls.from_dict(relation_config) return relation @classmethod - def parse_relation_results(cls, relation_results: RelationResults) -> dict: - raise NotImplementedError( - "`parse_relation_results()` needs to be implemented on this RelationConfigBase instance" - ) + def parse_bq_table(cls, table: BigQueryTable) -> dict: + raise NotImplementedError("`parse_bq_table()` is not implemented for this relation type") @classmethod def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]: diff --git a/dbt/adapters/bigquery/relation_configs/cluster.py b/dbt/adapters/bigquery/relation_configs/cluster.py index addf84db6..ad5b7b2ed 100644 --- a/dbt/adapters/bigquery/relation_configs/cluster.py +++ b/dbt/adapters/bigquery/relation_configs/cluster.py @@ -1,9 +1,9 @@ from dataclasses import dataclass from typing import Any, Dict, FrozenSet -import agate from dbt.adapters.relation_configs import RelationConfigChange from dbt.contracts.graph.nodes import ModelNode +from google.cloud.bigquery import Table as BigQueryTable from dbt.adapters.bigquery.relation_configs._base import BigQueryRelationConfigBase @@ -40,8 +40,8 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: return config_dict @classmethod - def parse_relation_results(cls, relation_results: agate.Table) -> Dict[str, Any]: # type: ignore - config_dict = {"fields": frozenset(row.get("column_name") for row in relation_results)} + def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]: # type: ignore + config_dict = {"fields": frozenset(table.clustering_fields)} return config_dict diff --git a/dbt/adapters/bigquery/relation_configs/materialized_view.py b/dbt/adapters/bigquery/relation_configs/materialized_view.py index ef6a223d2..51351aabf 100644 --- a/dbt/adapters/bigquery/relation_configs/materialized_view.py +++ b/dbt/adapters/bigquery/relation_configs/materialized_view.py @@ -1,10 +1,9 @@ from dataclasses import dataclass from typing import Any, Dict, Optional -import agate -from dbt.adapters.relation_configs import RelationResults from dbt.contracts.graph.nodes import ModelNode from dbt.contracts.relation import ComponentName +from google.cloud.bigquery import Table as BigQueryTable from dbt.adapters.bigquery.relation_configs._base import BigQueryRelationConfigBase from dbt.adapters.bigquery.relation_configs.options import ( @@ -25,17 +24,17 @@ class BigQueryMaterializedViewConfig(BigQueryRelationConfigBase): https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_materialized_view_statement The following parameters are configurable by dbt: - - materialized_view_name: name of the materialized view - - schema_name: dataset name of the materialized view - - database_name: project name of the database + - table_id: name of the materialized view + - dataset_id: dataset name of the materialized view + - project_id: project name of the database - options: options that get set in `SET OPTIONS()` clause - partition: object containing partition information - cluster: object containing cluster information """ - materialized_view_name: str - schema_name: str - database_name: str + table_id: str + dataset_id: str + project_id: str options: BigQueryOptionsConfig partition: Optional[PartitionConfig] = None cluster: Optional[BigQueryClusterConfig] = None @@ -44,13 +43,9 @@ class BigQueryMaterializedViewConfig(BigQueryRelationConfigBase): def from_dict(cls, config_dict: Dict[str, Any]) -> "BigQueryMaterializedViewConfig": # required kwargs_dict: Dict[str, Any] = { - "materialized_view_name": cls._render_part( - ComponentName.Identifier, config_dict["materialized_view_name"] - ), - "schema_name": cls._render_part(ComponentName.Schema, config_dict["schema_name"]), - "database_name": cls._render_part( - ComponentName.Database, config_dict["database_name"] - ), + "table_id": cls._render_part(ComponentName.Identifier, config_dict["table_id"]), + "dataset_id": cls._render_part(ComponentName.Schema, config_dict["dataset_id"]), + "project_id": cls._render_part(ComponentName.Database, config_dict["project_id"]), "options": BigQueryOptionsConfig.from_dict(config_dict["options"]), } @@ -67,9 +62,9 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "BigQueryMaterializedViewConf @classmethod def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: config_dict = { - "materialized_view_name": model_node.identifier, - "schema_name": model_node.schema, - "database_name": model_node.database, + "table_id": model_node.identifier, + "dataset_id": model_node.schema, + "project_id": model_node.database, # despite this being a foreign object, there will always be options because of defaults "options": BigQueryOptionsConfig.parse_model_node(model_node), } @@ -84,27 +79,21 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: return config_dict @classmethod - def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: - materialized_view_config: agate.Table = relation_results.get("materialized_view") # type: ignore - materialized_view: agate.Row = cls._get_first_row(materialized_view_config) - + def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]: config_dict = { - "materialized_view_name": materialized_view.get("table_name"), - "schema_name": materialized_view.get("table_schema"), - "database_name": materialized_view.get("table_catalog"), + "table_id": table.table_id, + "dataset_id": table.dataset_id, + "project_id": table.project, # despite this being a foreign object, there will always be options because of defaults - "options": BigQueryOptionsConfig.parse_relation_results(relation_results), + "options": BigQueryOptionsConfig.parse_bq_table(table), } # optional - if partition_by := relation_results.get("partition_by"): - config_dict.update({"partition": PartitionConfig.parse_relation_results(partition_by)}) # type: ignore - - cluster_by: agate.Table = relation_results.get("cluster_by") # type: ignore - if len(cluster_by) > 0: - config_dict.update( - {"cluster": BigQueryClusterConfig.parse_relation_results(cluster_by)} - ) + if table.time_partitioning or table.range_partitioning: + config_dict.update({"partition": PartitionConfig.parse_bq_table(table)}) + + if table.clustering_fields: + config_dict.update({"cluster": BigQueryClusterConfig.parse_bq_table(table)}) return config_dict diff --git a/dbt/adapters/bigquery/relation_configs/options.py b/dbt/adapters/bigquery/relation_configs/options.py index a84d7dd19..758740c50 100644 --- a/dbt/adapters/bigquery/relation_configs/options.py +++ b/dbt/adapters/bigquery/relation_configs/options.py @@ -1,10 +1,10 @@ from dataclasses import dataclass from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional -import agate -from dbt.adapters.relation_configs import RelationConfigChange, RelationResults +from dbt.adapters.relation_configs import RelationConfigChange from dbt.contracts.graph.nodes import ModelNode +from google.cloud.bigquery import Table as BigQueryTable from dbt.adapters.bigquery.relation_configs._base import BigQueryRelationConfigBase from dbt.adapters.bigquery.utility import bool_setting, float_setting, sql_escape @@ -25,18 +25,6 @@ class BigQueryOptionsConfig(BigQueryRelationConfigBase): description: Optional[str] = None labels: Optional[Dict[str, str]] = None - @classmethod - def user_configurable_options(cls) -> List[str]: - return [ - "enable_refresh", - "refresh_interval_minutes", - "expiration_timestamp", - "max_staleness", - "kms_key_name", - "description", - "labels", - ] - def as_ddl_dict(self) -> Dict[str, Any]: """ Reformat `options_dict` so that it can be passed into the `bigquery_options()` macro. @@ -121,7 +109,15 @@ def formatted_setting(name: str) -> Any: def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: config_dict = { option: model_node.config.extra.get(option) - for option in cls.user_configurable_options() + for option in [ + "enable_refresh", + "refresh_interval_minutes", + "expiration_timestamp", + "max_staleness", + "kms_key_name", + "description", + "labels", + ] } # update dbt-specific versions of these settings @@ -135,13 +131,17 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: return config_dict @classmethod - def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: - options_config: agate.Table = relation_results.get("options") # type: ignore + def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]: config_dict = { - option.get("option_name"): option.get("option_value") - for option in options_config - if option.get("option_name") in cls.user_configurable_options() + "enable_refresh": table.mview_enable_refresh, + "refresh_interval_minutes": table.mview_refresh_interval.seconds / 60, + "expiration_timestamp": table.expires, + "max_staleness": None, + "description": table.description, + "labels": table.labels, } + if encryption_configuration := table.encryption_configuration: + config_dict.update({"kms_key_name": encryption_configuration.kms_key_name}) return config_dict diff --git a/dbt/adapters/bigquery/relation_configs/partition.py b/dbt/adapters/bigquery/relation_configs/partition.py index 8dd193ad1..cd57719db 100644 --- a/dbt/adapters/bigquery/relation_configs/partition.py +++ b/dbt/adapters/bigquery/relation_configs/partition.py @@ -5,7 +5,7 @@ from dbt.dataclass_schema import dbtClassMixin, ValidationError import dbt.exceptions from dbt.adapters.relation_configs import RelationConfigChange -from google.cloud.bigquery.table import Table +from google.cloud.bigquery.table import Table as BigQueryTable @dataclass @@ -108,25 +108,31 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: return model_node.config.extra.get("partition_by") @classmethod - def parse_relation_results(cls, bq_table: Table) -> Dict[str, Any]: + def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]: """ Parse the results of a BQ Table object into a raw config for `PartitionConfig.parse` """ - range_partitioning = bq_table.range_partitioning - time_partitioning = bq_table.time_partitioning - config_dict = { - "field": time_partitioning.field, - "data_type": "", - "granularity": time_partitioning.type_, - } - - # combine range fields into dictionary, like the model config - range_dict = { - "start": range_partitioning.range_.start, - "end": range_partitioning.range_.end, - "interval": range_partitioning.range_.interval, - } - config_dict.update({"range": range_dict}) + if time_partitioning := table.time_partitioning: + field_types = {field.name: field.field_type.lower() for field in table.schema} + config_dict = { + "field": time_partitioning.field, + "data_type": field_types[time_partitioning.field], + "granularity": time_partitioning.type_, + } + + elif range_partitioning := table.range_partitioning: + config_dict = { + "field": range_partitioning.field, + "data_type": "int64", + "range": { + "start": range_partitioning.range_.start, + "end": range_partitioning.range_.end, + "interval": range_partitioning.range_.interval, + }, + } + + else: + config_dict = {} return config_dict diff --git a/dbt/include/bigquery/macros/relations/materialized_view/alter.sql b/dbt/include/bigquery/macros/relations/materialized_view/alter.sql index b0381a7bf..7320addde 100644 --- a/dbt/include/bigquery/macros/relations/materialized_view/alter.sql +++ b/dbt/include/bigquery/macros/relations/materialized_view/alter.sql @@ -19,7 +19,7 @@ {% endmacro %} {% macro bigquery__get_materialized_view_configuration_changes(existing_relation, new_config) %} - {% set _existing_materialized_view = bigquery__describe_materialized_view(existing_relation) %} + {% set _existing_materialized_view = adapter.describe_relation(existing_relation) %} {% set _configuration_changes = existing_relation.materialized_view_config_changeset(_existing_materialized_view, new_config) %} {% do return(_configuration_changes) %} {% endmacro %} diff --git a/dbt/include/bigquery/macros/relations/materialized_view/describe.sql b/dbt/include/bigquery/macros/relations/materialized_view/describe.sql deleted file mode 100644 index 64bbdaee8..000000000 --- a/dbt/include/bigquery/macros/relations/materialized_view/describe.sql +++ /dev/null @@ -1,29 +0,0 @@ -{% macro bigquery__describe_materialized_view(relation) %} - {% set bq_relation = adapter.get_table(relation) %} - {% do return(bq_relation) %} -{% endmacro %} - - -{% macro bigquery__describe_materialized_view_sql(relation) %} - {%- set _materialized_view_sql -%} - select - table_name, - table_schema, - table_catalog - from {{ relation.information_schema('MATERIALIZED_VIEWS') }} - where table_name = '{{ relation.identifier }}' - and table_schema = '{{ relation.schema }}' - and table_catalog = '{{ relation.database }}' - {%- endset %} - {% set _materialized_view = run_query(_materialized_view_sql) %} - - {%- set _cluster_by = bigquery__describe_cluster(relation) -%} - {%- set _options = bigquery__describe_options(relation) -%} - - {% do return({ - 'materialized_view': _materialized_view, - 'partition_by': bigquery__describe_partition(relation), - 'cluster_by': _cluster_by, - 'options': _options - }) %} -{% endmacro %} diff --git a/tests/functional/adapter/describe_relation/_files.py b/tests/functional/adapter/describe_relation/_files.py index e5d330500..ac0203049 100644 --- a/tests/functional/adapter/describe_relation/_files.py +++ b/tests/functional/adapter/describe_relation/_files.py @@ -1,16 +1,17 @@ MY_SEED = """ id,value,record_date -1,100,2023-01-01 00:00:00 -2,200,2023-01-02 00:00:00 -3,300,2023-01-02 00:00:00 +1,100,2023-01-01 12:00:00 +2,200,2023-01-02 12:00:00 +3,300,2023-01-02 12:00:00 """.strip() + MY_BASE_TABLE = """ {{ config( materialized='table', partition_by={ "field": "record_date", - "data_type": "timestamp", + "data_type": "datetime", "granularity": "day" }, cluster_by=["id", "value"] @@ -22,12 +23,13 @@ from {{ ref('my_seed') }} """ + MY_MATERIALIZED_VIEW = """ {{ config( materialized='materialized_view', partition_by={ "field": "record_date", - "data_type": "timestamp", + "data_type": "datetime", "granularity": "day" }, cluster_by="id", @@ -40,13 +42,39 @@ """ +MY_OTHER_BASE_TABLE = """ +{{ config( + materialized='table', + partition_by={ + "field": "value", + "data_type": "int64", + "range": { + "start": 0, + "end": 500, + "interval": 50 + } + }, + cluster_by=["id", "value"] +) }} +select + id, + value, + record_date +from {{ ref('my_seed') }} +""" + + MY_OTHER_MATERIALIZED_VIEW = """ {{ config( materialized='materialized_view', partition_by={ - "field": "record_date", - "data_type": "timestamp", - "granularity": "day" + "field": "value", + "data_type": "int64", + "range": { + "start": 0, + "end": 500, + "interval": 50 + } }, cluster_by="id", enable_refresh=False, @@ -56,5 +84,5 @@ id, value, record_date -from {{ ref('my_base_table') }} +from {{ ref('my_other_base_table') }} """ diff --git a/tests/functional/adapter/describe_relation/test_describe_relation.py b/tests/functional/adapter/describe_relation/test_describe_relation.py index 4d6c77cca..adccd5126 100644 --- a/tests/functional/adapter/describe_relation/test_describe_relation.py +++ b/tests/functional/adapter/describe_relation/test_describe_relation.py @@ -5,25 +5,21 @@ from dbt.tests.util import get_connection, run_dbt from dbt.adapters.bigquery.relation_configs import BigQueryMaterializedViewConfig -from tests.functional.adapter.describe_relation._files import ( - MY_BASE_TABLE, - MY_MATERIALIZED_VIEW, - MY_OTHER_MATERIALIZED_VIEW, - MY_SEED, -) +from tests.functional.adapter.describe_relation import _files class TestDescribeRelation: @pytest.fixture(scope="class", autouse=True) def seeds(self): - return {"my_seed.csv": MY_SEED} + return {"my_seed.csv": _files.MY_SEED} @pytest.fixture(scope="class", autouse=True) def models(self): yield { - "my_base_table.sql": MY_BASE_TABLE, - "my_materialized_view.sql": MY_MATERIALIZED_VIEW, - "my_other_materialized_view.sql": MY_OTHER_MATERIALIZED_VIEW, + "my_base_table.sql": _files.MY_BASE_TABLE, + "my_materialized_view.sql": _files.MY_MATERIALIZED_VIEW, + "my_other_base_table.sql": _files.MY_OTHER_BASE_TABLE, + "my_other_materialized_view.sql": _files.MY_OTHER_MATERIALIZED_VIEW, } @pytest.fixture(scope="class") @@ -79,9 +75,12 @@ def test_describe_materialized_view(self, project, my_materialized_view): with get_connection(project.adapter): results = project.adapter.describe_relation(my_materialized_view) assert isinstance(results, BigQueryMaterializedViewConfig) - assert results.materialized_view_name == f'"{my_materialized_view.identifier}"' - assert results.schema_name == f'"{my_materialized_view.schema}"' - assert results.database_name == f'"{my_materialized_view.database}"' + assert results.table_id == f'"{my_materialized_view.identifier}"' + assert results.dataset_id == f'"{my_materialized_view.schema}"' + assert results.project_id == f'"{my_materialized_view.database}"' + assert results.partition.field == "record_date" + assert results.partition.data_type == "datetime" + assert results.partition.granularity == "day" assert results.cluster.fields == frozenset({"id"}) assert results.options.enable_refresh is True assert results.options.refresh_interval_minutes == 30 @@ -90,9 +89,12 @@ def test_describe_other_materialized_view(self, project, my_other_materialized_v with get_connection(project.adapter): results = project.adapter.describe_relation(my_other_materialized_view) assert isinstance(results, BigQueryMaterializedViewConfig) - assert results.materialized_view_name == f'"{my_other_materialized_view.identifier}"' - assert results.schema_name == f'"{my_other_materialized_view.schema}"' - assert results.database_name == f'"{my_other_materialized_view.database}"' + assert results.table_id == f'"{my_other_materialized_view.identifier}"' + assert results.dataset_id == f'"{my_other_materialized_view.schema}"' + assert results.project_id == f'"{my_other_materialized_view.database}"' + assert results.partition.field == "value" + assert results.partition.data_type == "int64" + assert results.partition.range == {"start": 0, "end": 500, "interval": 50} assert results.cluster.fields == frozenset({"id"}) assert results.options.enable_refresh is False assert results.options.refresh_interval_minutes == 30 # BQ returns it to the default From a2e9fa3afdc2a859fa48d46261b3fdec3d03052b Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 11 Oct 2023 00:16:57 -0400 Subject: [PATCH 3/5] basic tests pass --- dbt/adapters/bigquery/impl.py | 1 + dbt/adapters/bigquery/relation.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 5578a4a27..6adfaeee5 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -759,6 +759,7 @@ def get_bq_table(self, relation: BigQueryRelation) -> Optional[BigQueryTable]: table = None return table + @available.parse(lambda *a, **k: True) def describe_relation(self, relation: BigQueryRelation): if relation.type == RelationType.MaterializedView: bq_table = self.get_bq_table(relation) diff --git a/dbt/adapters/bigquery/relation.py b/dbt/adapters/bigquery/relation.py index 65daa2808..3076a2243 100644 --- a/dbt/adapters/bigquery/relation.py +++ b/dbt/adapters/bigquery/relation.py @@ -16,7 +16,6 @@ from dbt.contracts.relation import RelationType from dbt.exceptions import CompilationError from dbt.utils import filter_null_values -from google.cloud.bigquery import Table as BigQueryTable Self = TypeVar("Self", bound="BigQueryRelation") @@ -74,10 +73,11 @@ def materialized_view_from_model_node( @classmethod def materialized_view_config_changeset( - cls, table: BigQueryTable, runtime_config: RuntimeConfigObject + cls, + existing_materialized_view: BigQueryMaterializedViewConfig, + runtime_config: RuntimeConfigObject, ) -> Optional[BigQueryMaterializedViewConfigChangeset]: config_change_collection = BigQueryMaterializedViewConfigChangeset() - existing_materialized_view = BigQueryMaterializedViewConfig.from_bq_table(table) new_materialized_view = cls.materialized_view_from_model_node(runtime_config.model) assert isinstance(existing_materialized_view, BigQueryMaterializedViewConfig) assert isinstance(new_materialized_view, BigQueryMaterializedViewConfig) From b2187bd23aa9c56c4ad773207d53e555ccb40021 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 11 Oct 2023 00:39:11 -0400 Subject: [PATCH 4/5] existing change monitoring tests pass --- tests/functional/adapter/materialized_view_tests/_files.py | 6 ++++-- .../test_materialized_view_changes.py | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/functional/adapter/materialized_view_tests/_files.py b/tests/functional/adapter/materialized_view_tests/_files.py index 6cc69d2c6..8580b6b3d 100644 --- a/tests/functional/adapter/materialized_view_tests/_files.py +++ b/tests/functional/adapter/materialized_view_tests/_files.py @@ -5,12 +5,13 @@ 3,300,2023-01-02 00:00:00 """.strip() + MY_BASE_TABLE = """ {{ config( materialized='table', partition_by={ "field": "record_valid_date", - "data_type": "timestamp", + "data_type": "datetime", "granularity": "day" }, cluster_by=["id", "value"] @@ -22,12 +23,13 @@ from {{ ref('my_seed') }} """ + MY_MATERIALIZED_VIEW = """ {{ config( materialized='materialized_view', partition_by={ "field": "record_valid_date", - "data_type": "timestamp", + "data_type": "datetime", "granularity": "day" }, cluster_by=["id", "value"], diff --git a/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py b/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py index 826ec5d24..ca592613e 100644 --- a/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py +++ b/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py @@ -19,7 +19,9 @@ def check_start_state(project, materialized_view): assert isinstance(results, BigQueryMaterializedViewConfig) assert results.options.enable_refresh is True assert results.options.refresh_interval_minutes == 60 - assert results.options.max_staleness == "0-0 0 0:45:0" # ~= "INTERVAL 45 MINUTE" + assert results.partition.field == "record_valid_date" + assert results.partition.data_type == "datetime" + assert results.partition.granularity == "day" assert results.cluster.fields == frozenset({"id", "value"}) @staticmethod From 120c1cea38d0b25faf330cc91b6a57eb76c715a5 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 11 Oct 2023 00:53:38 -0400 Subject: [PATCH 5/5] partition change monitoring tests pass --- .../adapter/materialized_view_tests/_files.py | 24 ++++++++++++++ .../adapter/materialized_view_tests/_mixin.py | 32 ++++++++++++------- .../test_materialized_view_changes.py | 23 ++++++++++++- 3 files changed, 67 insertions(+), 12 deletions(-) diff --git a/tests/functional/adapter/materialized_view_tests/_files.py b/tests/functional/adapter/materialized_view_tests/_files.py index 8580b6b3d..1ee64269d 100644 --- a/tests/functional/adapter/materialized_view_tests/_files.py +++ b/tests/functional/adapter/materialized_view_tests/_files.py @@ -24,6 +24,7 @@ """ +# the whitespace to the left on partition matters here MY_MATERIALIZED_VIEW = """ {{ config( materialized='materialized_view', @@ -43,3 +44,26 @@ record_valid_date from {{ ref('my_base_table') }} """ + + +# the whitespace to the left on partition matters here +MY_OTHER_BASE_TABLE = """ +{{ config( + materialized='table', + partition_by={ + "field": "value", + "data_type": "int64", + "range": { + "start": 0, + "end": 500, + "interval": 50 + } + }, + cluster_by=["id", "value"] +) }} +select + id, + value, + record_valid_date +from {{ ref('my_seed') }} +""" diff --git a/tests/functional/adapter/materialized_view_tests/_mixin.py b/tests/functional/adapter/materialized_view_tests/_mixin.py index 5933a2dd2..5f75c7c04 100644 --- a/tests/functional/adapter/materialized_view_tests/_mixin.py +++ b/tests/functional/adapter/materialized_view_tests/_mixin.py @@ -12,11 +12,7 @@ set_model_file, ) -from tests.functional.adapter.materialized_view_tests._files import ( - MY_BASE_TABLE, - MY_MATERIALIZED_VIEW, - MY_SEED, -) +from tests.functional.adapter.materialized_view_tests import _files class BigQueryMaterializedViewMixin: @@ -35,11 +31,24 @@ def my_base_table(self, project) -> BaseRelation: type=RelationType.Table, ) + @pytest.fixture(scope="class") + def my_other_base_table(self, project) -> BaseRelation: + """ + Following the sentiment of `my_base_table` above, if we want to alter the partition + on the materialized view, we either need to update the partition on the base table, + or we need a second table with a different partition. + """ + return project.adapter.Relation.create( + identifier="my_other_base_table", + schema=project.test_schema, + database=project.database, + type=RelationType.Table, + ) + @pytest.fixture(scope="function", autouse=True) - def setup(self, project, my_base_table, my_materialized_view): # type: ignore + def setup(self, project, my_base_table, my_other_base_table, my_materialized_view): # type: ignore run_dbt(["seed"]) - run_dbt(["run", "--models", my_base_table.identifier, "--full-refresh"]) - run_dbt(["run", "--models", my_materialized_view.identifier, "--full-refresh"]) + run_dbt(["run", "--full-refresh"]) # the tests touch these files, store their contents in memory initial_model = get_model_file(project, my_materialized_view) @@ -52,15 +61,16 @@ def setup(self, project, my_base_table, my_materialized_view): # type: ignore @pytest.fixture(scope="class", autouse=True) def seeds(self): - return {"my_seed.csv": MY_SEED} + return {"my_seed.csv": _files.MY_SEED} @pytest.fixture(scope="class", autouse=True) def models(self): yield { "my_table.sql": MY_TABLE, "my_view.sql": MY_VIEW, - "my_base_table.sql": MY_BASE_TABLE, - "my_materialized_view.sql": MY_MATERIALIZED_VIEW, + "my_base_table.sql": _files.MY_BASE_TABLE, + "my_other_base_table.sql": _files.MY_OTHER_BASE_TABLE, + "my_materialized_view.sql": _files.MY_MATERIALIZED_VIEW, } @staticmethod diff --git a/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py b/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py index ca592613e..d2df9735e 100644 --- a/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py +++ b/tests/functional/adapter/materialized_view_tests/test_materialized_view_changes.py @@ -45,7 +45,28 @@ def check_state_alter_change_is_applied(project, materialized_view): @staticmethod def change_config_via_replace(project, materialized_view): initial_model = get_model_file(project, materialized_view) - new_model = initial_model.replace('cluster_by=["id", "value"]', 'cluster_by="id"') + # the whitespace to the left on partition matters here + old_partition = """ + partition_by={ + "field": "record_valid_date", + "data_type": "datetime", + "granularity": "day" + },""" + new_partition = """ + partition_by={ + "field": "value", + "data_type": "int64", + "range": { + "start": 0, + "end": 500, + "interval": 50 + } + },""" + new_model = ( + initial_model.replace('cluster_by=["id", "value"]', 'cluster_by="id"') + .replace(old_partition, new_partition) + .replace("'my_base_table'", "'my_other_base_table'") + ) set_model_file(project, materialized_view, new_model) @staticmethod