Skip to content

Commit

Permalink
added sync and asyc cases
Browse files Browse the repository at this point in the history
  • Loading branch information
ryota-cloud committed Jan 23, 2025
1 parent 6eb47f8 commit 71486b4
Showing 1 changed file with 37 additions and 21 deletions.
58 changes: 37 additions & 21 deletions smoke-test/tests/restli/test_restli_batch_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,29 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass:
return mcp_invalid.make_mcp()


def _create_invalid_dataset_mcps() -> List[MetadataChangeProposalWrapper]:
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)"
model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn()
bad_mcps = [
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.StatusClass(removed=False),
),
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.UpstreamLineageClass(
upstreams=[
models.UpstreamClass(
dataset=model_urn,
type=models.DatasetLineageTypeClass.TRANSFORMED,
)
]
),
),
]
return bad_mcps


def test_restli_batch_ingestion_sync(graph_client):
# Positive Test (all valid MetadataChangeProposal)
mcps = _create_valid_dashboard_mcps()
Expand Down Expand Up @@ -137,34 +160,27 @@ def test_restli_batch_ingestion_async(graph_client):
assert aspect.lastModified is not None


def test_restli_batch_ingest_exception_equivalence(graph_client):
def test_restli_batch_ingestion_exception_sync(graph_client):
"""
Test that batch ingest exceptions are equivalent to single ingest exceptions.
Test Batch ingestion when an exception occurs in sync mode
"""
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)"
model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn()
bad_mcps = [
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.StatusClass(removed=False),
),
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.UpstreamLineageClass(
upstreams=[
models.UpstreamClass(
dataset=model_urn,
type=models.DatasetLineageTypeClass.TRANSFORMED,
)
]
),
),
]
bad_mcps = _create_invalid_dataset_mcps()
generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn])

try:
graph_client.emit_mcps(bad_mcps, async_flag=False)
raise AssertionError("should have thrown an exception")
except Exception as e:
if isinstance(e, AssertionError):
raise e
print(f"Error emitting MCPs due to {e}")


def test_restli_batch_ingestion_exception_async(graph_client):
"""
Test Batch ingestion when an exception occurs in async mode
"""
bad_mcps = _create_invalid_dataset_mcps()
generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn])
ret = graph_client.emit_mcps(bad_mcps, async_flag=True)
assert ret >= 0

0 comments on commit 71486b4

Please sign in to comment.