Skip to content

Commit

Permalink
enable file validation to check the emptiness of h5 file with context…
Browse files Browse the repository at this point in the history
… manager
  • Loading branch information
danielfromearth committed Nov 10, 2023
1 parent 17e9d3c commit c822a9c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 35 deletions.
7 changes: 3 additions & 4 deletions concatenator/group_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import re

import h5py
import netCDF4 as nc
import numpy as np
import xarray as xr
Expand Down Expand Up @@ -78,7 +79,7 @@ def walk(


def flatten_grouped_dataset(
nc_dataset: nc.Dataset, file_to_subset: str, ensure_all_dims_are_coords: bool = False
nc_dataset: nc.Dataset | h5py.File, ensure_all_dims_are_coords: bool = False
) -> tuple[nc.Dataset, list[str], list[str]]:
"""
Transform a netCDF4 Dataset that has groups to an xarray compatible
Expand All @@ -104,9 +105,7 @@ def flatten_grouped_dataset(
flattened.
"""
# Close the existing read-only dataset and reopen in append mode
nc_dataset.close()
nc_dataset = nc.Dataset(file_to_subset, "r+")

# nc_dataset.close()
dimensions = {}

# for var_name in list(nc_dataset.variables.keys()):
Expand Down
136 changes: 105 additions & 31 deletions concatenator/stitchee.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
import logging
import os
import time
from collections.abc import Generator
from contextlib import contextmanager
from logging import Logger
from pathlib import Path
from warnings import warn

import h5py
import netCDF4 as nc
import xarray as xr

Expand All @@ -19,6 +23,48 @@
default_logger = logging.getLogger(__name__)


@contextmanager
def open_netcdf_dataset_or_hdf_file(
file_to_open: str | Path, mode="r"
) -> Generator[tuple[object, str], None, None]:
print("entered the context manager")
suffix = Path(file_to_open).suffix.lower()

if suffix in [".nc", ".nc4", ".netcdf", ".netcdf4"]:
managed_resource = nc.Dataset(file_to_open, mode)
file_kind = "netcdf"
elif suffix in [".h5", ".hdf", ".hdf5"]:
managed_resource = h5py.File(file_to_open, mode)
file_kind = "hdf"
else:
raise TypeError("Unexpected file extension, <%s>." % suffix)

try:
yield managed_resource, file_kind
except BlockingIOError as err:
print("File is already opened, and needs to be closed before trying this again.")
raise err
# except OSError:
# print("Could not open file in append mode using netCDF4. Trying h5py..")
# managed_resource = h5py.File(file_to_open, mode)
# yield managed_resource
# # any cleanup that should only be done on failure
# # raise
except Exception:
# print('caught:', e)
raise
else:
# any cleanup that should only be done on success
print("no exception was thrown")
finally:
# any cleanup that should always be done
try:
managed_resource.close()
except Exception:
print("Could not close the resource.")
print("exited the context manager")


def stitchee(
files_to_concat: list[str],
output_file: str,
Expand Down Expand Up @@ -53,6 +99,8 @@ def stitchee(
if num_input_files < 1:
logger.info("No non-empty netCDF files found. Exiting.")
return ""
else:
logger.info(f"{num_input_files} workable files identified.")

if concat_dim and (concat_method == "xarray-combine"):
warn(
Expand All @@ -67,29 +115,33 @@ def stitchee(
# The group structure is flattened.
start_time = time.time()
logger.info(" ..file %03d/%03d <%s>..", i + 1, num_input_files, filepath)
flat_dataset, coord_vars, _ = flatten_grouped_dataset(
nc.Dataset(filepath, "r"), filepath, ensure_all_dims_are_coords=True
)

logger.info("Removing duplicate dimensions")
flat_dataset = remove_duplicate_dims(flat_dataset)

logger.info("Opening flattened file with xarray.")
xrds = xr.open_dataset(
xr.backends.NetCDF4DataStore(flat_dataset),
decode_times=False,
decode_coords=False,
drop_variables=coord_vars,
)

benchmark_log["flattening"] = time.time() - start_time

# The flattened file is written to disk.
# flat_file_path = add_label_to_path(filepath, label="_flat_intermediate")
# xrds.to_netcdf(flat_file_path, encoding={v_name: {'dtype': 'str'} for v_name in string_vars})
# intermediate_flat_filepaths.append(flat_file_path)
# xrdataset_list.append(xr.open_dataset(flat_file_path))
xrdataset_list.append(xrds)
# ncds = nc.Dataset(filepath, "r")
# ncds.close()

with open_netcdf_dataset_or_hdf_file(filepath, "r+") as (nc_dataset, file_kind):
flat_dataset, coord_vars, _ = flatten_grouped_dataset(
nc_dataset, ensure_all_dims_are_coords=True
)

logger.info("Removing duplicate dimensions")
flat_dataset = remove_duplicate_dims(flat_dataset)

logger.info("Opening flattened file with xarray.")
xrds = xr.open_dataset(
xr.backends.NetCDF4DataStore(flat_dataset),
decode_times=False,
decode_coords=False,
drop_variables=coord_vars,
)

benchmark_log["flattening"] = time.time() - start_time

# The flattened file is written to disk.
# flat_file_path = add_label_to_path(filepath, label="_flat_intermediate")
# xrds.to_netcdf(flat_file_path, encoding={v_name: {'dtype': 'str'} for v_name in string_vars})
# intermediate_flat_filepaths.append(flat_file_path)
# xrdataset_list.append(xr.open_dataset(flat_file_path))
xrdataset_list.append(xrds)

# Flattened files are concatenated together (Using XARRAY).
start_time = time.time()
Expand Down Expand Up @@ -161,12 +213,13 @@ def stitchee(


def _validate_workable_files(files_to_concat, logger) -> tuple[list[str], int]:
"""Remove files from list that are not open-able as netCDF or that are empty."""
"""Remove files from a list that are not open-able as netCDF or that are empty."""
workable_files = []
for file in files_to_concat:
try:
with nc.Dataset(file, "r") as dataset:
is_empty = _is_file_empty(dataset)
with open_netcdf_dataset_or_hdf_file(file, mode="r") as (dataset, file_kind):
# with nc.Dataset(file, "r") as dataset:
is_empty = _is_file_empty(dataset, file_kind)
if is_empty is False:
workable_files.append(file)
except OSError:
Expand All @@ -177,13 +230,34 @@ def _validate_workable_files(files_to_concat, logger) -> tuple[list[str], int]:
return workable_files, number_of_workable_files


def _is_file_empty(parent_group: nc.Dataset | nc.Group) -> bool:
def _is_file_empty(parent_group: nc.Dataset | nc.Group | h5py.File, file_kind: str) -> bool:
"""
Function to test if a all variable size in a dataset is 0
"""
for var in parent_group.variables.values():
if var.size != 0:
if file_kind == "hdf":

def _is_hdf_group_and_subgroups_empty(data_new, group):
is_empty = True
for key, item in data_new[group].items():
group_path = f"{group}{key}"
if isinstance(item, h5py.Dataset):
if item.size != 0:
is_empty = False
elif isinstance(item, h5py.Group):
is_empty = _is_hdf_group_and_subgroups_empty(
data_new, data_new[group_path].name + "/"
)

if is_empty is False:
return False

if _is_hdf_group_and_subgroups_empty(parent_group, parent_group.name) is False:
return False
for child_group in parent_group.groups.values():
return _is_file_empty(child_group)

else:
for var in parent_group.variables.values():
if var.size != 0:
return False
for child_group in parent_group.groups.values():
return _is_file_empty(child_group, file_kind)
return True

0 comments on commit c822a9c

Please sign in to comment.