From 47a9c8a4a4ecc4da34bd210d56331e97f9fe8e7e Mon Sep 17 00:00:00 2001 From: shourya116 <105162686+shourya116@users.noreply.github.com> Date: Tue, 12 Dec 2023 11:53:46 +0530 Subject: [PATCH] Added Datascan Profiling (#35696) --- .../google/cloud/operators/dataplex.py | 531 +++++++++++++++++- .../google/cloud/sensors/dataplex.py | 118 ++++ .../google/cloud/triggers/dataplex.py | 82 +++ .../operators/cloud/dataplex.rst | 96 ++++ .../google/cloud/operators/test_dataplex.py | 126 +++++ .../google/cloud/sensors/test_dataplex.py | 80 +++ .../cloud/dataplex/example_dataplex_dp.py | 341 +++++++++++ 7 files changed, 1373 insertions(+), 1 deletion(-) create mode 100644 tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py diff --git a/airflow/providers/google/cloud/operators/dataplex.py b/airflow/providers/google/cloud/operators/dataplex.py index b75024fbf4ec8..66be3e4d9c8c2 100644 --- a/airflow/providers/google/cloud/operators/dataplex.py +++ b/airflow/providers/google/cloud/operators/dataplex.py @@ -22,7 +22,10 @@ from typing import TYPE_CHECKING, Any, Sequence from airflow.exceptions import AirflowException -from airflow.providers.google.cloud.triggers.dataplex import DataplexDataQualityJobTrigger +from airflow.providers.google.cloud.triggers.dataplex import ( + DataplexDataProfileJobTrigger, + DataplexDataQualityJobTrigger, +) if TYPE_CHECKING: from google.protobuf.field_mask_pb2 import FieldMask @@ -1204,6 +1207,532 @@ def execute_complete(self, context, event=None) -> None: return job +class DataplexCreateOrUpdateDataProfileScanOperator(GoogleCloudBaseOperator): + """ + Creates a DataScan Data Profile resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param body: Required. The Request body contains an instance of DataScan. + :param data_scan_id: Required. Data Profile scan identifier. + :param update_mask: Mask of fields to update. + :param api_version: The version of the api that will be requested for example 'v1'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + + :return: Dataplex data profile id + """ + + template_fields = ("project_id", "data_scan_id", "body", "impersonation_chain") + template_fields_renderers = {"body": "json"} + + def __init__( + self, + project_id: str, + region: str, + data_scan_id: str, + body: dict[str, Any] | DataScan, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + update_mask: dict | FieldMask | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.data_scan_id = data_scan_id + self.body = body + self.update_mask = update_mask + self.api_version = api_version + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context): + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + + self.log.info("Creating Dataplex Data Profile scan %s", self.data_scan_id) + try: + operation = hook.create_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + body=self.body, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex Data Profile scan %s created successfully!", self.data_scan_id) + except AlreadyExists: + self.log.info("Dataplex Data Profile scan already exists: %s", {self.data_scan_id}) + + operation = hook.update_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + body=self.body, + update_mask=self.update_mask, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex Data Profile scan %s updated successfully!", self.data_scan_id) + except GoogleAPICallError as e: + raise AirflowException(f"Error creating Data Profile scan {self.data_scan_id}", e) + + return self.data_scan_id + + +class DataplexGetDataProfileScanOperator(GoogleCloudBaseOperator): + """ + Gets a DataScan DataProfile resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param data_scan_id: Required. Data Profile scan identifier. + :param api_version: The version of the api that will be requested for example 'v1'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + + :return: Dataplex data profile + """ + + template_fields = ("project_id", "data_scan_id", "impersonation_chain") + + def __init__( + self, + project_id: str, + region: str, + data_scan_id: str, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.data_scan_id = data_scan_id + self.api_version = api_version + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context): + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + + self.log.info("Retrieving the details of Dataplex Data Profile scan %s", self.data_scan_id) + data_profile_scan = hook.get_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + return DataScan.to_dict(data_profile_scan) + + +class DataplexDeleteDataProfileScanOperator(GoogleCloudBaseOperator): + """ + Deletes a DataScan DataProfile resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param data_scan_id: Required. Data Profile scan identifier. + :param api_version: The version of the api that will be requested for example 'v1'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :return: None + """ + + template_fields = ("project_id", "data_scan_id", "impersonation_chain") + + def __init__( + self, + project_id: str, + region: str, + data_scan_id: str, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.data_scan_id = data_scan_id + self.api_version = api_version + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> None: + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + + self.log.info("Deleting Dataplex Data Profile Scan: %s", self.data_scan_id) + + operation = hook.delete_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex Data Profile scan %s deleted successfully!", self.data_scan_id) + + +class DataplexRunDataProfileScanOperator(GoogleCloudBaseOperator): + """ + Runs an on-demand execution of a DataScan Data Profile Scan. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param data_scan_id: Required. Data Profile scan identifier. + :param api_version: The version of the api that will be requested for example 'v1'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :param asynchronous: Flag informing that the Dataplex job should be run asynchronously. + This is useful for submitting long-running jobs and + waiting on them asynchronously using the DataplexDataProfileJobStatusSensor + :param result_timeout: Value in seconds for which operator will wait for the Data Profile scan result + when the flag `asynchronous = False`. + Throws exception if there is no result found after specified amount of seconds. + :param polling_interval_seconds: time in seconds between polling for job completion. + The value is considered only when running in deferrable mode. Must be greater than 0. + :param deferrable: Run operator in the deferrable mode. + + :return: Dataplex Data Profile scan job id. + """ + + template_fields = ("project_id", "data_scan_id", "impersonation_chain") + + def __init__( + self, + project_id: str, + region: str, + data_scan_id: str, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + asynchronous: bool = False, + result_timeout: float = 60.0 * 10, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + polling_interval_seconds: int = 10, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.data_scan_id = data_scan_id + self.api_version = api_version + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.asynchronous = asynchronous + self.result_timeout = result_timeout + self.deferrable = deferrable + self.polling_interval_seconds = polling_interval_seconds + + def execute(self, context: Context) -> dict: + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + + result = hook.run_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + job_id = result.job.name.split("/")[-1] + + if self.deferrable: + if self.asynchronous: + raise AirflowException( + "Both asynchronous and deferrable parameters were passed. Please, provide only one." + ) + self.defer( + trigger=DataplexDataProfileJobTrigger( + job_id=job_id, + data_scan_id=self.data_scan_id, + project_id=self.project_id, + region=self.region, + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + polling_interval_seconds=self.polling_interval_seconds, + ), + method_name="execute_complete", + ) + if not self.asynchronous: + job = hook.wait_for_data_scan_job( + job_id=job_id, + data_scan_id=self.data_scan_id, + project_id=self.project_id, + region=self.region, + result_timeout=self.result_timeout, + ) + + if job.state == DataScanJob.State.FAILED: + raise AirflowException(f"Data Profile job failed: {job_id}") + if job.state == DataScanJob.State.SUCCEEDED: + self.log.info("Data Profile job executed successfully.") + else: + self.log.info("Data Profile job execution returned status: %s", job.status) + + return job_id + + def execute_complete(self, context, event=None) -> None: + """ + Callback for when the trigger fires - returns immediately. + + Relies on trigger to throw an exception, otherwise it assumes execution was + successful. + """ + job_state = event["job_state"] + job_id = event["job_id"] + if job_state == DataScanJob.State.FAILED: + raise AirflowException(f"Job failed:\n{job_id}") + if job_state == DataScanJob.State.CANCELLED: + raise AirflowException(f"Job was cancelled:\n{job_id}") + if job_state == DataScanJob.State.SUCCEEDED: + self.log.info("Data Profile job executed successfully.") + return job_id + + +class DataplexGetDataProfileScanResultOperator(GoogleCloudBaseOperator): + """ + Gets a DataScan Data Profile Job resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param data_scan_id: Required. Data Profile scan identifier. + :param job_id: Optional. Data Profile scan job identifier. + :param api_version: The version of the api that will be requested for example 'v1'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :param wait_for_results: Flag indicating whether to wait for the result of a job execution + or to return the job in its current state. + :param result_timeout: Value in seconds for which operator will wait for the Data Profile scan result + when the flag `wait_for_results = True`. + Throws exception if there is no result found after specified amount of seconds. + + :return: Dict representing DataScanJob. + When the job completes with a successful status, information about the Data Profile result + is available. + """ + + template_fields = ("project_id", "data_scan_id", "impersonation_chain") + + def __init__( + self, + project_id: str, + region: str, + data_scan_id: str, + job_id: str | None = None, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + wait_for_results: bool = True, + result_timeout: float = 60.0 * 10, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.data_scan_id = data_scan_id + self.job_id = job_id + self.api_version = api_version + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.wait_for_results = wait_for_results + self.result_timeout = result_timeout + + def execute(self, context: Context) -> dict: + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + # fetch the last job + if not self.job_id: + jobs = hook.list_data_scan_jobs( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + job_ids = [DataScanJob.to_dict(job) for job in jobs] + if not job_ids: + raise AirflowException("There are no jobs, you should create one before.") + job_id = job_ids[0]["name"] + self.job_id = job_id.split("/")[-1] + + if self.wait_for_results: + job = hook.wait_for_data_scan_job( + job_id=self.job_id, + data_scan_id=self.data_scan_id, + project_id=self.project_id, + region=self.region, + result_timeout=self.result_timeout, + ) + else: + job = hook.get_data_scan_job( + project_id=self.project_id, + region=self.region, + job_id=self.job_id, + data_scan_id=self.data_scan_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + if job.state == DataScanJob.State.SUCCEEDED: + self.log.info("Data Profile job executed successfully") + else: + self.log.info("Data Profile job execution returned status: %s", job.state) + + result = DataScanJob.to_dict(job) + result["state"] = DataScanJob.State(result["state"]).name + + return result + + def execute_complete(self, context, event=None) -> None: + """ + Callback for when the trigger fires - returns immediately. + + Relies on trigger to throw an exception, otherwise it assumes execution was + successful. + """ + job_state = event["job_state"] + job_id = event["job_id"] + job = event["job"] + if job_state == DataScanJob.State.FAILED: + raise AirflowException(f"Job failed:\n{job_id}") + if job_state == DataScanJob.State.CANCELLED: + raise AirflowException(f"Job was cancelled:\n{job_id}") + if job_state == DataScanJob.State.SUCCEEDED: + self.log.info("Data Profile job executed successfully") + else: + self.log.info("Data Profile job execution returned status: %s", job_state) + + return job + + class DataplexCreateZoneOperator(GoogleCloudBaseOperator): """ Creates a Zone resource within a Lake. diff --git a/airflow/providers/google/cloud/sensors/dataplex.py b/airflow/providers/google/cloud/sensors/dataplex.py index ee0ffc7410d9e..05d25316bb2bd 100644 --- a/airflow/providers/google/cloud/sensors/dataplex.py +++ b/airflow/providers/google/cloud/sensors/dataplex.py @@ -259,3 +259,121 @@ def poke(self, context: Context) -> bool: raise AirflowSkipException(message) raise AirflowDataQualityScanException(message) return job_status == DataScanJob.State.SUCCEEDED + + +class DataplexDataProfileJobStatusSensor(BaseSensorOperator): + """ + Check the status of the Dataplex DataProfile job. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param data_scan_id: Required. Data Quality scan identifier. + :param job_id: Required. Job ID. + :param api_version: The version of the api that will be requested for example 'v3'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :param result_timeout: Value in seconds for which operator will wait for the Data Quality scan result. + Throws exception if there is no result found after specified amount of seconds. + + :return: Boolean indicating if the job run has reached the ``DataScanJob.State.SUCCEEDED``. + """ + + template_fields = ["job_id"] + + def __init__( + self, + project_id: str, + region: str, + data_scan_id: str, + job_id: str, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + result_timeout: float = 60.0 * 10, + start_sensor_time: float | None = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.data_scan_id = data_scan_id + self.job_id = job_id + self.api_version = api_version + self.retry = retry + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.result_timeout = result_timeout + self.start_sensor_time = start_sensor_time + + def _duration(self): + if not self.start_sensor_time: + self.start_sensor_time = time.monotonic() + return time.monotonic() - self.start_sensor_time + + def poke(self, context: Context) -> bool: + self.log.info("Waiting for job %s to be %s", self.job_id, DataScanJob.State.SUCCEEDED) + if self.result_timeout: + duration = self._duration() + if duration > self.result_timeout: + # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 + message = ( + f"Timeout: Data Profile scan {self.job_id} is not ready after {self.result_timeout}s" + ) + if self.soft_fail: + raise AirflowSkipException(message) + raise AirflowDataQualityScanResultTimeoutException(message) + + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + + try: + job = hook.get_data_scan_job( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + job_id=self.job_id, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadata, + ) + except GoogleAPICallError as e: + # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 + message = f"Error occurred when trying to retrieve Data Profile scan job: {self.data_scan_id}" + if self.soft_fail: + raise AirflowSkipException(message, e) + raise AirflowException(message, e) + + job_status = job.state + self.log.info( + "Current status of the Dataplex Data Profile scan job %s => %s", self.job_id, job_status + ) + if job_status == DataScanJob.State.FAILED: + # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 + message = f"Data Profile scan job failed: {self.job_id}" + if self.soft_fail: + raise AirflowSkipException(message) + raise AirflowException(message) + if job_status == DataScanJob.State.CANCELLED: + # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 + message = f"Data Profile scan job cancelled: {self.job_id}" + if self.soft_fail: + raise AirflowSkipException(message) + raise AirflowException(message) + return job_status == DataScanJob.State.SUCCEEDED diff --git a/airflow/providers/google/cloud/triggers/dataplex.py b/airflow/providers/google/cloud/triggers/dataplex.py index 5f7907cad0519..ae03023926a2d 100644 --- a/airflow/providers/google/cloud/triggers/dataplex.py +++ b/airflow/providers/google/cloud/triggers/dataplex.py @@ -107,3 +107,85 @@ async def run(self) -> AsyncIterator[TriggerEvent]: def _convert_to_dict(self, job: DataScanJob) -> dict: """Returns a representation of a DataScanJob instance as a dict.""" return DataScanJob.to_dict(job) + + +class DataplexDataProfileJobTrigger(BaseTrigger): + """ + DataplexDataProfileJobTrigger runs on the trigger worker and waits for the job to be `SUCCEEDED` state. + + :param job_id: Optional. The ID of a Dataplex job. + :param data_scan_id: Required. DataScan identifier. + :param project_id: Google Cloud Project where the job is running. + :param region: The ID of the Google Cloud region that the job belongs to. + :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :param polling_interval_seconds: polling period in seconds to check for the status. + """ + + def __init__( + self, + job_id: str | None, + data_scan_id: str, + project_id: str | None, + region: str, + gcp_conn_id: str = "google_cloud_default", + polling_interval_seconds: int = 10, + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ): + super().__init__(**kwargs) + self.job_id = job_id + self.data_scan_id = data_scan_id + self.project_id = project_id + self.region = region + self.gcp_conn_id = gcp_conn_id + self.polling_interval_seconds = polling_interval_seconds + self.impersonation_chain = impersonation_chain + + def serialize(self): + return ( + "airflow.providers.google.cloud.triggers.dataplex.DataplexDataProfileJobTrigger", + { + "job_id": self.job_id, + "data_scan_id": self.data_scan_id, + "project_id": self.project_id, + "region": self.region, + "gcp_conn_id": self.gcp_conn_id, + "impersonation_chain": self.impersonation_chain, + "polling_interval_seconds": self.polling_interval_seconds, + }, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: + hook = DataplexAsyncHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + while True: + job = await hook.get_data_scan_job( + project_id=self.project_id, + region=self.region, + job_id=self.job_id, + data_scan_id=self.data_scan_id, + ) + state = job.state + if state in (DataScanJob.State.FAILED, DataScanJob.State.SUCCEEDED, DataScanJob.State.CANCELLED): + break + self.log.info( + "Current state is: %s, sleeping for %s seconds.", + DataScanJob.State(state).name, + self.polling_interval_seconds, + ) + await asyncio.sleep(self.polling_interval_seconds) + yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": self._convert_to_dict(job)}) + + def _convert_to_dict(self, job: DataScanJob) -> dict: + """Returns a representation of a DataScanJob instance as a dict.""" + return DataScanJob.to_dict(job) diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst index 8c0da7d68610d..1b8cc56dcf51d 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst @@ -321,3 +321,99 @@ To delete a asset you can use: :dedent: 4 :start-after: [START howto_dataplex_delete_asset_operator] :end-before: [END howto_dataplex_delete_asset_operator] + +Create or update a Data Profile scan +------------------------------------ + +Before you create a Dataplex Data Profile scan you need to define its body. +For more information about the available fields to pass when creating a Data Profile scan, visit `Dataplex create data profile API. `__ + +A simple Data Profile scan configuration can look as followed: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 0 + :start-after: [START howto_dataplex_data_profile_configuration] + :end-before: [END howto_dataplex_data_profile_configuration] + +With this configuration we can create or update the Data Profile scan: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateOrUpdateDataProfileScanOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_create_data_profile_operator] + :end-before: [END howto_dataplex_create_data_profile_operator] + +Get a Data Profile scan +----------------------- + +To get a Data Profile scan you can use: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataProfileScanOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_get_data_profile_operator] + :end-before: [END howto_dataplex_get_data_profile_operator] + + + +Delete a Data Profile scan +-------------------------- + +To delete a Data Profile scan you can use: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteDataProfileScanOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_delete_data_profile_operator] + :end-before: [END howto_dataplex_delete_data_profile_operator] + +Run a Data Profile scan +----------------------- + +You can run Dataplex Data Profile scan in asynchronous modes to later check its status using sensor: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexRunDataProfileScanOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_run_data_profile_operator] + :end-before: [END howto_dataplex_run_data_profile_operator] + +To check that running Dataplex Data Profile scan succeeded you can use: + +:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexDataProfileJobStatusSensor`. + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_data_scan_job_state_sensor] + :end-before: [END howto_dataplex_data_scan_job_state_sensor] + +Also for this action you can use operator in the deferrable mode: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_run_data_profile_def_operator] + :end-before: [END howto_dataplex_run_data_profile_def_operator] + +Get a Data Profile scan job +--------------------------- + +To get a Data Profile scan job you can use: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataProfileScanResultOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_get_data_profile_job_operator] + :end-before: [END howto_dataplex_get_data_profile_job_operator] diff --git a/tests/providers/google/cloud/operators/test_dataplex.py b/tests/providers/google/cloud/operators/test_dataplex.py index 2cddabb4a76b2..67c9b8ca10f9f 100644 --- a/tests/providers/google/cloud/operators/test_dataplex.py +++ b/tests/providers/google/cloud/operators/test_dataplex.py @@ -25,17 +25,21 @@ from airflow.providers.google.cloud.operators.dataplex import ( DataplexCreateAssetOperator, DataplexCreateLakeOperator, + DataplexCreateOrUpdateDataProfileScanOperator, DataplexCreateOrUpdateDataQualityScanOperator, DataplexCreateTaskOperator, DataplexCreateZoneOperator, DataplexDeleteAssetOperator, + DataplexDeleteDataProfileScanOperator, DataplexDeleteDataQualityScanOperator, DataplexDeleteLakeOperator, DataplexDeleteTaskOperator, DataplexDeleteZoneOperator, + DataplexGetDataProfileScanResultOperator, DataplexGetDataQualityScanResultOperator, DataplexGetTaskOperator, DataplexListTasksOperator, + DataplexRunDataProfileScanOperator, DataplexRunDataQualityScanOperator, ) from airflow.providers.google.cloud.triggers.dataplex import DataplexDataQualityJobTrigger @@ -329,6 +333,35 @@ def test_execute_deferrable(self, mock_data_scan_job, hook_mock): assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME +class TestDataplexRunDataProfileScanOperator: + @mock.patch(HOOK_STR) + @mock.patch(DATASCANJOB_STR) + def test_execute(self, mock_data_scan_job, hook_mock): + op = DataplexRunDataProfileScanOperator( + task_id="execute_data_scan", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + api_version=API_VERSION, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.run_data_scan.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + class TestDataplexGetDataQualityScanResultOperator: @mock.patch(HOOK_STR) @mock.patch(DATASCANJOB_STR) @@ -392,6 +425,39 @@ def test_execute_deferrable(self, mock_data_scan_job, hook_mock): assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME +class TestDataplexGetDataProfileScanResultOperator: + @mock.patch(HOOK_STR) + @mock.patch(DATASCANJOB_STR) + def test_execute(self, mock_data_scan_job, hook_mock): + op = DataplexGetDataProfileScanResultOperator( + task_id="get_data_scan_result", + project_id=PROJECT_ID, + region=REGION, + job_id=JOB_ID, + data_scan_id=DATA_SCAN_ID, + api_version=API_VERSION, + wait_for_results=False, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + api_version=API_VERSION, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.get_data_scan_job.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + job_id=JOB_ID, + data_scan_id=DATA_SCAN_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + class TestDataplexCreateAssetOperator: @mock.patch(HOOK_STR) @mock.patch(ASSET_STR) @@ -557,6 +623,35 @@ def test_execute(self, hook_mock): ) +class TestDataplexDeleteDataProfileScanOperator: + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock): + op = DataplexDeleteDataProfileScanOperator( + task_id=TASK_ID, + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + api_version=API_VERSION, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.delete_data_scan.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + class TestDataplexCreateDataQualityScanOperator: @mock.patch(HOOK_STR) def test_execute(self, hook_mock): @@ -586,3 +681,34 @@ def test_execute(self, hook_mock): timeout=None, metadata=(), ) + + +class TestDataplexCreateDataProfileScanOperator: + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock): + op = DataplexCreateOrUpdateDataProfileScanOperator( + task_id=TASK_ID, + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + body={}, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + api_version=API_VERSION, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.create_data_scan.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + body={}, + retry=DEFAULT, + timeout=None, + metadata=(), + ) diff --git a/tests/providers/google/cloud/sensors/test_dataplex.py b/tests/providers/google/cloud/sensors/test_dataplex.py index 20a4de4ff0279..4d5e72da71f29 100644 --- a/tests/providers/google/cloud/sensors/test_dataplex.py +++ b/tests/providers/google/cloud/sensors/test_dataplex.py @@ -25,6 +25,7 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.google.cloud.hooks.dataplex import AirflowDataQualityScanResultTimeoutException from airflow.providers.google.cloud.sensors.dataplex import ( + DataplexDataProfileJobStatusSensor, DataplexDataQualityJobStatusSensor, DataplexTaskStateSensor, TaskState, @@ -191,3 +192,82 @@ def test_start_sensor_time_timeout(self, mock_duration): with pytest.raises(AirflowDataQualityScanResultTimeoutException): sensor.poke(context={}) + + +class TestDataplexDataProfileJobStatusSensor: + def run_job(self, state: int): + job = mock.Mock() + job.state = state + return job + + @mock.patch(DATAPLEX_HOOK) + def test_done(self, mock_hook): + job = self.run_job(DataScanJob.State.SUCCEEDED) + mock_hook.return_value.get_data_scan_job.return_value = job + + sensor = DataplexDataProfileJobStatusSensor( + task_id=TASK_ID, + project_id=PROJECT_ID, + job_id=TEST_JOB_ID, + data_scan_id=TEST_DATA_SCAN_ID, + region=REGION, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT, + ) + result = sensor.poke(context={}) + + mock_hook.return_value.get_data_scan_job.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + job_id=TEST_JOB_ID, + data_scan_id=TEST_DATA_SCAN_ID, + timeout=TIMEOUT, + retry=DEFAULT, + metadata=(), + ) + + assert result + + def test_start_sensor_time(self): + sensor = DataplexDataProfileJobStatusSensor( + task_id=TASK_ID, + project_id=PROJECT_ID, + job_id=TEST_JOB_ID, + data_scan_id=TEST_DATA_SCAN_ID, + region=REGION, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT, + ) + + assert sensor.start_sensor_time is None + + duration_1 = sensor._duration() + duration_2 = sensor._duration() + + assert sensor.start_sensor_time + assert 0 < duration_1 < duration_2 + + @mock.patch.object(DataplexDataProfileJobStatusSensor, "_duration") + def test_start_sensor_time_timeout(self, mock_duration): + result_timeout = 100 + mock_duration.return_value = result_timeout + 1 + + sensor = DataplexDataProfileJobStatusSensor( + task_id=TASK_ID, + project_id=PROJECT_ID, + job_id=TEST_JOB_ID, + data_scan_id=TEST_DATA_SCAN_ID, + region=REGION, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT, + result_timeout=result_timeout, + ) + + with pytest.raises(AirflowDataQualityScanResultTimeoutException): + sensor.poke(context={}) diff --git a/tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py b/tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py new file mode 100644 index 0000000000000..aaf6669464fca --- /dev/null +++ b/tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py @@ -0,0 +1,341 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that shows how to use Dataplex Scan Data. +""" +from __future__ import annotations + +import os +from datetime import datetime + +from google.cloud import dataplex_v1 +from google.cloud.dataplex_v1 import DataProfileSpec +from google.protobuf.field_mask_pb2 import FieldMask + +from airflow.models.baseoperator import chain +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateEmptyTableOperator, + BigQueryDeleteDatasetOperator, + BigQueryInsertJobOperator, +) +from airflow.providers.google.cloud.operators.dataplex import ( + DataplexCreateAssetOperator, + DataplexCreateLakeOperator, + DataplexCreateOrUpdateDataProfileScanOperator, + DataplexCreateZoneOperator, + DataplexDeleteAssetOperator, + DataplexDeleteDataProfileScanOperator, + DataplexDeleteLakeOperator, + DataplexDeleteZoneOperator, + DataplexGetDataProfileScanOperator, + DataplexGetDataProfileScanResultOperator, + DataplexRunDataProfileScanOperator, +) +from airflow.providers.google.cloud.sensors.dataplex import DataplexDataProfileJobStatusSensor +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") + +DAG_ID = "example_dataplex_data_profile" + +LAKE_ID = f"test-lake-{ENV_ID}" +REGION = "us-central1" + +DATASET_NAME = f"dataset_bq_{ENV_ID}" + +TABLE_1 = "table0" +TABLE_2 = "table1" + +SCHEMA = [ + {"name": "value", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "dt", "type": "STRING", "mode": "NULLABLE"}, +] + +DATASET = DATASET_NAME +INSERT_DATE = datetime.now().strftime("%Y-%m-%d") +INSERT_ROWS_QUERY = f"INSERT {DATASET}.{TABLE_1} VALUES (1, 'test test2', '{INSERT_DATE}');" +LOCATION = "us" + +TRIGGER_SPEC_TYPE = "ON_DEMAND" + +ZONE_ID = "test-zone-id" +DATA_SCAN_ID = "test-data-scan-id" + +EXAMPLE_LAKE_BODY = { + "display_name": "test_display_name", + "labels": [], + "description": "test_description", + "metastore": {"service": ""}, +} + +# [START howto_dataplex_zone_configuration] +EXAMPLE_ZONE = { + "type_": "RAW", + "resource_spec": {"location_type": "SINGLE_REGION"}, +} +# [END howto_dataplex_zone_configuration] + +ASSET_ID = "test-asset-id" + +# [START howto_dataplex_asset_configuration] +EXAMPLE_ASSET = { + "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET_NAME}", "type_": "BIGQUERY_DATASET"}, + "discovery_spec": {"enabled": True}, +} +# [END howto_dataplex_asset_configuration] + +# [START howto_dataplex_data_profile_configuration] +EXAMPLE_DATA_SCAN = dataplex_v1.DataScan() +EXAMPLE_DATA_SCAN.data.entity = ( + f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}" +) +EXAMPLE_DATA_SCAN.data.resource = ( + f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}" +) +EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({}) +# [END howto_dataplex_data_profile_configuration] +UPDATE_MASK = FieldMask(paths=["data_profile_spec"]) +ENTITY = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}" +EXAMPLE_DATA_SCAN_UPDATE = { + "data": { + "entity": ENTITY, + "resource": f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}", + }, + "data_profile_spec": {}, +} + + +with DAG( + DAG_ID, + start_date=datetime(2021, 1, 1), + schedule="@once", + tags=["example", "dataplex", "data_profile"], +) as dag: + create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) + create_table_1 = BigQueryCreateEmptyTableOperator( + task_id="create_table_1", + dataset_id=DATASET_NAME, + table_id=TABLE_1, + schema_fields=SCHEMA, + location=LOCATION, + ) + create_table_2 = BigQueryCreateEmptyTableOperator( + task_id="create_table_2", + dataset_id=DATASET_NAME, + table_id=TABLE_2, + schema_fields=SCHEMA, + location=LOCATION, + ) + insert_query_job = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration={ + "query": { + "query": INSERT_ROWS_QUERY, + "useLegacySql": False, + } + }, + ) + create_lake = DataplexCreateLakeOperator( + task_id="create_lake", project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID + ) + # [START howto_dataplex_create_zone_operator] + create_zone = DataplexCreateZoneOperator( + task_id="create_zone", + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + body=EXAMPLE_ZONE, + zone_id=ZONE_ID, + ) + # [END howto_dataplex_create_zone_operator] + # [START howto_dataplex_create_asset_operator] + create_asset = DataplexCreateAssetOperator( + task_id="create_asset", + project_id=PROJECT_ID, + region=REGION, + body=EXAMPLE_ASSET, + lake_id=LAKE_ID, + zone_id=ZONE_ID, + asset_id=ASSET_ID, + ) + # [END howto_dataplex_create_asset_operator] + # [START howto_dataplex_create_data_profile_operator] + create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator( + task_id="create_data_scan", + project_id=PROJECT_ID, + region=REGION, + body=EXAMPLE_DATA_SCAN, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_create_data_profile_operator] + update_data_scan = DataplexCreateOrUpdateDataProfileScanOperator( + task_id="update_data_scan", + project_id=PROJECT_ID, + region=REGION, + update_mask=UPDATE_MASK, + body=EXAMPLE_DATA_SCAN_UPDATE, + data_scan_id=DATA_SCAN_ID, + ) + # [START howto_dataplex_get_data_profile_operator] + get_data_scan = DataplexGetDataProfileScanOperator( + task_id="get_data_scan", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_get_data_profile_operator] + run_data_scan_sync = DataplexRunDataProfileScanOperator( + task_id="run_data_scan_sync", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + get_data_scan_job_result = DataplexGetDataProfileScanResultOperator( + task_id="get_data_scan_job_result", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + # [START howto_dataplex_run_data_profile_operator] + run_data_scan_async = DataplexRunDataProfileScanOperator( + task_id="run_data_scan_async", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + asynchronous=True, + ) + # [END howto_dataplex_run_data_profile_operator] + # [START howto_dataplex_data_scan_job_state_sensor] + get_data_scan_job_status = DataplexDataProfileJobStatusSensor( + task_id="get_data_scan_job_status", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}", + ) + # [END howto_dataplex_data_scan_job_state_sensor] + # [START howto_dataplex_get_data_profile_job_operator] + get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator( + task_id="get_data_scan_job_result_2", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_get_data_profile_job_operator] + # [START howto_dataplex_run_data_profile_def_operator] + run_data_scan_def = DataplexRunDataProfileScanOperator( + task_id="run_data_scan_def", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + deferrable=True, + ) + # [END howto_dataplex_run_data_profile_def_operator] + run_data_scan_async_2 = DataplexRunDataProfileScanOperator( + task_id="run_data_scan_async_2", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + asynchronous=True, + ) + # [START howto_dataplex_delete_asset_operator] + delete_asset = DataplexDeleteAssetOperator( + task_id="delete_asset", + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + zone_id=ZONE_ID, + asset_id=ASSET_ID, + ) + # [END howto_dataplex_delete_asset_operator] + delete_asset.trigger_rule = TriggerRule.ALL_DONE + # [START howto_dataplex_delete_zone_operator] + delete_zone = DataplexDeleteZoneOperator( + task_id="delete_zone", + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + zone_id=ZONE_ID, + ) + # [END howto_dataplex_delete_zone_operator] + delete_zone.trigger_rule = TriggerRule.ALL_DONE + # [START howto_dataplex_delete_data_profile_operator] + delete_data_scan = DataplexDeleteDataProfileScanOperator( + task_id="delete_data_scan", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_delete_data_profile_operator] + delete_data_scan.trigger_rule = TriggerRule.ALL_DONE + delete_lake = DataplexDeleteLakeOperator( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + task_id="delete_lake", + trigger_rule=TriggerRule.ALL_DONE, + ) + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + dataset_id=DATASET_NAME, + project_id=PROJECT_ID, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + chain( + # TEST SETUP + create_dataset, + [create_table_1, create_table_2], + insert_query_job, + create_lake, + create_zone, + create_asset, + # TEST BODY + create_data_scan, + update_data_scan, + get_data_scan, + run_data_scan_sync, + get_data_scan_job_result, + run_data_scan_async, + get_data_scan_job_status, + get_data_scan_job_result_2, + run_data_scan_def, + run_data_scan_async_2, + # get_data_scan_job_result_def, + # TEST TEARDOWN + delete_asset, + delete_zone, + delete_data_scan, + [delete_lake, delete_dataset], + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)