Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding binsparse mapping #22

Merged
merged 7 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: MERIT
data_path: /projects/mhpi/data/${name}
zone: 73
zone: 74
create_edges:
buffer: 0.3334
dx: 2000
Expand All @@ -19,7 +19,7 @@ create_N:
create_TMs:
MERIT:
save_sparse: True
TM: ${data_path}/zarr/TMs/MERIT_FLOWLINES_${zone}
TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone}
shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_streamflow:
version: merit_conus_v3.0
Expand Down
115 changes: 102 additions & 13 deletions marquette/merit/_TM_calculations.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import logging
from pathlib import Path

import binsparse
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
import zarr
from omegaconf import DictConfig
from scipy import sparse
from tqdm import tqdm

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,6 +58,85 @@ def create_HUC_MERIT_TM(
xr_dataset.to_zarr(zarr_path, mode="w")


def format_pairs(gage_output: dict):
pairs = []
for comid, edge_id in zip(gage_output["comid_idx"], gage_output["edge_id_idx"]):
for edge in edge_id:
# Check if upstream is a list (multiple connections)
if isinstance(edge, list):
for _id in edge:
# Replace None with np.NaN for consistency
if _id is None:
_id = np.NaN
pairs.append((comid, _id))
else:
# Handle single connection (not a list)
if edge is None:
edge = np.NaN
pairs.append((comid, edge))

return pairs


def create_coo_data(sparse_matrix, root: zarr.Group):
"""
Creates coordinate format (COO) data from river graph output for a specific gage.

This function processes the river graph data (specifically the 'ds' and 'up' arrays)
to create a list of pairs representing connections in the graph. These pairs are then
stored in a Zarr dataset within a group specific to a gage, identified by 'padded_gage_id'.

Parameters:
gage_output: The output from a river graph traversal, containing 'ds' and 'up' keys.
padded_gage_id (str): The identifier for the gage, used to create a specific group in Zarr.
root (zarr.Group): The root Zarr group where the dataset will be stored.

"""
values = sparse_matrix["values"]
pairs = format_pairs(sparse_matrix)

# Create a Zarr dataset for this specific gage
root.create_dataset("pairs", data=np.array(pairs), chunks=(10000,), dtype="int32")
root.array("values", data=np.array(values), chunks=(10000,), dtype="float32")


def create_sparse_MERIT_FLOW_TM(
cfg: DictConfig, edges: zarr.hierarchy.Group
) -> zarr.hierarchy.Group:
"""
Creating a sparse TM that maps MERIT basins to their reaches. Flow predictions are distributed
based on reach length/ total merit reach length
:param cfg:
:param edges:
:param huc_to_merit_TM:
:return:
"""
log.info("Using Edge COMIDs for TM")
COMIDs = np.unique(edges.merit_basin[:]) # already sorted
gage_coo_root = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="a")
merit_basin = edges.merit_basin[:]
river_graph_len = edges.len[:]
river_graph = {"values": [], "comid_idx": [], "edge_id_idx": []}
for comid_idx, basin_id in enumerate(
tqdm(
COMIDs,
desc="Creating a sparse TM Mapping MERIT basins to their edges",
ncols=140,
ascii=True,
)
):
col_indices = np.where(merit_basin == basin_id)[0]
total_length = np.sum(river_graph_len[col_indices])
if total_length == 0:
print("Basin not found:", basin_id)
continue
proportions = river_graph_len[col_indices] / total_length
river_graph["comid_idx"].append(comid_idx)
river_graph["edge_id_idx"].append(col_indices.tolist())
river_graph["values"].extend(proportions.tolist())
create_coo_data(river_graph, gage_coo_root)


def create_MERIT_FLOW_TM(
cfg: DictConfig, edges: zarr.hierarchy.Group
) -> zarr.hierarchy.Group:
Expand Down Expand Up @@ -113,19 +194,27 @@ def create_MERIT_FLOW_TM(
for idx, proportion in zip(indices, proportions):
column_index = np.where(river_graph_ids == river_graph_ids[idx])[0][0]
data_np[i][column_index] = proportion
data_array = xr.DataArray(
data=data_np,
dims=["COMID", "EDGEID"], # Explicitly naming the dimensions
coords={"COMID": COMIDs, "EDGEID": river_graph_ids}, # Adding coordinates
)
xr_dataset = xr.Dataset(
data_vars={"TM": data_array},
attrs={"description": "MERIT -> Edge Transition Matrix"},
)
log.info("Writing MERIT TM to zarr store")
zarr_path = Path(cfg.create_TMs.MERIT.TM)
xr_dataset.to_zarr(zarr_path, mode="w")
# zarr_hierarchy = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="r")

if cfg.create_TMs.MERIT.save_sparse:
log.info("Writing to sparse matrix")
gage_coo_root = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="a")
matrix = sparse.csr_matrix(data_np)
binsparse.write(gage_coo_root, "TM", matrix)
log.info("Sparse matrix written")
else:
data_array = xr.DataArray(
data=data_np,
dims=["COMID", "EDGEID"], # Explicitly naming the dimensions
coords={"COMID": COMIDs, "EDGEID": river_graph_ids}, # Adding coordinates
)
xr_dataset = xr.Dataset(
data_vars={"TM": data_array},
attrs={"description": "MERIT -> Edge Transition Matrix"},
)
log.info("Writing MERIT TM to zarr store")
zarr_path = Path(cfg.create_TMs.MERIT.TM)
xr_dataset.to_zarr(zarr_path, mode="w")
# zarr_hierarchy = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="r")


def join_geospatial_data(cfg: DictConfig) -> gpd.GeoDataFrame:
Expand Down
4 changes: 4 additions & 0 deletions marquette/merit/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from marquette.merit._TM_calculations import (
create_HUC_MERIT_TM,
create_MERIT_FLOW_TM,
# create_sparse_MERIT_FLOW_TM,
join_geospatial_data,
)

Expand Down Expand Up @@ -237,4 +238,7 @@ def create_TMs(cfg: DictConfig, edges: zarr.Group) -> None:
log.info("MERIT -> FLOWLINE data already exists in zarr format")
else:
log.info("Creating MERIT -> FLOWLINE TM")
# if cfg.create_TMs.MERIT.save_sparse:
# create_sparse_MERIT_FLOW_TM(cfg, edges)
# else:
create_MERIT_FLOW_TM(cfg, edges)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
git+https://github.com/ivirshup/binsparse-python.git@main
dask[complete]
polars
dask-expr
Expand Down
Loading