Skip to content

Commit

Permalink
Merge branch 'master' of github.com:nens/threedi-results-analysis int…
Browse files Browse the repository at this point in the history
…o hoan-update
  • Loading branch information
hoanphungt committed Dec 6, 2024
2 parents 64326ec + 266f4ca commit 9ff8ae9
Show file tree
Hide file tree
Showing 20 changed files with 600 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Compatibility with Python 3.12 (#1061)
- Bumped h5py to 3.10.0 and scipy to 1.13.0 for python 3.12 compatibility (#1061)
- Added Processing Algorithm "Extract structure control actions" (#926)
- Fixed attributeError when loading a QGIS project (#1063)
- Fix in Rasters to NetCDF algorithm to properly convert the units Enum to string (#1067)

Expand Down
11 changes: 11 additions & 0 deletions datasource/result_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,14 @@

# TODO: QH is also defined above.
LAYER_OBJECT_TYPE_MAPPING = dict([(a[0], a[1]) for a in layer_information])

"""
Maps action type to affected structure.
This can be used when no control action is present for an object.
"""
ACTION_TYPE_ATTRIBUTE_MAP = {
"set_crest_level": {"variable": "dpumax", "unit": "m MSL", "applicable_structures": ["v2_weir", "v2_orifice"]},
"set_pump_capacity": {"variable": "capacity", "unit": "", "applicable_structures": None},
"set_discharge_coefficients": {"variable": "discharge_coefficient_positive", "unit": "", "applicable_structures": None},
"set_gate_level": {"variable": None, "unit": "m MSL", "applicable_structures": None}
}
14 changes: 2 additions & 12 deletions datasource/test_datasources.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from threedigrid.admin import gridresultadmin
from threedigrid.admin.constants import NO_DATA_VALUE
from threedi_results_analysis.datasource.threedi_results import find_aggregation_netcdf
from threedi_results_analysis.datasource.threedi_results import normalized_object_type
from threedi_results_analysis.datasource.threedi_results import ThreediResult
from threedi_results_analysis.tests.utilities import TemporaryDirectory
from threedigrid.admin import gridresultadmin
from threedigrid.admin.constants import NO_DATA_VALUE

import mock
import numpy as np
Expand Down Expand Up @@ -110,16 +110,6 @@ def test_get_timeseries_filter_node(threedi_result):
np.testing.assert_equal(time_series[:, 1], data.return_value[:, 0])


def test_get_timeseries_filter_content_pk(threedi_result):
with mock.patch(
"threedigrid.orm.base.models.Model.get_filtered_field_value"
) as data:
data.return_value = np.ones((len(threedi_result.timestamps), 1))
time_series = threedi_result.get_timeseries("s1", content_pk=5)
np.testing.assert_equal(time_series[:, 0], threedi_result.get_timestamps())
np.testing.assert_equal(time_series[:, 1], data.return_value[:, 0])


def test_get_timeseries_filter_fill_value(threedi_result):
with mock.patch(
"threedigrid.orm.base.models.Model.get_filtered_field_value"
Expand Down
203 changes: 180 additions & 23 deletions datasource/threedi_results.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from functools import cached_property
from threedigrid.admin.constants import NO_DATA_VALUE
from threedi_results_analysis.datasource.result_constants import LAYER_OBJECT_TYPE_MAPPING
from threedi_results_analysis.datasource.result_constants import (
ACTION_TYPE_ATTRIBUTE_MAP,
)
from threedi_results_analysis.datasource.result_constants import (
LAYER_OBJECT_TYPE_MAPPING,
)
from threedi_results_analysis.datasource.result_constants import SUBGRID_MAP_VARIABLES
from threedigrid.admin.constants import NO_DATA_VALUE
from threedigrid.admin.gridadmin import GridH5Admin
from threedigrid.admin.gridresultadmin import GridH5AggregateResultAdmin
from threedigrid.admin.gridresultadmin import GridH5ResultAdmin
from threedigrid.admin.gridresultadmin import GridH5StructureControl
from threedigrid.admin.gridresultadmin import GridH5WaterQualityResultAdmin
from threedigrid.admin.structure_controls.models import StructureControlSourceTypes
from threedigrid.admin.structure_controls.models import StructureControlTypes

import glob
import h5py
Expand Down Expand Up @@ -37,6 +45,7 @@ class ThreediResult():
- GridH5ResultAdmin
- GridH5AggregateResultAdmin
- GridH5WaterQualityResultAdmin
- GridH5StructureControl
For more information about threedigrid see
https://threedigrid.readthedocs.io/en/latest/
Expand Down Expand Up @@ -119,7 +128,30 @@ def available_water_quality_vars(self):
@property
def available_vars(self):
"""Return a list of all available variables"""
return self.available_subgrid_map_vars + self.available_aggregation_vars + self.available_water_quality_vars
return self.available_subgrid_map_vars + self.available_aggregation_vars + self.available_water_quality_vars + self.available_structure_control_actions_vars

@property
def available_structure_control_actions_vars(self):
"""Return a list of all structure control actions variables"""
ga = self.structure_control_actions_result_admin
if not ga:
return []
available_vars = []
for control_type in StructureControlTypes.__members__.values():
control_type_data = getattr(ga, control_type.name)
action_types = np.unique(control_type_data.action_type)
for action_type in action_types:
source_types = [cta.source_type.value for cta in control_type_data.group_by_action_type(action_type)]
var = {
"name": action_type[4:].replace("_", " ").capitalize(), # sanitize
"unit": ACTION_TYPE_ATTRIBUTE_MAP[action_type]["unit"],
"parameters": action_type,
"types": source_types
}
if var not in available_vars:
available_vars.append(var)

return available_vars

@cached_property
def timestamps(self):
Expand Down Expand Up @@ -180,13 +212,13 @@ def get_gridadmin(self, variable=None):
"""Return the gridadmin where the variable is stored. If no variable is
given, a gridadmin without results is returned.
Results are either stored in the 'results_3di.nc', 'aggregate_results_3di.nc'
or 'water_quality_results_3di.nc'. These make use of the GridH5ResultAdmin,
GridH5AggregateResultAdmin or GridH5WaterQualityResultAdmin to query the data
Results are either stored in the 'results_3di.nc', 'aggregate_results_3di.nc',
'water_quality_results_3di.nc' or 'structure_control_actions_3di.nc'. These make use of the GridH5ResultAdmin,
GridH5AggregateResultAdmin, GridH5WaterQualityResultAdmin or GridH5StructureControl to query the data
respectively.
:param variable: str of the variable name, e.g. 's1', 'q_pump'
:return: handle to GridAdminResult, AggregateGridAdminResult or GridH5WaterQualityResultAdmin
:return: handle to GridAdminResult, AggregateGridAdminResult, GridH5WaterQualityResultAdmin or GridH5StructureControl
"""
if variable is None:
return self.gridadmin
Expand All @@ -196,45 +228,132 @@ def get_gridadmin(self, variable=None):
return self.aggregate_result_admin
elif variable in [v["parameters"] for v in self.available_water_quality_vars]:
return self.water_quality_result_admin
elif variable in [v["parameters"] for v in self.available_structure_control_actions_vars]:
return self.structure_control_actions_result_admin
else:
raise AttributeError(f"Unknown subgrid or aggregate or water quality variable: {variable}")

def get_timeseries(
self, nc_variable, node_id=None, content_pk=None, fill_value=None
self, nc_variable, node_id=None, fill_value=None, selected_object_type=None
):
"""Return a time series array of the given variable
A 2d array is given, with first column being the timestamps in seconds.
The next columns are the values of the nodes of the given variable.
You can also filter on a specific node using node_id or content_pk,
You can also filter on a specific node using node_id,
in which case only the timeseries of the given node is returned.
If there is no values of the given variable, only the timestamps are
returned, i.e. an array of shape (n, 1) with n being the timestamps.
:param nc_variable:
:param node_id:
:param content_pk:
:param fill_value:
:param selected_object_type: layer type of selected feature
:return: 2D array, first column being the timestamps
"""
ga = self.get_gridadmin(nc_variable)

filtered_result = ga.get_model_instance_by_field_name(nc_variable).timeseries(
indexes=slice(None)
)
if node_id:
filtered_result = filtered_result.filter(id=node_id)
elif content_pk:
filtered_result = filtered_result.filter(content_pk=content_pk)
if isinstance(ga, GridH5StructureControl):
# GridH5StructureControl has a different interface compared to the other GridAdmin structures
return self.get_structure_control_action_timeseries(ga, nc_variable, node_id, selected_object_type, fill_value)
else:
filtered_result = ga.get_model_instance_by_field_name(nc_variable).timeseries(
indexes=slice(None)
)
if node_id:
filtered_result = filtered_result.filter(id=node_id)
values = self.get_timeseries_values(filtered_result, nc_variable)
if fill_value is not None:
values[values == NO_DATA_VALUE] = fill_value

timestamps = self.get_timestamps(nc_variable)
timestamps = timestamps.reshape(-1, 1) # reshape (n,) to (n, 1)

return np.hstack([timestamps, values])

def get_structure_control_action_timeseries(self, ga, nc_variable, node_id, selected_object_type, fill_value):
assert nc_variable
assert node_id
timestamps = []
values = []

prev_value = None
for control_type in StructureControlTypes.__members__.values():
control_type_data = getattr(ga, control_type.name)
structure_controls_for_id = control_type_data.group_by_grid_id(node_id)
structure_controls = [sc for sc in structure_controls_for_id if sc.action_type == nc_variable]

# It could be that the same action is applied on nodes, lines and pumps, we need to find the right one.
desired_type = StructureControlSourceTypes.LINES if selected_object_type == "flowline" else StructureControlSourceTypes.PUMPS
structure_controls = [sc for sc in structure_controls_for_id if sc.source_type == desired_type]

for structure_control in structure_controls:
assert len(structure_control.time) == len(structure_control.action_value_1)
for time_key, value in zip(structure_control.time, structure_control.action_value_1):
if prev_value is not None:
timestamps.append(time_key)
values.append(prev_value)
prev_value = value
timestamps.append(time_key)
values.append(value)

# Retrieve gridadmin structure
if selected_object_type == "flowline":
structure = self.gridadmin.lines.filter(id=node_id)
elif selected_object_type == "pump":
structure = self.gridadmin.pumps.filter(id=node_id)
else:
raise NotImplementedError("Plotting node control actions is not yet implemented")

# Check whether this object's content type is applicable for this control action
applicable_structures = ACTION_TYPE_ATTRIBUTE_MAP[nc_variable]["applicable_structures"]
if applicable_structures:
content_type = structure.content_type[0].decode()
if content_type not in applicable_structures:
# This action is not applicable to this object
logger.info(f"Parameter {nc_variable} not applicable for type {str(content_type)}")
return np.column_stack(([], []))

orig_timestamps = self.get_timestamps()
max_time_stamp_orig = max(orig_timestamps)
min_time_stamp_orig = min(orig_timestamps)
if timestamps:
assert values
max_time_stamp_structure = max(timestamps)
# Possibly append the structure control actions till the end
if values and max_time_stamp_orig > max_time_stamp_structure:
values.append(values[-1])
timestamps.append(max_time_stamp_structure)
values.append(values[-1])
timestamps.append(max_time_stamp_orig)

affected_nc_variable = ACTION_TYPE_ATTRIBUTE_MAP[nc_variable]["variable"]
if affected_nc_variable:
# Check if we need to prepend the plot with non-controlled (static) values
orig_value = getattr(structure, affected_nc_variable)[0]
if not timestamps:
# No actions at all, take original value to plot
assert not values
values = [orig_value, orig_value]
timestamps = [min_time_stamp_orig, max_time_stamp_orig]
else:
min_time_stamp_structure = min(timestamps)
if min_time_stamp_orig < min_time_stamp_structure:
values.insert(0, orig_value)
timestamps.insert(0, min_time_stamp_structure)
values.insert(0, orig_value)
timestamps.insert(0, min_time_stamp_orig)

if not values:
return np.column_stack(([], []))

values = self.get_timeseries_values(filtered_result, nc_variable)
if fill_value is not None:
values[values == NO_DATA_VALUE] = fill_value
for index in range(len(values)):
if values[index] == NO_DATA_VALUE:
values[index] = fill_value

timestamps = self.get_timestamps(nc_variable)
timestamps = timestamps.reshape(-1, 1) # reshape (n,) to (n, 1)
return np.hstack([timestamps, values])
return np.column_stack((timestamps, values))

def get_values_by_timestep_nr(self, variable, timestamp_idx, node_ids):
"""Return an array of values of the given variable on the specified
Expand Down Expand Up @@ -317,7 +436,7 @@ def result_admin(self):
# Note: passing a file-like object due to an issue in threedigrid
# https://github.com/nens/threedigrid/issues/183
file_like_object_h5 = open(h5, 'rb')
file_like_object_h5.startswith = lambda x: ''
file_like_object_h5.startswith = lambda x: False
file_like_object_nc = open(self.file_path, 'rb')
return GridH5ResultAdmin(file_like_object_h5, file_like_object_nc)

Expand Down Expand Up @@ -353,6 +472,22 @@ def water_quality_result_admin(self):
file_like_object_nc = open(wq_path, 'rb')
return GridH5WaterQualityResultAdmin(file_like_object_h5, file_like_object_nc)

@cached_property
def structure_control_actions_result_admin(self):
try:
# Note: both of these might raise the FileNotFoundError
sca_path = find_structure_control_actions_netcdf(self.file_path)
h5 = self.h5_path
except FileNotFoundError:
logger.exception("Structure control actions result not found")
return None
# Note: passing a file-like object due to an issue in threedigrid
# https://github.com/nens/threedigrid/issues/183
file_like_object_h5 = open(h5, 'rb')
file_like_object_h5.startswith = lambda x: False
file_like_object_nc = open(sca_path, 'rb')
return GridH5StructureControl(file_like_object_h5, file_like_object_nc)

@property
def short_model_slug(self):
model_slug = self.gridadmin.model_slug
Expand Down Expand Up @@ -388,6 +523,28 @@ def find_aggregation_netcdf(netcdf_file_path):
)


def find_structure_control_actions_netcdf(netcdf_file_path):
"""An ad-hoc way to find the structure control actions netcdf file
Args:
netcdf_file_path: path to the result netcdf
Returns:
the structure control actions netcdf path
Raises:
FileNotFoundError if nothing is found
"""
pattern = "structure_control_actions_3di.nc"
result_dir = os.path.dirname(netcdf_file_path)
sca_result_files = glob.glob(os.path.join(result_dir, pattern))
if sca_result_files:
return sca_result_files[0]
raise FileNotFoundError(
"'structure_control_actions_3di.nc' file not found relative to %s" % result_dir
)


def find_water_quality_netcdf(netcdf_file_path):
"""An ad-hoc way to find the water quality netcdf file
Expand Down
2 changes: 1 addition & 1 deletion dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
Dependency("Mako", "mako", "", False),
Dependency("cftime", "cftime", ">=1.5.0", False), # threedigrid[results]
Dependency("alembic", "alembic", "==1.8.*", False),
Dependency("threedigrid", "threedigrid", "==2.2.*", False),
Dependency("threedigrid", "threedigrid", "==2.3.*", False),
Dependency("threedi-schema", "threedi_schema", "==0.219.*", False),
Dependency("threedi-modelchecker", "threedi_modelchecker", "==2.6.*", False),
Dependency("threedidepth", "threedidepth", "==0.6.3", False),
Expand Down
Loading

0 comments on commit 9ff8ae9

Please sign in to comment.