Skip to content

Commit

Permalink
adding another batch_ingestion test for exception
Browse files Browse the repository at this point in the history
  • Loading branch information
ryota-cloud committed Jan 23, 2025
1 parent cb47577 commit 6eb47f8
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions smoke-test/tests/restli/test_restli_batch_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_dashboard_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.serialization_helper import pre_json_transform
Expand All @@ -12,6 +13,7 @@
ChangeAuditStampsClass,
DashboardInfoClass,
)
from datahub.metadata.urns import MlModelUrn
from tests.consistency_utils import wait_for_writes_to_sync
from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper
from tests.utils import delete_urns
Expand Down Expand Up @@ -133,3 +135,36 @@ def test_restli_batch_ingestion_async(graph_client):
assert aspect.title == "Dummy Title For Testing"
assert aspect.description == "Dummy Description For Testing"
assert aspect.lastModified is not None


def test_restli_batch_ingest_exception_equivalence(graph_client):
"""
Test that batch ingest exceptions are equivalent to single ingest exceptions.
"""
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)"
model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn()
bad_mcps = [
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.StatusClass(removed=False),
),
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.UpstreamLineageClass(
upstreams=[
models.UpstreamClass(
dataset=model_urn,
type=models.DatasetLineageTypeClass.TRANSFORMED,
)
]
),
),
]
generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn])
try:
graph_client.emit_mcps(bad_mcps, async_flag=False)
raise AssertionError("should have thrown an exception")
except Exception as e:
if isinstance(e, AssertionError):
raise e
print(f"Error emitting MCPs due to {e}")

0 comments on commit 6eb47f8

Please sign in to comment.