-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcsv_loader.py
134 lines (117 loc) · 5.15 KB
/
csv_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import logging
import pandas as pd
import schema
import crud
class EPACSVLoader:
logger = logging.getLogger(__name__)
csv_file: str = "data/annual_conc_by_monitor_2023.csv"
cols = [
"State Name",
"County Name",
"City Name",
"Completeness Indicator",
"Site Num",
"Latitude",
"Longitude",
"Parameter Code",
"Datum",
"Parameter Name",
"Sample Duration",
"Pollutant Standard",
"Metric Used",
"Method Name",
"Year",
"Units of Measure",
"Observation Count",
"Observation Percent",
"Arithmetic Mean",
"Arithmetic Standard Dev",
"99th Percentile",
"90th Percentile"
]
def load_and_deserialize(self, session) -> None:
df = self._parse_csv(self.csv_file)
self._load_geos(session, df)
self._load_monitoring_sites(session, df)
self._load_measurements(session, df)
def _parse_csv(self, csv_file: str) -> pd.DataFrame:
try:
df = pd.read_csv(csv_file, header=0, usecols=self.cols)
print("Sample row:")
print(df.iloc[0])
# TODO find a more performant way to filter the data than loading it all into memory up front
df.drop(df[(df["Completeness Indicator"] != "Y")].index, inplace=True)
# Drop the Completeness column - no longer needed
df.drop(columns=["Completeness Indicator"], inplace=True)
# Convert blank or 'NaN' fields to None
df = df.astype(object)
df = df.where(pd.notnull(df), None)
return df
except:
self.logger.exception(f"Failed to parse {csv_file}")
raise
def _load_geos(self, session, df: pd.DataFrame) -> list[schema.GeoBase]:
# Disregard columns we don't care about for this operation
geos_df = df[["State Name", "County Name", "City Name"]]
# Grab unique geos
geos_df = geos_df.drop_duplicates()
geos: set[schema.GeoBase] = set()
try:
for index, row in df.iterrows():
geos.add(schema.GeoBase(
state_name=row["State Name"],
county=row["County Name"],
city=row["City Name"],
))
return crud.create_geos(session, geos)
except:
self.logger.exception("Failed to load geos from dataframe")
raise
def _load_monitoring_sites(self, session, df: pd.DataFrame) -> list[schema.MonitoringSiteBase]:
# Disregard columns we don't care about for this operation
sites_df = df[["State Name", "County Name", "City Name", "Site Num", "Latitude", "Longitude"]]
# Grab unique geos
sites_df = sites_df.drop_duplicates()
try:
monitoring_sites: set[schema.MonitoringSiteBase] = set()
for index, row in df.iterrows():
geo = crud.get_geo_by_fields(session, state_name=row["State Name"], county=row["County Name"], city=row["City Name"])
if geo:
monitoring_sites.add(schema.MonitoringSiteBase(
geo_id=geo.id,
site_num=row["Site Num"],
latitude=row["Latitude"],
longitude=row["Longitude"],
))
return crud.create_monitoring_sites(session, monitoring_sites)
except:
self.logger.exception("Failed to load monitoring sites from dataframe")
raise
def _load_measurements(self, session, df: pd.DataFrame) -> list[schema.MeasurementBase]:
try:
measurements: list[schema.MeasurementBase] = []
for index, row in df.iterrows():
site = crud.get_monitoring_site_by_fields(session, site_num=row["Site Num"], latitude=row["Latitude"], longitude=row["Longitude"])
if site:
measurements.append(schema.MeasurementBase(
monitoring_site_id=site.id,
parameter_code=row["Parameter Code"],
datum=row["Datum"],
parameter_name=row["Parameter Name"],
sample_duration=row["Sample Duration"],
pollutant_standard=row["Pollutant Standard"],
metric_used=row["Metric Used"],
method_name=row["Method Name"],
year=row["Year"],
units_of_measure=row["Units of Measure"],
observation_count=row["Observation Count"],
observation_percent=row["Observation Percent"],
arithmetic_mean=row["Arithmetic Mean"],
arithmetic_standard_dev=row["Arithmetic Standard Dev"],
percentile_99=row["99th Percentile"],
percentile_90=row["90th Percentile"],
))
return crud.create_measurements(session, measurements)
except:
self.logger.exception("Failed to load measurements from dataframe")
raise