-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_extraction.py
158 lines (123 loc) · 5.16 KB
/
data_extraction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import requests
from sqlalchemy import inspect
from sqlalchemy import text
import pandas as pd
import tabula
import boto3
import requests
class DataExtractor:
"""_summary_
A class used to extract data from various sources including RDS databases, PDFs, APIs, and S3 buckets.
Methods
-------
read_rds_table(table_name, engine)
Reads a table from an RDS database into a DataFrame.
retrieve_pdf_data(link)
Extracts data from a PDF file and returns it as a DataFrame.
retrieve_json(url, header)
retrieves data from a JSON API and returns it as a DataFrame.
retrieve_number_of_stores(url, headers)
gets the number of stores from an API.
retrieve_store_data(store_number, header)
Retrieves detailed store data from an API.
extract_from_s3(aws_bucket, s3_key, local_path)
Downloads CSV from an S3 bucket returning a DataFrame.
"""
def read_rds_table(self, table_name, engine):
"""_summary_
Extracts data from a PDF file and returns it as a DataFrame
Parameters
----------
table_name : str
The name of the table to read
engine : Engine
SQLAlchemy engine instance
Returns
-------
DataFrame: The table data as a pandas DataFrame.
"""
db_df = pd.read_sql_table(table_name, engine)
return db_df
def retrieve_pdf_df(self, link):
"""
Gets the number of stores from an API
Parameters
----------
link :(str): URL or path to the PDF file
Returns
-------
DataFrame: The PDF data as a pandas DataFrame
"""
pdf_df = tabula.read_pdf(link, pages='all',stream= False)
pandas_pdf = pd.concat(pdf_df, ignore_index=True)
return pandas_pdf
def retrieve_json(self, url, header):
"""
Gets the number of stores from an API
Parameters
----------
url : (str): The API endpoint URL.
header :(dict): The headers to include in the API request.
Returns
-------
int: The number of stores.
"""
try:
response = requests.get(url, headers=header)
# This will raise an HTTPError for bad responses (4xx and 5xx)
response.raise_for_status()
# Process the response if no exceptions were raised
data = response.json() # Assuming the response is in JSON format
except requests.exceptions.HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except requests.exceptions.ConnectionError as conn_err:
print(f"Connection error occurred: {conn_err}")
except requests.exceptions.Timeout as timeout_err:
print(f"Timeout error occurred: {timeout_err}")
except requests.exceptions.RequestException as req_err:
print(f"An error occurred: {req_err}")
except ValueError as json_err:
print(f"JSON decoding failed: {json_err}")
return data
def retrieve_stores_data(self, store_number, header):
"""
Retrieves detailed store data from an API.
Parameters
----------
store_number : (int) The number of stores to retrieve data for.
header : (dict)The headers to include in the API request.
Returns
-------
pandas DataFrame The detailed store data as a pandas DataFrame.
"""
store_data = []
for store_number_index in range(0, store_number):
try:
response = requests.get(
f'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/{store_number_index}', headers=header)
# This will raise an HTTPError for bad responses (4xx and 5xx)
response.raise_for_status()
# Process the response if no exceptions were raised
data = response.json() # Assuming the response is in JSON format
store_data.append(data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except requests.exceptions.ConnectionError as conn_err:
print(f"Connection error occurred: {conn_err}")
except requests.exceptions.Timeout as timeout_err:
print(f"Timeout error occurred: {timeout_err}")
except requests.exceptions.RequestException as req_err:
print(f"An error occurred: {req_err}")
except ValueError as json_err:
print(f"JSON decoding failed: {json_err}")
return pd.DataFrame(store_data)
def extract_from_s3(self, aws_bucket, s3_key, local_path):
s3 = boto3.client('s3')
response = s3.get_object(Bucket=aws_bucket, Key=s3_key)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status == 200:
print(f"Successful S3 get_object response. Status - {status}")
products_df = pd.read_csv(response.get("Body"))
return products_df
else:
print(f"Unsuccessful S3 get_object response. Status - {status}")