Skip to content

Commit

Permalink
feat(ingest(cli): add domain helper, add progressbar (#12436)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 23, 2025
1 parent 3471857 commit 8eda51e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
4 changes: 3 additions & 1 deletion docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ A group of commands to interact with containers in DataHub.
e.g. You can use this to apply a tag to all datasets recursively in this container.
```shell
datahub container tag --container-urn "urn:li:container:0e9e46bd6d5cf645f33d5a8f0254bc2d" --tag-urn "urn:li:tag:tag1"

datahub container domain --container-urn "urn:li:container:3f2effd1fbe154a4d60b597263a41e41" --domain-urn "urn:li:domain:ajsajo-b832-4ab3-8881-7ed5e991a44c"
datahub container owner --container-urn "urn:li:container:3f2effd1fbe154a4d60b597263a41e41" --owner-urn "urn:li:corpGroup:[email protected]"
datahub container term --container-urn "urn:li:container:3f2effd1fbe154a4d60b597263a41e41" --term-urn "urn:li:term:PII"
```

### check
Expand Down
41 changes: 30 additions & 11 deletions metadata-ingestion/src/datahub/cli/container_cli.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging
from typing import List
from typing import Any, List

import click
import progressbar

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import get_default_graph
from datahub.metadata.schema_classes import (
DomainsClass,
GlossaryTermAssociationClass,
OwnerClass,
OwnershipTypeClass,
Expand All @@ -27,12 +30,12 @@ def apply_association_to_container(
association_type: str,
) -> None:
"""
Common function to add either tags, terms, or owners to child datasets (for now).
Common function to add either tags, terms, domains, or owners to child datasets (for now).
Args:
container_urn: The URN of the container
association_urn: The URN of the tag, term, or user to apply
association_type: One of 'tag', 'term', or 'owner'
association_type: One of 'tag', 'term', 'domain' or 'owner'
"""
urns: List[str] = []
graph = get_default_graph()
Expand All @@ -43,10 +46,10 @@ def apply_association_to_container(
)
)

all_patches: List[Any] = []
for urn in urns:
logger.info(f"Adding {association_type} {association_urn} to {urn}")
builder = DatasetPatchBuilder(urn)

patches: List[Any] = []
if association_type == "tag":
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
elif association_type == "term":
Expand All @@ -60,9 +63,17 @@ def apply_association_to_container(
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
).build()

for mcp in patches:
graph.emit(mcp)
elif association_type == "domain":
patches = [
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DomainsClass(domains=[association_urn]),
)
]
all_patches.extend(patches)
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
for mcp in mcps_iter:
graph.emit(mcp)


@container.command()
Expand All @@ -83,7 +94,15 @@ def term(container_urn: str, term_urn: str) -> None:

@container.command()
@click.option("--container-urn", required=True, type=str)
@click.option("--owner-id", required=True, type=str)
def owner(container_urn: str, owner_id: str) -> None:
@click.option("--owner-urn", required=True, type=str)
def owner(container_urn: str, owner_urn: str) -> None:
"""Add patch to add a owner to all datasets in a container"""
apply_association_to_container(container_urn, owner_id, "owner")
apply_association_to_container(container_urn, owner_urn, "owner")


@container.command()
@click.option("--container-urn", required=True, type=str)
@click.option("--domain-urn", required=True, type=str)
def domain(container_urn: str, domain_urn: str) -> None:
"""Add patch to add a domain to all datasets in a container"""
apply_association_to_container(container_urn, domain_urn, "domain")

0 comments on commit 8eda51e

Please sign in to comment.