diff --git a/dbt-trigger/dbt_client.py b/dbt-trigger/dbt_client.py index f7c0532..08a386d 100644 --- a/dbt-trigger/dbt_client.py +++ b/dbt-trigger/dbt_client.py @@ -37,74 +37,4 @@ def trigger_job(self, job_id): data={"cause": f"Triggered by Google Cloud Function"}, method="POST", ) - # logger.info(f"Succesfully triggered job {job_id}") - # logger.info(f"Response from the API: \n{response}") return response - - def determine_run_status(self, run_id): - run_details = self.get_run_details(run_id) - run_id = run_details["data"]["id"] - if run_details["data"]["is_complete"]: - if run_details["data"]["is_error"]: - logger.error(f"The run {run_id} failed.") - return 20 - elif run_details["data"]["is_cancelled"]: - logger.error(f"The run {run_id} was cancelled.") - return 30 - else: - logger.info(f"The run {run_id} was successful") - return 10 - else: - logger.info(f"The run {run_id} is not yet completed.") - return 110 - - def connect(self): - try: - response = requests.get(self.account_url, headers=self.headers) - if response.status_code == 200: - logger.info("Successfully connected to DBT") - return 0 - else: - logger.exception("Could not connect to DBT") - return 1 - except Exception as e: - logger.exception(f"Could not connect to DBT due to {e}") - return 1 - - def get_run_details(self, run_id): - # logger.info(f"Checking run details for run {run_id}.") - response = self._request( - url=f"{self.account_url}/runs/{run_id}/", - params={"include_related": "['run_steps','debug_logs']"}, - ) - # logger.info(f"Response from the API: \n{response}") - return response - - def get_artifact_details(self, run_id): - logger.info(f"Grabbing artifact details for run {run_id}") - return self._request(f"{self.account_url}/runs/{run_id}/artifacts/") - - def download_artifact(self, run_id, artifact_name, destination_folder): - get_artifact_details_url = ( - f"{self.account_url}/runs/{run_id}/artifacts/{artifact_name}" - ) - artifact_file_name = os.path.basename(artifact_name) - artifact_folder = os.path.dirname(artifact_name) - - destination_fullpath = os.path.join(destination_folder, artifact_folder) - os.makedirs(destination_fullpath, exist_ok=True) - - filename = os.path.join(destination_fullpath, artifact_file_name) - CHUNK_SIZE = 16 * 1024 * 1024 - - try: - with requests.get( - get_artifact_details_url, headers=self.headers, stream=True - ) as r: - r.raise_for_status() - with open(filename, "wb") as f: - for chunk in r.iter_content(chunk_size=CHUNK_SIZE): - f.write(chunk) - logger.info(f"Successfully downloaded file {get_artifact_details_url}") - except Exception as e: - raise diff --git a/dbt-trigger/main.py b/dbt-trigger/main.py index 8dc523e..8d2760f 100644 --- a/dbt-trigger/main.py +++ b/dbt-trigger/main.py @@ -94,9 +94,6 @@ def trigger_dbt_job(request): if request_json and "job_id" in request_json: job_id = request_json["job_id"] - if "wait_for_completion" in request_json: - wait_for_completion_val = request_json["wait_for_completion"] - wait_for_completion = wait_for_completion_val.lower() == "true" else: logger.exception("Failed to retrieve job_id") raise @@ -111,46 +108,6 @@ def trigger_dbt_job(request): logger.exception(f"dbt run failed to start.") return logger.info(f"DBT run {run_id} started successfully.") - if wait_for_completion: - logger.info(f"Checking run details for run {run_id}.") - is_complete = False - exit_code = 110 - while not is_complete: - run_details = client.get_run_details(run_id) - is_complete = run_details["data"]["is_complete"] - if not is_complete: - logger.info( - f"The run {run_id} is not yet completed. Waiting for 30 seconds..." - ) - time.sleep(30) - else: - exit_code = client.determine_run_status(run_id) - if exit_code != 10: - failed_steps = [ - step - for step in run_details["data"]["run_steps"] - if step["status"] != 10 - ] - - error_details = { - "run_id": run_details["data"]["id"], - "status": run_details["data"]["status"], - "status_message": run_details["data"]["status_message"], - "failed_steps": [ - {"step_name": step["name"]} for step in failed_steps - ], - } - - error_message = ( - f"DBT Run Failure Summary:\n" - f"Run ID: {error_details['run_id']}\n" - f"Status: {error_details['status']}\n" - f"Message: {error_details['status_message']}\n" - f"Failed Steps: {', '.join(step['step_name'] for step in error_details['failed_steps'])}" - ) - - # logger.exception(error_message) - raise RuntimeError(error_message) return "Trigger dbt job completed", 200 except Exception as e: logger.exception( diff --git a/poc-terraform/main.tf b/poc-terraform/main.tf index f3dcd93..a1b6619 100644 --- a/poc-terraform/main.tf +++ b/poc-terraform/main.tf @@ -1,5 +1,5 @@ module "fivetran_trigger" { - source = "git@github.com:CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.14.4" + source = "git::https://github.com/CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.14.4" name = "fivetran-trigger" description = "A set of triggers to kick off Fivetran connection syncs for various systems" @@ -110,41 +110,41 @@ module "fivetran_trigger" { argument = { "connector_id" = "unclothed_cheddar" } - }, + } } secrets = ["API_KEY", "API_SECRET"] secret_managers = [ + "user:luis.rodriguez@cru.org", + "user:matt.drees@cru.org", "group:dps-gcp-role-data-engineers@cru.org", ] - - project_id = module.project.project_id - region = local.region + project_id = local.project_id } module "dbt-triggers" { - source = "git::https://github.com/CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.13.0" + source = "git::https://github.com/CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.14.4" name = "dbt-trigger" description = "A set of triggers to kick off dbt jobs" - secrets = ["DBT_TOKEN"] time_zone = "UTC" schedule = { doc_src_freshness : { cron : "0 0 1 1 *", argument = { - job_id = "18120" - wait_for_completion = true + "job_id" = "18120" } }, utilities : { cron : "0 0 1 1 *", argument = { - job_id = "23366" - wait_for_completion = false + "job_id" = "23366" } } } + + secrets = ["DBT_TOKEN"] + secret_managers = [ "user:luis.rodriguez@cru.org", "user:matt.drees@cru.org",