Skip to content

Commit

Permalink
Reservoir indices (#25)
Browse files Browse the repository at this point in the history
* formatted files, created a script to move the hydrolakes shp file into zarr

* added reservoir code
  • Loading branch information
taddyb authored Jul 7, 2024
1 parent 1f2e8c0 commit f0ba6b5
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 64 deletions.
16 changes: 8 additions & 8 deletions marquette/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import zarr
from omegaconf import DictConfig

log = logging.getLogger(__name__)
log = logging.getLogger(name=__name__)


@hydra.main(
Expand All @@ -24,12 +24,8 @@ def main(cfg: DictConfig) -> None:
if cfg.name.lower() == "hydrofabric":
raise ImportError("Hydrofabric functionality not yet supported")
elif cfg.name.lower() == "merit":
from marquette.merit.create import (
create_edges,
create_N,
create_TMs,
write_streamflow,
)
from marquette.merit.create import (create_edges, create_N, create_TMs,
map_lake_points, write_streamflow)

start = time.perf_counter()
log.info(f"Creating MERIT {cfg.zone} River Graph")
Expand All @@ -44,6 +40,9 @@ def main(cfg: DictConfig) -> None:
log.info("Converting Streamflow to zarr")
write_streamflow(cfg, edges)

log.info("Mapping Lake Pour Points to Edges")
map_lake_points(cfg, edges)

log.info("Running Data Post-Processing Extensions")
run_extensions(cfg, edges)

Expand Down Expand Up @@ -87,7 +86,8 @@ def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None:
global_dhbv_static_inputs(cfg, edges)

if "incremental_drainage_area" in cfg.extensions:
from marquette.merit.extensions import calculate_incremental_drainage_area
from marquette.merit.extensions import \
calculate_incremental_drainage_area

log.info("Adding edge/catchment area input data to your MERIT River Graph")
if "incremental_drainage_area" in edges:
Expand Down
3 changes: 3 additions & 0 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ create_streamflow:
predictions: /projects/mhpi/yxs275/DM_output/dPL_local_daymet_new_attr_water_loss_v6v10_random_batch_all_merit_forward/
start_date: 01-01-1980
end_date: 12-31-2020
map_lake_points:
lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
extensions:
- soils_data
- pet_forcing
Expand Down
62 changes: 62 additions & 0 deletions marquette/merit/_map_lake_points.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging
from pathlib import Path

import geopandas as gpd
import numpy as np
import zarr
from omegaconf import DictConfig
from tqdm import tqdm

log = logging.getLogger(name=__name__)

def _map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None:
"""A function that reads in a gdf of hydrolakes information, finds its corresponding edge, then saves the data
Parameters
----------
cfg: DictConfig
The configuration object
edges: zarr.Group
The zarr group containing the edges
"""
data_path = Path(cfg.map_lake_points.lake_points)
if not data_path.exists():
msg = "Cannot find the lake points file"
log.exception(msg)
raise FileNotFoundError(msg)
gdf = gpd.read_file(data_path)
lake_comids = gdf["COMID"].astype(int).values
edges_comids : np.ndarray = edges["merit_basin"][:].astype(np.int32) # type: ignore

hylak_id = np.full(len(edges_comids), -1, dtype=np.int32)
grand_id = np.full_like(edges_comids, -1, dtype=np.int32)
lake_area = np.full_like(edges_comids, -1, dtype=np.float32)
vol_total = np.full_like(edges_comids, -1, dtype=np.float32)
depth_avg = np.full_like(edges_comids, -1, dtype=np.float32)

for idx, lake_id in enumerate(tqdm(
lake_comids,
desc="Mapping Lake COMIDS to edges",
ncols=140,
ascii=True,
)) :
jdx = np.where(edges_comids == lake_id)[0]
if not jdx.size:
log.info(f"No lake found for COMID {lake_id}")
else:
# Assumung the pour point is at the end of the COMID
edge_idx = jdx[-1]
lake_row = gdf.iloc[idx]
hylak_id[edge_idx] = lake_row["Hylak_id"]
grand_id[edge_idx] = lake_row["Grand_id"]
lake_area[edge_idx] = lake_row["Lake_area"]
vol_total[edge_idx] = lake_row["Vol_total"]
depth_avg[edge_idx] = lake_row["Depth_avg"]

edges.array("hylak_id", data=hylak_id)
edges.array("grand_id", data=grand_id)
edges.array("lake_area", data=lake_area)
edges.array("vol_total", data=vol_total)
edges.array("depth_avg", data=depth_avg)

log.info("Wrote Lake data for zones to zarr")
50 changes: 28 additions & 22 deletions marquette/merit/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,19 @@
from omegaconf import DictConfig
from tqdm import tqdm

from marquette.merit._connectivity_matrix import (
create_gage_connectivity,
map_gages_to_zone,
new_zone_connectivity,
)
from marquette.merit._connectivity_matrix import (create_gage_connectivity,
map_gages_to_zone,
new_zone_connectivity)
from marquette.merit._edge_calculations import (
calculate_num_edges,
create_segment,
find_flowlines,
many_segment_to_edge_partition,
singular_segment_to_edge_partition,
sort_xarray_dataarray,
)
calculate_num_edges, create_segment, find_flowlines,
many_segment_to_edge_partition, singular_segment_to_edge_partition,
sort_xarray_dataarray)
from marquette.merit._map_lake_points import _map_lake_points
from marquette.merit._streamflow_conversion_functions import (
calculate_huc10_flow_from_individual_files,
calculate_merit_flow,
separate_basins,
)
from marquette.merit._TM_calculations import (
create_HUC_MERIT_TM,
create_MERIT_FLOW_TM,
# create_sparse_MERIT_FLOW_TM,
join_geospatial_data,
)
calculate_huc10_flow_from_individual_files, calculate_merit_flow,
separate_basins)
from marquette.merit._TM_calculations import ( # create_sparse_MERIT_FLOW_TM,
create_HUC_MERIT_TM, create_MERIT_FLOW_TM, join_geospatial_data)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -242,3 +231,20 @@ def create_TMs(cfg: DictConfig, edges: zarr.Group) -> None:
# create_sparse_MERIT_FLOW_TM(cfg, edges)
# else:
create_MERIT_FLOW_TM(cfg, edges)


def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None:
"""Maps HydroLAKES pour points to the corresponding edge
Parameters
----------
cfg: DictConfig
The configuration object
edges: zarr.Group
The zarr group containing the edges
"""
if "hylak_id" in edges:
log.info("HydroLakes Intersection already exists in Zarr format")
else:
log.info("Mapping HydroLakes Pour Points to Edges")
_map_lake_points(cfg, edges)
66 changes: 38 additions & 28 deletions marquette/merit/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,19 @@ def calculate_incremental_drainage_area(cfg: DictConfig, edges: zarr.Group) -> N
)


def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
"""Creates Q` summed data for all edges in a given MERIT zone
Parameters:
----------
cfg: DictConfig
The configuration object.
edges: zarr.Group
The edges group in the MERIT zone
"""
n = 1 # number of splits (used for reducing memory load)
n = 1 # number of splits (used for reducing memory load)
cp.cuda.runtime.setDevice(2) # manually setting the device to 2

streamflow_group = Path(
f"/projects/mhpi/data/MERIT/streamflow/zarr/{cfg.create_streamflow.version}/{cfg.zone}"
)
Expand All @@ -245,16 +245,15 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
streamflow_zarr: zarr.Group = zarr.open_group(streamflow_group, mode="r")
streamflow_time = streamflow_zarr.time[:]
# _, counts = cp.unique(edges.segment_sorting_index[:], return_counts=True) # type: ignore
dim_0 : int = streamflow_zarr.time.shape[0] # type: ignore
dim_1 : int = streamflow_zarr.COMID.shape[0] # type: ignore
dim_0: int = streamflow_zarr.time.shape[0] # type: ignore
dim_1: int = streamflow_zarr.COMID.shape[0] # type: ignore
edge_ids = np.array(edges.id[:])
edges_segment_sorting_index = cp.array(edges.segment_sorting_index[:])
edge_index_mapping = {v: i for i, v in enumerate(edge_ids)}

q_prime_np = np.zeros([dim_0, dim_1]).transpose(1, 0)
streamflow_data = cp.array(streamflow_zarr.streamflow[:])


# Generating a networkx DiGraph object
df_path = Path(f"{cfg.create_edges.edges}").parent / f"{cfg.zone}_graph_df.csv"
if df_path.exists():
Expand All @@ -264,47 +263,58 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
target = []
for idx, _ in enumerate(
tqdm(
edges.id[:],
desc="creating graph in dictionary form",
ascii=True,
ncols=140,
)
):
edges.id[:],
desc="creating graph in dictionary form",
ascii=True,
ncols=140,
)
):
if edges.ds[idx] != "0_0":
source.append(idx)
target.append(edge_index_mapping[edges.ds[idx]])
df = pd.DataFrame({"source": source, "target": target})
df.to_csv(df_path, index=False)
G = nx.from_pandas_edgelist(df=df, create_using=nx.DiGraph(),)

time_split = np.array_split(streamflow_time, n) #type: ignore
G = nx.from_pandas_edgelist(
df=df,
create_using=nx.DiGraph(),
)

time_split = np.array_split(streamflow_time, n) # type: ignore
for idx, time_range in enumerate(time_split):
q_prime_cp = cp.zeros([time_range.shape[0], dim_1]).transpose(1, 0)

for jdx, _ in enumerate(tqdm(
edge_ids,
desc=f"calculating q` sum data for part {idx}",
ascii=True,
ncols=140,
)):
for jdx, _ in enumerate(
tqdm(
edge_ids,
desc=f"calculating q` sum data for part {idx}",
ascii=True,
ncols=140,
)
):
streamflow_ds_id = edges_segment_sorting_index[jdx]
# num_edges_in_comid = counts[streamflow_ds_id]
try:
graph = nx.descendants(G, jdx, backend="cugraph")
graph.add(jdx) # Adding the idx to ensure it's counted
downstream_idx = np.array(list(graph)) # type: ignore
downstream_comid_idx = cp.unique(edges_segment_sorting_index[downstream_idx]) # type: ignore
q_prime_cp[downstream_comid_idx] += streamflow_data[time_range, streamflow_ds_id] # type: ignore
downstream_comid_idx = cp.unique(
edges_segment_sorting_index[downstream_idx]
) # type: ignore
q_prime_cp[downstream_comid_idx] += streamflow_data[
time_range, streamflow_ds_id
] # type: ignore
except nx.exception.NetworkXError:
# This means there is no connectivity from this basin. It's one-node graph
q_prime_cp[streamflow_ds_id] = streamflow_data[time_range, streamflow_ds_id]

q_prime_cp[streamflow_ds_id] = streamflow_data[
time_range, streamflow_ds_id
]

print("Saving GPU Memory to CPU; freeing GPU Memory")
q_prime_np[:, time_range] = cp.asnumpy(q_prime_cp)
del q_prime_cp
cp.get_default_memory_pool().free_all_blocks()

edges.array(
name="summed_q_prime",
data=q_prime_np.transpose(1, 0),
)
)
8 changes: 2 additions & 6 deletions marquette/merit/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
from scipy.sparse import csr_matrix
from tqdm import tqdm

from marquette.merit._graph import (
Segment,
data_to_csv,
get_edge_counts,
segments_to_edges,
)
from marquette.merit._graph import (Segment, data_to_csv, get_edge_counts,
segments_to_edges)

log = logging.getLogger(__name__)

Expand Down
2 changes: 2 additions & 0 deletions requirements-cuda.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
cudf-cu12==24.6.*
dask-cudf-cu12==24.6.*
cugraph-cu12==24.6.*
cuspatial-cu12==24.6.*
cuproj-cu12==24.6.*
59 changes: 59 additions & 0 deletions scripts/hydrolakes_to_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import argparse
from pathlib import Path

import geopandas as gpd
import numcodecs
import zarr
from shapely.wkt import dumps
from tqdm import tqdm


def split_hydrolakes(input_path: Path, output_path: Path) -> None:

print(f"Reading gdf file: {input_path}")
gdf = gpd.read_file(filename=input_path)

print("Writing geometries")
geometries = gdf["geometry"].apply(lambda geom: dumps(geom)).values

# Create a Zarr store
root: zarr.Group = zarr.open_group(output_path, mode="a")

# Create datasets for each column
for column in tqdm(
gdf.columns,
desc="writing gdf to zarr",
ncols=140,
ascii=True,
):
if column == "geometry":
root.array(column, data=geometries, dtype=object, object_codec=numcodecs.VLenUTF8())
root.attrs["crs"] = gdf.crs.to_string()
else:
data = gdf[column].values
if data.dtype == 'O':
# Saving as an object
root.array(column, data=geometries, dtype=object, object_codec=numcodecs.VLenUTF8())
else:
root.array(column, data=data)

print(f"Processing complete! Zarr store saved to: {output_path}")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert a shapefile to a Zarr store")
parser.add_argument("input_shp", type=Path, help="Path to the input shapefile")
parser.add_argument(
"output_zarr", type=Path, help="Path to save the output Zarr store"
)

args = parser.parse_args()

input_path = Path(args.input_shp)
output_path = Path(args.output_zarr)
if not input_path.exists():
raise FileNotFoundError(f"Input shapefile not found: {input_path}")
if output_path.exists():
raise FileExistsError(f"Output Zarr store already exists: {output_path}")

split_hydrolakes(input_path, output_path)
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
import pytest


@pytest.fixture
def sample_gage_cfg():
with hydra.initialize(
Expand Down

0 comments on commit f0ba6b5

Please sign in to comment.