Skip to content

Commit

Permalink
fixed url list generator; (another) major cleanup of code
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianOBlanton committed Nov 14, 2023
1 parent eb40e80 commit c536e28
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 155 deletions.
192 changes: 100 additions & 92 deletions adda/adda.py

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions adda/adda_visualization_plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def plot_model(adc_plot_grid=None, df_surface=None, df_stations=None, df_land_co
Results:
A plot in the USA East Coast region
"""
#coastline=np.loadtxt('/projects/sequence_analysis/vol1/prediction_work/ADCIRCSupportTools-v2/test_data/coarse_us_coast.dat')
coastline=np.loadtxt(os.path.join(os.path.dirname(__file__), "misc", "coarse_us_coast.dat"))
#N=16
#base_cmap='tab20c' # Or simply use None tab20c is also okay
Expand All @@ -55,37 +54,34 @@ def plot_model(adc_plot_grid=None, df_surface=None, df_stations=None, df_land_co
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(10,10), dpi=144) #, sharex=True)
# Set up surface
if df_surface is not None:
#print('plot_model: Found a extrapolated surface data set')
x = adc_plot_grid['LON']
y = adc_plot_grid['LAT']
v = df_surface['VAL'].values
v = v.reshape(-1,len(x)) # Reshapes to LAT column-major format
mesh = ax.pcolormesh(x, y, v, shading='nearest', cmap=cmap, vmin=vmin, vmax=vmax)
# Merge control points
if df_stations is not None:
#print('plot_model: Found a station data set')
stations_X=df_stations['LON'].values
stations_Y=df_stations['LAT'].values
stations_V=df_stations['VAL'].values
ax.scatter(stations_X, stations_Y, s=60, marker='o',
c=stations_V, cmap=cmap, edgecolor='black',
vmin=vmin, vmax=vmax)
if df_land_control is not None:
#print('plot_model: Found a land_control data set')
land_X=df_land_control['LON'].values
land_Y=df_land_control['LAT'].values
land_V=df_land_control['VAL'].values
ax.scatter(land_X, land_Y, s=30, marker='o',
c=land_V, cmap=cmap, edgecolor='black',
vmin=vmin, vmax=vmax)
if df_water_control is not None:
#print('plot_model: Found a water_control data set')
water_X=df_water_control['LON'].values
water_Y=df_water_control['LAT'].values
water_V=df_water_control['VAL'].values
ax.scatter(water_X, water_Y, s=30, marker='x',
c='black', edgecolor='black',
vmin=vmin, vmax=vmax)
ax.scatter(water_X, water_Y, s=30, marker='x',c='black')
#,edgecolor='black',
#vmin=vmin, vmax=vmax)

ax.plot(coastline[:,0],coastline[:,1],color='black',linewidth=.25)
ax.axis('equal')
ax.set_xlim(xlim)
Expand Down
20 changes: 9 additions & 11 deletions harvester/fetch_station_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ def interpolate_and_resample(self, dx, n_pad=0, sample_mins=15, int_limit=3)->pd
utilities.log.warn(f'Error Value: Failed interpolation {ex}: Probable empty station data')
raise
return df_normal_smooth
#sys.exit(1)

def old_interpolate_and_resample(self, dx, sample_mins=15)->pd.DataFrame:
"""
Expand Down Expand Up @@ -269,9 +268,9 @@ def aggregate_station_data(self, perform_interpolation=True)->pd.DataFrame:
try:
df_data = pd.concat(aggregateData, axis=1)
idx = df_data.index
utilities.log.info('Check for time duplicates')
utilities.log.debug('Check for time duplicates')
if idx.duplicated().any():
utilities.log.info("Duplicated data times found . will keep first value(s) only")
utilities.log.debug("Duplicated data times found . will keep first value(s) only")
df_data = df_data.loc[~df_data.index.duplicated(keep='first')]
if len(idx) != len(df_data.index):
utilities.log.warning(f'had duplicate times {len(idx)} {len(df_data.index)}')
Expand Down Expand Up @@ -503,7 +502,8 @@ def __init__(self, station_id_list, periods=None, product='water_level',
utilities.log.error('No valid fort file was found: Abort')
sys.exit(1)
self.available_stations_tuple=available_stations
utilities.log.info(f'List of ADCIRC stations {self.available_stations_tuple}')
#utilities.log.debug(f'List of ADCIRC stations {self.available_stations_tuple}')

periods = self._remove_empty_url_pointers(periods)
if not keep_earliest_url:
remperiod,periods=periods[0],periods[1:]
Expand All @@ -522,7 +522,7 @@ def _fetch_adcirc_nodes_from_fort63_input_file(self, station_df) -> list():
Returns: list of tuples (stationid,nodeid). Superfluous stationids are ignored
"""
utilities.log.debug('Attempt to find ADCIRC fort_63 stations/Nodes')
utilities.log.debug('Getting ADCIRC fort_63 station nodes')
try:
idx=list()
utilities.log.debug('Fetch stations fort63 style ...')
Expand All @@ -548,7 +548,7 @@ def _fetch_adcirc_nodes_from_fort61_input_file(self, stations, periods) -> list(
Returns: list of tuples (stationid,nodeid). Superfluous stationids are ignored
"""
utilities.log.info('Attempt to find ADCIRC stations')
utilities.log.debug('Attempt to find ADCIRC stations')
full_idx=list()
for url61 in periods:
utilities.log.info('Fetch stations: {} '.format(url61))
Expand Down Expand Up @@ -1014,7 +1014,6 @@ def __init__(self, station_id_list, periods, config, product='river_water_level'
utilities.log.error('Contrails No such product key. Input {}, Available {}'.format(product, self.products.keys()))
raise
##sys.exit(1)
print(self._product)
utilities.log.info(f'CONTRAILS Fetching product {self._product}')
self._systemkey=config['systemkey']
self._domain=config['domain']
Expand Down Expand Up @@ -1141,7 +1140,7 @@ def fetch_single_product(self, station, time_range) -> pd.DataFrame:
datalist=list()
periods = self.return_list_of_daily_timeranges(time_range)
for tstart,tend in periods:
utilities.log.debug('Iterate: start time is {}, end time is {}, station is {}'.format(tstart,tend,station))
utilities.log.debug('Start time is {}, end time is {}, station is {}'.format(tstart,tend,station))
indict = {'method': METHOD, 'class': self.CLASSDICT[self._product],
'system_key': self._systemkey ,'site_id': station,
'tz': GLOBAL_TIMEZONE,
Expand Down Expand Up @@ -1303,7 +1302,7 @@ def __init__(self, station_id_list, periods, product='water_level', interval=Non
self._data_unit=map_product_to_harvester_units(product)
try:
self._product=self.products[product] # self.products[product] # product
utilities.log.info(f'NOAA-WEB Fetching product {self._product}')
utilities.log.info(f'NOAA Fetching product {self._product}')
except KeyError as e:
utilities.log.error(f'NOAA/NOS-WEB No such product key. Input {product}, Available {self.products.keys()}: {e}')
raise
Expand Down Expand Up @@ -1406,7 +1405,7 @@ def fetch_single_product(self, station, time_range) -> pd.DataFrame:
datalist=list()
periods = self.return_list_of_daily_timeranges(time_range)
for tstart,tend in periods:
utilities.log.debug('Iterate: start time is {}, end time is {}, station is {}'.format(tstart,tend,station))
utilities.log.debug('Start time is {}, end time is {}, station is {}'.format(tstart,tend,station))
indict = {'product': self._product,
'station': station,
'datum':'MSL',
Expand Down Expand Up @@ -1539,7 +1538,6 @@ def fetch_single_metadata(self, station) -> pd.DataFrame:
# sys.exit(1)
# super().__init__(station_id_list, periods, resample_mins=resample_mins)
#
#TODO metric con versions
# def fetch_single_product(self, buoy, time_range) -> pd.DataFrame:
# """
# For a single NDBC site_id, process the tuple from the input period.
Expand Down
18 changes: 8 additions & 10 deletions harvester/get_adcirc_stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,11 @@ def extract_adcirc_grid_coords(urls_63)->pd.DataFrame:
Returns:
adc_coords: Dictionary with the keys ['LON','LAT']
"""
print(urls_63)
if not isinstance(urls_63, list):
url=urls_63
else:
url=first_available_netCDF4(urls_63) # Assumes all urls are for the same grid
utilities.log.info(f"Loading fort.63.log {url}")
utilities.log.info(f"Opening fort.63 {url}")
nc = nc4.Dataset(url)
gridx = nc.variables['x']
gridy = nc.variables['y']
Expand Down Expand Up @@ -196,7 +195,7 @@ def __init__(self, source='TDS',product='water_level',
self.station_list=fetch_adcirc_data.get_adcirc_stations_fort63_style(station_list_file)
else:
self.station_list=fetch_adcirc_data.get_adcirc_stations_fort61_style(station_list_file)
utilities.log.info(f'Fetched station list from {self.station_list}')
utilities.log.debug(f'Station list is {self.station_list}')

# Specify the desired products
self.product = product.lower()
Expand All @@ -212,8 +211,7 @@ def __init__(self, source='TDS',product='water_level',
Tmin=None # These will be populated with the min/max times as a str with format %Y%m%d%H%M
Tmax=None

utilities.log.info(f'SOURCE to fetch from {self.source}')
utilities.log.info(f'PRODUCT to fetch is {self.product}')
utilities.log.info(f'Retrieving {self.product} from {self.source}.')

def remove_knockout_stations(self, df_station) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -291,7 +289,7 @@ def fetch_station_product(self, urls, return_sample_min=0, fort63_style=False, v
self.sitename = fetch_adcirc_data.strip_sitename_from_url(urls, fill='NoSite')
self.stormnumber = fetch_adcirc_data.strip_storm_number_from_url(urls, fill='NoNumber')
self.timemark=fetch_adcirc_data.strip_time_from_url(urls)

try:
if self.source.upper()=='TDS':
adc_stations=self.station_list
Expand Down Expand Up @@ -339,15 +337,15 @@ def main(args):
stoptime = tnow.strftime('%Y-%m-%d %H:%M:%S')
else:
stoptime=args.timeout
print(f'Stoptime and ndays {stoptime}. {args.ndays}')
#print(f'Stoptime and ndays {stoptime}. {args.ndays}')

variable_name = args.variable_name
utilities.log.info(f'variable_name set to {variable_name}')

# Generate a list of URLs consistent with the desired criteria

print('Demonstrate URL generation')
print(args.instance_name)
#print('Demonstrate URL generation')
#print(args.instance_name)

# genurls can differentiate between a yaml/url-based approach and a url template given the status of args.url
genrpl = genurls.generate_urls_from_times(url=args.url, timeout=stoptime, ndays=args.ndays, grid_name=args.gridname, instance_name=args.instance_name, config_name=config_name)
Expand All @@ -359,7 +357,7 @@ def main(args):
urls = genrpl.build_url_list_from_template_url_and_offset( ensemble=args.ensemble_name)
gridname = fetch_adcirc_data.grab_gridname_from_url(urls)

print(f' URLs {urls}')
#print(f' URLs {urls}')

# Invoke the harvester related class
if args.fort63_style:
Expand Down
4 changes: 2 additions & 2 deletions harvester/get_observations_stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(self, source='NOAA',product='water_level',
if self.source=='NDBC_HISTORIC':
self.station_list = fetch_data.get_ndbc_buoys(station_list_file)

utilities.log.info(f'Fetched station list from {self.station_list}')
utilities.log.debug(f'Station list is {self.station_list}')

# Specify the desired products
self.product = product.lower()
Expand Down Expand Up @@ -209,7 +209,7 @@ def fetch_station_product(self, time_range, return_sample_min=0, interval=None):
"""
starttime = time_range[0]
endtime=time_range[1]
utilities.log.debug(f'Attempt a product fetch for the time range {starttime}-{endtime}')
utilities.log.info(f'Retrieving data for the time range {starttime}-{endtime}')
template = "An exception of type {0} occurred."

interval=interval
Expand Down
33 changes: 16 additions & 17 deletions processing/compute_error_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def combine_data_to_dict(in_adc,in_obs,in_err, product='WL')->dict:
obs.set_index(['TIME','SRC'], inplace=True)
err.set_index(['TIME','SRC'], inplace=True)
df_merged = pd.concat([adc,obs,err])
utilities.log.info('Combined data sets into multi-index form')
utilities.log.debug('Combined data sets into multi-index form')
data_dict = generate_merged_dict_data(df_merged,product='WL')
return data_dict

Expand All @@ -93,7 +93,7 @@ def generate_merged_dict_data(df_merged, product='WL')->dict:
Returns:
df_merged_dict: A Dict of the merged 3 data source DataFrame suitable for JSON storage
"""
utilities.log.info('Begin processing DICT data format')
utilities.log.debug('Begin processing DICT data format')
variables = ['ADC','OBS','ERR']
df = df_merged
df.reset_index(inplace=True) # Remove SRC from the multiindex
Expand All @@ -111,7 +111,7 @@ def generate_merged_dict_data(df_merged, product='WL')->dict:
dictdata[station].update({variable: {'TIME': cols, product:val}})
else:
dictdata[station]={variable: {'TIME': cols, product:val}}
utilities.log.info('Constructed DICT time series data')
utilities.log.debug('Constructed DICT time series data')
return dictdata

class compute_error_field(object):
Expand Down Expand Up @@ -164,9 +164,9 @@ def __init__(self, obs, adc, meta, n_hours_per_period=12, n_hours_per_tide=12.42
self.n_pad=n_pad

self.z_thresh=zthresh
utilities.log.info('Averaging: Hours per period {}'.format(n_hours_per_period))
utilities.log.info('Tidal: Hours per tidal period {}, num of padding hours {}'.format(self.n_hours_per_tide, self.n_pad))
utilities.log.info('Exclusions: If chosen use Z-thresh of {}'.format(self.z_thresh))
utilities.log.debug('Averaging: Hours per period {}'.format(n_hours_per_period))
utilities.log.debug('Tidal: Hours per tidal period {}, num of padding hours {}'.format(self.n_hours_per_tide, self.n_pad))
utilities.log.debug('Exclusions: If chosen use Z-thresh of {}'.format(self.z_thresh))

def _intersection_stations(self):
"""
Expand All @@ -175,7 +175,7 @@ def _intersection_stations(self):
common_stations = self.obs.columns & self.adc.columns
self.obs = self.obs[common_stations]
self.adc = self.adc[common_stations]
utilities.log.info('OBS and ADC DataFrames reduced (inplace) to common stations of len {}'.format(len(common_stations)))
utilities.log.debug('OBS and ADC DataFrames reduced (inplace) to common stations of len {}'.format(len(common_stations)))

def _intersection_times(self):
"""
Expand All @@ -187,7 +187,7 @@ def _intersection_times(self):
adc = self.adc.loc[common_times]
self.obs=obs
self.adc=adc
utilities.log.info('OBS and ADC DataFrames reduced (inplace) to common times of len {}'.format(len(common_times)))
utilities.log.debug('OBS and ADC DataFrames reduced (inplace) to common times of len {}'.format(len(common_times)))

#
# The tidal transform step used here is a bit tricky. We begin with the input hourly data (starttime,endtime) from the adc/obs sources.
Expand Down Expand Up @@ -230,14 +230,14 @@ def _tidal_transform_data(self):
num_rows_keep = num_periods * 12
self.adc = self.adc.tail(num_rows_keep)
self.obs = self.obs.tail(num_rows_keep)
print(len(self.adc))
utilities.log.info('Tidal transform: Num retained periods {}, num rows kept {}, adc data index {}'.format(num_periods, num_rows_keep, self.adc.index))
utilities.log.debug('Tidal transform: Num retained periods {}, num rows kept {}'.format(num_periods, num_rows_keep))
utilities.log.debug('Tidal transform: adc data index {}'.format(self.adc.index))
# Now only keep a complete set of 12.42 ur full-tidal periods whos last time <=timeout
ttimestart, ttimeend = diurnal_range[0],diurnal_range[-1]
utilities.log.info('inplace tidal_transform_data: n_pad {}, n_hours_per_period {}, n_hours_per_tide {}'.format(n_pad, n_hours_per_period, n_hours_per_tide))
utilities.log.info('inplace tidal_transform_data: timein {}, timeout {}, trans_time_start, trans_time_end {}'.format(timein, timeout, ttimestart, ttimeend))
utilities.log.debug('inplace tidal_transform_data: n_pad {}, n_hours_per_period {}, n_hours_per_tide {}'.format(n_pad, n_hours_per_period, n_hours_per_tide))
utilities.log.debug('inplace tidal_transform_data: timein {}, timeout {}, trans_time_start, trans_time_end {}'.format(timein, timeout, ttimestart, ttimeend))
else:
utilities.log.info('inplace tidal_transform_data: Not enough data to perform a tidal transform. Skip')
utilities.log.debug('inplace tidal_transform_data: Not enough data to perform a tidal transform. Skip')

# MIGHT need something special for Hurricanes ?
def _apply_time_bounds(self, time_range):
Expand All @@ -264,11 +264,10 @@ def _apply_time_bounds(self, time_range):
except ValueError:
utilities.log.error('_apply_time_bounds Input time range is wrong. Must be %Y-%m-%d %H:%M:%S or a hurricane advisory number. Got {}: Abort'.format(time_range))
sys.exit(1)
print(f'bounds {bound_lo} and {bound_hi}')
self.adc=self.adc.loc[ (self.adc.index >= bound_lo) & (self.adc.index <= bound_hi) ]
self.obs=self.obs.loc[ (self.obs.index >= bound_lo) & (self.obs.index <= bound_hi) ]
self._intersection_times() # Should not be needed here but just in case
utilities.log.info('New adc time lo {}, New adc time hi {}'.format( min(self.adc.index).strftime(dformat), max(self.adc.index.strftime(dformat))))
utilities.log.debug('New adc time lo {}, New adc time hi {}'.format( min(self.adc.index).strftime(dformat), max(self.adc.index.strftime(dformat))))

# Statistical stuff

Expand All @@ -287,7 +286,7 @@ def _remove_station_outliers(self):
z = np.abs(stats.zscore(self.df_final['mean'].dropna(axis=0)))> self.z_thresh
droplist = self.df_final.dropna(axis=0)[z].index.tolist()
self.df_final.drop(droplist,axis=0,inplace=True)
utilities.log.info('Requested check for station error outlier status. {} stations removed using a zthresh of {}.'.format(len(droplist),self.z_thresh))
utilities.log.debug('Requested check for station error outlier status. {} stations removed using a zthresh of {}.'.format(len(droplist),self.z_thresh))

def _compute_and_average_errors(self):
"""
Expand Down Expand Up @@ -320,7 +319,7 @@ def _compute_and_average_errors(self):
self.df_final.index.name='STATION'
# Reset diff format back
self.diff.set_index('TIME',inplace=True)
utilities.log.info('Completed Cycle averaging. Num of periods {}'.format(num_periods))
utilities.log.debug('Completed Cycle averaging. Num of periods {}'.format(num_periods))

##
## Simple test based on data previously stored into files
Expand Down
Loading

0 comments on commit c536e28

Please sign in to comment.