Skip to content

Commit

Permalink
Merge pull request #23 from CruGlobal/dbt-geography-triggers
Browse files Browse the repository at this point in the history
Refactor FivetranClient error handling and remove unused ExitCodeException class
  • Loading branch information
tony-guan-cru authored Dec 10, 2024
2 parents 687b42c + 77acc3c commit cc91793
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 74 deletions.
40 changes: 40 additions & 0 deletions dbt-trigger/dbt_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import logging
import requests

logger = logging.getLogger("primary_logger")


class DbtClient:

def __init__(self, access_token: str, account_id: str):
self.access_token = access_token
self.account_id = account_id
self.headers = {"Authorization": f"Bearer {access_token}"}
self.account_url = f"https://cloud.getdbt.com/api/v2/accounts/{account_id}/"
self.base_url = "https://cloud.getdbt.com/api/v2/"

def _request(self, url, data=None, params=None, method="GET"):
request_details = {"headers": self.headers}
if data:
request_details["data"] = data
if params:
request_details["params"] = params

try:
response = requests.request(method, url, **request_details)
if response.ok:
return response.json()
except Exception as e:
logger.exception(f"Error in making request to {url}: {e}")
raise

def trigger_job(self, job_id):
logger.info(f"Triggering dbt job {job_id} on account {self.account_id}")

response = self._request(
f"{self.account_url}/jobs/{job_id}/run/",
data={"cause": f"Triggered by Google Cloud Function"},
method="POST",
)
return response
116 changes: 116 additions & 0 deletions dbt-trigger/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os
import functions_framework
from dbt_client import DbtClient
import logging
import sys
import os
import json
from requests import auth, Session
import time


logger = logging.getLogger("primary_logger")
logger.propagate = False


class CloudLoggingFormatter(logging.Formatter):
"""
Produces messages compatible with google cloud logging
"""

def format(self, record: logging.LogRecord) -> str:
s = super().format(record)
return json.dumps(
{
"message": s,
"severity": record.levelname,
"timestamp": {"seconds": int(record.created), "nanos": 0},
}
)


def setup_logging():
"""
Sets up logging for the application.
"""
global logger

# Remove any existing handlers
if logger.handlers:
for handler in logger.handlers:
logger.removeHandler(handler)

handler = logging.StreamHandler(stream=sys.stdout)
formatter = CloudLoggingFormatter(fmt="%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

sys.excepthook = handle_unhandled_exception


def handle_unhandled_exception(exc_type, exc_value, exc_traceback):
"""
Handles unhandled exceptions by logging the exception details and sending an alert to the development team.
This function is intended to be used as a custom excepthook function, which is called when an unhandled exception
occurs in the application. The function logs the exception details to the primary logger, and sends an alert to
the development team using a third-party service such as Datadog or PagerDuty.
Args:
exc_type (type): The type of the exception that was raised.
exc_value (Exception): The exception object that was raised.
exc_traceback (traceback): The traceback object that was generated when the exception was raised.
"""
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return

logger.exception(
"Unhandled exception", exc_info=(exc_type, exc_value, exc_traceback)
)


@functions_framework.http
def trigger_dbt_job(request):
"""
Triggers a dbt job for a given job id.
"""
setup_logging()

request_json = request.get_json(silent=True)

if (
request_json is None
and request.headers.get("Content-Type") == "application/octet-stream"
):
try:
request_data = request.get_data(as_text=True)
request_json = json.loads(request_data) if request_data else None
except Exception as e:
logger.exception(f"Failed to parse octet-stream data: {str(e)}")
request_json = None

if request_json and "job_id" in request_json:
job_id = request_json["job_id"]
else:
logger.exception("Failed to retrieve job_id")
raise

dbt_token = os.environ["DBT_TOKEN"]
account_id = "10206"
try:
client = DbtClient(access_token=dbt_token, account_id=account_id)
job_run_response = client.trigger_job(job_id)
run_id = job_run_response["data"]["id"]
if run_id is None:
logger.exception(f"dbt run failed to start.")
return
logger.info(f"DBT run {run_id} started successfully.")
return "Trigger dbt job completed", 200
except Exception as e:
logger.exception(
f"An error occurred when attempting to trigger dbt job: {str(e)}"
)
raise
2 changes: 2 additions & 0 deletions dbt-trigger/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
functions-framework==3.*
requests==2.32.3
75 changes: 11 additions & 64 deletions fivetran-trigger/fivetran_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,6 @@
logger = logging.getLogger("primary_logger")


class ExitCodeException(Exception):
"""
ExitCodeException is a custom exception class for raising exceptions with exit codes.
"""

def __init__(self, message, exit_code):
super().__init__(message)
self.message = message
self.exit_code = exit_code


class FivetranClient:
"""
FivetranClient is a class for interacting with the Fivetran API.
Expand All @@ -27,18 +16,8 @@ class FivetranClient:
auth (HTTPBasicAuth): The authentication object used for requests
"""

EXIT_CODE_INVALID_CREDENTIALS = 200
EXIT_CODE_BAD_REQUEST = 201
EXIT_CODE_SYNC_REFRESH_ERROR = 202
EXIT_CODE_SYNC_ALREADY_RUNNING = 203
EXIT_CODE_SYNC_INVALID_SOURCE_ID = 204
EXIT_CODE_SYNC_INVALID_POKE_INTERVAL = 205
EXIT_CODE_INVALID_INPUT = 206
EXIT_CODE_UNKNOWN_ERROR = 249

def __init__(self, auth: HTTPBasicAuth) -> None:
self.auth = auth
# logger.debug("FivetranClient initialized")

def _request(
self, endpoint: str, method: str = "GET", payload: dict = None
Expand Down Expand Up @@ -76,25 +55,7 @@ def _request(
return resp.json()
except requests.exceptions.RequestException as e:
logger.exception(f"Error: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 401:
raise ExitCodeException(
f"Authentication failed: {e.response.json().get('message')}",
self.EXIT_CODE_INVALID_CREDENTIALS,
)
elif e.response.json().get("code") == "NotFound_Integration":
raise ExitCodeException(
f"Invalid source ID: {e.response.json().get('message')}",
self.EXIT_CODE_SYNC_INVALID_SOURCE_ID,
)
elif e.response.status_code == 400:
raise ExitCodeException(
f"Bad request: {e.response.json().get('message')}",
self.EXIT_CODE_BAD_REQUEST,
)
else:
raise ExitCodeException(
f"Unknown error: {e.response.json()}", self.EXIT_CODE_UNKNOWN_ERROR
)
raise

def trigger_sync(
self,
Expand Down Expand Up @@ -133,11 +94,10 @@ def trigger_sync(
method="POST",
payload={"force": force},
)
except ExitCodeException as e:
except Exception as e:
logger.exception(f"Error triggering sync: {e}")
raise ExitCodeException(f"Error triggering sync: {e}", e.exit_code) from e
raise
else:
# logger.info("Sync triggered successfully")
if wait_for_completion:
new_success, new_failure = prev_success, prev_failure
while prev_success == new_success and prev_failure == new_failure:
Expand All @@ -153,10 +113,7 @@ def trigger_sync(
not prev_failure and new_failure
):
logger.exception(f"Sync failed at {new_failure}")
raise ExitCodeException(
f"Sync failed at {new_failure}",
self.EXIT_CODE_SYNC_REFRESH_ERROR,
)
raise
else:
logger.info("No new failure detected")

Expand All @@ -179,11 +136,9 @@ def determine_sync_status(self, connector_id: str) -> str:
endpoint=f"connectors/{connector_id}", method="GET"
)
return response.get("data", {}).get("status", {}).get("sync_state")
except ExitCodeException as e:
except Exception as e:
logger.exception(f"Error determining sync status: {e}")
raise ExitCodeException(
f"Error determining sync status: {e}", e.exit_code
) from e
raise

def get_connector_details(self, connector_id: str) -> dict:
"""
Expand All @@ -204,11 +159,9 @@ def get_connector_details(self, connector_id: str) -> dict:
endpoint=f"connectors/{connector_id}", method="GET"
)
return response.get("data", {})
except ExitCodeException as e:
except Exception as e:
logger.exception(f"Error getting connector details: {e}")
raise ExitCodeException(
f"Error getting connector details: {e}", e.exit_code
) from e
raise

def _get_latest_success_and_failure(self, connector_id: str) -> tuple:
"""
Expand Down Expand Up @@ -259,18 +212,12 @@ def update_connector(
endpoint = f"connectors/{connector_id}"
try:
self._request(endpoint, method="PATCH", payload=payload)
except ExitCodeException as e:
except Exception as e:
logger.exception(f"Error updating connector: {e}")
raise ExitCodeException(
f"Error updating connector: {e}", e.exit_code
) from e
# else:
# logger.info("Connector updated successfully")
raise
else:
logger.exception("No updates to connector were provided")
raise ExitCodeException(
"No updates to connector were provided", self.EXIT_CODE_BAD_REQUEST
)
raise

def connect(self) -> int:
"""
Expand Down
8 changes: 5 additions & 3 deletions fivetran-trigger/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ def trigger_sync(request):
connector_id = request_json["connector_id"]
else:
logger.exception("Failed to retrieve connector_id")
raise ValueError("Failed to retrieve connector_id")
raise

client = FivetranClient(basic_auth)

try:
client.update_connector(connector_id=connector_id, schedule_type="manual")
logger.info(f"Connector updated successfully, schedule_type: manual")
logger.info(
f"Connector updated successfully, schedule_type: manual, connector_id: {connector_id}"
)
client.trigger_sync(
connector_id=connector_id,
force=True,
Expand All @@ -128,4 +130,4 @@ def trigger_sync(request):
logger.exception(
f"connector_id: {connector_id} - Error triggering Fivetran sync: {str(e)}"
)
raise RuntimeError(f"Error triggering Fivetran sync: {str(e)}")
raise
2 changes: 1 addition & 1 deletion fivetran-trigger/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
functions-framework==3.*

# For fivetran api calls
requests==2.32.3
requests==2.32.3
Loading

0 comments on commit cc91793

Please sign in to comment.