Skip to content

Commit

Permalink
Refactoring/streamlining ETL scripts, adding unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkhamLee committed Mar 8, 2024
1 parent 3b20714 commit e426f53
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 25 deletions.
35 changes: 18 additions & 17 deletions etl_pipelines/openweather_current/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import os
import sys
import json
from jsonschema import validate

parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
Expand Down Expand Up @@ -40,24 +39,17 @@ def get_weather_data():
# create URL
url = utilities.build_url_weather(WEATHER_KEY, ENDPOINT)

# get data
data = utilities.get_weather_data(url)
return utilities.get_weather_data(url)


def validate_data(data: dict) -> dict:

# load validation data
with open('current_weather.json') as file:
SCHEMA = json.load(file)

# validate the data
try:
validate(instance=data, schema=SCHEMA)

except Exception as e:
message = (f'Data validation failed for the pipeline for openweather current, with error: {e}') # noqa: E501
logger.debug(message)
response = etl_utilities.send_slack_webhook(WEBHOOK_URL, message)
logger.debug(f'Slack pipeline failure alert sent with code: {response}') # noqa: E501

return data
return etl_utilities.validate_json(data, SCHEMA)


def parse_data(data: dict) -> dict:
Expand Down Expand Up @@ -90,23 +82,32 @@ def write_data(data: dict):
try:
influx.write_influx_data(client, payload, data, BUCKET)
logger.info('Weather data written to InfluxDB')
return 0

except Exception as e:
message = (f'InfluxDB write for openweather current failed with error: {e}') # noqa: E501
logger.debug(message)
response = etl_utilities.send_slack_webhook(WEBHOOK_URL, message)
logger.debug(f'Slack pipeline failure alert sent with code: {response}') # noqa: E501
logger.debug(f'Pipeline failure alert sent via Slack with code: {response}') # noqa: E501
return 1, response


def main():

# get current weather
data = get_weather_data()

# parse air quality data
parsed_data = utilities.weather_parser(data)
# validate data
if validate_data(data) == 0:

# parse air quality data
parsed_data = utilities.weather_parser(data)

# write data to InfluxDB
write_data(parsed_data)

write_data(parsed_data)
else:
sys.exit()


if __name__ == "__main__":
Expand Down
113 changes: 113 additions & 0 deletions etl_pipelines/openweather_current/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# (C) Markham Lee 2023 - 2024
# https://github.com/MarkhamLee/productivity-music-stocks-weather-IoT-dashboard
# Test script for the Finnhub Stock Price ETL
# Before running this make sure all of your environmental variables,
# connection strings, etc., are setup properly, chances are, those
# will be the source of most of your errors, as opposed to the code itself.
import os
import sys
import json
import unittest
import main
import tracemalloc
tracemalloc.start()

parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)

from etl_library.logging_util import logger # noqa: E402
from openweather_library.weather_utilities import WeatherUtilities # noqa: 402
from etl_library.general_utilities import EtlUtilities # noqa: E402


class OpenWeatherCurrentTesting(unittest.TestCase):

@classmethod
def setUpClass(self):

self.utilities = WeatherUtilities()
self.etl_utilities = EtlUtilities()

self.logger = logger
self.logger.info('Testing started...')

# End to end test, we validate that the API call works and we're
# able to write successfully to InfluxDB.
def test_openweather_api(self):

self.logger.info('Performing end to end tests')

data = main.get_weather_data()

# now we check that data parsing works properly
parsed_data = main.parse_data(data)
parsed_length = len(parsed_data)

# Finally, we write the data to the DB
status = main.write_data(parsed_data)

self.assertIsNotNone(data, 'API call failed')
self.assertEqual(parsed_length, 10, "Parsed data is the wrong shape")
self.assertEqual(status, 0, "InfluxDB write unsuccessful")

# test sending a bad data payload to InfluxDB that "should" fail type
# checking. Also testing the triggering of a pipeline failure alert
# sent via Slack.
def test_bad_data_write(self):

data = {
"weather": 5,
"temp": "Gojo",
"feels like": "dancing",
"low": 6.61
}

# Finally, we write the data to the DB
code, response = main.write_data(data)

self.assertEqual(code, 1,
"Data write was successful, should've failed")
self.assertEqual(response, 200, "Slack alert was sent unsuccessfully")

# Test using a bad key for the API request, which will also catch any API
# error/code other than 200. Expected behavior is that the API request
# fails, errors are caught and a Slack alert is sent.
def test_bad_api_keys(self):

# build URL
BAD_KEY = 'kasdkasdfasa'
ENDPOINT = 'weather?'
url = self.utilities.build_url_weather(BAD_KEY, ENDPOINT)

code, response = self.utilities.get_weather_data(url)

self.assertEqual(code, 1,
"API call was successful, it should've failed")
self.assertEqual(response, 200, "Slack alert was sent unsuccessfully")

# Testing data validation, this should throw an error if an improper
# data json payload is provided.
def test_data_validation(self):

# define "bad" data payload
bad_data = {
"weather": 5,
"temp": "Gojo",
"feels like": "dancing",
"low": 6.61
}

# load data schema
with open('current_weather.json') as file:
SCHEMA = json.load(file)

# test data validation
code, response = self.etl_utilities.validate_json(bad_data, SCHEMA)

self.assertEqual(code, 1,
"Data validation was unsuccessful")
self.assertEqual(response, 200, "Slack alert was sent unsuccessfully")


if __name__ == '__main__':
unittest.main()
20 changes: 12 additions & 8 deletions etl_pipelines/openweather_library/weather_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,20 @@ def weather_parser(response: dict) -> dict:

def get_weather_data(self, url: str) -> dict:

# get weather data
response = requests.get(url)

try:
# get weather data
response = requests.get(url)
response = response.json()
logger.info('weather data retrieved')
return response
response.raise_for_status()

except Exception as e:
except requests.exceptions.HTTPError as e:
WEBHOOK_URL = os.environ['ALERT_WEBHOOK']
message = (f'weather data retrieval failed with error: {e}')
logger.debug(message)
self.etl_utilities.send_slack_webhook(WEBHOOK_URL, message)
return e
response = self.etl_utilities.send_slack_webhook(WEBHOOK_URL,
message)
return 1, response

response = response.json()
logger.info('weather data retrieved')
return response

0 comments on commit e426f53

Please sign in to comment.