diff --git a/Core/LAMBDA/viz_functions/image_based/viz_hand_fim_processing/lambda_function.py b/Core/LAMBDA/viz_functions/image_based/viz_hand_fim_processing/lambda_function.py index 783a726d..ff1c17da 100644 --- a/Core/LAMBDA/viz_functions/image_based/viz_hand_fim_processing/lambda_function.py +++ b/Core/LAMBDA/viz_functions/image_based/viz_hand_fim_processing/lambda_function.py @@ -107,7 +107,6 @@ def lambda_handler(event, context): s3_path_piece = '' # Get db table names and setup db connection - print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data db_schema = db_fim_table.split(".")[0] db_table = db_fim_table.split(".")[-1] if any(x in db_schema for x in ["aep", "fim_catchments", "catfim"]): @@ -121,7 +120,7 @@ def lambda_handler(event, context): if "catchments" in db_fim_table: df_inundation = create_inundation_catchment_boundary(huc8, branch) - print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data + # print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data try: df_inundation.to_postgis(f"{db_table}", con=process_db.engine, schema=db_schema, if_exists='append') except Exception as e: @@ -130,13 +129,11 @@ def lambda_handler(event, context): process_db.engine.dispose() else: - print(f"Processing FIM for huc {huc8} and branch {branch}") + print(f"Processing HUC-branch {huc8_branch} for {fim_config_name} for {date}T{hour}:00:00Z") s3_path_piece = '/'.join([run_values[by] for by in process_by]) subsetted_data = f"{data_prefix}/{product}/{fim_config_name}/workspace/{date}/{hour}/data/{s3_path_piece}_data.csv" - print(f"Processing HUC {huc8} for {fim_config_name} for {date}T{hour}:00:00Z") - if input_variable == 'stage': stage_lookup = s3_csv_to_df(data_bucket, subsetted_data) stage_lookup = stage_lookup.set_index('hydro_id') @@ -154,14 +151,14 @@ def lambda_handler(event, context): stage_lookup = pd.DataFrame() df_zero_stage_records = pd.DataFrame() if catch_exists and hand_exists and rating_curve_exists: - print("->Calculating flood depth") + # print("->Calculating flood depth") stage_lookup, df_zero_stage_records = calculate_stage_values(rating_curve_key, data_bucket, subsetted_data, huc8_branch) # get stages else: print(f"catchment, hand, or rating curve are missing for huc {huc8} and branch {branch}:\nCatchment exists: {catch_exists} ({catchment_key})\nHand exists: {hand_exists} ({hand_key})\nRating curve exists: {rating_curve_exists} ({rating_curve_key})") # If not a reference/egis fim run, Upload zero_stage reaches for tracking / FIM cache if fim_run_type != 'reference' and not df_zero_stage_records.empty: - print(f"Adding zero stage data to {db_table}_zero_stage")# Only process inundation configuration if available data + # print(f"Adding zero stage data to {db_table}_zero_stage")# Only process inundation configuration if available data df_zero_stage_records = df_zero_stage_records.reset_index() df_zero_stage_records.drop(columns=['hydro_id','feature_id'], inplace=True) df_zero_stage_records.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=False) @@ -184,7 +181,7 @@ def lambda_handler(event, context): df_no_inundation = stage_lookup.merge(df_inundation.drop_duplicates(), on=['hand_id'],how='left',indicator=True) df_no_inundation = df_no_inundation.loc[df_no_inundation['_merge'] == 'left_only'] if df_no_inundation.empty == False: - print(f"Adding {len(df_no_inundation)} reaches with NaN inundation to zero_stage table") + # print(f"Adding {len(df_no_inundation)} reaches with NaN inundation to zero_stage table") df_no_inundation.drop(df_no_inundation.columns.difference(['hand_id','rc_discharge_cms','note']), axis=1, inplace=True) df_no_inundation['note'] = "Error - No inundation returned from hand processing." df_no_inundation.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=False) @@ -193,7 +190,7 @@ def lambda_handler(event, context): if df_inundation.empty: return - print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data + # print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data try: df_inundation.drop(columns=['hydro_id', 'feature_id'], inplace=True) df_inundation.to_sql(db_table, con=process_db.engine, schema=db_schema, if_exists='append', index=False) @@ -218,7 +215,7 @@ def lambda_handler(event, context): df_inundation['branch'] = branch df_inundation = df_inundation.rename(columns={"index": "oid"}) - print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data + # print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data try: df_inundation.to_postgis(f"{db_table}", con=process_db.engine, schema=db_schema, if_exists='append') except Exception as e: @@ -228,8 +225,6 @@ def lambda_handler(event, context): print(f"Successfully processed tif for HUC {huc8} and branch {branch} for {product} for {reference_time}") - return - def create_inundation_catchment_boundary(huc8, branch): """ Creates the catchment boundary polygons @@ -238,7 +233,7 @@ def create_inundation_catchment_boundary(huc8, branch): catchment_dataset = None try: - print("--> Connecting to S3 datasets") + # print("--> Connecting to S3 datasets") tries = 0 raster_open_success = False while tries < 3: @@ -249,12 +244,12 @@ def create_inundation_catchment_boundary(huc8, branch): except Exception as e: tries += 1 time.sleep(30) - print(f"Failed to open datasets. Trying again in 30 seconds - ({e})") + # print(f"Failed to open datasets. Trying again in 30 seconds - ({e})") if not raster_open_success: raise HANDDatasetReadError("Failed to open HAND and Catchment datasets") - print("--> Setting up mapping array") + # print("--> Setting up mapping array") profile = catchment_dataset.profile # get the rasterio profile so the output can use the profile and match the input # noqa # set the output nodata to 0 @@ -262,7 +257,7 @@ def create_inundation_catchment_boundary(huc8, branch): profile['dtype'] = "int32" # Open the output raster using rasterio. This will allow the inner function to be parallel and write to it - print("--> Setting up windows") + # print("--> Setting up windows") # Get the list of windows according to the raster metadata so they can be looped through windows = subdivide(riowindows.Window(0, 0, width=catchment_dataset.width, height=catchment_dataset.height), 1024, 1024) @@ -297,7 +292,7 @@ def process(window): except Exception as e: tries += 1 time.sleep(10) - print(f"Failed to open catchment. Trying again in 10 seconds - ({e})") + # print(f"Failed to open catchment. Trying again in 10 seconds - ({e})") if not catchment_open_success: raise HANDDatasetReadError("Failed to open Catchment dataset window") @@ -320,7 +315,7 @@ def process(window): if catchment_dataset is not None: catchment_dataset.close() - print("Generating polygons") + # print("Generating polygons") crs = 'EPSG:3338' if str(huc8).startswith('19') else 'EPSG:5070' df_final = gpd.GeoDataFrame(geoms, columns=['hydro_id', 'geom'], crs=crs, geometry="geom") df_final = df_final.dissolve(by="hydro_id") @@ -328,10 +323,10 @@ def process(window): df_final = df_final.set_crs('epsg:3857') if df_final.index.has_duplicates: - print("dropping duplicates") + # print("dropping duplicates") df_final = df_final.drop_duplicates() - print("Adding additional metadata columns") + # print("Adding additional metadata columns") df_final = df_final.reset_index() df_final = df_final.rename(columns={"index": "hydro_id"}) df_final['fim_version'] = FIM_VERSION @@ -357,7 +352,7 @@ def create_inundation_output(huc8, branch, stage_lookup, reference_time, input_v if not os.path.exists('/tmp/raw_rasters/'): os.mkdir('/tmp/raw_rasters/') - print("--> Connecting to S3 datasets") + # print("--> Connecting to S3 datasets") tries = 0 raster_open_success = False while tries < 3: @@ -374,7 +369,7 @@ def create_inundation_output(huc8, branch, stage_lookup, reference_time, input_v if not raster_open_success: raise HANDDatasetReadError("Failed to open HAND and Catchment datasets") - print("--> Setting up mapping array") + # print("--> Setting up mapping array") catchment_nodata = int(catchment_dataset.nodata) # get no_data value for catchment raster valid_catchments = stage_lookup.index.tolist() # parse lookup to get features with >0 stages # noqa hydroids = stage_lookup.index.tolist() # parse lookup to get all features @@ -399,7 +394,7 @@ def create_inundation_output(huc8, branch, stage_lookup, reference_time, input_v profile['dtype'] = "int32" # Open the output raster using rasterio. This will allow the inner function to be parallel and write to it - print("--> Setting up windows") + # print("--> Setting up windows") # Get the list of windows according to the raster metadata so they can be looped through windows = subdivide(riowindows.Window(0, 0, width=hand_dataset.width, height=hand_dataset.height), 1024, 1024) @@ -510,7 +505,7 @@ def process(window): if catchment_dataset is not None: catchment_dataset.close() - print("Generating polygons") + # print("Generating polygons") crs = 'EPSG:3338' if str(huc8).startswith('19') else 'EPSG:5070' df_final = gpd.GeoDataFrame(geoms, columns=['hydro_id', 'geom'], crs=crs, geometry="geom") df_final = df_final.dissolve(by="hydro_id") @@ -521,17 +516,17 @@ def process(window): df_final = df_final.join(stage_lookup).dropna() if df_final.index.has_duplicates: - print("dropping duplicates") + # print("dropping duplicates") df_final = df_final.drop_duplicates() - print("Converting m columns to ft") + # print("Converting m columns to ft") df_final['rc_stage_ft'] = (df_final['rc_stage_m'] * 3.28084).astype(int) df_final['rc_previous_stage_ft'] = round(df_final['rc_previous_stage_m'] * 3.28084, 2) df_final['rc_discharge_cfs'] = round(df_final['rc_discharge_cms'] * 35.315, 2) df_final['rc_previous_discharge_cfs'] = round(df_final['rc_previous_discharge_cms'] * 35.315, 2) df_final = df_final.drop(columns=["rc_stage_m", "rc_previous_stage_m", "rc_discharge_cms", "rc_previous_discharge_cms"]) - print("Adding additional metadata columns") + # print("Adding additional metadata columns") df_final = df_final.reset_index() df_final = df_final.rename(columns={"index": "hydro_id"}) df_final['fim_version'] = FIM_VERSION @@ -562,9 +557,9 @@ def s3_csv_to_df(bucket, key, columns=None): for i in range(5): try: # Read S3 csv file into Pandas DataFrame - print(f"Reading {key} from {bucket} into DataFrame") + # print(f"Reading {key} from {bucket} into DataFrame") df = pd.read_csv(f"s3://{bucket}/{key}", **extra_pd_args) - print("DataFrame creation Successful") + # print("DataFrame creation Successful") except Exception as e: if i == 4: print(f"Failed to read from S3:\n{e}") continue @@ -599,13 +594,13 @@ def calculate_stage_values(hydrotable_key, subsetted_streams_bucket, subsetted_s df_forecast = df_forecast.drop(columns=['huc8_branch', 'huc']) df_forecast = df_forecast.set_index('hydro_id') - print(f"Removing {len(df_forecast[df_forecast['stage_m'].isna()])} reaches with a NaN interpolated stage") + # print(f"Removing {len(df_forecast[df_forecast['stage_m'].isna()])} reaches with a NaN interpolated stage") df_zero_stage = df_forecast[df_forecast['stage_m'].isna()].copy() df_zero_stage['note'] = "NaN Stage After Hydrotable Lookup" df_forecast = df_forecast[~df_forecast['stage_m'].isna()] stage0 = df_forecast['stage_m'] == 0 - print(f"Removing {len(df_forecast[stage0])} reaches with a 0 interpolated stage") + # print(f"Removing {len(df_forecast[stage0])} reaches with a 0 interpolated stage") df_zero_stage = pd.concat([df_zero_stage, df_forecast[stage0]], axis=0) df_zero_stage['note'] = np.where(df_zero_stage.note.isnull(), "0 Stage After Hydrotable Lookup", "NaN") df_forecast = df_forecast[~stage0] @@ -613,7 +608,7 @@ def calculate_stage_values(hydrotable_key, subsetted_streams_bucket, subsetted_s df_zero_stage.drop(columns=['discharge_cms', 'stage_m', 'rc_stage_m', 'rc_previous_stage_m', 'rc_previous_discharge_cms'], inplace=True) df_forecast = df_forecast.join(df_hydro_max) - print(f"{len(df_forecast)} reaches will be processed") + # print(f"{len(df_forecast)} reaches will be processed") return df_forecast, df_zero_stage @@ -663,14 +658,18 @@ def interpolate_stage(df_row, df_hydro): if hydrotable_index >= len(stages): exceeds_max = True hydrotable_index = hydrotable_index - 1 - + hydrotable_previous_index = hydrotable_index-1 if CACHE_FIM_RESOLUTION_FT == 1 or exceeds_max: + # This is a simpler case because this is the resoluation at which the hydrotable is provided rounded_stage = stages[hydrotable_index] + rc_discharge = discharges[hydrotable_index] + rc_previous_stage = stages[hydrotable_previous_index] + rc_previous_discharge = discharges[hydrotable_previous_index] else: rounded_stage = round_m_to_nearest_ft_resolution(interpolated_stage, CACHE_FIM_RESOLUTION_FT, CACHE_FIM_RESOLUTION_ROUNDING) - rc_previous_stage = stages[hydrotable_previous_index] - rc_discharge = discharges[hydrotable_index] - rc_previous_discharge = discharges[hydrotable_previous_index] - + rc_discharge = round(np.interp(rounded_stage, stages, discharges), 2) + rc_previous_stage = max(rounded_stage - CACHE_FIM_RESOLUTION_FT, 0) + rc_previous_discharge = round(np.interp(rc_previous_stage, stages, discharges), 2) + return interpolated_stage, rounded_stage, rc_previous_stage, rc_discharge, rc_previous_discharge \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/fim_caching_templates/0_create_or_truncate_tables.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/fim_caching_templates/0_create_or_truncate_tables.sql index b4104d5f..897bd15a 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/fim_caching_templates/0_create_or_truncate_tables.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/fim_caching_templates/0_create_or_truncate_tables.sql @@ -23,7 +23,7 @@ CREATE TABLE {db_fim_table} forecast_stage_ft double precision, rc_discharge_cfs double precision, rc_previous_discharge_cfs double precision, - rc_stage_ft integer, + rc_stage_ft double precision, rc_previous_stage_ft integer, max_rc_stage_ft double precision, max_rc_discharge_cfs double precision, @@ -37,7 +37,7 @@ DROP TABLE IF EXISTS {db_fim_table}_geo; CREATE TABLE {db_fim_table}_geo ( hand_id integer, - rc_stage_ft integer, + rc_stage_ft double precision, geom geometry(geometry, 3857) );