Skip to content

Commit

Permalink
Fixes 0.25ft resolution FIM cache bug
Browse files Browse the repository at this point in the history
Refs #1048
  • Loading branch information
shawncrawley authored and nickchadwick-noaa committed Jan 23, 2025
1 parent d93acb6 commit 2d551fe
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def lambda_handler(event, context):
s3_path_piece = ''

# Get db table names and setup db connection
print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
db_schema = db_fim_table.split(".")[0]
db_table = db_fim_table.split(".")[-1]
if any(x in db_schema for x in ["aep", "fim_catchments", "catfim"]):
Expand All @@ -121,7 +120,7 @@ def lambda_handler(event, context):
if "catchments" in db_fim_table:
df_inundation = create_inundation_catchment_boundary(huc8, branch)

print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
# print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
try:
df_inundation.to_postgis(f"{db_table}", con=process_db.engine, schema=db_schema, if_exists='append')
except Exception as e:
Expand All @@ -130,13 +129,11 @@ def lambda_handler(event, context):
process_db.engine.dispose()

else:
print(f"Processing FIM for huc {huc8} and branch {branch}")
print(f"Processing HUC-branch {huc8_branch} for {fim_config_name} for {date}T{hour}:00:00Z")

s3_path_piece = '/'.join([run_values[by] for by in process_by])
subsetted_data = f"{data_prefix}/{product}/{fim_config_name}/workspace/{date}/{hour}/data/{s3_path_piece}_data.csv"

print(f"Processing HUC {huc8} for {fim_config_name} for {date}T{hour}:00:00Z")

if input_variable == 'stage':
stage_lookup = s3_csv_to_df(data_bucket, subsetted_data)
stage_lookup = stage_lookup.set_index('hydro_id')
Expand All @@ -154,14 +151,14 @@ def lambda_handler(event, context):
stage_lookup = pd.DataFrame()
df_zero_stage_records = pd.DataFrame()
if catch_exists and hand_exists and rating_curve_exists:
print("->Calculating flood depth")
# print("->Calculating flood depth")
stage_lookup, df_zero_stage_records = calculate_stage_values(rating_curve_key, data_bucket, subsetted_data, huc8_branch) # get stages
else:
print(f"catchment, hand, or rating curve are missing for huc {huc8} and branch {branch}:\nCatchment exists: {catch_exists} ({catchment_key})\nHand exists: {hand_exists} ({hand_key})\nRating curve exists: {rating_curve_exists} ({rating_curve_key})")

# If not a reference/egis fim run, Upload zero_stage reaches for tracking / FIM cache
if fim_run_type != 'reference' and not df_zero_stage_records.empty:
print(f"Adding zero stage data to {db_table}_zero_stage")# Only process inundation configuration if available data
# print(f"Adding zero stage data to {db_table}_zero_stage")# Only process inundation configuration if available data
df_zero_stage_records = df_zero_stage_records.reset_index()
df_zero_stage_records.drop(columns=['hydro_id','feature_id'], inplace=True)
df_zero_stage_records.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=False)
Expand All @@ -184,7 +181,7 @@ def lambda_handler(event, context):
df_no_inundation = stage_lookup.merge(df_inundation.drop_duplicates(), on=['hand_id'],how='left',indicator=True)
df_no_inundation = df_no_inundation.loc[df_no_inundation['_merge'] == 'left_only']
if df_no_inundation.empty == False:
print(f"Adding {len(df_no_inundation)} reaches with NaN inundation to zero_stage table")
# print(f"Adding {len(df_no_inundation)} reaches with NaN inundation to zero_stage table")
df_no_inundation.drop(df_no_inundation.columns.difference(['hand_id','rc_discharge_cms','note']), axis=1, inplace=True)
df_no_inundation['note'] = "Error - No inundation returned from hand processing."
df_no_inundation.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=False)
Expand All @@ -193,7 +190,7 @@ def lambda_handler(event, context):
if df_inundation.empty:
return

print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
# print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
try:
df_inundation.drop(columns=['hydro_id', 'feature_id'], inplace=True)
df_inundation.to_sql(db_table, con=process_db.engine, schema=db_schema, if_exists='append', index=False)
Expand All @@ -218,7 +215,7 @@ def lambda_handler(event, context):
df_inundation['branch'] = branch
df_inundation = df_inundation.rename(columns={"index": "oid"})

print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
# print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
try:
df_inundation.to_postgis(f"{db_table}", con=process_db.engine, schema=db_schema, if_exists='append')
except Exception as e:
Expand All @@ -228,8 +225,6 @@ def lambda_handler(event, context):

print(f"Successfully processed tif for HUC {huc8} and branch {branch} for {product} for {reference_time}")

return

def create_inundation_catchment_boundary(huc8, branch):
"""
Creates the catchment boundary polygons
Expand All @@ -238,7 +233,7 @@ def create_inundation_catchment_boundary(huc8, branch):

catchment_dataset = None
try:
print("--> Connecting to S3 datasets")
# print("--> Connecting to S3 datasets")
tries = 0
raster_open_success = False
while tries < 3:
Expand All @@ -249,20 +244,20 @@ def create_inundation_catchment_boundary(huc8, branch):
except Exception as e:
tries += 1
time.sleep(30)
print(f"Failed to open datasets. Trying again in 30 seconds - ({e})")
# print(f"Failed to open datasets. Trying again in 30 seconds - ({e})")

if not raster_open_success:
raise HANDDatasetReadError("Failed to open HAND and Catchment datasets")

print("--> Setting up mapping array")
# print("--> Setting up mapping array")
profile = catchment_dataset.profile # get the rasterio profile so the output can use the profile and match the input # noqa

# set the output nodata to 0
profile['nodata'] = 0
profile['dtype'] = "int32"

# Open the output raster using rasterio. This will allow the inner function to be parallel and write to it
print("--> Setting up windows")
# print("--> Setting up windows")

# Get the list of windows according to the raster metadata so they can be looped through
windows = subdivide(riowindows.Window(0, 0, width=catchment_dataset.width, height=catchment_dataset.height), 1024, 1024)
Expand Down Expand Up @@ -297,7 +292,7 @@ def process(window):
except Exception as e:
tries += 1
time.sleep(10)
print(f"Failed to open catchment. Trying again in 10 seconds - ({e})")
# print(f"Failed to open catchment. Trying again in 10 seconds - ({e})")

if not catchment_open_success:
raise HANDDatasetReadError("Failed to open Catchment dataset window")
Expand All @@ -320,18 +315,18 @@ def process(window):
if catchment_dataset is not None:
catchment_dataset.close()

print("Generating polygons")
# print("Generating polygons")
crs = 'EPSG:3338' if str(huc8).startswith('19') else 'EPSG:5070'
df_final = gpd.GeoDataFrame(geoms, columns=['hydro_id', 'geom'], crs=crs, geometry="geom")
df_final = df_final.dissolve(by="hydro_id")
df_final = df_final.to_crs(3857)
df_final = df_final.set_crs('epsg:3857')

if df_final.index.has_duplicates:
print("dropping duplicates")
# print("dropping duplicates")
df_final = df_final.drop_duplicates()

print("Adding additional metadata columns")
# print("Adding additional metadata columns")
df_final = df_final.reset_index()
df_final = df_final.rename(columns={"index": "hydro_id"})
df_final['fim_version'] = FIM_VERSION
Expand All @@ -357,7 +352,7 @@ def create_inundation_output(huc8, branch, stage_lookup, reference_time, input_v
if not os.path.exists('/tmp/raw_rasters/'):
os.mkdir('/tmp/raw_rasters/')

print("--> Connecting to S3 datasets")
# print("--> Connecting to S3 datasets")
tries = 0
raster_open_success = False
while tries < 3:
Expand All @@ -374,7 +369,7 @@ def create_inundation_output(huc8, branch, stage_lookup, reference_time, input_v
if not raster_open_success:
raise HANDDatasetReadError("Failed to open HAND and Catchment datasets")

print("--> Setting up mapping array")
# print("--> Setting up mapping array")
catchment_nodata = int(catchment_dataset.nodata) # get no_data value for catchment raster
valid_catchments = stage_lookup.index.tolist() # parse lookup to get features with >0 stages # noqa
hydroids = stage_lookup.index.tolist() # parse lookup to get all features
Expand All @@ -399,7 +394,7 @@ def create_inundation_output(huc8, branch, stage_lookup, reference_time, input_v
profile['dtype'] = "int32"

# Open the output raster using rasterio. This will allow the inner function to be parallel and write to it
print("--> Setting up windows")
# print("--> Setting up windows")

# Get the list of windows according to the raster metadata so they can be looped through
windows = subdivide(riowindows.Window(0, 0, width=hand_dataset.width, height=hand_dataset.height), 1024, 1024)
Expand Down Expand Up @@ -510,7 +505,7 @@ def process(window):
if catchment_dataset is not None:
catchment_dataset.close()

print("Generating polygons")
# print("Generating polygons")
crs = 'EPSG:3338' if str(huc8).startswith('19') else 'EPSG:5070'
df_final = gpd.GeoDataFrame(geoms, columns=['hydro_id', 'geom'], crs=crs, geometry="geom")
df_final = df_final.dissolve(by="hydro_id")
Expand All @@ -521,17 +516,17 @@ def process(window):
df_final = df_final.join(stage_lookup).dropna()

if df_final.index.has_duplicates:
print("dropping duplicates")
# print("dropping duplicates")
df_final = df_final.drop_duplicates()

print("Converting m columns to ft")
# print("Converting m columns to ft")
df_final['rc_stage_ft'] = (df_final['rc_stage_m'] * 3.28084).astype(int)
df_final['rc_previous_stage_ft'] = round(df_final['rc_previous_stage_m'] * 3.28084, 2)
df_final['rc_discharge_cfs'] = round(df_final['rc_discharge_cms'] * 35.315, 2)
df_final['rc_previous_discharge_cfs'] = round(df_final['rc_previous_discharge_cms'] * 35.315, 2)
df_final = df_final.drop(columns=["rc_stage_m", "rc_previous_stage_m", "rc_discharge_cms", "rc_previous_discharge_cms"])

print("Adding additional metadata columns")
# print("Adding additional metadata columns")
df_final = df_final.reset_index()
df_final = df_final.rename(columns={"index": "hydro_id"})
df_final['fim_version'] = FIM_VERSION
Expand Down Expand Up @@ -562,9 +557,9 @@ def s3_csv_to_df(bucket, key, columns=None):
for i in range(5):
try:
# Read S3 csv file into Pandas DataFrame
print(f"Reading {key} from {bucket} into DataFrame")
# print(f"Reading {key} from {bucket} into DataFrame")
df = pd.read_csv(f"s3://{bucket}/{key}", **extra_pd_args)
print("DataFrame creation Successful")
# print("DataFrame creation Successful")
except Exception as e:
if i == 4: print(f"Failed to read from S3:\n{e}")
continue
Expand Down Expand Up @@ -599,21 +594,21 @@ def calculate_stage_values(hydrotable_key, subsetted_streams_bucket, subsetted_s
df_forecast = df_forecast.drop(columns=['huc8_branch', 'huc'])
df_forecast = df_forecast.set_index('hydro_id')

print(f"Removing {len(df_forecast[df_forecast['stage_m'].isna()])} reaches with a NaN interpolated stage")
# print(f"Removing {len(df_forecast[df_forecast['stage_m'].isna()])} reaches with a NaN interpolated stage")
df_zero_stage = df_forecast[df_forecast['stage_m'].isna()].copy()
df_zero_stage['note'] = "NaN Stage After Hydrotable Lookup"
df_forecast = df_forecast[~df_forecast['stage_m'].isna()]

stage0 = df_forecast['stage_m'] == 0
print(f"Removing {len(df_forecast[stage0])} reaches with a 0 interpolated stage")
# print(f"Removing {len(df_forecast[stage0])} reaches with a 0 interpolated stage")
df_zero_stage = pd.concat([df_zero_stage, df_forecast[stage0]], axis=0)
df_zero_stage['note'] = np.where(df_zero_stage.note.isnull(), "0 Stage After Hydrotable Lookup", "NaN")
df_forecast = df_forecast[~stage0]

df_zero_stage.drop(columns=['discharge_cms', 'stage_m', 'rc_stage_m', 'rc_previous_stage_m', 'rc_previous_discharge_cms'], inplace=True)

df_forecast = df_forecast.join(df_hydro_max)
print(f"{len(df_forecast)} reaches will be processed")
# print(f"{len(df_forecast)} reaches will be processed")

return df_forecast, df_zero_stage

Expand Down Expand Up @@ -663,14 +658,18 @@ def interpolate_stage(df_row, df_hydro):
if hydrotable_index >= len(stages):
exceeds_max = True
hydrotable_index = hydrotable_index - 1

hydrotable_previous_index = hydrotable_index-1
if CACHE_FIM_RESOLUTION_FT == 1 or exceeds_max:
# This is a simpler case because this is the resoluation at which the hydrotable is provided
rounded_stage = stages[hydrotable_index]
rc_discharge = discharges[hydrotable_index]
rc_previous_stage = stages[hydrotable_previous_index]
rc_previous_discharge = discharges[hydrotable_previous_index]
else:
rounded_stage = round_m_to_nearest_ft_resolution(interpolated_stage, CACHE_FIM_RESOLUTION_FT, CACHE_FIM_RESOLUTION_ROUNDING)
rc_previous_stage = stages[hydrotable_previous_index]
rc_discharge = discharges[hydrotable_index]
rc_previous_discharge = discharges[hydrotable_previous_index]
rc_discharge = round(np.interp(rounded_stage, stages, discharges), 2)
rc_previous_stage = max(rounded_stage - CACHE_FIM_RESOLUTION_FT, 0)
rc_previous_discharge = round(np.interp(rc_previous_stage, stages, discharges), 2)

return interpolated_stage, rounded_stage, rc_previous_stage, rc_discharge, rc_previous_discharge
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ CREATE TABLE {db_fim_table}
forecast_stage_ft double precision,
rc_discharge_cfs double precision,
rc_previous_discharge_cfs double precision,
rc_stage_ft integer,
rc_stage_ft double precision,
rc_previous_stage_ft integer,
max_rc_stage_ft double precision,
max_rc_discharge_cfs double precision,
Expand All @@ -37,7 +37,7 @@ DROP TABLE IF EXISTS {db_fim_table}_geo;

CREATE TABLE {db_fim_table}_geo (
hand_id integer,
rc_stage_ft integer,
rc_stage_ft double precision,
geom geometry(geometry, 3857)
);

Expand Down

0 comments on commit 2d551fe

Please sign in to comment.