Skip to content

Commit

Permalink
Refactoring, updated unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkhamLee committed Mar 9, 2024
1 parent 5788078 commit 09a5600
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 52 deletions.
9 changes: 9 additions & 0 deletions etl_pipelines/finnhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ The "stock_prices_payload.json" file is used to validate the payload that comes

You can read more about the Finnhub Stock Price API [here](https://finnhub.io/docs/api/quote)

#### Data Quality & Testing
* The json validate library is used to ensure that the returned data payload is correct in terms of the right fields being present, and the data in those fields being the right data type. This is critical as InfluxDB has tight type checking and once a field has been written in a particular format e.g., a float, subsequent writes of a different data type will be rejected, e.g.,an integer.
* The test.py file will run a series of unit tests, checking both primary and secondary functionality:
* The end to end ETL workflow, if any part of the process fails, the entire test fails
* Validating that the appropriate error messages and alerts are sent in response to a bad or failed API call
* Testing the data validation process, i.e, comparing a provided "bad" data payload with the ETL's JSON schema, followed by checking that the appropriate error messages and alerts are generated.
* Alerts are sent via Slack related to any "pipeline failure issue", whether it's an API call failing, data validation failing, DB write issues, etc.


#### Deployment
To deploy/run/use the container you will need to sign-up for the Finnhub API and get an API key, you're able to hit the API 60/second with the free version, which should be sufficient for most personal use cases. Additionally, please make note that the container uses the finnuhb python library, so you'll need to install that into your python virtual environment if you plan to use this outside of a Docker container. Finally, if you want to receive broken pipeline/pipeline error alerts, you'll need to sign up for the Slack API, and then configure a channel and a wehbhook for receiving messages.

Expand Down
11 changes: 0 additions & 11 deletions etl_pipelines/finnhub/finnhub_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
# information from the API in the future

import os
import json
import finnhub
from jsonschema import validate


class FinnHubUtilities():
Expand All @@ -22,21 +20,12 @@ def __init__(self) -> None:
def get_stock_data(symbol: str,
FINNHUB_KEY=os.environ.get('FINNHUB_SECRET')) -> dict:

# FINNHUB_KEY = os.environ.get('FINNHUB_SECRET')

# import data schema for validation
with open('stock_prices_payload.json') as file:
SCHEMA = json.load(file)

# create client
client = finnhub.Client(FINNHUB_KEY)

# get data
data = client.quote(symbol)

# validate data
validate(instance=data, schema=SCHEMA)

return data

@staticmethod
Expand Down
24 changes: 20 additions & 4 deletions etl_pipelines/finnhub/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import os
import sys
import json
from finnhub_utilities import FinnHubUtilities

parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
Expand Down Expand Up @@ -39,6 +40,15 @@ def get_prices(symbol: str, *args):
return response


def validate_json_payload(data: dict) -> int:

with open('stock_prices_payload.json') as file:
SCHEMA = json.load(file)

# validate the data
return etl_utilities.validate_json(data, SCHEMA)


def parse_data(data: dict) -> dict:

return finn_util.parse_stock_data(data)
Expand Down Expand Up @@ -84,11 +94,17 @@ def main():

stock_data = get_prices(os.environ['STOCK_SYMBOL'])

# parse data into a json payload
stock_payload = parse_data(stock_data)
# validate the data
if validate_json_payload(stock_data) == 0:

# parse data into a json payload
stock_payload = parse_data(stock_data)

# write data to InfluxDB
write_data(stock_payload)

# write data to InfluxDB
write_data(stock_payload)
else:
sys.exit()


if __name__ == '__main__':
Expand Down
21 changes: 12 additions & 9 deletions etl_pipelines/finnhub/stock_prices_payload.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@

{
"c": 378.85,
"d": 2.7,
"dp": -1.006,
"h": 384.3,
"l": 377.44,
"o": 383.76,
"pc": 382.7,
"t": 170129160
"type": "object",
"properties": {
"c": {"type": "number"},
"d": {"type": "number"},
"dp": {"type": "number"},
"h": {"type": "number"},
"l": {"type": "number"},
"o": {"type": "number"},
"pc": {"type": "number"},
"t": {"type": "number"}
},
"required": ["pc", "o", "l", "dp"]
}
39 changes: 11 additions & 28 deletions etl_pipelines/finnhub/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ def test_finnhub_api_(self):

data = main.get_prices(self.STOCK_SYMBOL)

# get length of json object - the get prices method already
# validates the json format, so this a double check, still,
# if that step failed, then so would this one.
value_count = len(data)
# validate the returned payload
validation_status = main.validate_json_payload(data)

# now we check that data parsing works properly
parsed_data = main.parse_data(data)
Expand All @@ -41,22 +39,11 @@ def test_finnhub_api_(self):
# back if the write fails and a Slack alert is triggered.
response = main.write_data(parsed_data)

self.assertEqual(value_count, 8, 'json data is the wrong shape')
self.assertEqual(validation_status, 0,
'json data is missing fields and/or has errors')
self.assertEqual(parsed_length, 4, "Parsed data is the wrong shape")
self.assertEqual(response, None, "InfluxDB write unsuccessful")

# validate proper response if an invalid symbol is sent via the main.py
# price method
def test_finnhub_api_bad_symbol(self):

data = main.get_prices('cheese')

value = data['d'] # comes back as 'None' if the symbol is wrong
print(value)

self.assertEqual(value, None,
'Incorrect response to invalid stock symbol')

# Check the response of the API call if the wrong key is passed
# expected response is a 200 code from a successful Slack alert being
# sent. I.e. you already know the bad key won't work, so what you want to
Expand All @@ -67,27 +54,23 @@ def test_finnhub_api_bad_key(self):

self.assertEqual(data, 200, 'Bad API Key')

# strict type casting and checking is used to ensure that all the numbers
# are floats. Here we send bad data composed of integers and strings to
# see if it a) fails as expected b) triggers a Slack alert.
# We check via sending bad data to InfluxDB as opposed to checking types
# within the json as InfluxDB has strct type checking, so what matters is
# if Influx perceives the data type as wrong. E.g., you need to cast fields
# to floats, in case data comes back as "2" instead of 2.0
# Validate the json schema by purposely sending a bad data payload
# to be compared against the data schema for this ETL.
def test_db_write_bad_data(self):

data = {

"previous_close": 'Thursday',
"open": float(544.33),
"open": "Lateralus",
"last_price": int(5),
"change": float(0.33)
}

response = main.write_data(data)
status, slack_response = main.validate_json_payload(data)

self.assertEqual(response, 200, 'DB type check failed,\
wrong data type written to DB!')
self.assertEqual(status, 1,
"Data validation suceeded, it should've failed")
self.assertEqual(slack_response, 200, "Slack alert failed to send")


if __name__ == '__main__':
Expand Down

0 comments on commit 09a5600

Please sign in to comment.