diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index a756b27a38e848..704479d958bc12 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -1039,12 +1039,12 @@ jobs: cypress_matrix=$(printf "{\"test_strategy\":\"cypress\",\"batch\":\"0\",\"batch_count\":\"$cypress_batch_count\"}"; for ((i=1;i> "$GITHUB_OUTPUT" diff --git a/docker/datahub-frontend/Dockerfile b/docker/datahub-frontend/Dockerfile index 16e6477c37ce69..43659cb4362985 100644 --- a/docker/datahub-frontend/Dockerfile +++ b/docker/datahub-frontend/Dockerfile @@ -15,7 +15,7 @@ RUN if [ "${ALPINE_REPO_URL}" != "http://dl-cdn.alpinelinux.org/alpine" ] ; then # Upgrade Alpine and base packages # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 -ENV JMX_VERSION=0.18.0 +ENV JMX_VERSION=0.20.0 RUN apk --no-cache --update-cache --available upgrade \ && apk --no-cache add curl sqlite libc6-compat snappy \ && apk --no-cache add openjdk17-jre-headless --repository=${ALPINE_REPO_URL}/edge/community \ diff --git a/docker/datahub-gms/Dockerfile b/docker/datahub-gms/Dockerfile index 5462d4f70002c1..232802a6bad8b8 100644 --- a/docker/datahub-gms/Dockerfile +++ b/docker/datahub-gms/Dockerfile @@ -25,7 +25,7 @@ RUN go install github.com/jwilder/dockerize@$DOCKERIZE_VERSION FROM alpine:3.21 AS base -ENV JMX_VERSION=0.18.0 +ENV JMX_VERSION=0.20.0 # Re-declaring args from above to make them available in this stage (will inherit default values) ARG ALPINE_REPO_URL diff --git a/docker/datahub-mae-consumer/Dockerfile b/docker/datahub-mae-consumer/Dockerfile index 4ddec56311fb96..828b505b8b5a80 100644 --- a/docker/datahub-mae-consumer/Dockerfile +++ b/docker/datahub-mae-consumer/Dockerfile @@ -34,7 +34,7 @@ ARG MAVEN_CENTRAL_REPO_URL RUN if [ "${ALPINE_REPO_URL}" != "http://dl-cdn.alpinelinux.org/alpine" ] ; then sed -i "s#http.*://dl-cdn.alpinelinux.org/alpine#${ALPINE_REPO_URL}#g" /etc/apk/repositories ; fi # Upgrade Alpine and base packages -ENV JMX_VERSION=0.18.0 +ENV JMX_VERSION=0.20.0 # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ && apk --no-cache add curl bash coreutils sqlite libc6-compat snappy \ diff --git a/docker/datahub-mce-consumer/Dockerfile b/docker/datahub-mce-consumer/Dockerfile index 42e40cd5942144..b5a73481518dc1 100644 --- a/docker/datahub-mce-consumer/Dockerfile +++ b/docker/datahub-mce-consumer/Dockerfile @@ -34,7 +34,7 @@ ARG MAVEN_CENTRAL_REPO_URL RUN if [ "${ALPINE_REPO_URL}" != "http://dl-cdn.alpinelinux.org/alpine" ] ; then sed -i "s#http.*://dl-cdn.alpinelinux.org/alpine#${ALPINE_REPO_URL}#g" /etc/apk/repositories ; fi # Upgrade Alpine and base packages -ENV JMX_VERSION=0.18.0 +ENV JMX_VERSION=0.20.0 # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ && apk --no-cache add curl bash sqlite libc6-compat snappy \ diff --git a/docker/datahub-upgrade/Dockerfile b/docker/datahub-upgrade/Dockerfile index d63ceb83dc5295..bd4b560b8347cf 100644 --- a/docker/datahub-upgrade/Dockerfile +++ b/docker/datahub-upgrade/Dockerfile @@ -33,7 +33,7 @@ ARG MAVEN_CENTRAL_REPO_URL # Optionally set corporate mirror for apk RUN if [ "${ALPINE_REPO_URL}" != "http://dl-cdn.alpinelinux.org/alpine" ] ; then sed -i "s#http.*://dl-cdn.alpinelinux.org/alpine#${ALPINE_REPO_URL}#g" /etc/apk/repositories ; fi -ENV JMX_VERSION=0.18.0 +ENV JMX_VERSION=0.20.0 # Upgrade Alpine and base packages # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 diff --git a/docs/actions/events/entity-change-event.md b/docs/actions/events/entity-change-event.md index 4a7264c20bcc62..ff992f90480307 100644 --- a/docs/actions/events/entity-change-event.md +++ b/docs/actions/events/entity-change-event.md @@ -417,3 +417,182 @@ This event is emitted when a new entity has been hard-deleted on DataHub. } } ``` + +## Action Request Events (Proposals) + +Action Request events represent proposals for changes to entities that may require approval before being applied. These events have entityType "actionRequest" and use the `LIFECYCLE` category with `CREATE` operation. + +### Domain Association Request Event + +This event is emitted when a domain association is proposed for an entity on DataHub. + +#### Sample Event +```json +{ + "entityType": "actionRequest", + "entityUrn": "urn:li:actionRequest:abc-123", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1234567890 + }, + "version": 0, + "parameters": { + "domains": "[\"urn:li:domain:marketing\"]", + "actionRequestType": "DOMAIN_ASSOCIATION", + "resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)", + "resourceType": "dataset" + } +} +``` + +### Owner Association Request Event + +This event is emitted when an owner association is proposed for an entity on DataHub. + +#### Sample Event +```json +{ + "entityType": "actionRequest", + "entityUrn": "urn:li:actionRequest:def-456", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1234567890 + }, + "version": 0, + "parameters": { + "owners": "[{\"type\":\"TECHNICAL_OWNER\",\"typeUrn\":\"urn:li:ownershipType:technical_owner\",\"ownerUrn\":\"urn:li:corpuser:jdoe\"}]", + "actionRequestType": "OWNER_ASSOCIATION", + "resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)", + "resourceType": "dataset" + } +} +``` + +### Tag Association Request Event + +This event is emitted when a tag association is proposed for an entity on DataHub. + +#### Sample Event +```json +{ + "entityType": "actionRequest", + "entityUrn": "urn:li:actionRequest:ghi-789", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1234567890 + }, + "version": 0, + "parameters": { + "actionRequestType": "TAG_ASSOCIATION", + "resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)", + "tagUrn": "urn:li:tag:pii", + "resourceType": "dataset" + } +} +``` + +### Create Glossary Term Request Event + +This event is emitted when a new glossary term creation is proposed on DataHub. + +#### Sample Event +```json +{ + "entityType": "actionRequest", + "entityUrn": "urn:li:actionRequest:jkl-101", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1234567890 + }, + "version": 0, + "parameters": { + "parentNodeUrn": "urn:li:glossaryNode:123", + "glossaryEntityName": "ExampleTerm", + "actionRequestType": "CREATE_GLOSSARY_TERM", + "resourceType": "glossaryTerm" + } +} +``` + +### Term Association Request Event + +This event is emitted when a glossary term association is proposed for an entity on DataHub. + +#### Sample Event +```json +{ + "entityType": "actionRequest", + "entityUrn": "urn:li:actionRequest:mno-102", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1234567890 + }, + "version": 0, + "parameters": { + "glossaryTermUrn": "urn:li:glossaryTerm:123", + "actionRequestType": "TERM_ASSOCIATION", + "resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)", + "resourceType": "dataset" + } +} +``` + +### Update Description Request Event + +This event is emitted when an update to an entity's description is proposed on DataHub. + +#### Sample Event +```json +{ + "entityType": "actionRequest", + "entityUrn": "urn:li:actionRequest:pqr-103", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1234567890 + }, + "version": 0, + "parameters": { + "description": "Example description for a dataset.", + "actionRequestType": "UPDATE_DESCRIPTION", + "resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)", + "resourceType": "dataset" + } +} +``` + +### Structured Property Association Request Event + +This event is emitted when a structured property association is proposed for an entity on DataHub. + +#### Sample Event +```json +{ + "entityType": "actionRequest", + "entityUrn": "urn:li:actionRequest:stu-104", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1234567890 + }, + "version": 0, + "parameters": { + "structuredProperties": "[{\"propertyUrn\":\"urn:li:structuredProperty:123\",\"values\":[\"value1\",\"value2\"]}]", + "actionRequestType": "STRUCTURED_PROPERTY_ASSOCIATION", + "resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)", + "resourceType": "dataset" + } +} +``` diff --git a/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md b/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md index 7a58363cea6589..68ba9af38dd2ea 100644 --- a/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md +++ b/docs/managed-datahub/operator-guide/setting-up-remote-ingestion-executor.md @@ -102,9 +102,9 @@ In order to update the executor, ie. to deploy a new container version, you'll n ### Deploying on Kubernetes -The Helm chart [datahub-executor-worker](https://github.com/acryldata/datahub-executor-helm/tree/main/charts/datahub-executor-worker) can be used to deploy on a Kubernetes cluster. These instructions also apply for deploying to Amazon Elastic Kubernetes Service (EKS) or Google Kubernetes Engine (GKE). +The Helm chart [datahub-executor-worker](https://executor-helm.acryl.io/index.yaml) can be used to deploy on a Kubernetes cluster. These instructions also apply for deploying to Amazon Elastic Kubernetes Service (EKS) or Google Kubernetes Engine (GKE). -1. **Download Chart**: Download the [latest release](https://github.com/acryldata/datahub-executor-helm/releases) of the chart +1. **Download Chart**: Download the [latest release](https://executor-helm.acryl.io/index.yaml) of the chart 2. **Unpack the release archive**: ``` tar zxvf v0.0.4.tar.gz --strip-components=2 diff --git a/metadata-ingestion/docs/sources/mode/mode.md b/metadata-ingestion/docs/sources/mode/mode.md deleted file mode 100644 index a61c429a6ab1c1..00000000000000 --- a/metadata-ingestion/docs/sources/mode/mode.md +++ /dev/null @@ -1 +0,0 @@ -See Mode's [Authentication documentation](https://mode.com/developer/api-reference/authentication/) on how to generate `token` and `password`. diff --git a/metadata-ingestion/docs/sources/mode/mode_pre.md b/metadata-ingestion/docs/sources/mode/mode_pre.md new file mode 100644 index 00000000000000..a1fc264781175b --- /dev/null +++ b/metadata-ingestion/docs/sources/mode/mode_pre.md @@ -0,0 +1,21 @@ +### Authentication + +See Mode's [Authentication documentation](https://mode.com/developer/api-reference/authentication/) on how to generate an API `token` and `password`. + +Mode does not support true "service accounts", so you must use a user account for authentication. +Depending on your requirements, you may want to create a dedicated user account for usage with DataHub ingestion. + +### Permissions + +DataHub ingestion requires the user to have the following permissions: + +- Have at least the "Member" role. +- For each Connection, have at least"View" access. + + To check Connection permissions, navigate to "Workspace Settings" → "Manage Connections". For each connection in the list, click on the connection → "Permissions". If the default workspace access is "View" or "Query", you're all set for that connection. If it's "Restricted", you'll need to individually grant your ingestion user View access. + +- For each Space, have at least "View" access. + + To check Collection permissions, navigate to the "My Collections" page as an Admin user. For each collection with Workspace Access set to "Restricted" access, the ingestion user must be manually granted the "Viewer" access in the "Manage Access" dialog. Collections with "All Members can View/Edit" do not need to be manually granted access. + +Note that if the ingestion user has "Admin" access, then it will automatically have "View" access for all connections and collections. diff --git a/metadata-ingestion/examples/ai/dh_ai_client.py b/metadata-ingestion/examples/ai/dh_ai_client.py index 96adb260da4c45..b51c8dd050cf1b 100644 --- a/metadata-ingestion/examples/ai/dh_ai_client.py +++ b/metadata-ingestion/examples/ai/dh_ai_client.py @@ -32,7 +32,7 @@ class DatahubAIClient: def __init__( self, - token: str, + token: Optional[str] = None, server_url: str = "http://localhost:8080", platform: str = "mlflow", ) -> None: diff --git a/metadata-ingestion/examples/ai/dh_ai_client_sample.py b/metadata-ingestion/examples/ai/dh_ai_client_sample.py index 291cfb2ff1d619..9a8ff578314175 100644 --- a/metadata-ingestion/examples/ai/dh_ai_client_sample.py +++ b/metadata-ingestion/examples/ai/dh_ai_client_sample.py @@ -8,7 +8,7 @@ if __name__ == "__main__": # Example usage parser = argparse.ArgumentParser() - parser.add_argument("--token", required=True, help="DataHub access token") + parser.add_argument("--token", required=False, help="DataHub access token") parser.add_argument( "--server_url", required=False, diff --git a/metadata-ingestion/examples/library/create_ermodelrelationship.py b/metadata-ingestion/examples/library/create_ermodelrelationship.py new file mode 100644 index 00000000000000..9456a7024d333a --- /dev/null +++ b/metadata-ingestion/examples/library/create_ermodelrelationship.py @@ -0,0 +1,134 @@ +import time + +from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.schema_classes import ( + AuditStampClass, + ERModelRelationshipCardinalityClass, + ERModelRelationshipKeyClass, + ERModelRelationshipPropertiesClass, + NumberTypeClass, + OtherSchemaClass, + RelationshipFieldMappingClass, + SchemaFieldClass, + SchemaFieldDataTypeClass, + SchemaMetadataClass, + StringTypeClass, +) + +# Configuration +GMS_ENDPOINT = "http://localhost:8080" +PLATFORM = "mysql" +ENV = "PROD" + +e = DatahubRestEmitter(gms_server=GMS_ENDPOINT, extra_headers={}) + + +def get_schema_field( + name: str, dtype: str, type: SchemaFieldDataTypeClass +) -> SchemaFieldClass: + """Creates a schema field for MySQL columns.""" + + field = SchemaFieldClass( + fieldPath=name, + type=type, + nativeDataType=dtype, + description=name, + lastModified=AuditStampClass( + time=int(time.time() * 1000), + actor="urn:li:corpuser:ingestion", + ), + ) + if name == "id": + field.isPartitioningKey = True + return field + + +# Define Employee Table +dataset_employee = make_dataset_urn(PLATFORM, "Employee", ENV) +employee_fields = [ + get_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())), + get_schema_field( + "name", "varchar", SchemaFieldDataTypeClass(type=StringTypeClass()) + ), + get_schema_field("age", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())), + get_schema_field( + "company_id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass()) + ), +] + +e.emit_mcp( + MetadataChangeProposalWrapper( + entityUrn=dataset_employee, + aspect=SchemaMetadataClass( + schemaName="Employee", + platform=make_data_platform_urn(PLATFORM), + fields=employee_fields, + version=0, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + ), + ) +) + +# Define Company Table +dataset_company = make_dataset_urn(PLATFORM, "Company", ENV) +company_fields = [ + get_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())), + get_schema_field( + "name", "varchar", SchemaFieldDataTypeClass(type=StringTypeClass()) + ), +] + +e.emit_mcp( + MetadataChangeProposalWrapper( + entityUrn=dataset_company, + aspect=SchemaMetadataClass( + schemaName="Company", + platform=make_data_platform_urn(PLATFORM), + fields=company_fields, + version=0, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + ), + ) +) + +# Establish Relationship (Foreign Key: Employee.company_id → Company.id) +relationship_key = ERModelRelationshipKeyClass(id="employee_to_company") +relationship_properties = ERModelRelationshipPropertiesClass( + name="Employee to Company Relationship", + source=dataset_employee, + destination=dataset_company, + relationshipFieldMappings=[ + RelationshipFieldMappingClass(sourceField="company_id", destinationField="id") + ], + cardinality=ERModelRelationshipCardinalityClass.N_ONE, + customProperties={"constraint": "Foreign Key", "index": "company_id"}, +) + +relationship_urn = f"urn:li:erModelRelationship:{relationship_key.id}" + +e.emit_mcp( + MetadataChangeProposalWrapper( + entityType="erModelRelationship", + changeType="UPSERT", + entityKeyAspect=relationship_key, + aspectName=relationship_key.ASPECT_NAME, + aspect=relationship_key, + ) +) + +e.emit_mcp( + MetadataChangeProposalWrapper( + entityUrn=relationship_urn, + entityType="erModelRelationship", + changeType="UPSERT", + aspectName=relationship_properties.ASPECT_NAME, + aspect=relationship_properties, + ) +) + +print("relationship_urn", relationship_urn) +print("Employee and Company tables created with ERModelRelationship linking them.") diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 628c7156dadd5c..9a421fc92f2e45 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -301,6 +301,8 @@ data_lake_profiling = { "pydeequ>=1.1.0", "pyspark~=3.5.0", + # cachetools is used by the profiling config + *cachetools_lib, } delta_lake = { @@ -485,9 +487,9 @@ | classification_lib | {"db-dtypes"} # Pandas extension data types | cachetools_lib, - "s3": {*s3_base, *data_lake_profiling, *cachetools_lib}, + "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, - "abs": {*abs_base, *data_lake_profiling, *cachetools_lib}, + "abs": {*abs_base, *data_lake_profiling}, "sagemaker": aws_common, "salesforce": {"simple-salesforce", *cachetools_lib}, "snowflake": snowflake_common | usage_common | sqlglot_lib, diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 214b14a2b6c100..7c8eb2cd31236c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -113,6 +113,7 @@ ) from datahub.utilities.delta import delta_type_to_hive_type from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column +from datahub.utilities.lossy_collections import LossyList logger = logging.getLogger(__name__) @@ -220,7 +221,7 @@ def platform_validator(cls, v: str) -> str: class GlueSourceReport(StaleEntityRemovalSourceReport): catalog_id: Optional[str] = None tables_scanned = 0 - filtered: List[str] = dataclass_field(default_factory=list) + filtered: LossyList[str] = dataclass_field(default_factory=LossyList) databases: EntityFilterReport = EntityFilterReport.field(type="database") num_job_script_location_missing: int = 0 @@ -746,7 +747,7 @@ def get_all_databases_and_tables( for tables in self.get_tables_from_database(database): all_tables.append(tables) except Exception as e: - self.report.failure( + self.report.warning( message="Failed to get tables from database", context=database["Name"], exc=e, diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py index edb9b7b8bd5264..6102e6d61a8bd1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py @@ -13,6 +13,7 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.emitter.mce_builder import make_group_urn, make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -51,6 +52,7 @@ OriginTypeClass, StatusClass, ) +from datahub.utilities.lossy_collections import LossyList logger = logging.getLogger(__name__) @@ -132,11 +134,7 @@ class AzureADConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): description="regex patterns for groups to include in ingestion.", ) - # If enabled, report will contain names of filtered users and groups. - filtered_tracking: bool = Field( - default=True, - description="If enabled, report will contain names of filtered users and groups.", - ) + _remove_filtered_tracking = pydantic_removed_field("filtered_tracking") # Optional: Whether to mask sensitive information from workunit ID's. On by default. mask_group_id: bool = Field( @@ -156,14 +154,10 @@ class AzureADConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): @dataclass class AzureADSourceReport(StaleEntityRemovalSourceReport): - filtered: List[str] = field(default_factory=list) - filtered_tracking: bool = field(default=True, repr=False) - filtered_count: int = field(default=0) + filtered: LossyList[str] = field(default_factory=LossyList) def report_filtered(self, name: str) -> None: - self.filtered_count += 1 - if self.filtered_tracking: - self.filtered.append(name) + self.filtered.append(name) # Source that extracts Azure AD users, groups and group memberships using Microsoft Graph REST API @@ -266,9 +260,7 @@ def create(cls, config_dict, ctx): def __init__(self, config: AzureADConfig, ctx: PipelineContext): super().__init__(config, ctx) self.config = config - self.report = AzureADSourceReport( - filtered_tracking=self.config.filtered_tracking - ) + self.report = AzureADSourceReport() session = requests.Session() retries = Retry( total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504] diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index bf0a33e423446a..13d92e8e41a966 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -24,6 +24,7 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import DatasetLineageProviderConfigBase +from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( ContainerKey, @@ -155,10 +156,7 @@ class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase): workspace: str = Field( description="The Mode workspace name. Find it in Settings > Workspace > Details." ) - default_schema: str = Field( - default="public", - description="Default schema to use when schema is not provided in an SQL query", - ) + _default_schema = pydantic_removed_field("default_schema") space_pattern: AllowDenyPattern = Field( default=AllowDenyPattern( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 5f732e2621656f..7e3220daceca3c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -249,6 +249,12 @@ class SnowflakeV2Config( description="If enabled along with `extract_tags`, extracts snowflake's key-value tags as DataHub structured properties instead of DataHub tags.", ) + structured_properties_template_cache_invalidation_interval: int = Field( + hidden_from_docs=True, + default=60, + description="Interval in seconds to invalidate the structured properties template cache.", + ) + include_external_url: bool = Field( default=True, description="Whether to populate Snowsight url for Snowflake Objects", diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 40bcfb514efd23..7092041f8da95c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -159,6 +159,17 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str: and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE') order by table_schema, table_name""" + @staticmethod + def get_all_tags(): + return """ + SELECT tag_database as "TAG_DATABASE", + tag_schema AS "TAG_SCHEMA", + tag_name AS "TAG_NAME", + FROM snowflake.account_usage.tag_references + GROUP BY TAG_DATABASE , TAG_SCHEMA, tag_name + ORDER BY TAG_DATABASE, TAG_SCHEMA, TAG_NAME ASC; + """ + @staticmethod def get_all_tags_on_object_with_propagation( db_name: str, quoted_identifier: str, domain: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index b24471f8666afa..691448e1d09768 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -114,6 +114,7 @@ class SnowflakeV2Report( num_tables_with_known_upstreams: int = 0 num_upstream_lineage_edge_parsing_failed: int = 0 num_secure_views_missing_definition: int = 0 + num_structured_property_templates_created: int = 0 data_dictionary_cache: Optional["SnowflakeDataDictionary"] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index 173024aec0cf38..c31b7dfe5b9b4e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -285,6 +285,23 @@ def get_secure_view_definitions(self) -> Dict[str, Dict[str, Dict[str, str]]]: return secure_view_definitions + def get_all_tags(self) -> List[SnowflakeTag]: + cur = self.connection.query( + SnowflakeQuery.get_all_tags(), + ) + + tags = [ + SnowflakeTag( + database=tag["TAG_DATABASE"], + schema=tag["TAG_SCHEMA"], + name=tag["TAG_NAME"], + value="", + ) + for tag in cur + ] + + return tags + @serialized_lru_cache(maxsize=1) def get_tables_for_database( self, db_name: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 04bc51f1ebd3f5..134f0b343ccfe6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -1,10 +1,10 @@ import itertools import logging +import time from typing import Dict, Iterable, List, Optional, Union from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter.mce_builder import ( - get_sys_time, make_data_platform_urn, make_dataset_urn_with_platform_instance, make_schema_field_urn, @@ -74,7 +74,6 @@ PROFILING, ) from datahub.metadata.com.linkedin.pegasus2avro.common import ( - AuditStamp, GlobalTags, Status, SubTypes, @@ -101,15 +100,8 @@ StringType, TimeType, ) -from datahub.metadata.com.linkedin.pegasus2avro.structured import ( - StructuredPropertyDefinition, -) from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties from datahub.metadata.urns import ( - ContainerUrn, - DatasetUrn, - DataTypeUrn, - EntityTypeUrn, SchemaFieldUrn, StructuredPropertyUrn, ) @@ -191,7 +183,7 @@ def __init__( self.domain_registry: Optional[DomainRegistry] = domain_registry self.classification_handler = ClassificationHandler(self.config, self.report) self.tag_extractor = SnowflakeTagExtractor( - config, self.data_dictionary, self.report + config, self.data_dictionary, self.report, identifiers ) self.profiler: Optional[SnowflakeProfiler] = profiler self.snowsight_url_builder: Optional[SnowsightUrlBuilder] = ( @@ -217,6 +209,16 @@ def snowflake_identifier(self, identifier: str) -> str: return self.identifiers.snowflake_identifier(identifier) def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + if self.config.extract_tags_as_structured_properties: + logger.info("Creating structured property templates for tags") + yield from self.tag_extractor.create_structured_property_templates() + # We have to wait until cache invalidates to make sure the structured property template is available + logger.info( + f"Waiting for {self.config.structured_properties_template_cache_invalidation_interval} seconds for structured properties cache to invalidate" + ) + time.sleep( + self.config.structured_properties_template_cache_invalidation_interval + ) self.databases = [] for database in self.get_databases() or []: self.report.report_entity_scanned(database.name, "database") @@ -698,6 +700,7 @@ def _process_view( def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: use_sp = self.config.extract_tags_as_structured_properties + identifier = ( self.snowflake_identifier(tag.structured_property_identifier()) if use_sp @@ -708,10 +711,11 @@ def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: return self.report.report_tag_processed(identifier) + if use_sp: - yield from self.gen_tag_as_structured_property_workunits(tag) - else: - yield from self.gen_tag_workunits(tag) + return + + yield from self.gen_tag_workunits(tag) def _format_tags_as_structured_properties( self, tags: List[SnowflakeTag] @@ -732,6 +736,7 @@ def gen_dataset_workunits( if table.tags: for tag in table.tags: yield from self._process_tag(tag) + for column_name in table.column_tags: for tag in table.column_tags[column_name]: yield from self._process_tag(tag) @@ -903,29 +908,6 @@ def gen_tag_workunits(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: entityUrn=tag_urn, aspect=tag_properties_aspect ).as_workunit() - def gen_tag_as_structured_property_workunits( - self, tag: SnowflakeTag - ) -> Iterable[MetadataWorkUnit]: - identifier = self.snowflake_identifier(tag.structured_property_identifier()) - urn = StructuredPropertyUrn(identifier).urn() - aspect = StructuredPropertyDefinition( - qualifiedName=identifier, - displayName=tag.name, - valueType=DataTypeUrn("datahub.string").urn(), - entityTypes=[ - EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(), - EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(), - EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(), - ], - lastModified=AuditStamp( - time=get_sys_time(), actor="urn:li:corpuser:datahub" - ), - ) - yield MetadataChangeProposalWrapper( - entityUrn=urn, - aspect=aspect, - ).as_workunit() - def gen_column_tags_as_structured_properties( self, dataset_urn: str, table: Union[SnowflakeTable, SnowflakeView] ) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py index 597e7bee4d4cc0..08d319c7fe25d2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py @@ -1,6 +1,9 @@ import logging -from typing import Dict, List, Optional +from typing import Dict, Iterable, List, Optional +from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.snowflake_config import ( SnowflakeV2Config, @@ -12,7 +15,22 @@ SnowflakeTag, _SnowflakeTagCache, ) -from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin +from datahub.ingestion.source.snowflake.snowflake_utils import ( + SnowflakeCommonMixin, + SnowflakeIdentifierBuilder, +) +from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp +from datahub.metadata.com.linkedin.pegasus2avro.structured import ( + StructuredPropertyDefinition, +) +from datahub.metadata.urns import ( + ContainerUrn, + DatasetUrn, + DataTypeUrn, + EntityTypeUrn, + SchemaFieldUrn, + StructuredPropertyUrn, +) logger: logging.Logger = logging.getLogger(__name__) @@ -23,11 +41,12 @@ def __init__( config: SnowflakeV2Config, data_dictionary: SnowflakeDataDictionary, report: SnowflakeV2Report, + snowflake_identifiers: SnowflakeIdentifierBuilder, ) -> None: self.config = config self.data_dictionary = data_dictionary self.report = report - + self.snowflake_identifiers = snowflake_identifiers self.tag_cache: Dict[str, _SnowflakeTagCache] = {} def _get_tags_on_object_without_propagation( @@ -59,6 +78,41 @@ def _get_tags_on_object_without_propagation( raise ValueError(f"Unknown domain {domain}") return tags + def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]: + for tag in self.data_dictionary.get_all_tags(): + if not self.config.structured_property_pattern.allowed( + tag.tag_identifier() + ): + continue + if self.config.extract_tags_as_structured_properties: + self.report.num_structured_property_templates_created += 1 + yield from self.gen_tag_as_structured_property_workunits(tag) + + def gen_tag_as_structured_property_workunits( + self, tag: SnowflakeTag + ) -> Iterable[MetadataWorkUnit]: + identifier = self.snowflake_identifiers.snowflake_identifier( + tag.structured_property_identifier() + ) + urn = StructuredPropertyUrn(identifier).urn() + aspect = StructuredPropertyDefinition( + qualifiedName=identifier, + displayName=tag.name, + valueType=DataTypeUrn("datahub.string").urn(), + entityTypes=[ + EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(), + EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(), + EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(), + ], + lastModified=AuditStamp( + time=get_sys_time(), actor="urn:li:corpuser:datahub" + ), + ) + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=aspect, + ).as_workunit() + def _get_tags_on_object_with_propagation( self, domain: str, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py index 9cd485f6d9a4b1..3d2a8af3a54999 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py @@ -7,7 +7,12 @@ make_data_platform_urn, make_dataplatform_instance_urn, ) +from datahub.emitter.mcp_builder import ( + DatabaseKey, + SchemaKey, +) from datahub.metadata.schema_classes import ( + ContainerClass, DataFlowInfoClass, DataJobInfoClass, DataJobInputOutputClass, @@ -171,11 +176,7 @@ def urn(self) -> str: flow_id=self.entity.flow.formatted_name, job_id=self.entity.formatted_name, cluster=self.entity.flow.cluster, - platform_instance=( - self.entity.flow.platform_instance - if self.entity.flow.platform_instance - else None - ), + platform_instance=self.entity.flow.platform_instance, ) def add_property( @@ -222,6 +223,26 @@ def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClas ) return None + @property + def as_container_aspect(self) -> ContainerClass: + key_args = dict( + platform=self.entity.flow.orchestrator, + instance=self.entity.flow.platform_instance, + env=self.entity.flow.env, + database=self.entity.flow.db, + ) + container_key = ( + SchemaKey( + schema=self.entity.schema, + **key_args, + ) + if isinstance(self.entity, StoredProcedure) + else DatabaseKey( + **key_args, + ) + ) + return ContainerClass(container=container_key.as_urn()) + @dataclass class MSSQLDataFlow: @@ -244,9 +265,7 @@ def urn(self) -> str: orchestrator=self.entity.orchestrator, flow_id=self.entity.formatted_name, cluster=self.entity.cluster, - platform_instance=( - self.entity.platform_instance if self.entity.platform_instance else None - ), + platform_instance=self.entity.platform_instance, ) @property @@ -267,3 +286,13 @@ def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClas ), ) return None + + @property + def as_container_aspect(self) -> ContainerClass: + databaseKey = DatabaseKey( + platform=self.entity.orchestrator, + instance=self.entity.platform_instance, + env=self.entity.env, + database=self.entity.db, + ) + return ContainerClass(container=databaseKey.as_urn()) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py index 2d0c4dc9f6f6f9..6c1f15a40075c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py @@ -108,6 +108,10 @@ class SQLServerConfig(BasicSQLAlchemyConfig): default=True, description="Enable lineage extraction for stored procedures", ) + include_containers_for_pipelines: bool = Field( + default=False, + description="Enable the container aspects ingestion for both pipelines and tasks. Note that this feature requires the corresponding model support in the backend, which was introduced in version 0.15.0.1.", + ) @pydantic.validator("uri_args") def passwords_match(cls, v, values, **kwargs): @@ -641,6 +645,12 @@ def construct_job_workunits( aspect=data_platform_instance_aspect, ).as_workunit() + if self.config.include_containers_for_pipelines: + yield MetadataChangeProposalWrapper( + entityUrn=data_job.urn, + aspect=data_job.as_container_aspect, + ).as_workunit() + if include_lineage: yield MetadataChangeProposalWrapper( entityUrn=data_job.urn, @@ -683,6 +693,13 @@ def construct_flow_workunits( entityUrn=data_flow.urn, aspect=data_platform_instance_aspect, ).as_workunit() + + if self.config.include_containers_for_pipelines: + yield MetadataChangeProposalWrapper( + entityUrn=data_flow.urn, + aspect=data_flow.as_container_aspect, + ).as_workunit() + # TODO: Add SubType when it appear def get_inspectors(self) -> Iterable[Inspector]: diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index 276f4ccd54a4a1..85767c619a74b4 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -55,11 +55,19 @@ async def get_client_version_stats(): async with session.get(pypi_url) as resp: response_json = await resp.json() try: - releases = response_json.get("releases", []) - sorted_releases = sorted(releases.keys(), key=lambda x: Version(x)) - latest_cli_release_string = [ - x for x in sorted_releases if "rc" not in x - ][-1] + releases = response_json.get("releases", {}) + filtered_releases = { + version: release_files + for version, release_files in releases.items() + if not all( + release_file.get("yanked") for release_file in release_files + ) + and "rc" not in version + } + sorted_releases = sorted( + filtered_releases.keys(), key=lambda x: Version(x) + ) + latest_cli_release_string = sorted_releases[-1] latest_cli_release = Version(latest_cli_release_string) current_version_info = releases.get(current_version_string) current_version_date = None diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 7b4f5abe1cd462..7a296a597468ec 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -629,7 +629,27 @@ def default_query_results( # noqa: C901 ), ]: return [] - + elif query == snowflake_query.SnowflakeQuery.get_all_tags(): + return [ + *[ + { + "TAG_DATABASE": "TEST_DB", + "TAG_SCHEMA": "TEST_SCHEMA", + "TAG_NAME": f"my_tag_{ix}", + } + for ix in range(3) + ], + { + "TAG_DATABASE": "TEST_DB", + "TAG_SCHEMA": "TEST_SCHEMA", + "TAG_NAME": "security", + }, + { + "TAG_DATABASE": "OTHER_DB", + "TAG_SCHEMA": "OTHER_SCHEMA", + "TAG_NAME": "my_other_tag", + }, + ] elif ( query == snowflake_query.SnowflakeQuery.get_all_tags_in_database_without_propagation( diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index d2e20e784282ee..340b771b76e310 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -219,6 +219,7 @@ def test_snowflake_tags_as_structured_properties( include_column_lineage=False, include_usage_stats=False, include_operational_stats=False, + structured_properties_template_cache_invalidation_interval=0, ), ), sink=DynamicTypedConfig( diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py index d4f6e92c93c1e0..86ffdf33f585cb 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py @@ -115,6 +115,7 @@ def test_snowflake_structured_property_pattern_deny(): match_fully_qualified_names=True, schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), extract_tags_as_structured_properties=True, + structured_properties_template_cache_invalidation_interval=0, tag_pattern=AllowDenyPattern( deny=["TEST_DB.TEST_SCHEMA.my_tag_2:my_value_2"] ), @@ -142,7 +143,7 @@ def test_snowflake_structured_property_pattern_deny(): source_report = pipeline.source.get_report() assert isinstance(source_report, SnowflakeV2Report) assert source_report.tags_scanned == 5 - assert source_report._processed_tags == { + assert sorted(list(source_report._processed_tags)) == [ "snowflake.other_db.other_schema.my_other_tag", "snowflake.test_db.test_schema.security", - } + ] diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json index 0b478e13b86d2a..73f69273836346 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json @@ -112,11 +112,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80", + "job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000", + "date_created": "2025-01-31 08:02:41.167000", + "date_modified": "2025-01-31 08:02:41.360000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -2279,8 +2279,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000" + "date_created": "2025-01-31 08:02:40.980000", + "date_modified": "2025-01-31 08:02:40.980000" }, "name": "DemoData.Foo.Proc.With.SpecialChar", "type": { @@ -2329,8 +2329,8 @@ "depending_on_procedure": "{}", "code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n", "input parameters": "[]", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000" + "date_created": "2025-01-31 08:02:40.987000", + "date_modified": "2025-01-31 08:02:40.987000" }, "name": "DemoData.Foo.NewProc", "type": { @@ -4969,7 +4969,7 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1735588784503, + "time": 1738310563767, "actor": "urn:li:corpuser:_ingestion" } } @@ -5092,7 +5092,7 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1735588784511, + "time": 1738310563770, "actor": "urn:li:corpuser:_ingestion" } } diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json index c1748ff13ac93c..2789ccd3cd5a72 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json @@ -112,11 +112,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80", + "job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000", + "date_created": "2025-01-31 08:02:41.167000", + "date_modified": "2025-01-31 08:02:41.360000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -2279,8 +2279,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000" + "date_created": "2025-01-31 08:02:40.980000", + "date_modified": "2025-01-31 08:02:40.980000" }, "name": "DemoData.Foo.Proc.With.SpecialChar", "type": { @@ -2694,7 +2694,7 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1735588789629, + "time": 1738310565884, "actor": "urn:li:corpuser:_ingestion" } } diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json index cbfc374decab2f..a9f52e4c97f012 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json @@ -128,6 +128,47 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(mssql,my-instance.Weekly Demo Data Backup,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(mssql,my-instance.Weekly Demo Data Backup,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)" + }, + { + "id": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a", + "urn": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)", @@ -136,11 +177,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80", + "job_id": "5a260993-c4ce-4bb3-a273-eaf6ef6e0382", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000", + "date_created": "2025-01-28 15:27:31.437000", + "date_modified": "2025-01-28 15:27:31.593000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -175,6 +216,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)", @@ -193,6 +250,31 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)" + }, + { + "id": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a", + "urn": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:5726a09b23f60be6f661206c879a3683", @@ -2516,6 +2598,47 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)" + }, + { + "id": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a", + "urn": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)", @@ -2529,8 +2652,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000" + "date_created": "2025-01-28 15:27:31.257000", + "date_modified": "2025-01-28 15:27:31.257000" }, "name": "DemoData.Foo.Proc.With.SpecialChar", "type": { @@ -2561,6 +2684,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:6fbadfb496ee98718da210cc2fca1680" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)", @@ -2584,6 +2723,35 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)" + }, + { + "id": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a", + "urn": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + }, + { + "id": "urn:li:container:6fbadfb496ee98718da210cc2fca1680", + "urn": "urn:li:container:6fbadfb496ee98718da210cc2fca1680" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),NewProc)", @@ -2596,8 +2764,8 @@ "depending_on_procedure": "{}", "code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n", "input parameters": "[]", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000" + "date_created": "2025-01-28 15:27:31.263000", + "date_modified": "2025-01-28 15:27:31.263000" }, "name": "DemoData.Foo.NewProc", "type": { @@ -2628,6 +2796,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),NewProc)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:6fbadfb496ee98718da210cc2fca1680" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),NewProc)", @@ -2651,6 +2835,35 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,my-instance.DemoData.Foo.stored_procedures,PROD),NewProc)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mssql,my-instance)" + }, + { + "id": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a", + "urn": "urn:li:container:db8117ee3cc6397c503e7824ae3e0f6a" + }, + { + "id": "urn:li:container:6fbadfb496ee98718da210cc2fca1680", + "urn": "urn:li:container:6fbadfb496ee98718da210cc2fca1680" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:5631370915311469374ef3cb5f0ebbf0", @@ -3046,7 +3259,7 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1735588787786, + "time": 1738078055642, "actor": "urn:li:corpuser:_ingestion" } } diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json index 70f5784d6d0b18..189e673270b007 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json @@ -112,11 +112,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80", + "job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000", + "date_created": "2025-01-31 08:02:41.167000", + "date_modified": "2025-01-31 08:02:41.360000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -2279,8 +2279,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000" + "date_created": "2025-01-31 08:02:40.980000", + "date_modified": "2025-01-31 08:02:40.980000" }, "name": "DemoData.Foo.Proc.With.SpecialChar", "type": { @@ -2329,8 +2329,8 @@ "depending_on_procedure": "{}", "code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n", "input parameters": "[]", - "date_created": "2024-12-30 19:59:24.690000", - "date_modified": "2024-12-30 19:59:24.690000" + "date_created": "2025-01-31 08:02:40.987000", + "date_modified": "2025-01-31 08:02:40.987000" }, "name": "DemoData.Foo.NewProc", "type": { @@ -5019,7 +5019,7 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1735588791954, + "time": 1738310566860, "actor": "urn:li:corpuser:_ingestion" } } @@ -5166,7 +5166,7 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1735588791966, + "time": 1738310566866, "actor": "urn:li:corpuser:_ingestion" } } diff --git a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml index e003ec39cd5282..7067d04c61097c 100644 --- a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml +++ b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml @@ -8,6 +8,7 @@ source: database: DemoData host_port: localhost:21433 platform_instance: my-instance + include_containers_for_pipelines: true # use_odbc: True # uri_args: # driver: "ODBC Driver 17 for SQL Server" diff --git a/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java b/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java index 9ce8a1b91e66c7..8b045084c2c5c2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java @@ -6,10 +6,10 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.systemmetadata.TraceStorageStatus; -import com.linkedin.metadata.systemmetadata.TraceWriteStatus; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; +import io.datahubproject.openapi.v1.models.TraceStorageStatus; +import io.datahubproject.openapi.v1.models.TraceWriteStatus; import java.time.Duration; import java.util.Collection; import java.util.Collections; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java index 51e30bd6f6658d..47eddba4322466 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java @@ -11,9 +11,6 @@ import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.systemmetadata.SystemMetadataService; import com.linkedin.metadata.systemmetadata.TraceService; -import com.linkedin.metadata.systemmetadata.TraceStatus; -import com.linkedin.metadata.systemmetadata.TraceStorageStatus; -import com.linkedin.metadata.systemmetadata.TraceWriteStatus; import com.linkedin.mxe.FailedMetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; @@ -21,6 +18,9 @@ import io.datahubproject.metadata.context.TraceContext; import io.datahubproject.metadata.context.TraceIdGenerator; import io.datahubproject.metadata.exception.TraceException; +import io.datahubproject.openapi.v1.models.TraceStatus; +import io.datahubproject.openapi.v1.models.TraceStorageStatus; +import io.datahubproject.openapi.v1.models.TraceWriteStatus; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java b/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java index c5217475362295..06e40a4e142f9e 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java @@ -12,11 +12,11 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringMap; -import com.linkedin.metadata.systemmetadata.TraceStorageStatus; -import com.linkedin.metadata.systemmetadata.TraceWriteStatus; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; import io.datahubproject.metadata.context.TraceContext; +import io.datahubproject.openapi.v1.models.TraceStorageStatus; +import io.datahubproject.openapi.v1.models.TraceWriteStatus; import java.io.IOException; import java.time.Duration; import java.util.Collections; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java b/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java index abe65d48b3410a..1ec575e5567a97 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java @@ -25,9 +25,6 @@ import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.systemmetadata.SystemMetadataService; -import com.linkedin.metadata.systemmetadata.TraceStatus; -import com.linkedin.metadata.systemmetadata.TraceStorageStatus; -import com.linkedin.metadata.systemmetadata.TraceWriteStatus; import com.linkedin.mxe.FailedMetadataChangeProposal; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; @@ -35,6 +32,9 @@ import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.TraceContext; import io.datahubproject.metadata.context.TraceIdGenerator; +import io.datahubproject.openapi.v1.models.TraceStatus; +import io.datahubproject.openapi.v1.models.TraceStorageStatus; +import io.datahubproject.openapi.v1.models.TraceWriteStatus; import io.datahubproject.test.metadata.context.TestOperationContexts; import java.time.Instant; import java.util.Collections; diff --git a/metadata-service/openapi-servlet/models/build.gradle b/metadata-service/openapi-servlet/models/build.gradle index dbc51ca17e3388..d75e656e5ecd6c 100644 --- a/metadata-service/openapi-servlet/models/build.gradle +++ b/metadata-service/openapi-servlet/models/build.gradle @@ -6,7 +6,6 @@ dependencies { implementation project(':entity-registry') implementation project(':metadata-operation-context') implementation project(':metadata-auth:auth-api') - implementation project(':metadata-service:services') implementation externalDependency.jacksonDataBind implementation externalDependency.httpClient diff --git a/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceResponseV1.java b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceResponseV1.java index 5fc721c2812d41..5c3c14ee57d4cb 100644 --- a/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceResponseV1.java +++ b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceResponseV1.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.linkedin.common.urn.Urn; -import com.linkedin.metadata.systemmetadata.TraceStatus; import java.util.LinkedHashMap; import java.util.Map; import lombok.AllArgsConstructor; diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceStatus.java b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceStatus.java similarity index 88% rename from metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceStatus.java rename to metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceStatus.java index 35bdecee459e1e..720c7c2a150f11 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceStatus.java +++ b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceStatus.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.systemmetadata; +package io.datahubproject.openapi.v1.models; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.AllArgsConstructor; diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceStorageStatus.java b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceStorageStatus.java similarity index 97% rename from metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceStorageStatus.java rename to metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceStorageStatus.java index 0def8785a6b823..b3fb486b006cf8 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceStorageStatus.java +++ b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceStorageStatus.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.systemmetadata; +package io.datahubproject.openapi.v1.models; import com.fasterxml.jackson.annotation.JsonInclude; import io.datahubproject.metadata.exception.TraceException; diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceWriteStatus.java b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceWriteStatus.java similarity index 90% rename from metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceWriteStatus.java rename to metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceWriteStatus.java index 78bccd6bf1ccf7..7fcbe03f1daf79 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceWriteStatus.java +++ b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v1/models/TraceWriteStatus.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.systemmetadata; +package io.datahubproject.openapi.v1.models; public enum TraceWriteStatus { // error occurred during processing diff --git a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/v1/TraceControllerTest.java b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/v1/TraceControllerTest.java index ca26ba42832009..d4f387e618eb17 100644 --- a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/v1/TraceControllerTest.java +++ b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/v1/TraceControllerTest.java @@ -18,15 +18,15 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.systemmetadata.TraceService; -import com.linkedin.metadata.systemmetadata.TraceStatus; -import com.linkedin.metadata.systemmetadata.TraceStorageStatus; -import com.linkedin.metadata.systemmetadata.TraceWriteStatus; import io.datahubproject.metadata.context.ObjectMapperContext; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.TraceContext; import io.datahubproject.openapi.config.TracingInterceptor; import io.datahubproject.openapi.v1.models.TraceRequestV1; import io.datahubproject.openapi.v1.models.TraceResponseV1; +import io.datahubproject.openapi.v1.models.TraceStatus; +import io.datahubproject.openapi.v1.models.TraceStorageStatus; +import io.datahubproject.openapi.v1.models.TraceWriteStatus; import io.datahubproject.test.metadata.context.TestOperationContexts; import java.util.List; import java.util.Map; diff --git a/metadata-service/services/build.gradle b/metadata-service/services/build.gradle index 0e84580cc04da5..19815385825f17 100644 --- a/metadata-service/services/build.gradle +++ b/metadata-service/services/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation project(':metadata-events:mxe-avro') implementation project(':metadata-events:mxe-registration') implementation project(':metadata-events:mxe-utils-avro') + api project(':metadata-service:openapi-servlet:models') api project(path: ':metadata-models', configuration: 'dataTemplate') api project(':metadata-models') diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceService.java index 1b6f32da1162fb..1b92200d0e1fd9 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/TraceService.java @@ -2,6 +2,7 @@ import com.linkedin.common.urn.Urn; import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.openapi.v1.models.TraceStatus; import java.util.List; import java.util.Map; import javax.annotation.Nonnull;