Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add scaffolding for sdk v2 #12554

Merged
merged 13 commits into from
Feb 6, 2025
5 changes: 4 additions & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ warn_unused_configs = yes
disallow_untyped_defs = no

# try to be a bit more strict in certain areas of the codebase
[mypy-datahub]
# Only for datahub's __init__.py - allow implicit reexport
implicit_reexport = yes
[mypy-datahub.*]
ignore_missing_imports = no
implicit_reexport = no
Expand Down Expand Up @@ -54,7 +57,7 @@ addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no:
markers =
slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow')
integration: marks all integration tests, across all batches (deselect with '-m "not integration"')
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelisation in CI. Batch 0 is the default batch.
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch.
integration_batch_1: mark tests to run in batch 1 of integration tests
integration_batch_2: mark tests to run in batch 2 of integration tests
testpaths =
Expand Down
35 changes: 35 additions & 0 deletions metadata-ingestion/src/datahub/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from datahub.configuration.common import MetaError

# TODO: Move all other error types to this file.


class SdkUsageError(MetaError):
pass


class AlreadyExistsError(SdkUsageError):
pass


class ItemNotFoundError(SdkUsageError):
pass


class MultipleItemsFoundError(SdkUsageError):
pass


class SchemaFieldKeyError(SdkUsageError, KeyError):
pass


class IngestionAttributionWarning(Warning):
pass


class MultipleSubtypesWarning(Warning):
pass


class ExperimentalWarning(Warning):
pass
33 changes: 33 additions & 0 deletions metadata-ingestion/src/datahub/sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import warnings

import datahub.metadata.schema_classes as models
from datahub.errors import ExperimentalWarning, SdkUsageError
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.metadata.urns import (
ChartUrn,
ContainerUrn,
CorpGroupUrn,
CorpUserUrn,
DashboardUrn,
DataPlatformInstanceUrn,
DataPlatformUrn,
DatasetUrn,
DomainUrn,
GlossaryTermUrn,
SchemaFieldUrn,
TagUrn,
)
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.main_client import DataHubClient

warnings.warn(
"The new datahub SDK (e.g. datahub.sdk.*) is experimental. "
"Our typical backwards-compatibility and stability guarantees do not apply to this code. "
"When it's promoted to stable, the import path will change "
"from `from datahub.sdk import ...` to `from datahub import ...`.",
ExperimentalWarning,
stacklevel=2,
)
del warnings
del ExperimentalWarning
15 changes: 15 additions & 0 deletions metadata-ingestion/src/datahub/sdk/_all_entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Dict, List, Type

from datahub.sdk._entity import Entity
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset

# TODO: Is there a better way to declare this?
ENTITY_CLASSES_LIST: List[Type[Entity]] = [
Container,
Dataset,
]

ENTITY_CLASSES: Dict[str, Type[Entity]] = {
cls.get_urn_type().ENTITY_TYPE: cls for cls in ENTITY_CLASSES_LIST
}
48 changes: 48 additions & 0 deletions metadata-ingestion/src/datahub/sdk/_attribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

import contextlib
from typing import Iterator

from datahub.utilities.str_enum import StrEnum


class KnownAttribution(StrEnum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we consider custom attributions beyond the listed ones here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we will - this is not the final form. attribution will likely be expanded upon a bunch, but this works well enough for an initial experimental impl

INGESTION = "INGESTION"
INGESTION_ALTERNATE = "INGESTION_ALTERNATE"

UI = "UI"
SDK = "SDK"

PROPAGATION = "PROPAGATION"

def is_ingestion(self) -> bool:
return self in (
KnownAttribution.INGESTION,
KnownAttribution.INGESTION_ALTERNATE,
)


_default_attribution = KnownAttribution.SDK


def get_default_attribution() -> KnownAttribution:
return _default_attribution


def set_default_attribution(attribution: KnownAttribution) -> None:
global _default_attribution
_default_attribution = attribution


@contextlib.contextmanager
def change_default_attribution(attribution: KnownAttribution) -> Iterator[None]:
old_attribution = get_default_attribution()
try:
set_default_attribution(attribution)
yield
finally:
set_default_attribution(old_attribution)


def is_ingestion_attribution() -> bool:
return get_default_attribution().is_ingestion()
89 changes: 89 additions & 0 deletions metadata-ingestion/src/datahub/sdk/_entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import abc
from typing import List, Optional, Type, Union

from typing_extensions import Self

import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import Aspect as AspectTypeVar
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.errors import SdkUsageError
from datahub.metadata.urns import Urn
from datahub.utilities.urns._urn_base import _SpecificUrn


class Entity:
__slots__ = ("_urn", "_prev_aspects", "_aspects")

def __init__(self, /, urn: Urn):
# This method is not meant for direct usage.
if type(self) is Entity:
raise SdkUsageError(f"{Entity.__name__} cannot be instantiated directly.")

Check warning on line 20 in metadata-ingestion/src/datahub/sdk/_entity.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/sdk/_entity.py#L20

Added line #L20 was not covered by tests

assert isinstance(urn, self.get_urn_type())
self._urn: _SpecificUrn = urn

# prev_aspects is None means this was created from scratch
self._prev_aspects: Optional[models.AspectBag] = None
self._aspects: models.AspectBag = {}

@classmethod
def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self:
# If an init method from a subclass adds required fields, it also needs to override this method.
# An alternative approach would call cls.__new__() to bypass the init method, but it's a bit
# too hacky for my taste.
entity = cls(urn=urn)
return entity._init_from_graph(current_aspects)

Check warning on line 35 in metadata-ingestion/src/datahub/sdk/_entity.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/sdk/_entity.py#L34-L35

Added lines #L34 - L35 were not covered by tests

def _init_from_graph(self, current_aspects: models.AspectBag) -> Self:
self._prev_aspects = current_aspects
aspect: models._Aspect
for aspect_name, aspect in (current_aspects or {}).items(): # type: ignore
aspect_copy = type(aspect).from_obj(aspect.to_obj())
self._aspects[aspect_name] = aspect_copy # type: ignore
return self

@classmethod
@abc.abstractmethod
def get_urn_type(cls) -> Type[_SpecificUrn]: ...

@property
def urn(self) -> _SpecificUrn:
return self._urn

def _get_aspect(
self,
aspect_type: Type[AspectTypeVar],
/,
) -> Optional[AspectTypeVar]:
return self._aspects.get(aspect_type.ASPECT_NAME) # type: ignore

def _set_aspect(self, value: AspectTypeVar, /) -> None:
self._aspects[value.ASPECT_NAME] = value # type: ignore

def _setdefault_aspect(self, default_aspect: AspectTypeVar, /) -> AspectTypeVar:
# Similar semantics to dict.setdefault.
if existing_aspect := self._get_aspect(type(default_aspect)):
return existing_aspect
self._set_aspect(default_aspect)
return default_aspect

def _as_mcps(
self,
change_type: Union[str, models.ChangeTypeClass] = models.ChangeTypeClass.UPSERT,
) -> List[MetadataChangeProposalWrapper]:
urn_str = str(self.urn)

mcps = []
for aspect in self._aspects.values():
assert isinstance(aspect, models._Aspect)
mcps.append(
MetadataChangeProposalWrapper(
entityUrn=urn_str,
aspect=aspect,
changeType=change_type,
)
)
return mcps

def __repr__(self) -> str:
return f"{self.__class__.__name__}('{self.urn}')"
Loading
Loading