From 09a5600904dfbb7d113cccb1b6bc031c64e14cb5 Mon Sep 17 00:00:00 2001 From: Markham Lee Date: Sat, 9 Mar 2024 09:42:10 -0800 Subject: [PATCH] Refactoring, updated unit tests --- etl_pipelines/finnhub/README.md | 9 +++++ etl_pipelines/finnhub/finnhub_utilities.py | 11 ------ etl_pipelines/finnhub/main.py | 24 ++++++++++-- .../finnhub/stock_prices_payload.json | 21 +++++----- etl_pipelines/finnhub/test.py | 39 ++++++------------- 5 files changed, 52 insertions(+), 52 deletions(-) diff --git a/etl_pipelines/finnhub/README.md b/etl_pipelines/finnhub/README.md index 755e800..3563b27 100644 --- a/etl_pipelines/finnhub/README.md +++ b/etl_pipelines/finnhub/README.md @@ -18,6 +18,15 @@ The "stock_prices_payload.json" file is used to validate the payload that comes You can read more about the Finnhub Stock Price API [here](https://finnhub.io/docs/api/quote) +#### Data Quality & Testing +* The json validate library is used to ensure that the returned data payload is correct in terms of the right fields being present, and the data in those fields being the right data type. This is critical as InfluxDB has tight type checking and once a field has been written in a particular format e.g., a float, subsequent writes of a different data type will be rejected, e.g.,an integer. +* The test.py file will run a series of unit tests, checking both primary and secondary functionality: + * The end to end ETL workflow, if any part of the process fails, the entire test fails + * Validating that the appropriate error messages and alerts are sent in response to a bad or failed API call + * Testing the data validation process, i.e, comparing a provided "bad" data payload with the ETL's JSON schema, followed by checking that the appropriate error messages and alerts are generated. +* Alerts are sent via Slack related to any "pipeline failure issue", whether it's an API call failing, data validation failing, DB write issues, etc. + + #### Deployment To deploy/run/use the container you will need to sign-up for the Finnhub API and get an API key, you're able to hit the API 60/second with the free version, which should be sufficient for most personal use cases. Additionally, please make note that the container uses the finnuhb python library, so you'll need to install that into your python virtual environment if you plan to use this outside of a Docker container. Finally, if you want to receive broken pipeline/pipeline error alerts, you'll need to sign up for the Slack API, and then configure a channel and a wehbhook for receiving messages. diff --git a/etl_pipelines/finnhub/finnhub_utilities.py b/etl_pipelines/finnhub/finnhub_utilities.py index e151f86..a20d88d 100644 --- a/etl_pipelines/finnhub/finnhub_utilities.py +++ b/etl_pipelines/finnhub/finnhub_utilities.py @@ -6,9 +6,7 @@ # information from the API in the future import os -import json import finnhub -from jsonschema import validate class FinnHubUtilities(): @@ -22,21 +20,12 @@ def __init__(self) -> None: def get_stock_data(symbol: str, FINNHUB_KEY=os.environ.get('FINNHUB_SECRET')) -> dict: - # FINNHUB_KEY = os.environ.get('FINNHUB_SECRET') - - # import data schema for validation - with open('stock_prices_payload.json') as file: - SCHEMA = json.load(file) - # create client client = finnhub.Client(FINNHUB_KEY) # get data data = client.quote(symbol) - # validate data - validate(instance=data, schema=SCHEMA) - return data @staticmethod diff --git a/etl_pipelines/finnhub/main.py b/etl_pipelines/finnhub/main.py index 2859da8..2981381 100644 --- a/etl_pipelines/finnhub/main.py +++ b/etl_pipelines/finnhub/main.py @@ -5,6 +5,7 @@ import os import sys +import json from finnhub_utilities import FinnHubUtilities parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -39,6 +40,15 @@ def get_prices(symbol: str, *args): return response +def validate_json_payload(data: dict) -> int: + + with open('stock_prices_payload.json') as file: + SCHEMA = json.load(file) + + # validate the data + return etl_utilities.validate_json(data, SCHEMA) + + def parse_data(data: dict) -> dict: return finn_util.parse_stock_data(data) @@ -84,11 +94,17 @@ def main(): stock_data = get_prices(os.environ['STOCK_SYMBOL']) - # parse data into a json payload - stock_payload = parse_data(stock_data) + # validate the data + if validate_json_payload(stock_data) == 0: + + # parse data into a json payload + stock_payload = parse_data(stock_data) + + # write data to InfluxDB + write_data(stock_payload) - # write data to InfluxDB - write_data(stock_payload) + else: + sys.exit() if __name__ == '__main__': diff --git a/etl_pipelines/finnhub/stock_prices_payload.json b/etl_pipelines/finnhub/stock_prices_payload.json index 62c41d4..e52b6da 100644 --- a/etl_pipelines/finnhub/stock_prices_payload.json +++ b/etl_pipelines/finnhub/stock_prices_payload.json @@ -1,11 +1,14 @@ - { -"c": 378.85, -"d": 2.7, -"dp": -1.006, -"h": 384.3, -"l": 377.44, -"o": 383.76, -"pc": 382.7, -"t": 170129160 + "type": "object", + "properties": { + "c": {"type": "number"}, + "d": {"type": "number"}, + "dp": {"type": "number"}, + "h": {"type": "number"}, + "l": {"type": "number"}, + "o": {"type": "number"}, + "pc": {"type": "number"}, + "t": {"type": "number"} + }, + "required": ["pc", "o", "l", "dp"] } \ No newline at end of file diff --git a/etl_pipelines/finnhub/test.py b/etl_pipelines/finnhub/test.py index a921790..f0cd13f 100644 --- a/etl_pipelines/finnhub/test.py +++ b/etl_pipelines/finnhub/test.py @@ -28,10 +28,8 @@ def test_finnhub_api_(self): data = main.get_prices(self.STOCK_SYMBOL) - # get length of json object - the get prices method already - # validates the json format, so this a double check, still, - # if that step failed, then so would this one. - value_count = len(data) + # validate the returned payload + validation_status = main.validate_json_payload(data) # now we check that data parsing works properly parsed_data = main.parse_data(data) @@ -41,22 +39,11 @@ def test_finnhub_api_(self): # back if the write fails and a Slack alert is triggered. response = main.write_data(parsed_data) - self.assertEqual(value_count, 8, 'json data is the wrong shape') + self.assertEqual(validation_status, 0, + 'json data is missing fields and/or has errors') self.assertEqual(parsed_length, 4, "Parsed data is the wrong shape") self.assertEqual(response, None, "InfluxDB write unsuccessful") - # validate proper response if an invalid symbol is sent via the main.py - # price method - def test_finnhub_api_bad_symbol(self): - - data = main.get_prices('cheese') - - value = data['d'] # comes back as 'None' if the symbol is wrong - print(value) - - self.assertEqual(value, None, - 'Incorrect response to invalid stock symbol') - # Check the response of the API call if the wrong key is passed # expected response is a 200 code from a successful Slack alert being # sent. I.e. you already know the bad key won't work, so what you want to @@ -67,27 +54,23 @@ def test_finnhub_api_bad_key(self): self.assertEqual(data, 200, 'Bad API Key') - # strict type casting and checking is used to ensure that all the numbers - # are floats. Here we send bad data composed of integers and strings to - # see if it a) fails as expected b) triggers a Slack alert. - # We check via sending bad data to InfluxDB as opposed to checking types - # within the json as InfluxDB has strct type checking, so what matters is - # if Influx perceives the data type as wrong. E.g., you need to cast fields - # to floats, in case data comes back as "2" instead of 2.0 + # Validate the json schema by purposely sending a bad data payload + # to be compared against the data schema for this ETL. def test_db_write_bad_data(self): data = { "previous_close": 'Thursday', - "open": float(544.33), + "open": "Lateralus", "last_price": int(5), "change": float(0.33) } - response = main.write_data(data) + status, slack_response = main.validate_json_payload(data) - self.assertEqual(response, 200, 'DB type check failed,\ - wrong data type written to DB!') + self.assertEqual(status, 1, + "Data validation suceeded, it should've failed") + self.assertEqual(slack_response, 200, "Slack alert failed to send") if __name__ == '__main__':