Skip to content

Commit

Permalink
Structure control action plotting (#1069)
Browse files Browse the repository at this point in the history
  • Loading branch information
benvanbasten-ns authored Dec 5, 2024
1 parent cde9e9d commit 266f4ca
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 62 deletions.
11 changes: 11 additions & 0 deletions datasource/result_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,14 @@

# TODO: QH is also defined above.
LAYER_OBJECT_TYPE_MAPPING = dict([(a[0], a[1]) for a in layer_information])

"""
Maps action type to affected structure.
This can be used when no control action is present for an object.
"""
ACTION_TYPE_ATTRIBUTE_MAP = {
"set_crest_level": {"variable": "dpumax", "unit": "m MSL", "applicable_structures": ["v2_weir", "v2_orifice"]},
"set_pump_capacity": {"variable": "capacity", "unit": "", "applicable_structures": None},
"set_discharge_coefficients": {"variable": "discharge_coefficient_positive", "unit": "", "applicable_structures": None},
"set_gate_level": {"variable": None, "unit": "m MSL", "applicable_structures": None}
}
14 changes: 2 additions & 12 deletions datasource/test_datasources.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from threedigrid.admin import gridresultadmin
from threedigrid.admin.constants import NO_DATA_VALUE
from threedi_results_analysis.datasource.threedi_results import find_aggregation_netcdf
from threedi_results_analysis.datasource.threedi_results import normalized_object_type
from threedi_results_analysis.datasource.threedi_results import ThreediResult
from threedi_results_analysis.tests.utilities import TemporaryDirectory
from threedigrid.admin import gridresultadmin
from threedigrid.admin.constants import NO_DATA_VALUE

import mock
import numpy as np
Expand Down Expand Up @@ -110,16 +110,6 @@ def test_get_timeseries_filter_node(threedi_result):
np.testing.assert_equal(time_series[:, 1], data.return_value[:, 0])


def test_get_timeseries_filter_content_pk(threedi_result):
with mock.patch(
"threedigrid.orm.base.models.Model.get_filtered_field_value"
) as data:
data.return_value = np.ones((len(threedi_result.timestamps), 1))
time_series = threedi_result.get_timeseries("s1", content_pk=5)
np.testing.assert_equal(time_series[:, 0], threedi_result.get_timestamps())
np.testing.assert_equal(time_series[:, 1], data.return_value[:, 0])


def test_get_timeseries_filter_fill_value(threedi_result):
with mock.patch(
"threedigrid.orm.base.models.Model.get_filtered_field_value"
Expand Down
203 changes: 180 additions & 23 deletions datasource/threedi_results.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from functools import cached_property
from threedigrid.admin.constants import NO_DATA_VALUE
from threedi_results_analysis.datasource.result_constants import LAYER_OBJECT_TYPE_MAPPING
from threedi_results_analysis.datasource.result_constants import (
ACTION_TYPE_ATTRIBUTE_MAP,
)
from threedi_results_analysis.datasource.result_constants import (
LAYER_OBJECT_TYPE_MAPPING,
)
from threedi_results_analysis.datasource.result_constants import SUBGRID_MAP_VARIABLES
from threedigrid.admin.constants import NO_DATA_VALUE
from threedigrid.admin.gridadmin import GridH5Admin
from threedigrid.admin.gridresultadmin import GridH5AggregateResultAdmin
from threedigrid.admin.gridresultadmin import GridH5ResultAdmin
from threedigrid.admin.gridresultadmin import GridH5StructureControl
from threedigrid.admin.gridresultadmin import GridH5WaterQualityResultAdmin
from threedigrid.admin.structure_controls.models import StructureControlSourceTypes
from threedigrid.admin.structure_controls.models import StructureControlTypes

import glob
import h5py
Expand Down Expand Up @@ -37,6 +45,7 @@ class ThreediResult():
- GridH5ResultAdmin
- GridH5AggregateResultAdmin
- GridH5WaterQualityResultAdmin
- GridH5StructureControl
For more information about threedigrid see
https://threedigrid.readthedocs.io/en/latest/
Expand Down Expand Up @@ -119,7 +128,30 @@ def available_water_quality_vars(self):
@property
def available_vars(self):
"""Return a list of all available variables"""
return self.available_subgrid_map_vars + self.available_aggregation_vars + self.available_water_quality_vars
return self.available_subgrid_map_vars + self.available_aggregation_vars + self.available_water_quality_vars + self.available_structure_control_actions_vars

@property
def available_structure_control_actions_vars(self):
"""Return a list of all structure control actions variables"""
ga = self.structure_control_actions_result_admin
if not ga:
return []
available_vars = []
for control_type in StructureControlTypes.__members__.values():
control_type_data = getattr(ga, control_type.name)
action_types = np.unique(control_type_data.action_type)
for action_type in action_types:
source_types = [cta.source_type.value for cta in control_type_data.group_by_action_type(action_type)]
var = {
"name": action_type[4:].replace("_", " ").capitalize(), # sanitize
"unit": ACTION_TYPE_ATTRIBUTE_MAP[action_type]["unit"],
"parameters": action_type,
"types": source_types
}
if var not in available_vars:
available_vars.append(var)

return available_vars

@cached_property
def timestamps(self):
Expand Down Expand Up @@ -180,13 +212,13 @@ def get_gridadmin(self, variable=None):
"""Return the gridadmin where the variable is stored. If no variable is
given, a gridadmin without results is returned.
Results are either stored in the 'results_3di.nc', 'aggregate_results_3di.nc'
or 'water_quality_results_3di.nc'. These make use of the GridH5ResultAdmin,
GridH5AggregateResultAdmin or GridH5WaterQualityResultAdmin to query the data
Results are either stored in the 'results_3di.nc', 'aggregate_results_3di.nc',
'water_quality_results_3di.nc' or 'structure_control_actions_3di.nc'. These make use of the GridH5ResultAdmin,
GridH5AggregateResultAdmin, GridH5WaterQualityResultAdmin or GridH5StructureControl to query the data
respectively.
:param variable: str of the variable name, e.g. 's1', 'q_pump'
:return: handle to GridAdminResult, AggregateGridAdminResult or GridH5WaterQualityResultAdmin
:return: handle to GridAdminResult, AggregateGridAdminResult, GridH5WaterQualityResultAdmin or GridH5StructureControl
"""
if variable is None:
return self.gridadmin
Expand All @@ -196,45 +228,132 @@ def get_gridadmin(self, variable=None):
return self.aggregate_result_admin
elif variable in [v["parameters"] for v in self.available_water_quality_vars]:
return self.water_quality_result_admin
elif variable in [v["parameters"] for v in self.available_structure_control_actions_vars]:
return self.structure_control_actions_result_admin
else:
raise AttributeError(f"Unknown subgrid or aggregate or water quality variable: {variable}")

def get_timeseries(
self, nc_variable, node_id=None, content_pk=None, fill_value=None
self, nc_variable, node_id=None, fill_value=None, selected_object_type=None
):
"""Return a time series array of the given variable
A 2d array is given, with first column being the timestamps in seconds.
The next columns are the values of the nodes of the given variable.
You can also filter on a specific node using node_id or content_pk,
You can also filter on a specific node using node_id,
in which case only the timeseries of the given node is returned.
If there is no values of the given variable, only the timestamps are
returned, i.e. an array of shape (n, 1) with n being the timestamps.
:param nc_variable:
:param node_id:
:param content_pk:
:param fill_value:
:param selected_object_type: layer type of selected feature
:return: 2D array, first column being the timestamps
"""
ga = self.get_gridadmin(nc_variable)

filtered_result = ga.get_model_instance_by_field_name(nc_variable).timeseries(
indexes=slice(None)
)
if node_id:
filtered_result = filtered_result.filter(id=node_id)
elif content_pk:
filtered_result = filtered_result.filter(content_pk=content_pk)
if isinstance(ga, GridH5StructureControl):
# GridH5StructureControl has a different interface compared to the other GridAdmin structures
return self.get_structure_control_action_timeseries(ga, nc_variable, node_id, selected_object_type, fill_value)
else:
filtered_result = ga.get_model_instance_by_field_name(nc_variable).timeseries(
indexes=slice(None)
)
if node_id:
filtered_result = filtered_result.filter(id=node_id)
values = self.get_timeseries_values(filtered_result, nc_variable)
if fill_value is not None:
values[values == NO_DATA_VALUE] = fill_value

timestamps = self.get_timestamps(nc_variable)
timestamps = timestamps.reshape(-1, 1) # reshape (n,) to (n, 1)

return np.hstack([timestamps, values])

def get_structure_control_action_timeseries(self, ga, nc_variable, node_id, selected_object_type, fill_value):
assert nc_variable
assert node_id
timestamps = []
values = []

prev_value = None
for control_type in StructureControlTypes.__members__.values():
control_type_data = getattr(ga, control_type.name)
structure_controls_for_id = control_type_data.group_by_grid_id(node_id)
structure_controls = [sc for sc in structure_controls_for_id if sc.action_type == nc_variable]

# It could be that the same action is applied on nodes, lines and pumps, we need to find the right one.
desired_type = StructureControlSourceTypes.LINES if selected_object_type == "flowline" else StructureControlSourceTypes.PUMPS
structure_controls = [sc for sc in structure_controls_for_id if sc.source_type == desired_type]

for structure_control in structure_controls:
assert len(structure_control.time) == len(structure_control.action_value_1)
for time_key, value in zip(structure_control.time, structure_control.action_value_1):
if prev_value is not None:
timestamps.append(time_key)
values.append(prev_value)
prev_value = value
timestamps.append(time_key)
values.append(value)

# Retrieve gridadmin structure
if selected_object_type == "flowline":
structure = self.gridadmin.lines.filter(id=node_id)
elif selected_object_type == "pump":
structure = self.gridadmin.pumps.filter(id=node_id)
else:
raise NotImplementedError("Plotting node control actions is not yet implemented")

# Check whether this object's content type is applicable for this control action
applicable_structures = ACTION_TYPE_ATTRIBUTE_MAP[nc_variable]["applicable_structures"]
if applicable_structures:
content_type = structure.content_type[0].decode()
if content_type not in applicable_structures:
# This action is not applicable to this object
logger.info(f"Parameter {nc_variable} not applicable for type {str(content_type)}")
return np.column_stack(([], []))

orig_timestamps = self.get_timestamps()
max_time_stamp_orig = max(orig_timestamps)
min_time_stamp_orig = min(orig_timestamps)
if timestamps:
assert values
max_time_stamp_structure = max(timestamps)
# Possibly append the structure control actions till the end
if values and max_time_stamp_orig > max_time_stamp_structure:
values.append(values[-1])
timestamps.append(max_time_stamp_structure)
values.append(values[-1])
timestamps.append(max_time_stamp_orig)

affected_nc_variable = ACTION_TYPE_ATTRIBUTE_MAP[nc_variable]["variable"]
if affected_nc_variable:
# Check if we need to prepend the plot with non-controlled (static) values
orig_value = getattr(structure, affected_nc_variable)[0]
if not timestamps:
# No actions at all, take original value to plot
assert not values
values = [orig_value, orig_value]
timestamps = [min_time_stamp_orig, max_time_stamp_orig]
else:
min_time_stamp_structure = min(timestamps)
if min_time_stamp_orig < min_time_stamp_structure:
values.insert(0, orig_value)
timestamps.insert(0, min_time_stamp_structure)
values.insert(0, orig_value)
timestamps.insert(0, min_time_stamp_orig)

if not values:
return np.column_stack(([], []))

values = self.get_timeseries_values(filtered_result, nc_variable)
if fill_value is not None:
values[values == NO_DATA_VALUE] = fill_value
for index in range(len(values)):
if values[index] == NO_DATA_VALUE:
values[index] = fill_value

timestamps = self.get_timestamps(nc_variable)
timestamps = timestamps.reshape(-1, 1) # reshape (n,) to (n, 1)
return np.hstack([timestamps, values])
return np.column_stack((timestamps, values))

def get_values_by_timestep_nr(self, variable, timestamp_idx, node_ids):
"""Return an array of values of the given variable on the specified
Expand Down Expand Up @@ -317,7 +436,7 @@ def result_admin(self):
# Note: passing a file-like object due to an issue in threedigrid
# https://github.com/nens/threedigrid/issues/183
file_like_object_h5 = open(h5, 'rb')
file_like_object_h5.startswith = lambda x: ''
file_like_object_h5.startswith = lambda x: False
file_like_object_nc = open(self.file_path, 'rb')
return GridH5ResultAdmin(file_like_object_h5, file_like_object_nc)

Expand Down Expand Up @@ -353,6 +472,22 @@ def water_quality_result_admin(self):
file_like_object_nc = open(wq_path, 'rb')
return GridH5WaterQualityResultAdmin(file_like_object_h5, file_like_object_nc)

@cached_property
def structure_control_actions_result_admin(self):
try:
# Note: both of these might raise the FileNotFoundError
sca_path = find_structure_control_actions_netcdf(self.file_path)
h5 = self.h5_path
except FileNotFoundError:
logger.exception("Structure control actions result not found")
return None
# Note: passing a file-like object due to an issue in threedigrid
# https://github.com/nens/threedigrid/issues/183
file_like_object_h5 = open(h5, 'rb')
file_like_object_h5.startswith = lambda x: False
file_like_object_nc = open(sca_path, 'rb')
return GridH5StructureControl(file_like_object_h5, file_like_object_nc)

@property
def short_model_slug(self):
model_slug = self.gridadmin.model_slug
Expand Down Expand Up @@ -388,6 +523,28 @@ def find_aggregation_netcdf(netcdf_file_path):
)


def find_structure_control_actions_netcdf(netcdf_file_path):
"""An ad-hoc way to find the structure control actions netcdf file
Args:
netcdf_file_path: path to the result netcdf
Returns:
the structure control actions netcdf path
Raises:
FileNotFoundError if nothing is found
"""
pattern = "structure_control_actions_3di.nc"
result_dir = os.path.dirname(netcdf_file_path)
sca_result_files = glob.glob(os.path.join(result_dir, pattern))
if sca_result_files:
return sca_result_files[0]
raise FileNotFoundError(
"'structure_control_actions_3di.nc' file not found relative to %s" % result_dir
)


def find_water_quality_netcdf(netcdf_file_path):
"""An ad-hoc way to find the water quality netcdf file
Expand Down
2 changes: 1 addition & 1 deletion dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
Dependency("Mako", "mako", "", False),
Dependency("cftime", "cftime", ">=1.5.0", False), # threedigrid[results]
Dependency("alembic", "alembic", "==1.8.*", False),
Dependency("threedigrid", "threedigrid", "==2.2.*", False),
Dependency("threedigrid", "threedigrid", "==2.3.*", False),
Dependency("threedi-schema", "threedi_schema", "==0.219.*", False),
Dependency("threedi-modelchecker", "threedi_modelchecker", "==2.6.*", False),
Dependency("threedidepth", "threedidepth", "==0.6.3", False),
Expand Down
2 changes: 1 addition & 1 deletion tool_animation/map_animator.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def _get_active_parameter_config(self):
available_wq_vars = threedi_result.available_water_quality_vars[:] # a copy

parameter_config = generate_parameter_config(
available_subgrid_vars, agg_vars=agg_vars, wq_vars=available_wq_vars
available_subgrid_vars, agg_vars=agg_vars, wq_vars=available_wq_vars, sca_vars=None
)

def _intersection(a: List, b: List):
Expand Down
5 changes: 3 additions & 2 deletions tool_graph/graph_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ def timeseries_table(self, parameters, absolute, time_units):

if (parameters not in threedi_result.available_subgrid_map_vars and
parameters not in threedi_result.available_aggregation_vars and
parameters not in [v["parameters"] for v in threedi_result.available_water_quality_vars]):
parameters not in [v["parameters"] for v in threedi_result.available_water_quality_vars] and
parameters not in [v["parameters"] for v in threedi_result.available_structure_control_actions_vars]):
logger.warning(f"Parameter {parameters} not available in result {self.result.value.text()}")
return EMPTY_TIMESERIES

Expand All @@ -166,7 +167,7 @@ def timeseries_table(self, parameters, absolute, time_units):
return EMPTY_TIMESERIES

timeseries = threedi_result.get_timeseries(
parameters, node_id=self.object_id.value, fill_value=np.NaN
parameters, node_id=self.object_id.value, fill_value=np.NaN, selected_object_type=self.object_type.value
)
if timeseries.shape[1] == 1:
logger.info("1-element timeserie, plotting empty serie")
Expand Down
Loading

0 comments on commit 266f4ca

Please sign in to comment.