Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(persist_docs): write dbt model and project metadata to properties #449

Merged
merged 11 commits into from
Oct 13, 2023
46 changes: 35 additions & 11 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@
get_catalog_id,
get_catalog_type,
get_chunks,
is_valid_table_parameter_key,
stringify_table_parameter_value,
)
from dbt.adapters.base import ConstraintSupport, available
from dbt.adapters.base.impl import AdapterConfig
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.sql import SQLAdapter
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import CompiledNode, ConstraintType
from dbt.exceptions import DbtRuntimeError
Expand Down Expand Up @@ -810,11 +813,12 @@ def persist_docs_to_glue(
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

# By default, there is no need to update Glue Table
need_udpate_table = False
need_to_update_table = False
# Get Table from Glue
table = glue_client.get_table(CatalogId=catalog_id, DatabaseName=relation.schema, Name=relation.name)["Table"]
# Prepare new version of Glue Table picking up significant fields
updated_table = self._get_table_input(table)
table_input = self._get_table_input(table)
table_parameters = table_input["Parameters"]
# Update table description
if persist_relation_docs:
# Prepare dbt description
Expand All @@ -825,16 +829,35 @@ def persist_docs_to_glue(
glue_table_comment = table["Parameters"].get("comment", "")
# Update description if it's different
if clean_table_description != glue_table_description or clean_table_description != glue_table_comment:
updated_table["Description"] = clean_table_description
updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"])
updated_table_parameters["comment"] = clean_table_description
updated_table["Parameters"] = updated_table_parameters
need_udpate_table = True
table_input["Description"] = clean_table_description
table_parameters["comment"] = clean_table_description
need_to_update_table = True

# Get dbt model meta if available
meta: Dict[str, Any] = model.get("config", {}).get("meta", {})
# Add some of dbt model config fields as table meta
meta["unique_id"] = model.get("unique_id")
meta["materialized"] = model.get("config", {}).get("materialized")
# Get dbt runtime config to be able to get dbt project metadata
runtime_config: RuntimeConfig = self.config
# Add dbt project metadata to table meta
meta["dbt_project_name"] = runtime_config.project_name
meta["dbt_project_version"] = runtime_config.version
# Prepare meta values for table properties and check if update is required
for meta_key, meta_value_raw in meta.items():
if is_valid_table_parameter_key(meta_key):
meta_value = stringify_table_parameter_value(meta_value_raw)
# Check that meta value is already attached to Glue table
current_meta_value: Optional[str] = table_parameters.get(meta_key)
if current_meta_value is None or current_meta_value != meta_value:
# Update Glue table parameter only if needed
table_parameters[meta_key] = meta_value
need_to_update_table = True

# Update column comments
if persist_column_docs:
# Process every column
for col_obj in updated_table["StorageDescriptor"]["Columns"]:
for col_obj in table_input["StorageDescriptor"]["Columns"]:
# Get column description from dbt
col_name = col_obj["Name"]
if col_name in model["columns"]:
Expand All @@ -846,15 +869,16 @@ def persist_docs_to_glue(
# Update column description if it's different
if glue_col_comment != clean_col_comment:
col_obj["Comment"] = clean_col_comment
need_udpate_table = True
need_to_update_table = True

# Update Glue Table only if table/column description is modified.
# It prevents redundant schema version creating after incremental runs.
if need_udpate_table:
if need_to_update_table:
table_input["Parameters"] = table_parameters
glue_client.update_table(
CatalogId=catalog_id,
DatabaseName=relation.schema,
TableInput=updated_table,
TableInput=table_input,
SkipArchive=skip_archive_table_version,
)

Expand Down
17 changes: 16 additions & 1 deletion dbt/adapters/athena/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import re
from enum import Enum
from typing import Generator, List, Optional, TypeVar
from typing import Any, Generator, List, Optional, TypeVar

from mypy_boto3_athena.type_defs import DataCatalogTypeDef

Expand All @@ -9,6 +11,19 @@ def clean_sql_comment(comment: str) -> str:
return " ".join(line for line in split_and_strip if line)


def stringify_table_parameter_value(value: Any) -> str:
"""Convert any variable to string for Glue Table property."""
if isinstance(value, (dict, list)):
value_str: str = json.dumps(value)
roslovets marked this conversation as resolved.
Show resolved Hide resolved
else:
value_str = str(value)
return value_str[:512000]


def is_valid_table_parameter_key(key: str) -> bool:
return len(key) <= 255 and bool(re.match(r"^[\u0020-\uD7FF\uE000-\uFFFD\t]*$", key))
roslovets marked this conversation as resolved.
Show resolved Hide resolved


def get_catalog_id(catalog: Optional[DataCatalogTypeDef]) -> Optional[str]:
return catalog["Parameters"]["catalog-id"] if catalog and catalog["Type"] == AthenaCatalogType.GLUE.value else None

Expand Down
23 changes: 22 additions & 1 deletion tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from dbt.adapters.athena.utils import clean_sql_comment, get_chunks
from dbt.adapters.athena.utils import (
clean_sql_comment,
get_chunks,
is_valid_table_parameter_key,
stringify_table_parameter_value,
)


def test_clean_comment():
Expand All @@ -14,6 +19,22 @@ def test_clean_comment():
)


def test_stringify_table_parameter_value():
assert stringify_table_parameter_value(True) == "True"
assert stringify_table_parameter_value(123) == "123"
assert stringify_table_parameter_value("dbt-athena") == "dbt-athena"
assert stringify_table_parameter_value(["a", "b", 3]) == '["a", "b", 3]'
assert stringify_table_parameter_value({"a": 1, "b": "c"}) == '{"a": 1, "b": "c"}'
assert len(stringify_table_parameter_value("a" * 512001)) == 512000


def test_is_valid_table_parameter_key():
assert is_valid_table_parameter_key("valid_key") is True
assert is_valid_table_parameter_key("Valid Key 123*!") is True
assert is_valid_table_parameter_key("invalid \n key") is False
assert is_valid_table_parameter_key("long_key" * 100) is False


def test_get_chunks_empty():
assert len(list(get_chunks([], 5))) == 0

Expand Down