From 71486b4033f574b41e99532cc605ed32f2bd50fe Mon Sep 17 00:00:00 2001 From: Ryota Egashira Date: Thu, 23 Jan 2025 12:47:54 -0800 Subject: [PATCH] added sync and asyc cases --- .../restli/test_restli_batch_ingestion.py | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/smoke-test/tests/restli/test_restli_batch_ingestion.py b/smoke-test/tests/restli/test_restli_batch_ingestion.py index 4b8b55db86e6b4..98292f26bfa3cf 100644 --- a/smoke-test/tests/restli/test_restli_batch_ingestion.py +++ b/smoke-test/tests/restli/test_restli_batch_ingestion.py @@ -86,6 +86,29 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass: return mcp_invalid.make_mcp() +def _create_invalid_dataset_mcps() -> List[MetadataChangeProposalWrapper]: + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)" + model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn() + bad_mcps = [ + MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=models.StatusClass(removed=False), + ), + MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=models.UpstreamLineageClass( + upstreams=[ + models.UpstreamClass( + dataset=model_urn, + type=models.DatasetLineageTypeClass.TRANSFORMED, + ) + ] + ), + ), + ] + return bad_mcps + + def test_restli_batch_ingestion_sync(graph_client): # Positive Test (all valid MetadataChangeProposal) mcps = _create_valid_dashboard_mcps() @@ -137,30 +160,13 @@ def test_restli_batch_ingestion_async(graph_client): assert aspect.lastModified is not None -def test_restli_batch_ingest_exception_equivalence(graph_client): +def test_restli_batch_ingestion_exception_sync(graph_client): """ - Test that batch ingest exceptions are equivalent to single ingest exceptions. + Test Batch ingestion when an exception occurs in sync mode """ - dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)" - model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn() - bad_mcps = [ - MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=models.StatusClass(removed=False), - ), - MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=models.UpstreamLineageClass( - upstreams=[ - models.UpstreamClass( - dataset=model_urn, - type=models.DatasetLineageTypeClass.TRANSFORMED, - ) - ] - ), - ), - ] + bad_mcps = _create_invalid_dataset_mcps() generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn]) + try: graph_client.emit_mcps(bad_mcps, async_flag=False) raise AssertionError("should have thrown an exception") @@ -168,3 +174,13 @@ def test_restli_batch_ingest_exception_equivalence(graph_client): if isinstance(e, AssertionError): raise e print(f"Error emitting MCPs due to {e}") + + +def test_restli_batch_ingestion_exception_async(graph_client): + """ + Test Batch ingestion when an exception occurs in async mode + """ + bad_mcps = _create_invalid_dataset_mcps() + generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn]) + ret = graph_client.emit_mcps(bad_mcps, async_flag=True) + assert ret >= 0