Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAP-940: Add change monitoring for partitioning clause #962

18 changes: 14 additions & 4 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import google.auth
import google.oauth2
import google.cloud.bigquery
from google.cloud.bigquery import AccessEntry, SchemaField
from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable
import google.cloud.exceptions

from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager
Expand Down Expand Up @@ -749,17 +749,27 @@ def get_view_options(self, config: Dict[str, Any], node: Dict[str, Any]) -> Dict
opts = self.get_common_options(config, node)
return opts

@available.parse(lambda *a, **k: True)
def get_bq_table(self, relation: BigQueryRelation) -> Optional[BigQueryTable]:
try:
table = self.connections.get_bq_table(
relation.database, relation.schema, relation.identifier
)
except google.cloud.exceptions.NotFound:
table = None
return table

@available.parse(lambda *a, **k: True)
def describe_relation(self, relation: BigQueryRelation):
if relation.type == RelationType.MaterializedView:
macro = "bigquery__describe_materialized_view"
bq_table = self.get_bq_table(relation)
parser = BigQueryMaterializedViewConfig
else:
raise dbt.exceptions.DbtRuntimeError(
f"The method `BigQueryAdapter.describe_relation` is not implemented "
f"for the relation type: {relation.type}"
)
relation_results = self.execute_macro(macro, kwargs={"relation": relation})
return parser.from_relation_results(relation_results)
return parser.from_bq_table(bq_table)

@available.parse_none
def grant_access_to(self, entity, entity_type, role, grant_target_dict):
Expand Down
12 changes: 5 additions & 7 deletions dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dataclasses import dataclass, field
from typing import FrozenSet, Optional
from typing import FrozenSet, Optional, TypeVar

from itertools import chain, islice
from dbt.context.providers import RuntimeConfigObject
from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema
from dbt.adapters.relation_configs import RelationResults, RelationConfigChangeAction
from dbt.adapters.relation_configs import RelationConfigChangeAction
from dbt.adapters.bigquery.relation_configs import (
BigQueryClusterConfigChange,
BigQueryMaterializedViewConfig,
Expand All @@ -16,7 +16,6 @@
from dbt.contracts.relation import RelationType
from dbt.exceptions import CompilationError
from dbt.utils import filter_null_values
from typing import TypeVar


Self = TypeVar("Self", bound="BigQueryRelation")
Expand Down Expand Up @@ -74,12 +73,11 @@ def materialized_view_from_model_node(

@classmethod
def materialized_view_config_changeset(
cls, relation_results: RelationResults, runtime_config: RuntimeConfigObject
cls,
existing_materialized_view: BigQueryMaterializedViewConfig,
runtime_config: RuntimeConfigObject,
) -> Optional[BigQueryMaterializedViewConfigChangeset]:
config_change_collection = BigQueryMaterializedViewConfigChangeset()
existing_materialized_view = BigQueryMaterializedViewConfig.from_relation_results(
relation_results
)
new_materialized_view = cls.materialized_view_from_model_node(runtime_config.model)
assert isinstance(existing_materialized_view, BigQueryMaterializedViewConfig)
assert isinstance(new_materialized_view, BigQueryMaterializedViewConfig)
Expand Down
14 changes: 7 additions & 7 deletions dbt/adapters/bigquery/relation_configs/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import agate
from dbt.adapters.base.relation import Policy
from dbt.adapters.relation_configs import RelationConfigBase, RelationResults
from dbt.adapters.relation_configs import RelationConfigBase
from google.cloud.bigquery import Table as BigQueryTable

from dbt.adapters.bigquery.relation_configs.policies import (
BigQueryIncludePolicy,
BigQueryQuotePolicy,
Expand Down Expand Up @@ -35,16 +37,14 @@ def parse_model_node(cls, model_node: ModelNode) -> dict:
)

@classmethod
def from_relation_results(cls, relation_results: RelationResults) -> "RelationConfigBase":
relation_config = cls.parse_relation_results(relation_results)
def from_bq_table(cls, table: BigQueryTable) -> "RelationConfigBase":
relation_config = cls.parse_bq_table(table)
relation = cls.from_dict(relation_config)
return relation

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
raise NotImplementedError(
"`parse_relation_results()` needs to be implemented on this RelationConfigBase instance"
)
def parse_bq_table(cls, table: BigQueryTable) -> dict:
raise NotImplementedError("`parse_bq_table()` is not implemented for this relation type")

@classmethod
def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]:
Expand Down
6 changes: 3 additions & 3 deletions dbt/adapters/bigquery/relation_configs/cluster.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from dataclasses import dataclass
from typing import Any, Dict, FrozenSet

import agate
from dbt.adapters.relation_configs import RelationConfigChange
from dbt.contracts.graph.nodes import ModelNode
from google.cloud.bigquery import Table as BigQueryTable

from dbt.adapters.bigquery.relation_configs._base import BigQueryRelationConfigBase

Expand Down Expand Up @@ -40,8 +40,8 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
return config_dict

@classmethod
def parse_relation_results(cls, relation_results: agate.Table) -> Dict[str, Any]: # type: ignore
config_dict = {"fields": frozenset(row.get("column_name") for row in relation_results)}
def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]: # type: ignore
config_dict = {"fields": frozenset(table.clustering_fields)}
return config_dict


Expand Down
60 changes: 23 additions & 37 deletions dbt/adapters/bigquery/relation_configs/materialized_view.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

import agate
from dbt.adapters.relation_configs import RelationResults
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import ComponentName
from google.cloud.bigquery import Table as BigQueryTable

from dbt.adapters.bigquery.relation_configs._base import BigQueryRelationConfigBase
from dbt.adapters.bigquery.relation_configs.options import (
Expand All @@ -25,17 +24,17 @@ class BigQueryMaterializedViewConfig(BigQueryRelationConfigBase):
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_materialized_view_statement

The following parameters are configurable by dbt:
- materialized_view_name: name of the materialized view
- schema_name: dataset name of the materialized view
- database_name: project name of the database
- table_id: name of the materialized view
- dataset_id: dataset name of the materialized view
- project_id: project name of the database
- options: options that get set in `SET OPTIONS()` clause
- partition: object containing partition information
- cluster: object containing cluster information
"""

materialized_view_name: str
schema_name: str
database_name: str
table_id: str
dataset_id: str
project_id: str
options: BigQueryOptionsConfig
partition: Optional[PartitionConfig] = None
cluster: Optional[BigQueryClusterConfig] = None
Expand All @@ -44,13 +43,9 @@ class BigQueryMaterializedViewConfig(BigQueryRelationConfigBase):
def from_dict(cls, config_dict: Dict[str, Any]) -> "BigQueryMaterializedViewConfig":
# required
kwargs_dict: Dict[str, Any] = {
"materialized_view_name": cls._render_part(
ComponentName.Identifier, config_dict["materialized_view_name"]
),
"schema_name": cls._render_part(ComponentName.Schema, config_dict["schema_name"]),
"database_name": cls._render_part(
ComponentName.Database, config_dict["database_name"]
),
"table_id": cls._render_part(ComponentName.Identifier, config_dict["table_id"]),
"dataset_id": cls._render_part(ComponentName.Schema, config_dict["dataset_id"]),
"project_id": cls._render_part(ComponentName.Database, config_dict["project_id"]),
"options": BigQueryOptionsConfig.from_dict(config_dict["options"]),
}

Expand All @@ -67,9 +62,9 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "BigQueryMaterializedViewConf
@classmethod
def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
config_dict = {
"materialized_view_name": model_node.identifier,
"schema_name": model_node.schema,
"database_name": model_node.database,
"table_id": model_node.identifier,
"dataset_id": model_node.schema,
"project_id": model_node.database,
# despite this being a foreign object, there will always be options because of defaults
"options": BigQueryOptionsConfig.parse_model_node(model_node),
}
Expand All @@ -84,30 +79,21 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
materialized_view_config: agate.Table = relation_results.get("materialized_view") # type: ignore
materialized_view: agate.Row = cls._get_first_row(materialized_view_config)

def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]:
config_dict = {
"materialized_view_name": materialized_view.get("table_name"),
"schema_name": materialized_view.get("table_schema"),
"database_name": materialized_view.get("table_catalog"),
"table_id": table.table_id,
"dataset_id": table.dataset_id,
"project_id": table.project,
# despite this being a foreign object, there will always be options because of defaults
"options": BigQueryOptionsConfig.parse_relation_results(relation_results),
"options": BigQueryOptionsConfig.parse_bq_table(table),
}

# optional
if partition_by := relation_results.get("partition_by"):
if len(partition_by) > 0:
config_dict.update(
{"partition": PartitionConfig.parse_relation_results(partition_by[0])}
)

cluster_by: agate.Table = relation_results.get("cluster_by") # type: ignore
if len(cluster_by) > 0:
config_dict.update(
{"cluster": BigQueryClusterConfig.parse_relation_results(cluster_by)}
)
if table.time_partitioning or table.range_partitioning:
config_dict.update({"partition": PartitionConfig.parse_bq_table(table)})

if table.clustering_fields:
config_dict.update({"cluster": BigQueryClusterConfig.parse_bq_table(table)})

return config_dict

Expand Down
42 changes: 21 additions & 21 deletions dbt/adapters/bigquery/relation_configs/options.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional

import agate
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.relation_configs import RelationConfigChange
from dbt.contracts.graph.nodes import ModelNode
from google.cloud.bigquery import Table as BigQueryTable

from dbt.adapters.bigquery.relation_configs._base import BigQueryRelationConfigBase
from dbt.adapters.bigquery.utility import bool_setting, float_setting, sql_escape
Expand All @@ -25,18 +25,6 @@ class BigQueryOptionsConfig(BigQueryRelationConfigBase):
description: Optional[str] = None
labels: Optional[Dict[str, str]] = None

@classmethod
def user_configurable_options(cls) -> List[str]:
return [
"enable_refresh",
"refresh_interval_minutes",
"expiration_timestamp",
"max_staleness",
"kms_key_name",
"description",
"labels",
]

def as_ddl_dict(self) -> Dict[str, Any]:
"""
Reformat `options_dict` so that it can be passed into the `bigquery_options()` macro.
Expand Down Expand Up @@ -121,7 +109,15 @@ def formatted_setting(name: str) -> Any:
def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
config_dict = {
option: model_node.config.extra.get(option)
for option in cls.user_configurable_options()
for option in [
"enable_refresh",
"refresh_interval_minutes",
"expiration_timestamp",
"max_staleness",
"kms_key_name",
"description",
"labels",
]
}

# update dbt-specific versions of these settings
Expand All @@ -135,13 +131,17 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
options_config: agate.Table = relation_results.get("options") # type: ignore
def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]:
config_dict = {
option.get("option_name"): option.get("option_value")
for option in options_config
if option.get("option_name") in cls.user_configurable_options()
"enable_refresh": table.mview_enable_refresh,
"refresh_interval_minutes": table.mview_refresh_interval.seconds / 60,
"expiration_timestamp": table.expires,
"max_staleness": None,
"description": table.description,
"labels": table.labels,
}
if encryption_configuration := table.encryption_configuration:
config_dict.update({"kms_key_name": encryption_configuration.kms_key_name})
return config_dict


Expand Down
40 changes: 24 additions & 16 deletions dbt/adapters/bigquery/relation_configs/partition.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import agate
from dbt.contracts.graph.nodes import ModelNode
from dbt.dataclass_schema import dbtClassMixin, ValidationError
import dbt.exceptions
from dbt.adapters.relation_configs import RelationConfigChange
from google.cloud.bigquery.table import Table as BigQueryTable


@dataclass
Expand Down Expand Up @@ -108,23 +108,31 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
return model_node.config.extra.get("partition_by")

@classmethod
def parse_relation_results(cls, describe_relation_results: agate.Row) -> Dict[str, Any]:
def parse_bq_table(cls, table: BigQueryTable) -> Dict[str, Any]:
"""
Parse the results of a describe query into a raw config for `PartitionConfig.parse`
Parse the results of a BQ Table object into a raw config for `PartitionConfig.parse`
"""
config_dict = {
"field": describe_relation_results.get("partition_column_name"),
"data_type": describe_relation_results.get("partition_data_type"),
"granularity": describe_relation_results.get("partition_type"),
}

# combine range fields into dictionary, like the model config
range_dict = {
"start": describe_relation_results.get("partition_start"),
"end": describe_relation_results.get("partition_end"),
"interval": describe_relation_results.get("partition_interval"),
}
config_dict.update({"range": range_dict})
if time_partitioning := table.time_partitioning:
field_types = {field.name: field.field_type.lower() for field in table.schema}
config_dict = {
"field": time_partitioning.field,
"data_type": field_types[time_partitioning.field],
"granularity": time_partitioning.type_,
}

elif range_partitioning := table.range_partitioning:
config_dict = {
"field": range_partitioning.field,
"data_type": "int64",
"range": {
"start": range_partitioning.range_.start,
"end": range_partitioning.range_.end,
"interval": range_partitioning.range_.interval,
},
}

else:
config_dict = {}

return config_dict

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@


{% macro bigquery__describe_partition(relation) %}
{% set _sql = bigquery__get_describe_partition_sql(relation) %}
{% do return(run_query(_sql)) %}
{% set bq_relation = adapter.connections.get_bq_table(relation.database, relation.schema, relation.identifier) %}
{% do return(bq_relation) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
{% endmacro %}

{% macro bigquery__get_materialized_view_configuration_changes(existing_relation, new_config) %}
{% set _existing_materialized_view = bigquery__describe_materialized_view(existing_relation) %}
{% set _existing_materialized_view = adapter.describe_relation(existing_relation) %}
{% set _configuration_changes = existing_relation.materialized_view_config_changeset(_existing_materialized_view, new_config) %}
{% do return(_configuration_changes) %}
{% endmacro %}
Loading