From 33a25aeef4bed791878ae3dbb7340f6113fc4d5d Mon Sep 17 00:00:00 2001 From: danielfromearth Date: Tue, 5 Mar 2024 18:19:08 -0500 Subject: [PATCH] add history_json to output global attributes, built from in-files --- concatenator/attribute_handling.py | 170 ++++++------------------ concatenator/group_handling.py | 18 ++- concatenator/harmony/download_worker.py | 3 +- concatenator/harmony/service_adapter.py | 12 ++ concatenator/stitchee.py | 4 +- 5 files changed, 68 insertions(+), 139 deletions(-) diff --git a/concatenator/attribute_handling.py b/concatenator/attribute_handling.py index f1d3c43..0efb374 100644 --- a/concatenator/attribute_handling.py +++ b/concatenator/attribute_handling.py @@ -96,132 +96,6 @@ def _flatten_coordinate_attribute(attribute_string: str) -> str: ) -# def construct_history(input_files: list, granule_urls): -# """Construct history JSON entry for this concatenation operation -# -# https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42 -# https://wiki.earthdata.nasa.gov/pages/viewpage.action?spaceKey=TRT&title=Formal+Schema+for+history_json -# -# Parameters -# ---------- -# input_files: List of input files -# granule_urls: List of granule URLs -# -# Returns -# ------- -# a history JSON dictionary, constructed for this concatenation operation -# """ -# -# history_json = { -# "date_time": datetime.now(tz=timezone.utc).isoformat(), -# "derived_from": granule_urls, -# "program": 'stitchee', -# "version": importlib_metadata.distribution('stitchee').version, -# "parameters": f'input_files={input_files}', -# "program_ref": "https://cmr.earthdata.nasa.gov:443/search/concepts/S1262025641-LARC_CLOUD", -# "$schema": "https://harmony.earthdata.nasa.gov/schemas/history/0.1.0/history-v0.1.0.json" -# } -# return history_json -# -# -# def retrieve_history(dataset): -# """ -# Retrieve history_json field from NetCDF dataset, if it exists -# -# Parameters -# ---------- -# dataset: NetCDF Dataset representing a single granule -# -# Returns -# ------- -# a history JSON dictionary, constructed for this concatenation operation -# """ -# if 'history_json' not in dataset.ncattrs(): -# return [] -# history_json = dataset.getncattr('history_json') -# return json.loads(history_json) - -# def set_output_attributes(input_dataset: Dataset, output_dataset: Dataset, -# request_parameters: dict) -> None: -# """ Set the global attributes of the merged output file. -# -# These begin as the global attributes of the input granule, but are updated to also include -# the provenance data via an updated `history` CF attribute (or `History` -# if that is already present), and a `history_json` attribute that is -# compliant with the schema defined at the URL specified by -# `HISTORY_JSON_SCHEMA`. -# -# `projection` is not included in the output parameters, as this is not -# an original message parameter. It is a derived `pyproj.Proj` instance -# that is defined by the input `crs` parameter. -# -# `x_extent` and `y_extent` are not serializable, and are instead -# included by `x_min`, `x_max` and `y_min` `y_max` accordingly. -# -# Parameters -# ---------- -# input_dataset : Dataset -# output_dataset : Dataset -# request_parameters : dict -# """ -# # Get attributes from input file -# output_attributes = read_attrs(input_dataset) -# -# # Reconstruct parameters' dictionary with only keys that correspond to non-null values. -# valid_request_parameters = {parameter_name: parameter_value -# for parameter_name, parameter_value -# in request_parameters.items() -# if parameter_value is not None} -# -# # Remove unnecessary and unserializable request parameters -# for surplus_key in ['projection', 'x_extent', 'y_extent']: -# valid_request_parameters.pop(surplus_key, None) -# -# # Retrieve `granule_url` and replace the `input_file` attribute. -# # This ensures `history_json` refers to the archived granule location, rather -# # than a temporary file in the Docker container. -# valid_request_parameters['input_file'] = valid_request_parameters.pop('granule_url', None) -# -# # Preferentially use `history`, unless `History` is already present in the -# # input file. -# cf_att_name = 'History' if hasattr(input_dataset, 'History') else 'history' -# input_history = getattr(input_dataset, cf_att_name, None) -# -# # Create new history_json attribute -# new_history_json_record = create_history_record(input_history, valid_request_parameters) -# -# # Extract existing `history_json` from input granule -# if hasattr(input_dataset, 'history_json'): -# old_history_json = json.loads(output_attributes['history_json']) -# if isinstance(old_history_json, list): -# output_history_json = old_history_json -# else: -# # Single `history_record` element. -# output_history_json = [old_history_json] -# else: -# output_history_json = [] -# -# # Append `history_record` to the existing `history_json` array: -# output_history_json.append(new_history_json_record) -# output_attributes['history_json'] = json.dumps(output_history_json) -# -# # Create history attribute -# history_parameters = {parameter_name: parameter_value -# for parameter_name, parameter_value -# in new_history_json_record['parameters'].items() -# if parameter_name != 'input_file'} -# -# new_history_line = ' '.join([new_history_json_record['date_time'], -# new_history_json_record['program'], -# new_history_json_record['version'], -# json.dumps(history_parameters)]) -# -# output_history = '\n'.join(filter(None, [input_history, new_history_line])) -# output_attributes[cf_att_name] = output_history -# -# output_dataset.setncatts(output_attributes) - - def create_new_attributes(input_dataset: xr.Dataset, request_parameters: dict) -> dict: """Set the global attributes of the merged output file. @@ -330,6 +204,44 @@ def create_history_record(input_history: str, request_parameters: dict) -> dict: return history_record -# def read_attrs(dataset: Union[Dataset, Variable]) -> Dict: -# """ Read attributes from either a NetCDF4 Dataset or variable object. """ -# return dataset.__dict__ +def retrieve_history(dataset: netCDF4.Dataset) -> dict: + """ + Retrieve history_json field from NetCDF dataset, if it exists + + Parameters + ---------- + dataset: NetCDF Dataset representing a single granule + + Returns + ------- + A history_json field + """ + if "history_json" not in dataset.ncattrs(): + return {} + history_json = dataset.getncattr("history_json") + return json.loads(history_json) + + +def construct_history(input_files: list, granule_urls: list) -> dict: + """ + Construct history JSON entry for this concatenation operation + https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42 + + Parameters + ---------- + input_files: List of input files + + Returns + ------- + History JSON constructed for this concat operation + """ + history_json = { + "$schema": HISTORY_JSON_SCHEMA, + "date_time": datetime.now(tz=timezone.utc).isoformat(), + "program": PROGRAM, + "version": VERSION, + "parameters": f"input_files={input_files}", + "derived_from": granule_urls, + "program_ref": PROGRAM_REF, + } + return history_json diff --git a/concatenator/group_handling.py b/concatenator/group_handling.py index 277011c..e30b2f2 100644 --- a/concatenator/group_handling.py +++ b/concatenator/group_handling.py @@ -4,6 +4,7 @@ Functions for converting multidimensional data structures between a group hierarchy and a flat structure """ + import re import netCDF4 as nc @@ -103,7 +104,7 @@ def flatten_grouped_dataset( netCDF4 Dataset that does not contain groups and that has been flattened. """ - # Close the existing read-only dataset and reopen in append mode + # Close the existing read-only dataset and reopen in 'append' mode nc_dataset.close() nc_dataset = nc.Dataset(file_to_subset, "r+") @@ -159,7 +160,7 @@ def flatten_grouped_dataset( def regroup_flattened_dataset( - dataset: xr.Dataset, output_file: str + dataset: xr.Dataset, output_file: str, history_to_append: str | None ) -> None: # pylint: disable=too-many-branches """ Given a list of xarray datasets, combine those datasets into a @@ -176,7 +177,10 @@ def regroup_flattened_dataset( """ with nc.Dataset(output_file, mode="w", format="NETCDF4") as base_dataset: # Copy global attributes - base_dataset.setncatts(dataset.attrs) + output_attributes = dataset.attrs + if history_to_append is not None: + output_attributes["history_json"] = history_to_append + base_dataset.setncatts(output_attributes) # Create Groups group_lst = [] @@ -283,9 +287,11 @@ def _calculate_chunks(dim_sizes: list, default_low_dim_chunksize=4000) -> tuple: number_of_dims = len(dim_sizes) if number_of_dims <= 3: chunk_sizes = tuple( - default_low_dim_chunksize - if ((s > default_low_dim_chunksize) and (number_of_dims > 1)) - else s + ( + default_low_dim_chunksize + if ((s > default_low_dim_chunksize) and (number_of_dims > 1)) + else s + ) for s in dim_sizes ) else: diff --git a/concatenator/harmony/download_worker.py b/concatenator/harmony/download_worker.py index 3f502e3..b9cf089 100644 --- a/concatenator/harmony/download_worker.py +++ b/concatenator/harmony/download_worker.py @@ -35,8 +35,7 @@ def multi_core_download( Returns ------- - list - list of downloaded files as pathlib.Path objects + list of downloaded files as pathlib.Path objects """ if process_count is None: diff --git a/concatenator/harmony/service_adapter.py b/concatenator/harmony/service_adapter.py index 0b8d562..d4034ba 100644 --- a/concatenator/harmony/service_adapter.py +++ b/concatenator/harmony/service_adapter.py @@ -1,15 +1,18 @@ +import json from pathlib import Path from shutil import copyfile from tempfile import TemporaryDirectory from urllib.parse import urlsplit from uuid import uuid4 +import netCDF4 as nc import pystac from harmony.adapter import BaseHarmonyAdapter from harmony.util import bbox_to_geometry, stage from pystac import Item from pystac.item import Asset +from concatenator.attribute_handling import construct_history, retrieve_history from concatenator.harmony.download_worker import multi_core_download from concatenator.harmony.util import ( _get_netcdf_urls, @@ -98,10 +101,18 @@ def process_catalog(self, catalog: pystac.Catalog) -> pystac.Catalog: ) self.logger.info("Finished granule downloads.") + history_json: list[dict] = [] for file_count, file in enumerate(input_files): file_size = sizeof_fmt(file.stat().st_size) self.logger.info(f"File {file_count} is size <{file_size}>. Path={file}") + with nc.Dataset(file, "r") as dataset: + history_json.extend(retrieve_history(dataset)) + + history_json.append(construct_history(input_files, netcdf_urls)) + + new_history_json = json.dumps(history_json, default=str) + self.logger.info("Running Stitchee..") output_path = str(Path(temp_dir).joinpath(filename).resolve()) @@ -112,6 +123,7 @@ def process_catalog(self, catalog: pystac.Catalog) -> pystac.Catalog: write_tmp_flat_concatenated=False, keep_tmp_files=False, concat_dim="mirror_step", # This is currently set only for TEMPO + history_to_append=new_history_json, logger=self.logger, ) self.logger.info("Stitchee completed.") diff --git a/concatenator/stitchee.py b/concatenator/stitchee.py index 24cf87d..0f18fca 100644 --- a/concatenator/stitchee.py +++ b/concatenator/stitchee.py @@ -30,7 +30,7 @@ def stitchee( concat_method: str = "xarray-concat", concat_dim: str = "", concat_kwargs: dict | None = None, - history_to_append: dict | None = None, + history_to_append: str | None = None, logger: Logger = default_logger, ) -> str: """Concatenate netCDF data files along an existing dimension. @@ -145,7 +145,7 @@ def stitchee( # The group hierarchy of the concatenated file is reconstructed (using XARRAY). start_time = time.time() logger.info("Reconstructing groups within concatenated file...") - regroup_flattened_dataset(combined_ds, output_file) # , new_global_attributes) + regroup_flattened_dataset(combined_ds, output_file, history_to_append) benchmark_log["reconstructing_groups"] = time.time() - start_time logger.info("--- Benchmark results ---")