Skip to content

Commit

Permalink
refactor(ingest): cleanup structured properties validation (#12115)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 14, 2024
1 parent f9ca305 commit 05ba403
Showing 1 changed file with 56 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
from contextlib import contextmanager
from enum import Enum
from pathlib import Path
from typing import Generator, List, Optional
from typing import List, Optional

import yaml
from pydantic import validator
from ruamel.yaml import YAML

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.global_context import get_graph_context, set_graph_context
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.metadata.schema_classes import (
PropertyValueClass,
Expand All @@ -24,23 +24,10 @@
class StructuredPropertiesConfig:
"""Configuration class to hold the graph client"""

_graph: Optional[DataHubGraph] = None

@classmethod
@contextmanager
def use_graph(cls, graph: DataHubGraph) -> Generator[None, None, None]:
"""Context manager to temporarily set a custom graph"""
previous_graph = cls._graph
cls._graph = graph
try:
yield
finally:
cls._graph = previous_graph

@classmethod
def get_graph(cls) -> DataHubGraph:
def get_graph_required(cls) -> DataHubGraph:
"""Get the current graph, falling back to default if none set"""
return cls._graph if cls._graph is not None else get_default_graph()
return get_graph_context() or get_default_graph()


class AllowedTypes(Enum):
Expand Down Expand Up @@ -79,7 +66,7 @@ class TypeQualifierAllowedTypes(ConfigModel):
@validator("allowed_types", each_item=True)
def validate_allowed_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph()
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
Expand All @@ -106,7 +93,7 @@ class StructuredProperties(ConfigModel):
@validator("entity_types", each_item=True)
def validate_entity_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph()
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
Expand Down Expand Up @@ -136,63 +123,64 @@ def urn_must_be_present(cls, v, values):

@staticmethod
def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
emitter: DataHubGraph = graph if graph else get_default_graph()
with StructuredPropertiesConfig.use_graph(emitter):
print("Using graph")
with set_graph_context(graph):
graph = StructuredPropertiesConfig.get_graph_required()

with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
)

if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warning(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warn(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
emitter.emit_mcp(mcp)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
)
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")
logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
with StructuredPropertiesConfig.use_graph(graph):
with set_graph_context(graph):
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
Expand Down

0 comments on commit 05ba403

Please sign in to comment.