Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): cleanup structured properties validation #12115

Merged
merged 2 commits into from
Dec 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
from contextlib import contextmanager
from enum import Enum
from pathlib import Path
from typing import Generator, List, Optional
from typing import List, Optional

import yaml
from pydantic import validator
from ruamel.yaml import YAML

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.global_context import get_graph_context, set_graph_context
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.metadata.schema_classes import (
PropertyValueClass,
Expand All @@ -24,23 +24,10 @@
class StructuredPropertiesConfig:
"""Configuration class to hold the graph client"""

_graph: Optional[DataHubGraph] = None

@classmethod
@contextmanager
def use_graph(cls, graph: DataHubGraph) -> Generator[None, None, None]:
"""Context manager to temporarily set a custom graph"""
previous_graph = cls._graph
cls._graph = graph
try:
yield
finally:
cls._graph = previous_graph

@classmethod
def get_graph(cls) -> DataHubGraph:
def get_graph_required(cls) -> DataHubGraph:
"""Get the current graph, falling back to default if none set"""
return cls._graph if cls._graph is not None else get_default_graph()
return get_graph_context() or get_default_graph()


class AllowedTypes(Enum):
Expand Down Expand Up @@ -79,7 +66,7 @@ class TypeQualifierAllowedTypes(ConfigModel):
@validator("allowed_types", each_item=True)
def validate_allowed_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph()
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
Expand All @@ -106,7 +93,7 @@ class StructuredProperties(ConfigModel):
@validator("entity_types", each_item=True)
def validate_entity_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph()
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
Expand Down Expand Up @@ -136,63 +123,64 @@ def urn_must_be_present(cls, v, values):

@staticmethod
def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
emitter: DataHubGraph = graph if graph else get_default_graph()
with StructuredPropertiesConfig.use_graph(emitter):
print("Using graph")
with set_graph_context(graph):
graph = StructuredPropertiesConfig.get_graph_required()

with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
)

if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warning(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warn(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
emitter.emit_mcp(mcp)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
)
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")
logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
with StructuredPropertiesConfig.use_graph(graph):
with set_graph_context(graph):
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
Expand Down
Loading