Skip to content

Commit

Permalink
Remove unused wait_for_completion logic and related run status checks…
Browse files Browse the repository at this point in the history
… in dbt job trigger
  • Loading branch information
tony-guan-cru committed Dec 10, 2024
1 parent 86f4258 commit 77acc3c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 124 deletions.
70 changes: 0 additions & 70 deletions dbt-trigger/dbt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,74 +37,4 @@ def trigger_job(self, job_id):
data={"cause": f"Triggered by Google Cloud Function"},
method="POST",
)
# logger.info(f"Succesfully triggered job {job_id}")
# logger.info(f"Response from the API: \n{response}")
return response

def determine_run_status(self, run_id):
run_details = self.get_run_details(run_id)
run_id = run_details["data"]["id"]
if run_details["data"]["is_complete"]:
if run_details["data"]["is_error"]:
logger.error(f"The run {run_id} failed.")
return 20
elif run_details["data"]["is_cancelled"]:
logger.error(f"The run {run_id} was cancelled.")
return 30
else:
logger.info(f"The run {run_id} was successful")
return 10
else:
logger.info(f"The run {run_id} is not yet completed.")
return 110

def connect(self):
try:
response = requests.get(self.account_url, headers=self.headers)
if response.status_code == 200:
logger.info("Successfully connected to DBT")
return 0
else:
logger.exception("Could not connect to DBT")
return 1
except Exception as e:
logger.exception(f"Could not connect to DBT due to {e}")
return 1

def get_run_details(self, run_id):
# logger.info(f"Checking run details for run {run_id}.")
response = self._request(
url=f"{self.account_url}/runs/{run_id}/",
params={"include_related": "['run_steps','debug_logs']"},
)
# logger.info(f"Response from the API: \n{response}")
return response

def get_artifact_details(self, run_id):
logger.info(f"Grabbing artifact details for run {run_id}")
return self._request(f"{self.account_url}/runs/{run_id}/artifacts/")

def download_artifact(self, run_id, artifact_name, destination_folder):
get_artifact_details_url = (
f"{self.account_url}/runs/{run_id}/artifacts/{artifact_name}"
)
artifact_file_name = os.path.basename(artifact_name)
artifact_folder = os.path.dirname(artifact_name)

destination_fullpath = os.path.join(destination_folder, artifact_folder)
os.makedirs(destination_fullpath, exist_ok=True)

filename = os.path.join(destination_fullpath, artifact_file_name)
CHUNK_SIZE = 16 * 1024 * 1024

try:
with requests.get(
get_artifact_details_url, headers=self.headers, stream=True
) as r:
r.raise_for_status()
with open(filename, "wb") as f:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
f.write(chunk)
logger.info(f"Successfully downloaded file {get_artifact_details_url}")
except Exception as e:
raise
43 changes: 0 additions & 43 deletions dbt-trigger/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ def trigger_dbt_job(request):

if request_json and "job_id" in request_json:
job_id = request_json["job_id"]
if "wait_for_completion" in request_json:
wait_for_completion_val = request_json["wait_for_completion"]
wait_for_completion = wait_for_completion_val.lower() == "true"
else:
logger.exception("Failed to retrieve job_id")
raise
Expand All @@ -111,46 +108,6 @@ def trigger_dbt_job(request):
logger.exception(f"dbt run failed to start.")
return
logger.info(f"DBT run {run_id} started successfully.")
if wait_for_completion:
logger.info(f"Checking run details for run {run_id}.")
is_complete = False
exit_code = 110
while not is_complete:
run_details = client.get_run_details(run_id)
is_complete = run_details["data"]["is_complete"]
if not is_complete:
logger.info(
f"The run {run_id} is not yet completed. Waiting for 30 seconds..."
)
time.sleep(30)
else:
exit_code = client.determine_run_status(run_id)
if exit_code != 10:
failed_steps = [
step
for step in run_details["data"]["run_steps"]
if step["status"] != 10
]

error_details = {
"run_id": run_details["data"]["id"],
"status": run_details["data"]["status"],
"status_message": run_details["data"]["status_message"],
"failed_steps": [
{"step_name": step["name"]} for step in failed_steps
],
}

error_message = (
f"DBT Run Failure Summary:\n"
f"Run ID: {error_details['run_id']}\n"
f"Status: {error_details['status']}\n"
f"Message: {error_details['status_message']}\n"
f"Failed Steps: {', '.join(step['step_name'] for step in error_details['failed_steps'])}"
)

# logger.exception(error_message)
raise RuntimeError(error_message)
return "Trigger dbt job completed", 200
except Exception as e:
logger.exception(
Expand Down
22 changes: 11 additions & 11 deletions poc-terraform/main.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module "fivetran_trigger" {
source = "git@github.com:CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.14.4"
source = "git::https://github.com/CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.14.4"

name = "fivetran-trigger"
description = "A set of triggers to kick off Fivetran connection syncs for various systems"
Expand Down Expand Up @@ -110,41 +110,41 @@ module "fivetran_trigger" {
argument = {
"connector_id" = "unclothed_cheddar"
}
},
}
}

secrets = ["API_KEY", "API_SECRET"]

secret_managers = [
"user:[email protected]",
"user:[email protected]",
"group:[email protected]",
]

project_id = module.project.project_id
region = local.region
project_id = local.project_id
}

module "dbt-triggers" {
source = "git::https://github.com/CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.13.0"
source = "git::https://github.com/CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.14.4"
name = "dbt-trigger"
description = "A set of triggers to kick off dbt jobs"
secrets = ["DBT_TOKEN"]
time_zone = "UTC"
schedule = {
doc_src_freshness : {
cron : "0 0 1 1 *",
argument = {
job_id = "18120"
wait_for_completion = true
"job_id" = "18120"
}
},
utilities : {
cron : "0 0 1 1 *",
argument = {
job_id = "23366"
wait_for_completion = false
"job_id" = "23366"
}
}
}

secrets = ["DBT_TOKEN"]

secret_managers = [
"user:[email protected]",
"user:[email protected]",
Expand Down

0 comments on commit 77acc3c

Please sign in to comment.