diff --git a/.pylintrc b/.pylintrc index 655cbbd..52e941d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -9,12 +9,14 @@ [MASTER] init-hook='import sys; sys.path.append("./")' max-line-length=150 -disable=broad-except +disable=broad-except,broad-exception-raised max-args=16 max-locals=25 min-public-methods=0 +max-public-methods=27 fail-under=9.5 extension-pkg-allow-list=pydantic max-branches=25 max-statements=62 -max-positional-arguments=20 \ No newline at end of file +max-positional-arguments=20 +max-attributes=12 diff --git a/src/common/generate_urls_from_times.py b/src/common/generate_urls_from_times.py deleted file mode 100644 index 14ebd6a..0000000 --- a/src/common/generate_urls_from_times.py +++ /dev/null @@ -1,636 +0,0 @@ -#!/usr/bin/env python - -# SPDX-FileCopyrightText: 2022 Renaissance Computing Institute. All rights reserved. -# SPDX-FileCopyrightText: 2023 Renaissance Computing Institute. All rights reserved. -# -# SPDX-License-Identifier: GPL-3.0-or-later -# SPDX-License-Identifier: LicenseRef-RENCI -# SPDX-License-Identifier: MIT - -# -# This helper set of functions takes times,offsets,urls as possible inputs and returns a list of URLs that may be passed -# to the fetch_adcirc_data methods. If the Caller wants to process a LIST of Urls, just call this repeatedly -# then aggregate and check duplicates -# - -""" - Time series extraction - - Authors: Jeffrey L. Tilson -""" - -import os,sys -import numpy as np -import pandas as pd -import datetime as dt - -import src.common.utilities as utilities -from argparse import ArgumentParser - -# pylint: skip-file - -# create a logger -logger = utilities.logger - - -def is_hurricane(test_val)->bool: - """ - Determine of the input test val is a Date, an Int or something else - Parameters: - test_val: For a valid time enter a str with dformat %Y-%m-%d %H:%M:%S or %Y%m%d%H - For a valid hurricane enter an int - """ - is_hurricane=False - try: - test = dt.datetime.strptime(test_val,'%Y-%m-%d %H:%M:%S') # If fails then not a datetime - except (ValueError,TypeError): - try: - test = dt.datetime.strptime(test_val,'%Y%m%d%H') - except: - try: - outid = int(test_val) - is_hurricane=True - except ValueError: - logger.exception('test indicates not a hurricane nor a casting. Perhaps a format issue ?. Got %s: Abort', test_val) - raise - #sys.exit(1) - return is_hurricane - -def generate_six_hour_time_steps_from_range(time_range)->list: - """ - Given the input time tuple, return the inclusive set of times that occur on - the daily 6 hour mark. So on output we would have 00Z,06Z,12Z,18Z times only - - Parameters: - time_range: Tuple (datetime,datetime) of the start and end times (datetime objects) - - Returns: - list_of_times: list of (str) times in the format: %Y%m%d%H - - """ - if is_hurricane(time_range[0]): - logger.debug('Determined input time_range URL is a Hurricane') - list_of_times = generate_six_hour_time_advisories_from_range(time_range) - else: - list_of_times = generate_six_hour_time_castings_from_range(time_range) - return list_of_times - -def generate_six_hour_time_castings_from_range(time_range)->list: - """ - A non-hurricane - Advisory. We need to distinguish between the two. Note, we can be promiscuous here - with the URLs, since urls that do not exist will get trapped by Harvester - Parameters: - time_range: tuple (datetime,datetime) - Returns: - list_of_times: list of times/advisories in the a string format to build new urls - """ - keep_hours=[0,6,12,18] - starttime = dt.datetime.strptime(time_range[0],'%Y-%m-%d %H:%M:%S') - stoptime = dt.datetime.strptime(time_range[1],'%Y-%m-%d %H:%M:%S') - pdtime=pd.date_range(start=starttime, end=stoptime,freq='h') # Doesnt land on 00,06,12,18 - list_of_times=list() - for time in pdtime: - if time.hour in keep_hours: - list_of_times.append(time.strftime('%Y%m%d%H')) - # Keep input entry as well ? - list_of_times.append(stoptime.strftime('%Y%m%d%H')) - list_of_times.sort() - return list_of_times - -def generate_six_hour_time_advisories_from_range(advisory_range)->list: - """ - Advisory range has no specific time meaning other than generally being every 6 hours - So simply accept the range as fact. The INPUT advisory number is NOT retained in the - generated list - - Save Advisories in a leading zero format: "{:02d}".format(adv) - - Parameters: - advisory_range: tuple (int,int) - Returns: - list_of_advisories: list of times/advisories in the a string format to build new urls - includes the input time_step.advisory in the final list - """ - # How many 6 hour periods can we identify? We need to choose a startpoint. Use the highest time and look back - startadv=int(advisory_range[0]) - stopadv=int(advisory_range[1]) - if startadv > stopadv: - startadv,stopadv = stopadv, startadv - list_of_advisories=list() - for inc in range(startadv, stopadv): - list_of_advisories.append("{:02d}".format(inc)) - list_of_advisories=[i for i in list_of_advisories if int(i) > 0] - # Should we retain the input value ? - list_of_advisories.append("{:02d}".format(stopadv)) - # A last ditch sort to to be sure - list_of_advisories.sort() - return list_of_advisories - -# Generates a proper list-time/advisories depending if its a Hurricane or not -def generate_six_hour_time_steps_from_offset(time_value, offset)->list: - """ - For an arbitrary URL, we could have a conventional now/forecast OR a Hurricane - Advisory. We need to distinguish between the two. Note, we can be promiscuous here - with the URLs, since urls that do not exist will get trapped by Harvester - Parameters: - time_val: (datetime) Either the time or advisory value from a asgs url - offset: (int) Number of DAYS to look back/forward from strvalue - if strvalue is an Advisory then we look back a number of STEPS - corresponding to 6 hour intervals based on offset - Returns: - timelist: list of times/advisories in the a string format to build new urls - """ - if is_hurricane(time_value): - logger.debug('Determined input URL is a Hurricane') - list_of_times = generate_six_hour_time_advisories_from_offset(time_value,offset) - else: - list_of_times = generate_six_hour_time_castings_from_offset(time_value,offset) - return list_of_times - -def generate_six_hour_time_castings_from_offset(time_value,offset)->list: - """ - Start with the strtime and build a list of 6hour steps for up to offset days - We expect the input time to a stoptime and the offsets to be < 0. But, though - the overall code has not been tested for it, we simply reorder the times - as necc and proceed - - Parameters: - time_value: (datetime) start time - offset: (int) Number of DAYS to look back/forward from strvalue - - Returns: - timelist: list of times in a string format to build new urls - """ - keep_hours=[0,6,12,18] - stoptime = dt.datetime.strptime(time_value,'%Y-%m-%d %H:%M:%S') - starttime = stoptime + dt.timedelta(days=offset) - - if starttime > stoptime: - logger.warning('Stoptime < starttime. Supplied offset was %s days: Reordering', offset) - starttime,stoptime = stoptime,starttime - - return generate_six_hour_time_steps_from_range( (starttime.strftime('%Y-%m-%d %H:%M:%S'), stoptime.strftime('%Y-%m-%d %H:%M:%S')) ) - -def generate_six_hour_time_advisories_from_offset(strtime,offset)->list: - """ - Start with the strtime and build a list of 6hour steps for up to offset days - We expect the input time to bve an Advisory number (int) We also anticipate offsets to be < 0. - since offset >0 would apply to Hurricane advisories not performed (but you could do it) - - Here we assume each index is a 6 hour time step. So we just need to decide how many to look back for. - Harvester will quietly ignore urls that do not exist - - Save Advisories in a leading zero format: "{:02d}".format(adv) - - Parameters: - strvale: (str) time - offset: (int) Number of DAYS to look back/forward from strvalue - - Returns: - timelist: list of advisories in the a string format to build new urls - """ - list_of_advisories=list() - stop_advisory = int(strtime) - num_6hour_look_asides = int(24*offset/6) - range_values= [0,num_6hour_look_asides] - range_values.sort() # sorts ascending order - for inc in range(*range_values): - list_of_advisories.append("{:02d}".format(stop_advisory+inc)) - list_of_advisories=[i for i in list_of_advisories if int(i) >= 0] - # Keep the input value ? - list_of_advisories.append("{:02d}".format(stop_advisory)) - # A last ditch sort to to be sure - list_of_advisories.sort() - return list_of_advisories - - -def grab_years_from_time_list(list_of_times)->list: - """ - Process the input time list to extract a list of Years (str) - Note: This could be a list of Advisories as well. If so, - simply return the advisory number, though it will probably not be used - - Parameters: - list_of_times: List of (str) time in the format %Y%m%d%H - Returns: - list of years values - - """ - list_of_years=list() - for time in list_of_times: - try: - value=dt.datetime.strptime(time,'%Y%m%d%H').year - except TypeError: - value=time - list_of_years.append(value) - return list_of_years - - -def generate_list_of_instances(list_of_times, in_gridname, in_instance): - """ - This function matches every entry in the list_of_times with an associated instance. - The structure of this code is such that, in the future, we may have scenarios where - the value of the instance may change for a given year. - - Currently, though, we will simply build a list of identical instances. - The value of the selected instance may be passed in by the caller - - Parameters: - list_of_times: list (str)(%Y%m%d%H) ordered set of instances from which to build new URLs - in_gridname: current gridname from a representative INPUT url - in_gridname: current instance from a representative INPUT url - - Returns: - instance_list: ordered list of instances to use for building a set of new urls. - """ - num_entries = len(list_of_times) - gridname = in_gridname # Get default values - instance = in_instance - - instance_list = num_entries*[instance] - return instance_list - - -# Expect this to be part of a looped list of times from which appending will be applied -def construct_url_from_yaml( config, intime, instance, ensemble, gridname, hurricane_yaml_year=None, hurricane_yaml_source=None ): - """ - Given a single time (%Y%m%d%H) or advisory, the gridname, instance, and ensemble values - use the entries in config to build a proper URL - If applying to Hurricanes, we need to also applyld_url_list_from_yaml_and_timest the values for hurricane_yaml_year, and - hurricane_yaml_source - """ - # hurricane_yaml_source is a special case scenario - if is_hurricane(intime): - logger.debug('Request for YAML build of Hurricane URL. subdir is %s', hurricane_yaml_source) - intime = str(intime) - subdir = hurricane_yaml_year # This is certainly NOT generalized - source=hurricane_yaml_source - else: - subdir = dt.datetime.strptime(intime,'%Y%m%d%H').year - source='nam' - cfg = config['ADCIRC'] - url = cfg["baseurl"] + \ - cfg["dodsCpart"] % (subdir, - source, intime, - cfg["AdcircGrid"] % (gridname), - cfg["Machine"], - cfg["Instance"] % (instance), - cfg["Ensemble"] % (ensemble), - cfg["fortNumber"] - ) - return url - - -def construct_starttime_from_offset(stoptime,ndays): - """ - Construct an appropriate starttime given the stoptime and offset. - NOTE if this is a Hurricane advisory, we return an appropriate - advisory assuming each advisory is 6 hoursa in duration. No - negative advisories are returned - - Parameters: - stoptime (str) (%Y-%m-%d %H:%M:%S) - ndays: (int) number of 24 hours days to look back/forward - - """ - if is_hurricane(stoptime): - num_6hour_look_asides = int(24*ndays/6) - stopadv=int(stoptime) - startadv=stopadv+num_6hour_look_asides # We normally assume offset is negative but that is not enforced - return startadv - else: - tstop = dt.datetime.strptime(stoptime,'%Y-%m-%d %H:%M:%S') - tstart = tstop + dt.timedelta(days=ndays) - starttime = tstart.strftime('%Y-%m-%d %H:%M:%S') - return starttime - - raise 'Fell out the bottom of construct_starttime_from_offset: Abort' - ##sys.exit(1) - -class generate_urls_from_times(object): - """ - Class that attempts to create a list of valid TDS Urls based on the input time conditions and possibly a YAML file - that contains the URL structure - This is NOT expected to be highly generalized and is intended for the ADDA/AST pipelines - - We hardwire the concept that hurricane data files timestep every 6 hours. It is okay to have one or two - "wrong" urls in the list as Harvester should be pretty resilient. - - If the caller elects to define output URLs based on times/ndays, then a YAML decribing the desired structure is required. - If the caller elects to also supply a URL, then the output URL structure will be gleened from it. - regardless of instance - - Lastly, the final product data series may extend before or after the the stop/start times. As an example, - If grabbing a nowcast, the data series may begine 6 hours before the indicated url time. - - Pass in a URL and the instance, gridname, are scraped from it - If YAML you must specify these terms. - - Possible scenarios: (options such as instance/ensemble can be applied to all) - 1) Input timein/timeout and the config_name YML (nominal name is url_framework.yml). This will - generate a set of URLs between the two time ranges. This can work for a Hurricane BUT - timein/timeout must be ADVISORY values - 2) Input timeout and offset and the config_name YML (nominal name is url_framework.yml). This will - generate a set of URLs between the two time ranges. This can work for a Hurricane BUT - timeout must be ADVISORY values - 3) Input URL and offset only. This will scrape the time/advisory from the URL and offset it in 6 hour steps - generate a set of URLs between the two time/advisory ranges. This can work for a Hurricanes - - Parameters: - url: (str) A single URL from which more URLs may be built - ndays: (int) Number of look back/ahead days from the stoptime value - starttime: (str) Selected time to begin the building of the list (YYYY-mm-dd HH:MM:SS) - stoptime: (str) Selected time to end the building of the list (YYYY-mm-dd HH:MM:SS) - config_name: (str) path/filename to yaml file that contains the INSTANCE mappings - hurricane_yaml_source=None: (str) This is a special case. If you want to build Hurricane URLs from a YAML, - then you will need to specify the subdir name directly, eg 'al09'. This will replace the default value of nam. - hurricane_yaml_year: (str) is part of the Hurricane special case. No way to dig out the year directory name without the user specifying it - only needed for YAML based hurricane construction. Eg .../thredds/dodsC/2021/al09/11/ec95d/... - """ - - def __init__(self, url=None, timein=None, timeout=None, ndays=None, grid_name = None, instance_name=None, config_name=None, hurricane_yaml_year=None, hurricane_yaml_source=None): - # The Hurricane special terms are only usedY if you are requesting to build from a YAML AND the caller wants Hurricane data - # If a URL passed in, then gridname and instance can be gotten from it. ensembles values are expected to be changed by the user - - self.config_name = config_name - if url is not None: - words=url.split('/') - self.ensemble=words[-2] - self.instance_name=words[-3] - self.grid_name=words[-5] - try: - stoptime=dt.datetime.strptime(words[-6],'%Y%m%d%H').strftime('%Y-%m-%d %H:%M:%S') # Can be overridden by args.stoptime - except ValueError: # Must be a hurricane - stoptime=words[-6] - self.url=url - # If No url, then build URLs from a YAML. This requires the caller to specify gridname, instance, and ensemble - else: - self.instance_name = instance_name # This is for potentially mapping new instances to urls - self.grid_name = grid_name - if self.instance_name is None: - logger.error('Must specify an instance value if building URLs based on a YAML. None specified: Abort') - raise - ##sys.exit(1) - if self.grid_name is None: - logger.error('Must specify a grid_name if building URLs based on a YAML. None specified: Abort') - raise - ##sys.exit(1) - self.hurricane_yaml_source=hurricane_yaml_source - self.hurricane_yaml_year=hurricane_yaml_year # Cannot span multiple years using Hurricane-YAML construction - - # timeout MUST be supplied somehow - if timeout is None and stoptime is None: - logger.error('timeout is not set and no URL provided: Abort') - raise - ##sys.exit(1) - if timeout is not None: - stoptime=timeout - - # Find timein - if timein is None: - if ndays is None: - logger.error('No timein or ndays specified.') - raise - ##sys.exit(1) - else: - starttime = construct_starttime_from_offset(stoptime,ndays) # Will return an advisory value if appropriate - else: - starttime = timein - self.starttime=starttime - self.stoptime=stoptime - self.ndays=ndays - logger.debug('Current time (or advisory) range is %s to %s. Specified ndays is %s', self.starttime, self.stoptime, self.ndays) - if url is not None: - logger.debug('Current estimated ensemble: %s, instance: %s and gridname: %s', self.ensemble, self.instance_name, self.grid_name) - - def build_url_list_from_template_url_and_times(self, ensemble='nowcast')-> list: - """ - We seek to build a set of compatible URLs spanning the input time range based on the - structure of the input URL. We expect the caller to provide a proper ensemble value - for the new URLs. - We expect no changes in the grid name. Only change in the ensemble and times are expected - - Parameters: - ensemble: (str) Caller specified ensemble. This way one could input a namforecast url but request nowcasts, eg. - - Returns: - urls: list(str). List of valid URLs for processing - """ - url = self.url - time_range=(self.starttime,self.stoptime) # This could also be an advisory range - list_of_times = generate_six_hour_time_steps_from_range(time_range) - list_of_instances = generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) - urls = list() - for time,instance in zip(list_of_times,list_of_instances): - words=url.split('/') - words[-2]=ensemble - words[-3]=self.instance_name - words[-6]=str(time) # Need to ensure because we could have an advisory come in - newurl='/'.join(words) - if newurl not in urls: - urls.append(newurl) - logger.debug('Constructed %s urls of ensemble %s', urls, ensemble) - return urls - - def build_url_list_from_template_url_and_offset(self, ensemble='nowcast')->list: - """ - We seek to build a set of compatible URLs starting from the URL embedded time - and walking back/forward offset days while using the provided ensemble value. - Eg, you might send in a forecast and want back a list of nowcasts for the same grid - structure of the input URL. We expect the caller to provide a proper ensemble value - for the new URLs. - We expect no changes in the grid name. Only change in the ensemble and times are expected - - Parameters: - ensemble: (str)( def of "nowcast") The desired ensemble word for the resultant urls - - Returns: - urls: list(str). List of valid URLs for processing - """ - url = self.url - time_value=self.stoptime # Could also be an advisory - offset = self.ndays - if offset > 0: - logger.warning('Offset >0 specified: Behavior is not tested') - #timein = url.split('/')[-6] # Maybe need to check for a Hurricane Advisory also - list_of_times = generate_six_hour_time_steps_from_offset(time_value,offset) - list_of_instances = generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) - urls = list() - for time,instance in zip(list_of_times,list_of_instances): - words=url.split('/') - words[-2]=ensemble - words[-3]=self.instance_name - words[-6]=str(time) # Need this in case its an advisory value - newurl='/'.join(words) - if newurl not in urls: - urls.append(newurl) - logger.debug('Constructed %s urls of ensemble %s', urls, ensemble) - return urls - -# Approach Used by ADDA - def build_url_list_from_yaml_and_times(self, ensemble='nowcast')->list: - """ - We seek to build a set of compatible URLs spanning the input time range based on the - structure of asgs urls in the config_name. The structure of the output URLs will be based on the - entries in the associated yaml file. Since, no url will be provided, we must ask the caller to provide - the gridname, ensemble, and instance. We expect the caller to provide a proper Instance value - for the new URLs. - We REQUIRE the grid name. Only change in the ensemble and times are expected - - Uses the following class variables: - time_range: (tuple) (datetime,datetime). Time range inclusive (could also be hurricane advisories) - instance: (str) if set the used for all urls. If not, attempt to find it in the yaml - gridname: (str) name for the grid - - Parameters: - ensemble: (str) ensemble name (dafaults to nowcast) - - Returns: - urls: list(str). List of valid URLs for processing - """ - if self.config_name is None: - logger.error('self.config_name is None. Cannot use the YAML generators: Abort') - raise - ##sys.exit(1) - try: - config = utilities.load_config(self.config_name) - except FileNotFoundError: # OSError: - logger.exception('No URL structural config yml file found: %s: Abort', self.config_name) - raise - ##sys.exit(1) - - time_range=(self.starttime,self.stoptime) # Could also be a range of advisories - list_of_times = generate_six_hour_time_steps_from_range(time_range) - list_of_instances = generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) - urls = list() - logger.debug('list_of_times: %s', list_of_times) - logger.debug('list_of_instances: %s', list_of_instances) - for time,instance in zip(list_of_times,list_of_instances): - url = construct_url_from_yaml( config, time, self.instance_name, ensemble, self.grid_name, hurricane_yaml_year=self.hurricane_yaml_year, hurricane_yaml_source=self.hurricane_yaml_source ) - if url not in urls: - urls.append(url) - logger.debug('Constructed %s urls of ensemble %s based on the YML', urls, ensemble) - return urls - -# Approach Used by ADDA - def build_url_list_from_yaml_and_offset(self, ensemble='nowcast')->list: - """ - We seek to build a set of compatible URLs spanning the input time range based on the - structure of asgs urls in the config_name. The structure of the output URLs will be based on the - entries in the associated yaml file. Since, no url will be provided, we must ask the caller to provide - the gridname, ensemble, and instance. We expect the caller to provide a proper Instance value - for the new URLs. - We REQUIRE the grid name. Only change in the ensemble and times are expected - - Uses the following class variables: - offset: (int). Offset in days - instance: (str) if set then used for all urls - gridname: (str) name for the grid - ensemble: (str) ensemble name (dafaults to nowcast) - - Parameters: - ensemble: (str) ensemble name (dafaults to nowcast) - - Returns: - urls: list(str). List of valid URLs for processing - """ - if self.config_name is None: - raise 'self.config_name is None. Cannot use the YAML generators: Abort' - ##sys.exit(1) - try: - config = utilities.load_config(self.config_name) - except OSError: - logger.exception('No URL structural config yml file found. %s: Abort', self.config_name) - raise - ##sys.exit(1) - - time_value=self.stoptime # Could also be an advisory - offset = self.ndays - if offset > 0: - logger.warning('Offset >0 specified: Behavior is not tested') - - list_of_times = generate_six_hour_time_steps_from_offset(time_value,offset) - list_of_instances = generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) - urls = list() - for time,instance in zip(list_of_times,list_of_instances): - url = construct_url_from_yaml( config, time, self.instance_name, ensemble, self.grid_name, hurricane_yaml_year=self.hurricane_yaml_year, hurricane_yaml_source=self.hurricane_yaml_source ) - if url not in urls: - urls.append(url) - logger.warning('Constructed %s urls of ensemble %s based on the YML and offset', urls, ensemble) - return urls - -def main(args): - """ - A simple main method to demonstrate the use of this class - """ - - config_name=args.config_name if args.config_name is not None else os.path.join(os.path.dirname(__file__), '../config', 'url_framework.yml') - - # Set up IO env - logger.debug("Product Level Working in %s.", os.getcwd()) - - if args.instance_name is not None: - logger.debug('Ignoring args.instance_name for the testing sequence') - - # - # Need to specify precedence in the arguments provided for testing main - # - - if args.url is not None: - logger.debug('Selecting a template-url generation method') - if args.timein is not None: - logger.debug('Selecting a specific time-range procedure') - rpl = generate_urls_from_times(url=args.url,timein=args.timein, timeout=args.timeout, ndays=None, grid_name=None, instance_name=None, config_name=None) - new_urls = rpl.build_url_list_from_template_url_and_times(ensemble=args.ensemble) - else: - logger.debug('Selecting time+ndays procedure') - rpl = generate_urls_from_times(url=args.url,timein=None, timeout=args.timeout, ndays=args.ndays, grid_name=None, instance_name=None, config_name=None) - new_urls = rpl.build_url_list_from_template_url_and_offset(ensemble=args.ensemble) - else: - logger.debug('Selecting a YAML generation method') - if args.grid_name is None or args.instance_name is None or config_name is None: - raise 'YAML-based procedurs requires gridname, instance_name and config_name' - ##sys.exit(1) - if args.hurricane_yaml_year is not None and args.hurricane_yaml_source is not None: - logger.debug('Detected values required for building YAML-based Hurricane urls') - if args.timein is not None: - logger.debug('Selecting a specific time-range procedure') - rpl = generate_urls_from_times(timein=args.timein, timeout=args.timeout, ndays=None, grid_name=args.grid_name, - instance_name=args.instance_name, config_name=args.config_name, hurricane_yaml_year=args.hurricane_yaml_year,hurricane_yaml_source=args.hurricane_yaml_source) - new_urls = rpl.build_url_list_from_yaml_and_times(ensemble=args.ensemble) - else: - logger.debug('Selecting time+ndays procedure') - rpl = generate_urls_from_times(timein=None, timeout=args.timeout, ndays=args.ndays, grid_name=args.grid_name, - instance_name=args.instance_name, config_name=args.config_name, hurricane_yaml_year=args.hurricane_yaml_year,hurricane_yaml_source=args.hurricane_yaml_source) - new_urls = rpl.build_url_list_from_yaml_and_times(ensemble=args.ensemble) - - logger.debug('New urls: %s', new_urls) - - -if __name__ == '__main__': - parser = ArgumentParser() - parser.add_argument('--url', default=None, action='store', dest='url', help='Input URL that may be used to build new output urls', type=str) - parser.add_argument('--ndays', default=None, action='store', dest='ndays',help='Day lag (usually < 0)', type=int) - parser.add_argument('--timeout', default=None, action='store', dest='timeout', help='YYYY-mm-dd HH:MM:SS. Latest day of analysis', type=str) - parser.add_argument('--timein', default=None, action='store', dest='timein', help='YYYY-mm-dd HH:MM:SS .Start day of analysis. ', type=str) - parser.add_argument('--config_name', action='store', dest='config_name', default=None, - help='String: yml config which contains URL structural information') - parser.add_argument('--instance_name', action='store', dest='instance_name', default=None, - help='String: Choose instance name. Required if using a YAML-based URL construction') - parser.add_argument('--grid_name', action='store', dest='grid_name', default=None, - help='String: Choose grid_name. Required if using a YAML-based URL construction') - parser.add_argument('--ensemble', action='store', dest='ensemble', default='nowcast', - help='String: Specify ensemble name ') - parser.add_argument('--hurricane_yaml_year', action='store', dest='hurricane_yaml_year', default=None, - help='String: Needed only for Hurricane/YML procedures') - parser.add_argument('--hurricane_yaml_source', action='store', dest='hurricane_yaml_source', default=None, - help='String: Needed only for Hurricane/YML procedures') - args = parser.parse_args() - - # log the input args - logger.debug('input args: %s', args) - - sys.exit(main(args)) - -# cat ../config/local_instance.yml diff --git a/src/common/geopoints.py b/src/common/geopoints.py index 788f887..5de3030 100644 --- a/src/common/geopoints.py +++ b/src/common/geopoints.py @@ -15,7 +15,7 @@ import pandas as pd from src.common.logger import LoggingUtil -import src.common.geopoints_url as gu +from src.common.geopoints_url import GeoPointsURL class GeoPoint: @@ -38,8 +38,11 @@ def __init__(self, _logger=None): # get the log level and directory from the environment. log_level, log_path = LoggingUtil.prep_for_logging() + # set the app name + app_name = "APSViz.UI-data.GeoPoint" + # create a logger - self.logger = LoggingUtil.init_logging("APSViz.UI-data.GeoPoint", level=log_level, line_format='medium', log_file_path=log_path) + self.logger = LoggingUtil.init_logging(app_name, level=log_level, line_format='medium', log_file_path=log_path) def get_geo_point_data(self, **kwargs) -> str: """ @@ -69,8 +72,10 @@ def get_geo_point_data(self, **kwargs) -> str: args = argsNT(float(kwargs['lon']), float(kwargs['lat']), kwargs['variable_name'], int(kwargs['kmax']), kwargs['alt_urlsource'], tds_svr, bool(kwargs['keep_headers']), kwargs['ensemble'], int(kwargs['ndays'])) + gu = GeoPointsURL(_logger=self.logger) + # call the function, check the return - df_nc = gu.main(args) + df_nc = gu.run(args) # if there was a valid response if df_nc is not None: @@ -85,7 +90,7 @@ def get_geo_point_data(self, **kwargs) -> str: tds_svr, bool(kwargs['keep_headers']), None, int(kwargs['ndays'])) # call the function, check the return - df_fc = gu.main(args) + df_fc = gu.run(args) # if there was a valid response if df_fc is not None: diff --git a/src/common/geopoints_url.py b/src/common/geopoints_url.py index b6034dd..fb6f541 100644 --- a/src/common/geopoints_url.py +++ b/src/common/geopoints_url.py @@ -3,11 +3,16 @@ Copyright (c) 2022,2023,2024 Renaissance Computing Institute -Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to +deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +IN THE SOFTWARE. Time series extraction @@ -15,183 +20,245 @@ """ import sys -import pandas as pd import time as tm -import src.common.utilities as utilities -import src.common.generate_urls_from_times as genurls -from argparse import ArgumentParser - -# pylint: skip-file - -# create a logger -logger = utilities.logger +import pandas as pd -# Define some basic mappings for URL to variables names. Can override using CI variables -var_mapper = {'fort': 'zeta', 'swan': 'swan_HS'} +from src.common.geopoints_urls_from_times import GenerateURLsFromTimes +from src.common.geopoints_utilities import GeoUtilities +from src.common.logger import LoggingUtil -def guess_variable_name(url) -> str: +class GeoPointsURL: """ - Simply search the given URL for occurances of ither fort or swan. Choose the variable approapriately. User may always - override using --variable_name + Class for geo-point functionality. - Parameters: - url: (str). A valid urls - Returns: - varname: . Guess is varname is zeta or swan_HS based on url nomenclature and specifications in the var_mapper dict """ - varname = None - for key, value in var_mapper.items(): - if isinstance(key, str) and key.casefold() in url.casefold(): - varname = value - break - return varname + def __init__(self, app_name='GeoPointsURL.TEST', _logger=None): + """ + Entry point for the GeoPointsURL class + """ + # if a reference to a logger was passed in use it + if _logger is not None: + # get a handle to a logger + self.logger = _logger + else: + # get the log level and directory from the environment. + log_level, log_path = LoggingUtil.prep_for_logging() + # create a logger + self.logger = LoggingUtil.init_logging(app_name, level=log_level, line_format='medium', log_file_path=log_path) -def strip_ensemble_from_url(urls) -> str: - """ - We mandate that the URLs input to this fetcher are those used to access the TDS server used in APSViz. The "ensemble" information will be in position .split('/')[-2] - eg. 'http://tds.renci.org/thredds/dodsC/2021/nam/2021052318/hsofs/hatteras.renci.org/hsofs-nam-bob-2021/nowcast/fort.63.nc' - - Parameters: - urls: list(str). list of valid urls - Returns: - Ensemble: - """ - url = grab_first_url_from_urllist(urls) - try: - words = url.split('/') - ensemble = words[-2] # Usually nowcast,forecast, etc - except IndexError as e: - logger.exception(f'strip_ensemble_from_url Unexpected failure try next:') - return ensemble + # Define some basic mappings for URL to variables names. Can override using CI variables + self.var_mapper = {'fort': 'zeta', 'swan': 'swan_HS'} + # create the utility class + self.geo_utils = GeoUtilities(_logger=self.logger) -def first_true(iterable, default=False, pred=None): - """ - itertools recipe found in the Python 3 docs - Returns the first true value in the iterable. - If no true value is found, returns *default* - If *pred* is not None, returns the first item - for which pred(item) is true. + def guess_variable_name(self, url) -> str: + """ + Search the given URL for occurrences of either fort or swan and choose the variable appropriately. - first_true([a,b,c], x) --> a or b or c or x - first_true([a,b], x, f) --> a if f(a) else b if f(b) else x + User may always override using --variable_name - """ - return next(filter(pred, iterable), default) + Parameters: + url: (str) valid urls + Returns: + varname: Guess is varname is zeta or swan_HS based on url nomenclature and specifications in the var_mapper dict + """ + varname = None + for key, value in self.var_mapper.items(): + if isinstance(key, str) and key.casefold() in url.casefold(): + varname = value + break + + return varname + + def strip_ensemble_from_url(self, urls) -> str: + """ + We mandate that the URLs input to this fetcher are those used to access the TDS server used in APSViz. + The "ensemble" information will be in position .split('/')[-2] + + e.g., 'https://tds.renci.org/thredds/dodsC/2021/nam/2021052318/hsofs/hatteras.renci.org/hsofs-nam-bob-2021/nowcast/fort.63.nc' + + Parameters: + urls: list(str) list of valid urls + Returns: + Ensemble: + """ + url = self.grab_first_url_from_url_list(urls) + ensemble = None -def grab_first_url_from_urllist(urls) -> str: - """ - eg. 'http://tds.renci.org/thredds/dodsC/2021/nam/2021052318/hsofs/hatteras.renci.org/hsofs-nam-bob-2021/nowcast/fort.63.nc' - - Parameters: - urls: list(str). list of valid urls - Returns: - url: . Fetch first available, valid url in the list - """ - if not isinstance(urls, list): - logger.error('first url: URLs must be in list form') - sys.exit(1) - url = first_true(urls) - return url - - -def main(args): - variable_name = args.variable_name - url = args.url - lon = args.lon - lat = args.lat - nearest_neighbors = args.kmax - ndays = args.ndays # Look back/forward - - logger.info('Input URL word is %s', url) - - if variable_name is None: - variable_name = guess_variable_name(url) - if variable_name is None: - logger.error('Variable name invald or not identified') - sys.exit(1) - logger.debug(f' Identified variable name is {variable_name}') - - ensemble = strip_ensemble_from_url([url]) - if args.ensemble is not None: # Else use the ensemble present in the input URL. Allow us to input a forecast but choose the nowcast - ensemble = args.ensemble - logger.debug(f'Input URL ensemble determined to be {ensemble}') - - # Try to setup proper header names for ADC/SWN and for nowcast/forecasr - dataproduct = 'Forecast' - if ensemble == 'nowcast': - dataproduct = 'Nowcast' - # Now figure out data source: adcirc or swan - datasrc = 'APS' - if variable_name == 'swan_HS': - datasrc = 'SWAN' - headername = f'{datasrc} {dataproduct}' - logger.debug(f' Header name defined to be {headername}') - - if ndays <= 0: - logger.debug(f'Build list of URLs to fetch: ndays lookback is {ndays}') - rpl = genurls.generate_urls_from_times(url=url, timein=None, timeout=None, ndays=ndays, grid_name=None, instance_name=None, config_name=None) - new_urls = rpl.build_url_list_from_template_url_and_offset(ensemble=ensemble) - logger.info('New URL list %s', new_urls) - else: - new_urls = [url] - logger.debug('Number of URL to try and process is: %s', len(new_urls)) - - logger.info('Lon: %s, Lat: %s', lon, lat) - logger.debug('Selected nearest neighbors values is: %s', nearest_neighbors) - - if len(new_urls) == 0: - logger.error('No URLs identified given the input URL: %s. Abort', url) - sys.exit(1) - - data_list = list() - exclude_list = list() - - t0 = tm.time() - for url in new_urls: - logger.debug('URL: %s', url) try: - df_product_data, df_product_metadata, df_excluded = utilities.Combined_pipeline(url, variable_name, lon, lat, nearest_neighbors) - #df_product_data.to_csv(f'Product_data.csv',header=args.keep_headers) - #df_product_metadata.to_csv(f'Product_meta.csv',header=args.keep_headers) - data_list.append(df_product_data) - exclude_list.append(df_excluded) - except (OSError, FileNotFoundError): - logger.warning('Current URL was not found: %s. Try another...', url) - pass - logger.info('Fetching Runtime was: %s seconds', tm.time() - t0) - - #If absolutely nothing comes back return a None - try: - df = pd.concat(data_list, axis=0) - df.columns = [headername] - df = (df.reset_index().drop_duplicates(subset='index', keep='last').set_index('index').sort_index()) - df_excluded = pd.concat(exclude_list, axis=0) - df.index = df.index.strftime('%Y-%m-%d %H:%M:%S') - df.index.name = 'time' - logger.debug('Dimension of final data array: %s', df.shape) - logger.debug('Dimension of excluded URL list array: %s', df_excluded.shape) - except ValueError: - logger.info('No data found for the specified lon/lat air. Return None') + words = url.split('/') + + ensemble = words[-2] # Usually nowcast, forecast, etc. + except IndexError: + self.logger.exception('strip_ensemble_from_url Unexpected failure try next') + + return ensemble + + @staticmethod + def first_true(iterable, default=False, pred=None): + """ + itertools recipe found in the Python 3 docs + Returns the first true value in the iterable. + If no true value is found, returns *default* + If *pred* is not None, returns the first item + for which pred(item) is true. + + first_true([a, b, c], x) --> a or b or c or x + first_true([a, b], x, f) --> a if f(a) else b if f(b) else x + + """ + return next(filter(pred, iterable), default) + + def grab_first_url_from_url_list(self, urls) -> str: + """ + e.g., https://tds.renci.org/thredds/dodsC/2021/nam/2021052318/hsofs/hatteras.renci.org/hsofs-nam-bob-2021/nowcast/fort.63.nc + + Parameters: + urls: list(str). list of valid urls + Returns: + url: . Fetch first available, valid url in the list + """ + # init the return + url = None + + if not isinstance(urls, list): + self.logger.error('first url: URLs must be in list form') + else: + url = self.first_true(urls) + + return url + + def run(self, args): + """ + initiates the process + + :param args: + :return: + """ + # assign the incoming run arguments + variable_name = args.variable_name + url = args.url + lon = args.lon + lat = args.lat + nearest_neighbors = args.kmax + n_days = args.ndays # Look back/forward + + self.logger.info('Input URL word is %s', url) + + if variable_name is None: + variable_name = self.guess_variable_name(url) + + if variable_name is None: + raise Exception('Variable name invalid or not identified') + + self.logger.debug('Identified variable name is %s', variable_name) + + ensemble = self.strip_ensemble_from_url([url]) + + if args.ensemble is not None: # Else use the ensemble present in the input URL. Allow us to input a forecast but choose the nowcast + ensemble = args.ensemble + + self.logger.debug('Input URL ensemble determined to be %s', ensemble) + + # Try to set up proper header names for ADC/SWN and for nowcast/forecast + dataproduct = 'Forecast' + + if ensemble == 'nowcast': + dataproduct = 'Nowcast' + + # Now figure out the data source: adcirc or swan + data_src = 'APS' + + if variable_name == 'swan_HS': + data_src = 'SWAN' + + header_name = data_src + dataproduct + + self.logger.debug('Header name defined to be %s ', header_name) + + if n_days <= 0: + self.logger.debug('Build list of URLs to fetch: n_days lookback is %s', n_days) + + rpl = GenerateURLsFromTimes(_logger=self.logger, url=url, time_in=None, time_out=None, n_days=n_days, grid_name=None, instance_name=None, + config_name=None) + + new_urls = rpl.build_url_list_from_template_url_and_offset(ensemble=ensemble) + + self.logger.info('New URL list %s', new_urls) + else: + new_urls = [url] + + self.logger.debug('Number of URL to try and process is: %s', len(new_urls)) + + self.logger.info('Lon: %s, Lat: %s, Selected nearest neighbors values is: %s', lon, lat, nearest_neighbors) + + if len(new_urls) == 0: + raise Exception('No URLs identified given the input URL: %s. Abort') + + data_list = [] + exclude_list = [] + + t0 = tm.time() + + for url in new_urls: + self.logger.debug('URL: %s', url) + + try: + df_product_data, df_excluded = self.geo_utils.combined_pipeline(url, variable_name, lon, lat, nearest_neighbors) + # , df_product_metadata + # df_product_data.to_csv(f'Product_data.csv', header=args.keep_headers) + # df_product_metadata.to_csv(f'Product_meta.csv', header=args.keep_headers) + data_list.append(df_product_data) + exclude_list.append(df_excluded) + except (OSError, FileNotFoundError): + self.logger.warning('Current URL was not found: %s. Try another...', url) + + self.logger.info('Fetching Runtime was: %s seconds', tm.time() - t0) + + # init the return df = None - # Final data outputs - # df.to_csv('Product_data_geopoints.csv') - # df_excluded.to_csv('Product_excluded_geopoints.csv') + # If absolutely nothing comes back return a None + try: + df = pd.concat(data_list, axis=0) + df.columns = [header_name] + df = (df.reset_index().drop_duplicates(subset='index', keep='last').set_index('index').sort_index()) + df_excluded = pd.concat(exclude_list, axis=0) + df.index = df.index.strftime('%Y-%m-%d %H:%M:%S') + df.index.name = 'time' + + self.logger.debug('Dimension of final data array: %s', df.shape) + self.logger.debug('Dimension of excluded URL list array: %s', df_excluded.shape) + except ValueError: + self.logger.info('No data found for the specified lon/lat air. Return None') - logger.info('Finished. Runtime was: %s seconds', tm.time() - t0) - return df + # Final data outputs + # df.to_csv('Product_data_geopoints.csv') + # df_excluded.to_csv('Product_excluded_geopoints.csv') + + self.logger.info('Finished. Runtime was: %s seconds', tm.time() - t0) + return df if __name__ == '__main__': - ret_val = 0 + # Main entry point for local testing + + # init the return + RET_VAL = 0 + + # setup a logger for testing + logger = LoggingUtil.init_logging("GeoPointsURL.test", level=10, line_format='medium', log_file_path='./geopoints_url-test.log') try: + from argparse import ArgumentParser + parser = ArgumentParser() - parser.add_argument('--lon', action='store', dest='lon', default=None, type=float, help='lon: longitiude value for time series extraction') + + parser.add_argument('--lon', action='store', dest='lon', default=None, type=float, help='lon: longitude value for time series extraction') parser.add_argument('--lat', action='store', dest='lat', default=None, type=float, help='lat: latitude value for time series extraction') parser.add_argument('--variable_name', action='store', dest='variable_name', default=None, type=str, help='Optional variable name of interest from the supplied url') @@ -204,21 +271,25 @@ def main(args): help='Choose overriding ensemble such as nowcast. Else internal code extracts from the URL') parser.add_argument('--ndays', action='store', dest='ndays', default=0, type=int, help='ndays to scan: Default=0, <0 means look back. >0 means look forward') - args = parser.parse_args() + + cli_args = parser.parse_args() # log the input args - logger.debug('input args: %s', args) + logger.debug('Input args: %s', cli_args) + + # instantiate the geo-point URL class + gp_url = GeoPointsURL(logger) # Call the runner - df = main(args) + df_out = gp_url.run(cli_args) - if df is not None: - logger.debug('Final output df:%s:%s', df.head(5), df.shape) + if df_out is not None: + logger.debug('Final output df: %s:%s', df_out.head(5), df_out.shape) else: logger.debug('Final output df is None: No data found') except Exception: - logger.exception("Exit: exception occured") - ret_val = 1 + logger.exception("Exit: exception occurred") + RET_VAL = 1 - sys.exit(ret_val) + sys.exit(RET_VAL) diff --git a/src/common/geopoints_urls_from_times.py b/src/common/geopoints_urls_from_times.py new file mode 100644 index 0000000..0d3688e --- /dev/null +++ b/src/common/geopoints_urls_from_times.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python + +# SPDX-FileCopyrightText: 2022 Renaissance Computing Institute. All rights reserved. +# SPDX-FileCopyrightText: 2023 Renaissance Computing Institute. All rights reserved. +# +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-License-Identifier: LicenseRef-RENCI +# SPDX-License-Identifier: MIT + +""" + Time series extraction + + Authors: Jeffrey L. Tilson +""" + +import os +import datetime as dt +from src.common.geopoints_utilities import GeoUtilities +from src.common.logger import LoggingUtil + + +class GenerateURLsFromTimes: + """ + Class that attempts to create a list of valid TDS Urls based on the input time conditions and possibly a YAML file + that contains the URL structure + This is NOT expected to be highly generalized and is intended for the ADDA/AST pipelines + + We hardwire the concept that hurricane data files timestep every 6 hours. + + It is okay to have one or two "wrong" urls in the list as the Harvester should be pretty resilient. + + If the caller elects to define output URLs based on times/ndays, then a YAML describing the desired structure is required. + If the caller elects to also supply a URL, then the output URL structure will be gleaned from it. + regardless of instance + + Lastly, the final product data series may extend before or after the stop/start times. + + As an example, if grabbing a nowcast, the data series may begin 6 hours before the indicated url time. + + Pass in a URL and the instance, gridname, are scraped from it + If YAML you must specify these terms. + + Possible scenarios: (options such as instance/ensemble can be applied to all) + 1) Input timein/timeout and the config_name YML (nominal name is url_framework.yml). + This will generate a set of URLs between the two time ranges. + This can work for a Hurricane, BUT timein/timeout must be ADVISORY values + 2) Input timeout and offset and the config_name YML (nominal name is url_framework.yml). + This will generate a set of URLs between the two time ranges. + This can work for a Hurricane, BUT timeout must be ADVISORY values + 3) Input URL and offset only. + This will scrape the time/advisory from the URL and offset it in 6-hour steps and + generate a set of URLs between the two time/advisory ranges. + This can work for hurricanes + + starttime is the selected time to begin the building of the list (YYYY-mm-dd HH:MM:SS) + stoptime is the selected time to end the building of the list (YYYY-mm-dd HH:MM:SS) + + Parameters: + url: (str) A single URL from which more URLs may be built + ndays: (int) Number of look back/ahead days from the stoptime value + config_name: (str) path/filename to yaml file that contains the INSTANCE mappings + hurricane_yaml_source=None: (str) This is a special case. + If you want to build Hurricane URLs from a YAML, then you will need to specify the subdir name directly, e.g. 'al09'. + This will replace the default value of a name. + hurricane_yaml_year: (str) is part of the Hurricane special case. No way to dig out the year directory name without the user specifying it + only needed for YAML based hurricane construction. Eg .../thredds/dodsC/2021/al09/11/ec95d/... + """ + def __str__(self): + return self.__class__.__name__ + + def __init__(self, _app_name='GenerateURLsFromTimes.TEST', _logger=None, url=None, time_in=None, time_out=None, n_days=None, grid_name=None, + instance_name=None, config_name=None, hurricane_yaml_year=None, hurricane_yaml_source=None): + # get a handle to a logger + self.logger = _logger + + self.utils = GeoUtilities(_logger=self.logger) + + stop_time = None + + # The Hurricane special terms are only usedY if you are requesting to build from a YAML AND the caller wants Hurricane data + # If a URL passed in, then gridname and instance can be gotten from it. + # ensemble values are expected to be changed by the user + self.config_name = config_name + if url is not None: + words = url.split('/') + self.ensemble = words[-2] + self.instance_name = words[-3] + self.grid_name = words[-5] + try: + stop_time = dt.datetime.strptime(words[-6], '%Y%m%d%H').strftime('%Y-%m-%d %H:%M:%S') # Can be overridden by args.stop_time + except ValueError: # Must be a hurricane + stop_time = words[-6] + self.url = url + # If No url, then build URLs from a YAML. This requires the caller to specify gridname, instance, and ensemble + else: + self.instance_name = instance_name # This is for potentially mapping new instances to urls + self.grid_name = grid_name + if self.instance_name is None: + raise Exception('Must specify an instance value if building URLs based on a YAML. None specified: Abort') + + if self.grid_name is None: + raise Exception('Must specify a grid_name if building URLs based on a YAML. None specified: Abort') + + self.hurricane_yaml_source = hurricane_yaml_source + self.hurricane_yaml_year = hurricane_yaml_year # Cannot span multiple years using Hurricane-YAML construction + + # timeout MUST be supplied somehow + if time_out is None and stop_time is None: + raise Exception('timeout is not set and no URL provided: Abort') + + if time_out is not None: + stop_time = time_out + + # Find time in + if time_in is None: + if n_days is None: + raise Exception('No timein or ndays specified.') + + start_time = self.utils.construct_start_time_from_offset(stop_time, n_days) # Will return an advisory value if appropriate + else: + start_time = time_in + + self.start_time = start_time + self.stop_time = stop_time + self.n_days = n_days + + self.logger.debug('Current time (or advisory) range is %s to %s. Specified ndays is %s', self.start_time, self.stop_time, self.n_days) + + if url is not None: + self.logger.debug('Current estimated ensemble: %s, instance: %s and gridname: %s', self.ensemble, self.instance_name, self.grid_name) + + def build_url_list_from_template_url_and_times(self, ensemble='nowcast') -> list: + """ + We seek to build a set of compatible URLs spanning the input time range based on the + structure of the input URL. We expect the caller to provide a proper ensemble value + for the new URLs. + We expect no changes in the grid name. Only change in the ensemble and times are expected + + Parameters: + ensemble: (str) Caller specified ensemble. This way one could input a namforecast url but request nowcasts, e.g. + + Returns: + urls: list(str). List of valid URLs for processing + """ + url = self.url + time_range = (self.start_time, self.stop_time) # This could also be an advisory range + list_of_times = self.utils.generate_six_hour_time_steps_from_range(time_range) + list_of_instances = self.utils.generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) + + urls = [] + + for time, instance in zip(list_of_times, list_of_instances): + words = url.split('/') + words[-2] = ensemble + words[-3] = self.instance_name + words[-6] = str(time) # Need to ensure because we could have an advisory come in + new_url = '/'.join(words) + + if new_url not in urls: + urls.append(new_url) + + self.logger.debug('Constructed %s urls of ensemble %s', urls, ensemble) + + return urls + + def build_url_list_from_template_url_and_offset(self, ensemble='nowcast') -> list: + """ + We seek to build a set of compatible URLs starting from the URL embedded time + and walking back/forward offset days while using the provided ensemble value. + e.g., you might send in a forecast and want back a list of nowcasts for the same grid + structure of the input URL. We expect the caller to provide a proper ensemble value + for the new URLs. + We expect no changes in the grid name. Only change in the ensemble and times are expected + + Parameters: + ensemble: (str) (def of "nowcast") The desired ensemble word for the resultant urls + + Returns: + urls: list(str). List of valid URLs for processing + """ + url = self.url + time_value = self.stop_time # Could also be an advisory + offset = self.n_days + + if offset > 0: + self.logger.warning('Offset >0 specified: Behavior is not tested') + + # time_in = url.split('/')[-6] # Maybe need to check for a Hurricane Advisory also + list_of_times = self.utils.generate_six_hour_time_steps_from_offset(time_value, offset) + list_of_instances = self.utils.generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) + + urls = [] + + for time, instance in zip(list_of_times, list_of_instances): + words = url.split('/') + words[-2] = ensemble + words[-3] = self.instance_name + words[-6] = str(time) # Need this in case it is an advisory value + newurl = '/'.join(words) + if newurl not in urls: + urls.append(newurl) + self.logger.debug('Constructed %s urls of ensemble %s', urls, ensemble) + return urls + + @staticmethod + def load_config(config_name): + """ + placeholder method to load the config file + + :param config_name: + :return: + """ + return config_name + + # Approach Used by ADDA + def build_url_list_from_yaml_and_times(self, ensemble='nowcast') -> list: + """ + We seek to build a set of compatible URLs spanning the input time range based on the + structure of asgs urls in the config_name. The structure of the output URLs will be based on the + entries in the associated YAML file. Since, no url will be provided, we must ask the caller to provide + the gridname, ensemble, and instance. We expect the caller to provide a proper Instance value + for the new URLs. + We REQUIRE the grid name. Only change in the ensemble and times are expected + + This uses the following class variables: + time_range: (tuple) (datetime, datetime). Time range inclusive (could also be hurricane advisories) + instance: (str) if set the used for all urls. If not, attempt to find it in the YAML + gridname: (str) name for the grid + + Parameters: + ensemble: (str) ensemble name (dafaults to nowcast) + + Returns: + urls: list(str). List of valid URLs for processing + """ + config = None + + if self.config_name is None: + raise Exception('self.config_name is None. Cannot use the YAML generators: Abort') + + try: + config = self.load_config(self.config_name) + except FileNotFoundError as e: # OSError: + raise FileNotFoundError(f'No URL structural config yml file found: {self.config_name}: Abort') from e + + time_range = (self.start_time, self.stop_time) # Could also be a range of advisories + list_of_times = self.utils.generate_six_hour_time_steps_from_range(time_range) + list_of_instances = self.utils.generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) + + urls = [] + + self.logger.debug('list_of_times: %s', list_of_times) + self.logger.debug('list_of_instances: %s', list_of_instances) + + for time, instance in zip(list_of_times, list_of_instances): + url = self.utils.construct_url_from_yaml(config, time, self.instance_name, ensemble, self.grid_name, + hurricane_yaml_year=self.hurricane_yaml_year, hurricane_yaml_source=self.hurricane_yaml_source) + if url not in urls: + urls.append(url) + + self.logger.debug('Constructed %s urls of ensemble %s based on the YML', urls, ensemble) + return urls + + # Approach Used by ADDA + def build_url_list_from_yaml_and_offset(self, ensemble='nowcast') -> list: + """ + We seek to build a set of compatible URLs spanning the input time range based on the + structure of asgs urls in the config_name. The structure of the output URLs will be based on the + entries in the associated YAML file. Since, no url will be provided, we must ask the caller to provide + the gridname, ensemble, and instance. We expect the caller to provide a proper Instance value + for the new URLs. + We REQUIRE the grid name. Only change in the ensemble and times are expected + + Uses the following class variables: + offset: (int). The offset in days + instance: (str) if set then used for all urls + gridname: (str) name for the grid + ensemble: (str) ensemble name (dafaults to nowcast) + + Parameters: + ensemble: (str) ensemble name (dafaults to nowcast) + + Returns: + urls: list(str). List of valid URLs for processing + """ + config = None + + if self.config_name is None: + raise Exception('self.config_name is None. Cannot use the YAML generators: Abort') + + try: + config = self.load_config(self.config_name) + except OSError as e: + raise OSError(f'No URL structural config yml file {self.config_name} found: Abort') from e + + time_value = self.stop_time # Could also be an advisory + offset = self.n_days + if offset > 0: + self.logger.warning('Offset >0 specified: Behavior is not tested') + + list_of_times = self.utils.generate_six_hour_time_steps_from_offset(time_value, offset) + list_of_instances = self.utils.generate_list_of_instances(list_of_times, self.grid_name, self.instance_name) + + urls = [] + + for time, instance in zip(list_of_times, list_of_instances): + url = self.utils.construct_url_from_yaml(config, time, self.instance_name, ensemble, self.grid_name, + hurricane_yaml_year=self.hurricane_yaml_year, hurricane_yaml_source=self.hurricane_yaml_source) + if url not in urls: + urls.append(url) + + self.logger.warning('Constructed %s urls of ensemble %s based on the YML and offset', urls, ensemble) + + return urls + + +class GenerateURLsEntry: + """ + Class that has an entry point to build urls + + """ + def __str__(self): + return self.__class__.__name__ + + def __init__(self, _app_name='GenerateURLsEntry.TEST', _logger=None): + """ + inits the class + + :param _logger: + """ + # if a reference to a logger was passed in use it + if _logger is not None: + # get a handle to a logger + self.logger = _logger + else: + # get the log level and directory from the environment. + log_level, log_path = LoggingUtil.prep_for_logging() + + # create a logger + self.logger = LoggingUtil.init_logging(_app_name, level=log_level, line_format='medium', log_file_path=log_path) + + def run(self, args): + """ + A simple main method to demonstrate the use of this class + """ + + config_name = args.config_name if args.config_name is not None else os.path.join(os.path.dirname(__file__), '../config', 'url_framework.yml') + + # Set up IO env + self.logger.debug("Product Level Working in %s.", os.getcwd()) + + if args.instance_name is not None: + self.logger.debug('Ignoring args.instance_name for the testing sequence') + + # + # Need to specify precedence in the arguments provided for testing main + # + + if args.url is not None: + self.logger.debug('Selecting a template-url generation method') + if args.timein is not None: + self.logger.debug('Selecting a specific time-range procedure') + rpl = GenerateURLsFromTimes(_logger=self.logger, url=args.url, time_in=args.timein, time_out=args.timeout, n_days=None, + grid_name=None, instance_name=None, config_name=None) + new_urls = rpl.build_url_list_from_template_url_and_times(ensemble=args.ensemble) + else: + self.logger.debug('Selecting time+ndays procedure') + rpl = GenerateURLsFromTimes(_logger=self.logger, url=args.url, time_in=None, time_out=args.timeout, n_days=args.ndays, grid_name=None, + instance_name=None, config_name=None) + new_urls = rpl.build_url_list_from_template_url_and_offset(ensemble=args.ensemble) + else: + self.logger.debug('Selecting a YAML generation method') + if args.grid_name is None or args.instance_name is None or config_name is None: + raise Exception('YAML-based procedures requires gridname, instance_name and config_name') + if args.hurricane_yaml_year is not None and args.hurricane_yaml_source is not None: + self.logger.debug('Detected values required for building YAML-based Hurricane urls') + if args.timein is not None: + self.logger.debug('Selecting a specific time-range procedure') + rpl = GenerateURLsFromTimes(_logger=self.logger, time_in=args.timein, time_out=args.timeout, n_days=None, grid_name=args.grid_name, + instance_name=args.instance_name, config_name=args.config_name, + hurricane_yaml_year=args.hurricane_yaml_year, hurricane_yaml_source=args.hurricane_yaml_source) + new_urls = rpl.build_url_list_from_yaml_and_times(ensemble=args.ensemble) + else: + self.logger.debug('Selecting time+ndays procedure') + rpl = GenerateURLsFromTimes(_logger=self.logger, time_in=None, time_out=args.timeout, n_days=args.ndays, grid_name=args.grid_name, + instance_name=args.instance_name, config_name=args.config_name, + hurricane_yaml_year=args.hurricane_yaml_year, hurricane_yaml_source=args.hurricane_yaml_source) + new_urls = rpl.build_url_list_from_yaml_and_times(ensemble=args.ensemble) + + self.logger.debug('New urls: %s', new_urls) + + +if __name__ == '__main__': + # setup a logger for testing + logger = LoggingUtil.init_logging("GenerateURLsFromTimes.test", level=10, line_format='medium', + log_file_path='./geopoints_url_from_times-test.log') + + from argparse import ArgumentParser + + parser = ArgumentParser() + parser.add_argument('--url', default=None, action='store', dest='url', help='Input URL that may be used to build new output urls', type=str) + parser.add_argument('--ndays', default=None, action='store', dest='ndays', help='Day lag (usually < 0)', type=int) + parser.add_argument('--timeout', default=None, action='store', dest='timeout', help='YYYY-mm-dd HH:MM:SS. Latest day of analysis', type=str) + parser.add_argument('--timein', default=None, action='store', dest='timein', help='YYYY-mm-dd HH:MM:SS .Start day of analysis. ', type=str) + parser.add_argument('--config_name', action='store', dest='config_name', default=None, + help='String: yml config which contains URL structural information') + parser.add_argument('--instance_name', action='store', dest='instance_name', default=None, + help='String: Choose instance name. Required if using a YAML-based URL construction') + parser.add_argument('--grid_name', action='store', dest='grid_name', default=None, + help='String: Choose grid_name. Required if using a YAML-based URL construction') + parser.add_argument('--ensemble', action='store', dest='ensemble', default='nowcast', help='String: Specify ensemble name ') + parser.add_argument('--hurricane_yaml_year', action='store', dest='hurricane_yaml_year', default=None, + help='String: Needed only for Hurricane/YML procedures') + parser.add_argument('--hurricane_yaml_source', action='store', dest='hurricane_yaml_source', default=None, + help='String: Needed only for Hurricane/YML procedures') + + cli_args = parser.parse_args() + + # log the input args + logger.debug('input cli_args: %s', cli_args) + + gen_entry = GenerateURLsEntry(_logger=logger) + + gen_entry.run(cli_args) diff --git a/src/common/geopoints_utilities.py b/src/common/geopoints_utilities.py new file mode 100644 index 0000000..859953e --- /dev/null +++ b/src/common/geopoints_utilities.py @@ -0,0 +1,797 @@ +# SPDX-FileCopyrightText: 2022 Renaissance Computing Institute. All rights reserved. +# SPDX-FileCopyrightText: 2023 Renaissance Computing Institute. All rights reserved. +# SPDX-FileCopyrightText: 2024 Renaissance Computing Institute. All rights reserved. +# +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-License-Identifier: LicenseRef-RENCI +# SPDX-License-Identifier: MIT + +""" + Time series extraction + + Authors: Jeffrey L. Tilson, Brian O. Blanton 8/2024 +""" +import re +import time as tm +import datetime as dt +import numpy as np +import pandas as pd +import xarray as xr + +from scipy import spatial as sp + + +class GeoUtilities: + """ + Class that has a number of static methods used throughout this component + + """ + def __init__(self, _app_name='GeoUtilities.TEST', _logger=None): + """ + inits the class + + :param _app_name: + :param _logger: + """ + # get a handle to a logger + self.logger = _logger + + self.k_max = 10 + self.got_kdtree = None + self.tol = 10e-5 + self.debug = True # False + + # Specify available reanalysis years + self.y_min = 1979 + self.y_max = 2023 + self.years = list(range(self.y_min, self.y_max + 1)) + + # logger.debug('utilities:y_min, y_max: %s, %s', y_min,y_max) + + self.file_ext = '.d0.no-unlim.T.rc.nc' + # self.file_ext='.d4.no-unlim.T.rc.nc'; + # self.logger.debug('utilities:file_ext: %s', file_ext) + + # Default standard location is on the primary RENCI TDS + # self.url_dir_format="https://tds.renci.org/thredds/dodsC/Reanalysis/ADCIRC/ERA5/hsofs/%d-post" + # self.url_dir_format="https://tds.renci.org/thredds/dodsC/Reanalysis/ADCIRC/ERA5/ec95d/%d" + self.url_dir_format = "https://tdsres.apps.renci.org/thredds/dodsC/ReanalysisV2/ADCIRC/ERA5/hsofs.V2/%d-post" + + self.keep_hours = [0, 6, 12, 18] + + @staticmethod + def get_adcirc_grid_from_ds(ds): + """ + creates an ad dict + """ + ag_dict: dict = {'lon': ds['x'][:], 'lat': ds['y'][:], 'ele': ds['element'][:, :] - 1, 'depth': ds['depth'][:], 'latmin': np.mean(ds['y'][:])} + + return ag_dict + + @staticmethod + def attach_element_areas(ag_dict): + """ + gets the element areas + """ + x = ag_dict['lon'].values + y = ag_dict['lat'].values + e = ag_dict['ele'].values + + # COMPUTE GLOBAL DX,DY, Len, angles + i1 = e[:, 0] + i2 = e[:, 1] + i3 = e[:, 2] + + x1 = x[i1] + x2 = x[i2] + x3 = x[i3] + + y1 = y[i1] + y2 = y[i2] + y3 = y[i3] + + # coordinate deltas + dx23 = x2 - x3 + dx31 = x3 - x1 + dx12 = x1 - x2 + dy23 = y2 - y3 + dy31 = y3 - y1 + dy12 = y1 - y2 + + # lengths of sides + a = np.sqrt(dx12 * dx12 + dy12 * dy12) + b = np.sqrt(dx31 * dx31 + dy31 * dy31) + c = np.sqrt(dx23 * dx23 + dy23 * dy23) + + ag_dict['areas'] = (x1 * dy23 + x2 * dy31 + x3 * dy12) / 2. + ag_dict['edge_lengths'] = [a, b, c] + ag_dict['dl'] = np.mean(ag_dict['edge_lengths'], axis=0) + + return ag_dict + + def basis2d_within_element(self, phi): + """ + gets the basis 2d elements + """ + interior_status = np.all(phi[:] <= 1 + self.tol, axis=1) & np.all(phi[:] >= 0 - self.tol, axis=1) + + return interior_status + + @staticmethod + def basis2d(ag_dict, xy_list, j): + """ + performs basis 2D operations + + """ + # check length of j and xy_list + # check for the necessary arrays in ag_dict + + # nodes for the elements in j + n3 = ag_dict['ele'][j] + + x = ag_dict['lon'][n3].values + x1 = x[:, 0] + x2 = x[:, 1] + x3 = x[:, 2] + + y = ag_dict['lat'][n3].values + y1 = y[:, 0] + y2 = y[:, 1] + y3 = y[:, 2] + + area_j = ag_dict['areas'][j] + xp = xy_list[:, 0] + yp = xy_list[:, 1] + + # Basis function 1 + a = (x2 * y3) - (x3 * y2) + b = y2 - y3 + c = -(x2 - x3) + phi0 = (a + b * xp + c * yp) / (2.0 * area_j) + + # Basis function 2 + a = (x3 * y1) - (x1 * y3) + b = y3 - y1 + c = -(x3 - x1) + phi1 = (a + b * xp + c * yp) / (2.0 * area_j) + + # Basis function 3 + a = (x1 * y2) - (x2 * y1) + b = y1 - y2 + c = -(x1 - x2) + phi2 = (a + b * xp + c * yp) / (2.0 * area_j) + + return np.array([phi0, phi1, phi2]).T + + @staticmethod + def get_adcirc_time_from_ds(ds): + """ + gets the ADCIRC time from the dataset + """ + return {'time': ds['time']} + + @staticmethod + def f63_to_xr(url): + """ + returns the dataset without certain variables + """ + dropvars = ['neta', 'nvel', 'max_nvdll', 'max_nvell'] + + return xr.open_dataset(url, drop_variables=dropvars) + + @staticmethod + def get_adcirc_slice_from_ds(ds, v, it=0): + """ + gets ADCIRC data from the dataset + """ + ad_var_dict = {} + + var = ds.variables[v] + + if re.search('max', v) or re.search('depth', v): + var_d = var[:] # the actual data + else: + if ds.variables[v].dims[0] == 'node': + # self.logger.debug('ds: transposed data found') + var_d = var[it, :].T # the actual data + elif ds.variables[v].dims[0] == 'time': + var_d = var[:, it] # the actual data + else: + raise Exception(f'Unexpected leading variable name: {ds.variables[v].dims}. Abort') + + # var_d[var_d.mask] = np.nan + ad_var_dict['var'] = var_d.data + + return ad_var_dict + + def compute_tree(self, ag_dict): + """ + Given lon,lat,ele in ag_dict,compute element centroids and + generate the ADCIRC grid KDTree + returns ag_dict with tree + """ + + t0 = tm.time() + + try: + x = ag_dict['lon'].values.ravel() # ravel; not needed + y = ag_dict['lat'].values.ravel() + e = ag_dict['ele'].values + except Exception as e: + raise Exception('Did not find lon,lat,ele data in ag_dict.') from e + + xe = np.mean(x[e], axis=1) + ye = np.mean(y[e], axis=1) + + # Still want to build up the data for ag_dict, we just do not need the tree reevaluated for every year + if self.got_kdtree is None: + ag_dict['tree'] = tree = sp.KDTree(np.c_[xe, ye]) + self.got_kdtree = tree + else: + ag_dict['tree'] = self.got_kdtree + + self.logger.debug('Build annual KDTree time is: %s seconds', tm.time() - t0) + + return ag_dict + + def compute_query(self, xy_list, ag_dict, kmax=10): + """ + Generate the kmax-set of nearest neighbors to each lon,lat pair in xylist. + Each test point (each lon/lat pair) gets associated distance (dd) and element (j) objects + At this stage it is possible that some test points are not interior to the nearest element. + We will subsequently check that. + + dd: num points by neighbors + j: num points by neighbors + """ + t0 = tm.time() + ag_results = {} + + dd, j = ag_dict['tree'].query(xy_list, k=kmax) + + if kmax == 1: + dd = dd.reshape(-1, 1) + j = j.reshape(-1, 1) + + ag_results['distance'] = dd + ag_results['elements'] = j + ag_results['number_neighbors'] = kmax + ag_results['geopoints'] = xy_list # We shall use this later + + self.logger.debug('KDTree query of size: %s took: %s seconds', kmax, tm.time() - t0) + + return ag_results + + def compute_basis_representation(self, xy_list, ag_dict, ag_results): + """ + For each test point with kmax number_neighbors, compute linear basis for + each neighbor. + + Then, check which, if any, element the test point actually resides within. + If none, then the returned basis functions (i.e., interpolation weights) are set to nans. + + If an input point is an "exact" grid point (i.e., ADCIRC grid node), then ambiguity + may arise regarding the best element and multiple True statuses can occur. + + Here we also keep the nearest element value and is done by reverse iterating in the zip function + """ + + # First, build all the basis weights and determine if it was an interior or not + t0 = tm.time() + kmax = ag_results['number_neighbors'] + j = ag_results['elements'] + phival_list = [] + within_interior = [] + + for k_value in range(0, kmax): + phi_val = self.basis2d(ag_dict, xy_list, j[:, k_value]) + phival_list.append(phi_val) + within_interior.append(self.basis2d_within_element(phi_val)) + + # detailed_weights_elements(phival_list, j) + + # Second only retain the "interior" results or nans if none + final_weights = np.full((phival_list[0].shape[0], phival_list[0].shape[1]), np.nan) + final_jvals = np.full(j.T[0].shape[0], -99999) + final_status = np.full(within_interior[0].shape[0], False) + + # Loop backwards. thus keeping the "nearest" True for each geopoints for each k in kmax + for pvals, jvals, testvals in zip(phival_list[::-1], j.T[::-1], within_interior[::-1]): # THis loops over Kmax values + final_weights[testvals] = pvals[testvals] + final_jvals[testvals] = jvals[testvals] + final_status[testvals] = testvals[testvals] + + ag_results['final_weights'] = final_weights + ag_results['final_jvals'] = final_jvals + ag_results['final_status'] = final_status + self.logger.info('Compute of basis took: %s seconds', tm.time() - t0) + + # Keep the list if the user needs to know after the fact + outside_elements = np.argwhere(np.isnan(final_weights).all(axis=1)).ravel() + ag_results['outside_elements'] = outside_elements + return ag_results + + def detailed_weights_elements(self, phival_list, j): + """ + This is only used for understanding better the detailed behavior of a particular grid + It is not invoked for general use + """ + for pvals, jvals in zip(phival_list, j.T): + df_pvals = pd.DataFrame(pvals, columns=['Phi0', 'Phi1', 'Phi2']) + df_pvals.index = df_pvals.index + df_jvals = pd.DataFrame(jvals + 1, columns=['Element+1']) + df = pd.concat([df_pvals, df_jvals], axis=1) + df.index = df.index + 1 + df.index = df.index.astype(int) + + self.logger.debug('Dataframe: %s', df.loc[2].to_frame().T) + + @staticmethod + def water_level_reductions(t, data_list, final_weights): + """ + Each data_list is a df for a single point containing 3 columns, one for + each node in the containing element. + These columns are reduced using the final_weights previously calculated + + A final df is returned with index=time and a single column for each of the + input test points (some of which may be partially or completely nan) + """ + try: + final_list = [] + + for index, dataseries, weights in zip(range(0, len(data_list)), data_list, final_weights): + reduced_data = np.matmul(dataseries.values, weights.T) + df = pd.DataFrame(reduced_data, index=t, columns=[f'P{index + 1}']) + final_list.append(df) + + df_final_data = pd.concat(final_list, axis=1) + except Exception: + df_final_data = None + + return df_final_data + + def water_level_selection(self, t, data_list, final_weights): + """ + Each data_list is a df for a single point containing three columns, one for each node in the containing element. + We choose the first column in the list that has any number of values. + Moving forward, one can make this approach better by choosing the highest weighted object with actual values + + A final df is returned with index=time and a single column for each of the + input test points (some of which may be partially or completely nan) + """ + final_list = [] + + # Index is a loop over multiple possible lon/lat pairs + for index, data_series, weights in zip(range(0, len(data_list)), data_list, final_weights): + df_single = pd.DataFrame(index=t) + count = 0 + + for vertex in data_series.columns: # Loop over the 3 vertices and their weights in order + count += 1 + df_single[f'P{vertex}'] = data_series[vertex].values + if df_single.count()[0] > 0: # df.notna().sum() + final_list.append(df_single) + self.logger.debug('Inserted one chosen df_single with non nan values for index %s at count number %s', index, count) + break + + self.logger.debug('Do Selection water series update') + try: + df_final_data = pd.concat(final_list, axis=1) + except Exception as e: + df_final_data = None + self.logger.debug('This Exception usually simply means no data at the chosen lon/lat. But Exception is %s', e) + + return df_final_data + + @staticmethod + def generate_metadata(ag_results): + """ + Here we want to simply help the user by reporting back the lon/lat values for each geo-point. + This should be the same as the input dataset. + + -99999 indicates an element was not found in the grid. + """ + + df_lonlat = pd.DataFrame(ag_results['geopoints'], columns=['LON', 'LAT']) + df_elements = pd.DataFrame(ag_results['final_jvals'] + 1, columns=['Element (1-based)']) + df_elements.replace(-99998, -99999, inplace=True) + + df_meta = pd.concat([df_lonlat, df_elements], axis=1) + df_meta['Point'] = df_meta.index + 1 + df_meta.set_index('Point', inplace=True) + df_meta.rename('P{}'.format, inplace=True) + + return df_meta + + def construct_reduced_water_level_data_from_ds(self, ds, ag_dict, ag_results, variable_name=None): + """ + This method acquires ADCIRC water levels for the list of geopoints/elements. + For each specified point in the grid, the resulting time series is reduced to a single time series using + a (basis 2d) weighted sum. + + For a non-nan value to result in the final data, the product data must: + 1) Be non-nan for each time series at a specified time tick + 2) The test point must be inside the specified element + """ + + if variable_name is None: + raise Exception('User MUST supply the correct variable name') + + self.logger.debug('Variable name is: %s', variable_name) + t0 = tm.time() + + data_list = [] + t1 = tm.time() + final_weights = ag_results['final_weights'] + final_jvals = ag_results['final_jvals'] + self.logger.debug('Time to acquire weigths and jvals: %s', tm.time() - t1) + + t1 = tm.time() + ac_dict = self.get_adcirc_time_from_ds(ds) + t = ac_dict['time'].values + e = ag_dict['ele'].values + self.logger.debug('Time to acquire time and element values: %s', tm.time() - t1) + + self.logger.debug('Before removal of out-of-triangle jvals: %s', final_jvals.shape) + + mask = final_jvals == -99999 + final_jvals = final_jvals[~mask] + self.logger.debug('After removal of out-of-triangle jvals: %s', final_jvals.shape) + + t1 = tm.time() + for v_station in final_jvals: + ad_vardict = self.get_adcirc_slice_from_ds(ds, variable_name, it=e[v_station]) + df = pd.DataFrame(ad_vardict['var']) + data_list.append(df) + + self.logger.debug('Time to TDS fetch annual all test station (triplets) was: %s seconds', tm.time() - t1) + + # logger.info('Selecting the weighted mean time series') + # df_final=WaterLevelReductions(t, data_list, final_weights) + + self.logger.debug('Selecting the greedy alg: first in list with not all nans time series') + df_final = self.water_level_selection(t, data_list, final_weights) + + t0 = tm.time() + df_meta = self.generate_metadata(ag_results) # This is here mostly for future considerations + self.logger.debug('Time to reduce annual: %s, test stations is: %s seconds', len(final_jvals), tm.time() - t0) + ag_results['final_reduced_data'] = df_final + ag_results['final_meta_data'] = df_meta + + return ag_results + + # NOTE We do not need to rebuild the tree for each year since the grid is unchanged. + def combined_pipeline(self, url, variable_name, lon, lat, nearest_neighbors=10): + """ + Interpolate for one year. + + df_excluded_geopoints lists only those stations excluded by element tests. + Some could be all nans due to dry points + + No flanks removed in this method as the caller may want to see everything + """ + t0 = tm.time() + geopoints = np.array([[lon, lat]]) + ds = self.f63_to_xr(url) + ag_dict = self.get_adcirc_grid_from_ds(ds) + ag_dict = self.attach_element_areas(ag_dict) + + self.logger.info('Compute_pipeline initiation: %s seconds', tm.time() - t0) + self.logger.info('Start annual KDTree pipeline LON: %s LAT: %s', geopoints[0][0], geopoints[0][1]) + + ag_dict = self.compute_tree(ag_dict) + ag_results = self.compute_query(geopoints, ag_dict, kmax=nearest_neighbors) + ag_results = self.compute_basis_representation(geopoints, ag_dict, ag_results) + ag_results = self.construct_reduced_water_level_data_from_ds(ds, ag_dict, ag_results, variable_name=variable_name) + + self.logger.debug('Basis function Tolerance value is: %s', self.tol) + self.logger.debug('List of %s stations not assigned to any grid element follows for kmax: %s', len(ag_results["outside_elements"]), + nearest_neighbors) + + t0 = tm.time() + df_product_data = ag_results['final_reduced_data'] + df_product_metadata = ag_results['final_meta_data'] + df_excluded_geopoints = pd.DataFrame(geopoints[ag_results['outside_elements']], index=ag_results['outside_elements'] + 1, + columns=['lon', 'lat']) + + self.logger.debug('Compute_pipeline cleanup: %s seconds', tm.time() - t0) + self.logger.debug('Finished annual Combined_pipeline') + + return df_product_data, df_excluded_geopoints # , df_product_metadata + + @staticmethod + def is_hurricane(test_val) -> bool: + """ + Determine of the input test val is a Date, an Int or something else + Parameters: + test_val: For a valid time enter a str with dformat %Y-%m-%d %H:%M:%S or %Y%m%d%H + For a valid hurricane enter an int + """ + is_hurricane = False + + try: + test = dt.datetime.strptime(test_val, '%Y-%m-%d %H:%M:%S') # If fails then not a datetime + except (ValueError, TypeError): + try: + test = dt.datetime.strptime(test_val, '%Y%m%d%H') + except Exception: + try: + out_id = int(test_val) + is_hurricane = True + except ValueError as e: + raise ValueError(f'test indicates not a hurricane nor a casting. Perhaps a format issue?. Got {test_val}: Abort') from e + + return is_hurricane + + def generate_six_hour_time_steps_from_range(self, time_range) -> list: + """ + Given the input time tuple, return the inclusive set of times that occur on + the daily 6-hour mark. So on output we would have 00Z,06Z,12Z,18Z times only + + Parameters: + time_range: Tuple (date time,date time) of the start and end times (datetime objects) + + Returns: + list_of_times: list of (str) times in the format: %Y%m%d%H + + """ + if self.is_hurricane(time_range[0]): + self.logger.debug('Determined input time_range URL is a Hurricane') + list_of_times = self.generate_six_hour_time_advisories_from_range(time_range) + else: + list_of_times = self.generate_six_hour_time_castings_from_range(time_range) + + return list_of_times + + def generate_six_hour_time_castings_from_range(self, time_range) -> list: + """ + A non-hurricane + Advisory. We need to distinguish between the two. Note, we can be promiscuous here + with the URLs, since urls that do not exist will get trapped by Harvester + Parameters: + time_range: tuple (datetime, datetime) + Returns: + list_of_times: list of times/advisories in a string format to build new urls + """ + + start_time = dt.datetime.strptime(time_range[0], '%Y-%m-%d %H:%M:%S') + stop_time = dt.datetime.strptime(time_range[1], '%Y-%m-%d %H:%M:%S') + pd_time = pd.date_range(start=start_time, end=stop_time, freq='h') # Doesnt land on 00,06,12,18 + + list_of_times = [] + + for time in pd_time: + if time.hour in self.keep_hours: + list_of_times.append(time.strftime('%Y%m%d%H')) + + # Keep input entry as well? + list_of_times.append(stop_time.strftime('%Y%m%d%H')) + list_of_times.sort() + + return list_of_times + + @staticmethod + def generate_six_hour_time_advisories_from_range(advisory_range) -> list: + """ + Advisory range has no specific time meaning other than generally being every 6 hours + So simply accept the range as fact. The INPUT advisory number is NOT retained in the + generated list + + Save Advisories in a leading zero format: "{:02d}".format(adv) + + Parameters: + advisory_range: tuple (int,int) + Returns: + list_of_advisories: list of times/advisories in a string format to build new urls + includes the input time_step.advisory in the final list + """ + # How many 6-hour periods can we identify? We need to choose a startpoint. Use the highest time and look back + start_adv = int(advisory_range[0]) + stop_adv = int(advisory_range[1]) + + if start_adv > stop_adv: + start_adv, stop_adv = stop_adv, start_adv + + list_of_advisories = [] + for inc in range(start_adv, stop_adv): + list_of_advisories.append(f'{inc: 02d}') + + list_of_advisories = [i for i in list_of_advisories if int(i) > 0] + + # Should we retain the input value? + list_of_advisories.append(f'{stop_adv: 02d}') + + # A last ditch sort to be sure + list_of_advisories.sort() + + return list_of_advisories + + # Generates a proper list-time/advisories depending if its a Hurricane or not + def generate_six_hour_time_steps_from_offset(self, time_value, offset) -> list: + """ + For an arbitrary URL, we could have a conventional now/forecast OR a Hurricane + Advisory. We need to distinguish between the two. Note, we can be promiscuous here + with the URLs, since urls that do not exist will get trapped by Harvester + Parameters: + time_value: (datetime) Either the time or advisory value from a asgs url + offset: (int) Number of DAYS to look back/forward from offset + if offset is an Advisory then we look back a number of STEPS + corresponding to 6 hour intervals based on offset + Returns: + list_of_times: list of times/advisories in a string format to build new urls + """ + if self.is_hurricane(time_value): + self.logger.debug('Determined input URL is a Hurricane') + list_of_times = self.generate_six_hour_time_advisories_from_offset(time_value, offset) + else: + list_of_times = self.generate_six_hour_time_castings_from_offset(time_value, offset) + + return list_of_times + + def generate_six_hour_time_castings_from_offset(self, time_value, offset) -> list: + """ + Start with the str_time and build a list of 6-hour steps for up to offset days + We expect the input time to a stop_time and the offsets to be < 0. But, though + the overall code has not been tested for it, we simply reorder the times + as necessary and proceed + + Parameters: + time_value: (datetime) start time + offset: (int) Number of DAYS to look back/forward from time_value + + Returns: + time_list: list of times in a string format to build new urls + """ + stop_time = dt.datetime.strptime(time_value, '%Y-%m-%d %H:%M:%S') + start_time = stop_time + dt.timedelta(days=offset) + + if start_time > stop_time: + self.logger.warning('Stoptime < starttime. Supplied offset was %s days: Reordering', offset) + start_time, stop_time = stop_time, start_time + + return self.generate_six_hour_time_steps_from_range((start_time.strftime('%Y-%m-%d %H:%M:%S'), stop_time.strftime('%Y-%m-%d %H:%M:%S'))) + + @staticmethod + def generate_six_hour_time_advisories_from_offset(str_time, offset) -> list: + """ + Start with the str_time and build a list of 6-hour steps for up to offset days + We expect the input time to bve an Advisory number (int). We also anticipate offsets to be < 0. + since offset >0 would apply to Hurricane advisories not performed (but you could do it) + + Here we assume each index is a 6-hour time step. So we just need to decide how many to look back for. + Harvester will quietly ignore urls that do not exist + + Save Advisories in a leading zero-padded format: "{:02d}".format(adv) + + Parameters: + str_time: (str) time + offset: (int) Number of DAYS to look back/forward from str_time + + Returns: + list_of_advisories: list of advisories in a string format to build new urls + """ + list_of_advisories = [] + stop_advisory = int(str_time) + num_6hour_look_asides = int(24 * offset / 6) + range_values = [0, num_6hour_look_asides] + range_values.sort() # sorts ascending order + + for inc in range(*range_values): + list_of_advisories.append(f'{stop_advisory + inc: 02d}') + + list_of_advisories = [i for i in list_of_advisories if int(i) >= 0] + + # Keep the input value? + list_of_advisories.append(f'{stop_advisory: 02d}') + + # A last ditch sort to be sure + list_of_advisories.sort() + + return list_of_advisories + + @staticmethod + def grab_years_from_time_list(list_of_times) -> list: + """ + Process the input time list to extract a list of Years (str) + Note: This could be a list of Advisories as well. If so, + + return the advisory number, though it will probably not be used + + Parameters: + list_of_times: List of (str) time in the format %Y%m%d%H + Returns: + list of year values + + """ + list_of_years = [] + + for time in list_of_times: + try: + value = dt.datetime.strptime(time, '%Y%m%d%H').year + except TypeError: + value = time + list_of_years.append(value) + + return list_of_years + + @staticmethod + def generate_list_of_instances(list_of_times, in_gridname, in_instance): + """ + This function matches every entry in the list_of_times with an associated instance. + The structure of this code is such that, in the future, we may have scenarios where + the value of the instance may change for a given year. + + Currently, though, we will simply build a list of identical instances. + The value of the selected instance may be passed in by the caller + + Parameters: + :param list_of_times: list (str)(%Y%m%d%H) ordered set of instances from which to build new URLs + :param in_gridname: current gridname from a representative INPUT url + :param in_instance: current instance from a representative INPUT url + + Returns: + instance_list: ordered list of instances to use for building a set of new urls. + """ + num_entries = len(list_of_times) + + # gridname = in_gridname # Get default values + instance = in_instance + + instance_list = num_entries * [instance] + + return instance_list + + + # Expect this to be part of a looped list of times from which appending will be applied + def construct_url_from_yaml(self, config, intime, instance, ensemble, gridname, hurricane_yaml_year=None, hurricane_yaml_source=None): + """ + Given a single time (%Y%m%d%H) or advisory, the gridname, instance, and ensemble values + use the entries in config to build a proper URL + If applying to Hurricanes, we need to also applyld_url_list_from_yaml_and_timest the values for hurricane_yaml_year, and + hurricane_yaml_source + """ + # hurricane_yaml_source is a special case scenario + if self.is_hurricane(intime): + self.logger.debug('Request for YAML build of Hurricane URL. subdir is %s', hurricane_yaml_source) + intime = str(intime) + subdir = hurricane_yaml_year # This is certainly NOT generalized + source = hurricane_yaml_source + else: + subdir = dt.datetime.strptime(intime, '%Y%m%d%H').year + source = 'nam' + + cfg = config['ADCIRC'] + url = cfg["baseurl"] + cfg["dodsCpart"] % ( + subdir, source, intime, cfg["AdcircGrid"] % (gridname), cfg["Machine"], cfg["Instance"] % (instance), cfg["Ensemble"] % (ensemble), + cfg["fortNumber"]) + + return url + + def construct_start_time_from_offset(self, stop_time, n_days): + """ + Construct an appropriate start_time given the stop_time and offset. + NOTE if this is a Hurricane advisory, we return an appropriate + advisory assuming each advisory is 6 hours in duration. No + negative advisories are returned + + Parameters: + stop_time (str) (%Y-%m-%d %H:%M:%S) + n_days: (int) number of 24-hour days to look back/forward + + """ + try: + if self.is_hurricane(stop_time): + num_6hour_look_asides = int(24 * n_days / 6) + stop_adv = int(stop_time) + start_adv = stop_adv + num_6hour_look_asides # We normally assume offset is negative but that is not enforced + + return start_adv + + t_stop = dt.datetime.strptime(stop_time, '%Y-%m-%d %H:%M:%S') + t_start = t_stop + dt.timedelta(days=n_days) + start_time = t_start.strftime('%Y-%m-%d %H:%M:%S') + + return start_time + + except Exception as e: + raise Exception('Fell out the bottom of construct_start_time_from_offset. Abort') from e diff --git a/src/common/utilities.py b/src/common/utilities.py deleted file mode 100644 index db0fa2f..0000000 --- a/src/common/utilities.py +++ /dev/null @@ -1,432 +0,0 @@ -''' -MIT License - -Copyright (c) 2022, 2023, 2024 Renaissance Computing Institute - -Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -''' - -""" - Time series extraction - - Authors: Jeffrey L. Tilson, Brian O. Blanton 8/2024 -""" - -import sys -import numpy as np -import pandas as pd -import re -import xarray as xr -from scipy import spatial as sp -from datetime import date, datetime -import time as tm - -# load the logger class -from src.common.logger import LoggingUtil - -# pylint: skip-file - -# get the log level and directory from the environment (or default). -log_level, log_path = LoggingUtil.prep_for_logging() - -# create a logger -logger = LoggingUtil.init_logging("geopoints_url", level=log_level, line_format='long', log_file_path=log_path) - -#logger.debug("utilities:Xarray Version: %s', xr.__version__) -Kmax=10 -got_kdtree=None -TOL=10e-5 -debug=True # False - -# Specify available reanalysis years -Ymin=1979 -Ymax=2023 -YEARS=[item for item in range(Ymin, Ymax+1)] -#logger.debug('utilities:Ymin, Ymax: %s, %s', Ymin,Ymax) - -fileext='.d0.no-unlim.T.rc.nc' -#fileext='.d4.no-unlim.T.rc.nc'; -#logger.debug('utilities:fileext: %s', fileext) - -# Default standard location is on the primary RENCI TDS -#urldirformat="http://tds.renci.org/thredds/dodsC/Reanalysis/ADCIRC/ERA5/hsofs/%d-post" -urldirformat="https://tdsres.apps.renci.org/thredds/dodsC/ReanalysisV2/ADCIRC/ERA5/hsofs.V2/%d-post" -#urldirformat="http://tds.renci.org/thredds/dodsC/Reanalysis/ADCIRC/ERA5/ec95d/%d" - -def get_adcirc_grid_from_ds(ds): - """ - """ - agdict = {} - agdict['lon'] = ds['x'][:] - agdict['lat'] = ds['y'][:] - agdict['ele'] = ds['element'][:,:] - 1 - agdict['depth'] = ds['depth'][:] - agdict['latmin'] = np.mean(ds['y'][:]) # needed for scaling lon/lat plots - return agdict - -def attach_element_areas(agdict): - """ - """ - x=agdict['lon'].values - y=agdict['lat'].values - e=agdict['ele'].values - - # COMPUTE GLOBAL DX,DY, Len, angles - i1=e[:,0] - i2=e[:,1] - i3=e[:,2] - - x1=x[i1];x2=x[i2];x3=x[i3]; - y1=y[i1];y2=y[i2];y3=y[i3]; - - # coordinate deltas - dx23=x2-x3 - dx31=x3-x1 - dx12=x1-x2 - dy23=y2-y3 - dy31=y3-y1 - dy12=y1-y2 - - # lengths of sides - a = np.sqrt(dx12*dx12 + dy12*dy12) - b = np.sqrt(dx31*dx31 + dy31*dy31) - c = np.sqrt(dx23*dx23 + dy23*dy23) - - agdict['areas'] = ( x1*dy23 + x2*dy31 + x3*dy12 )/2. - agdict['edge_lengths']=[a, b, c] - agdict['dl']=np.mean(agdict['edge_lengths'],axis=0) - - return agdict - -def basis2d_withinElement(phi): - """ - """ - interior_status = np.all(phi[:]<=1+TOL,axis=1) & np.all(phi[:]>=0-TOL,axis=1) - return interior_status - -def basis2d(agdict,xylist,j): - """ - """ - # check length of j and xylist - # check for needed arrays in agdict - phi=[] - #nodes for the elements in j - n3=agdict['ele'][j] - x=agdict['lon'][n3].values - x1=x[:,0];x2=x[:,1];x3=x[:,2]; - y=agdict['lat'][n3].values - y1=y[:,0];y2=y[:,1];y3=y[:,2]; - areaj=agdict['areas'][j] - xp=xylist[:,0] - yp=xylist[:,1] - # Basis function 1 - a=(x2*y3-x3*y2) - b=(y2-y3) - c=-(x2-x3) - phi0=(a+b*xp+c*yp)/(2.0*areaj) - # Basis function 2 - a=(x3*y1-x1*y3) - b=(y3-y1) - c=-(x3-x1) - phi1=(a+b*xp+c*yp)/(2.0*areaj) - # Basis function 3 - a=(x1*y2-x2*y1) - b=(y1-y2) - c=-(x1-x2) - phi2=(a+b*xp+c*yp)/(2.0*areaj) - return np.array([phi0, phi1, phi2]).T - -def get_adcirc_time_from_ds(ds): - """ - """ - return {'time': ds['time']} - -def f63_to_xr(url): - """ - """ - dropvars=['neta', 'nvel', 'max_nvdll', 'max_nvell'] - return xr.open_dataset(url,drop_variables=dropvars) - -def get_adcirc_slice_from_ds(ds,v,it=0): - """ - """ - advardict = {} - var = ds.variables[v] - if re.search('max', v) or re.search('depth', v): - var_d = var[:] # the actual data - else: - if ds.variables[v].dims[0] == 'node': - #logger.debug('ds: transposed data found') - var_d = var[it,:].T # the actual data - elif ds.variables[v].dims[0] == 'time': - var_d = var[:,it] # the actual data - else: - logger.debug('Unexpected leading variable name: %s. Abort', ds.variables[v].dims) - sys.exit(1) - #var_d[var_d.mask] = np.nan - advardict['var'] = var_d.data - return advardict - -def ComputeTree(agdict): - """ - Given lon,lat,ele in agdict,compute element centroids and - generate the ADCIRC grid KDTree - returns agdict with tree - """ - - global got_kdtree # Try not to if already done - t0=tm.time() - try: - x=agdict['lon'].values.ravel() # ravel; not needed - y=agdict['lat'].values.ravel() - e=agdict['ele'].values - except Exception as e: - logger.debug('Did not find lon,lat,ele data in agdict.') - sys.exit(1) - xe=np.mean(x[e],axis=1) - ye=np.mean(y[e],axis=1) - if got_kdtree is None: # Still want to build up the data for agdict, we just do not need the tree reevaluated for every year - agdict['tree']=tree = sp.KDTree(np.c_[xe,ye]) - got_kdtree=tree - else: - agdict['tree']=got_kdtree - logger.debug('Build annual KDTree time is: %s seconds', tm.time()-t0) - return agdict - -def ComputeQuery(xylist, agdict, kmax=10): - """ - Generate the kmax-set of nearest neighbors to each lon,lat pair in xylist. - Each test point (each lon/lat pair) gets associated distance (dd) and element (j) objects - At this stage it is possible that some test points are not interior to the nearest element. We will - subsequently check that. - - dd: num points by neighbors - j: num points by neighbors - """ - t0=tm.time() - agresults=dict() - dd, j = agdict['tree'].query(xylist, k=kmax) - if kmax==1: - dd=dd.reshape(-1,1) - j=j.reshape(-1,1) - agresults['distance']=dd - agresults['elements']=j - agresults['number_neighbors']=kmax - agresults['geopoints']=xylist # We shall use this later - logger.debug('KDTree query of size: %s took: %s seconds', kmax, tm.time()-t0) - return agresults - -def ComputeBasisRepresentation(xylist, agdict, agresults): - """ - For each test point with kmax number_neighbors, compute linear basis for - each neighbor. Then, check which, if any, element the test point actually resides within. - If none, then the returned basis functions (i.e., interpolation weights) are set to nans. - - If an input point is an "exact" grid point (i.e., ADCIRC grid node), then ambiguity - may arise regarding the best element and multiple True statuses can occur. Here we - also keep the nearest element value. We do this by reverse iterating in the zip function - """ - - # First build all the basis weights and determine if it was interior or not - t0=tm.time() - kmax = agresults['number_neighbors'] - j = agresults['elements'] - phival_list=list() - within_interior=list() - for k_value in range(0,kmax): - phival=basis2d(agdict,xylist,j[:,k_value]) - phival_list.append(phival) - within_interior.append(basis2d_withinElement(phival)) - - #detailed_weights_elements(phival_list, j) - - # Second only retain the "interior" results or nans if none - final_weights= np.full( (phival_list[0].shape[0],phival_list[0].shape[1]),np.nan) - final_jvals = np.full( j.T[0].shape[0],-99999) - final_status = np.full( within_interior[0].shape[0],False) - # Loop backwards. thus keeping the "nearest" True for each geopoints for each k in kmax - for pvals,jvals,testvals in zip(phival_list[::-1], j.T[::-1], within_interior[::-1]): # THis loops over Kmax values - final_weights[testvals] = pvals[testvals] - final_jvals[testvals]=jvals[testvals] - final_status[testvals] = testvals[testvals] - - agresults['final_weights']=final_weights - agresults['final_jvals']=final_jvals - agresults['final_status']=final_status - logger.info('Compute of basis took: %s seconds', tm.time()-t0) - # Keep the list if the user needs to know after the fact - outside_elements = np.argwhere(np.isnan(final_weights).all(axis=1)).ravel() - agresults['outside_elements']=outside_elements - return agresults - -def detailed_weights_elements(phival_list, j): - """ - This is only used for understanding better the detailed behavior of a particular grid - It is not invoked for general use - """ - for pvals,jvals in zip(phival_list,j.T): - df_pvals = pd.DataFrame(pvals, columns=['Phi0','Phi1','Phi2']) - df_pvals.index = df_pvals.index - df_jvals = pd.DataFrame(jvals+1,columns=['Element+1']) - df = pd.concat([df_pvals,df_jvals],axis=1) - df.index = df.index+1 - df.index = df.index.astype(int) - logger.debug('Dataframe: %s', df.loc[2].to_frame().T) - -def WaterLevelReductions(t, data_list, final_weights): - """ - Each data_list is a df for a single point containing 3 columns, one for - each node in the containing element. - These columns are reduced using the final_weights previously calculated - - A final df is returned with index=time and a single column for each of the - input test points (some of which may be partially or completely nan) - """ - try: - final_list = list() - for index,dataseries,weights in zip(range(0,len(data_list)), data_list,final_weights): - reduced_data = np.matmul(dataseries.values, weights.T) - df = pd.DataFrame(reduced_data, index=t, columns=[f'P{index+1}']) - final_list.append(df) - df_final_data = pd.concat(final_list, axis=1) - except Exception as e: - df_final_data=None - return df_final_data - -def WaterLevelSelection(t, data_list, final_weights): - """ - Each data_list is a df for a single point containing 3 columns, one for - each node in the containing element. - We choose the first column in the list that has any number of values. Moving forward - one can make this approach better by chosing the heighest weighted object with - actual values - - A final df is returned with index=time and a single column for each of the - input test points (some of which may be partially or completely nan) - """ - final_list = list() - ## Index is a loop over multiple possible lon/lat pairs - for index,dataseries,weights in zip(range(0,len(data_list)), data_list,final_weights): - df_single=pd.DataFrame(index=t) - count=0 - for vertex in dataseries.columns: # Loop over the 3 vertices and their weights in order - count +=1 - df_single[f'P{vertex}']=dataseries[vertex].values - if df_single.count()[0] > 0 : # df.notna().sum() - final_list.append(df_single) - logger.debug('Inserted one chosen df_single with non nan values for index %s at count number %s', index,count) - break - logger.debug('Do Selection water series update') - try: - df_final_data = pd.concat(final_list, axis=1) - except Exception as e: - df_final_data=None - logger.debug(f'This Exception usually simply means no data at the chosen lon/lat. But Exception is {e}') - return df_final_data - -def GenerateMetadata(agresults): - """ - Here we want to simply assist the user by reporting back the lon/lat values for each geopoint. - This should be the same as the input dataset. -99999 indicates an element was not found in the grid. - """ - - df_lonlat=pd.DataFrame(agresults['geopoints'], columns=['LON','LAT']) - df_elements = pd.DataFrame(agresults['final_jvals']+1, columns=['Element (1-based)']) - df_elements.replace(-99998,-99999,inplace=True) - df_meta=pd.concat( [df_lonlat,df_elements], axis=1) - df_meta['Point']=df_meta.index+1 - df_meta.set_index('Point', inplace=True) - df_meta.rename('P{}'.format, inplace=True) - - return df_meta - -def ConstructReducedWaterLevelData_from_ds(ds, agdict, agresults, variable_name=None): - """ - This method acquires ADCIRC water levels for the list of geopoints/elements. - For each specified point in the grid, the resulting time series' are reduced to a single time series using - a (basis2d) weighted sum. For a non-nan value to result in the final data, the product data must: - 1) Be non-nan for each time series at the specified time tick - 2) The test point must be interior to the specified element - """ - - if variable_name is None: - logger.error('User MUST supply the correct variable name') - sys.exit(1) - logger.debug('Variable name is: %s', variable_name) - t0 = tm.time() - - data_list=list() - t1=tm.time() - final_weights = agresults['final_weights'] - final_jvals = agresults['final_jvals'] - logger.debug('Time to acquire weigths and jvals: %s',tm.time()-t1) - - t1=tm.time() - acdict=get_adcirc_time_from_ds(ds) - t=acdict['time'].values - e = agdict['ele'].values - logger.debug('Time to acquire time and element values: %s', tm.time()-t1) - - logger.debug('Before removal of out-of-triangle jvals: %s', final_jvals.shape) - mask =(final_jvals == -99999) - final_jvals=final_jvals[~mask] - logger.debug(f'After removal of out-of-triangle jvals: %s', final_jvals.shape) - - t1=tm.time() - for vstation in final_jvals: - advardict = get_adcirc_slice_from_ds(ds,variable_name,it=e[vstation]) - df = pd.DataFrame(advardict['var']) - data_list.append(df) - logger.debug('Time to TDS fetch annual all test station (triplets) was: %s seconds', tm.time()-t1) - - #logger.info('Selecting the weighted mean time series') - #df_final=WaterLevelReductions(t, data_list, final_weights) - - logger.debug('Selecting the greedy alg: first in list with not all nans time series') - df_final=WaterLevelSelection(t, data_list, final_weights) - - t0=tm.time() - df_meta=GenerateMetadata(agresults) # This is here mostly for future considerations - logger.debug('Time to reduce annual: %s, test stations is: %s seconds', len(final_jvals), tm.time()-t0) - agresults['final_reduced_data']=df_final - agresults['final_meta_data']=df_meta - - return agresults - -# NOTE We do not need to rebuild the tree for each year since the grid is unchanged. -def Combined_pipeline(url, variable_name, lon, lat, nearest_neighbors=10): - """ - Interpolate for one year. - - df_excluded_geopoints lists only those stations excluded by element tests. - Some could be all nans due to dry points - - No flanks removed in this method as the caller may want to see everything - """ - t0=tm.time() - geopoints=np.array([[lon,lat]]) - ds = f63_to_xr(url) - agdict=get_adcirc_grid_from_ds(ds) - agdict=attach_element_areas(agdict) - logger.info('Compute_pipeline initiation: %s seconds', tm.time()-t0) - - logger.info('Start annual KDTree pipeline LON: %s LAT: %s', geopoints[0][0], geopoints[0][1]) - agdict=ComputeTree(agdict) - agresults=ComputeQuery(geopoints, agdict, kmax=nearest_neighbors) - agresults=ComputeBasisRepresentation(geopoints, agdict, agresults) - agresults=ConstructReducedWaterLevelData_from_ds(ds, agdict, agresults, variable_name=variable_name) - - logger.debug('Basis function Tolerance value is: %s', TOL) - logger.debug('List of %s stations not assigned to any grid element follows for kmax: %s', len(agresults["outside_elements"]), nearest_neighbors) - t0=tm.time() - df_product_data=agresults['final_reduced_data'] - df_product_metadata=agresults['final_meta_data'] - df_excluded_geopoints=pd.DataFrame(geopoints[agresults['outside_elements']], index=agresults['outside_elements']+1, columns=['lon','lat']) - logger.debug('Compute_pipeline cleanup: %s seconds', tm.time()-t0) - logger.debug('Finished annual Combined_pipeline') - - return df_product_data, df_product_metadata, df_excluded_geopoints diff --git a/src/common/utils.py b/src/common/utils.py index be2e80f..64cde19 100644 --- a/src/common/utils.py +++ b/src/common/utils.py @@ -14,7 +14,7 @@ import os import shutil -from enum import Enum, EnumType +from enum import Enum class GenUtils: @@ -73,7 +73,7 @@ def handle_instance_name(request_type: str, instance_name: Enum, reset: bool): ret_val = f'{request_type} instance name set to: {instance_name}' elif instance_name is None and os.path.exists(file_path): # open the config file for reading - with open(file_path, 'r') as fp: + with open(file_path, 'r', encoding='utf-8') as fp: # save the instance name in the file ret_val = fp.read() diff --git a/src/server.py b/src/server.py index ce730c6..0321408 100644 --- a/src/server.py +++ b/src/server.py @@ -14,8 +14,8 @@ import os import uuid import csv -from enum import EnumType +from enum import EnumType from typing import Union from fastapi import FastAPI, Query, Depends @@ -87,10 +87,6 @@ async def get_ui_data(grid_type: Union[str, None] = Query(default=None), event_t
   use_new_wb: Use the new catalog workbench code
   use_v3_sp: Use the new v3 data stored procedure """ - # pylint: disable=unused-argument - # pylint: disable=too-many-arguments - # pylint: disable=too-many-locals - # init the returned data and HTML status code ret_val: dict = {} status_code: int = 200 @@ -376,10 +372,6 @@ async def get_ui_data_file(file_name: Union[str, None] = Query(default='apsviz.j
   use_new_wb: Use the new catalog workbench code
   use_v3_sp: Use the new v3 data stored procedure """ - # pylint: disable=unused-argument - # pylint: disable=too-many-arguments - # pylint: disable=too-many-locals - # init the returned HTML status code status_code: int = 200 diff --git a/src/test/test_geopoints_url.py b/src/test/test_geopoints_url.py index 3004c90..65af126 100644 --- a/src/test/test_geopoints_url.py +++ b/src/test/test_geopoints_url.py @@ -6,7 +6,7 @@ import os from collections import namedtuple -import src.common.geopoints_url as gu +from src.common.geopoints_url import GeoPointsURL def test_geopoints_url(): @@ -46,8 +46,11 @@ def test_geopoints_url(): # init the named tuple for the nowcast call args = argsNT(-79.6725155674, 32.8596518752, None, 10, None, url, True, 'nowcast', 0) + # instantiate the geo-point URL class + gp_url = GeoPointsURL() + # call the function - df_nc = gu.main(args) + df_nc = gp_url.run(args) # check the return assert df_nc is not None @@ -55,8 +58,11 @@ def test_geopoints_url(): # init the named tuple for the forecast call args = argsNT(-79.6725155674, 32.8596518752, None, 10, None, url, True, None, 0) + # instantiate the geo-point URL class + gp_url = GeoPointsURL() + # call the function, - df_fc = gu.main(args) + df_fc = gp_url.run(args) # check the return assert df_fc is not None diff --git a/src/test/test_security.py b/src/test/test_security.py index 8528be7..036bedb 100644 --- a/src/test/test_security.py +++ b/src/test/test_security.py @@ -89,5 +89,5 @@ def test_access(): # execute the post ret_val = requests.get('http://localhost:4000/get_ui_data_secure?met_class=synoptic&limit=2', headers=auth_header, timeout=10) - # was the call unsuccessful + # assert if the call was unsuccessful assert ret_val.status_code == 200