Skip to content

Commit

Permalink
add history_json to output global attributes, built from in-files
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfromearth committed Mar 5, 2024
1 parent 73fdf57 commit 33a25ae
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 139 deletions.
170 changes: 41 additions & 129 deletions concatenator/attribute_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,132 +96,6 @@ def _flatten_coordinate_attribute(attribute_string: str) -> str:
)


# def construct_history(input_files: list, granule_urls):
# """Construct history JSON entry for this concatenation operation
#
# https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42
# https://wiki.earthdata.nasa.gov/pages/viewpage.action?spaceKey=TRT&title=Formal+Schema+for+history_json
#
# Parameters
# ----------
# input_files: List of input files
# granule_urls: List of granule URLs
#
# Returns
# -------
# a history JSON dictionary, constructed for this concatenation operation
# """
#
# history_json = {
# "date_time": datetime.now(tz=timezone.utc).isoformat(),
# "derived_from": granule_urls,
# "program": 'stitchee',
# "version": importlib_metadata.distribution('stitchee').version,
# "parameters": f'input_files={input_files}',
# "program_ref": "https://cmr.earthdata.nasa.gov:443/search/concepts/S1262025641-LARC_CLOUD",
# "$schema": "https://harmony.earthdata.nasa.gov/schemas/history/0.1.0/history-v0.1.0.json"
# }
# return history_json
#
#
# def retrieve_history(dataset):
# """
# Retrieve history_json field from NetCDF dataset, if it exists
#
# Parameters
# ----------
# dataset: NetCDF Dataset representing a single granule
#
# Returns
# -------
# a history JSON dictionary, constructed for this concatenation operation
# """
# if 'history_json' not in dataset.ncattrs():
# return []
# history_json = dataset.getncattr('history_json')
# return json.loads(history_json)

# def set_output_attributes(input_dataset: Dataset, output_dataset: Dataset,
# request_parameters: dict) -> None:
# """ Set the global attributes of the merged output file.
#
# These begin as the global attributes of the input granule, but are updated to also include
# the provenance data via an updated `history` CF attribute (or `History`
# if that is already present), and a `history_json` attribute that is
# compliant with the schema defined at the URL specified by
# `HISTORY_JSON_SCHEMA`.
#
# `projection` is not included in the output parameters, as this is not
# an original message parameter. It is a derived `pyproj.Proj` instance
# that is defined by the input `crs` parameter.
#
# `x_extent` and `y_extent` are not serializable, and are instead
# included by `x_min`, `x_max` and `y_min` `y_max` accordingly.
#
# Parameters
# ----------
# input_dataset : Dataset
# output_dataset : Dataset
# request_parameters : dict
# """
# # Get attributes from input file
# output_attributes = read_attrs(input_dataset)
#
# # Reconstruct parameters' dictionary with only keys that correspond to non-null values.
# valid_request_parameters = {parameter_name: parameter_value
# for parameter_name, parameter_value
# in request_parameters.items()
# if parameter_value is not None}
#
# # Remove unnecessary and unserializable request parameters
# for surplus_key in ['projection', 'x_extent', 'y_extent']:
# valid_request_parameters.pop(surplus_key, None)
#
# # Retrieve `granule_url` and replace the `input_file` attribute.
# # This ensures `history_json` refers to the archived granule location, rather
# # than a temporary file in the Docker container.
# valid_request_parameters['input_file'] = valid_request_parameters.pop('granule_url', None)
#
# # Preferentially use `history`, unless `History` is already present in the
# # input file.
# cf_att_name = 'History' if hasattr(input_dataset, 'History') else 'history'
# input_history = getattr(input_dataset, cf_att_name, None)
#
# # Create new history_json attribute
# new_history_json_record = create_history_record(input_history, valid_request_parameters)
#
# # Extract existing `history_json` from input granule
# if hasattr(input_dataset, 'history_json'):
# old_history_json = json.loads(output_attributes['history_json'])
# if isinstance(old_history_json, list):
# output_history_json = old_history_json
# else:
# # Single `history_record` element.
# output_history_json = [old_history_json]
# else:
# output_history_json = []
#
# # Append `history_record` to the existing `history_json` array:
# output_history_json.append(new_history_json_record)
# output_attributes['history_json'] = json.dumps(output_history_json)
#
# # Create history attribute
# history_parameters = {parameter_name: parameter_value
# for parameter_name, parameter_value
# in new_history_json_record['parameters'].items()
# if parameter_name != 'input_file'}
#
# new_history_line = ' '.join([new_history_json_record['date_time'],
# new_history_json_record['program'],
# new_history_json_record['version'],
# json.dumps(history_parameters)])
#
# output_history = '\n'.join(filter(None, [input_history, new_history_line]))
# output_attributes[cf_att_name] = output_history
#
# output_dataset.setncatts(output_attributes)


def create_new_attributes(input_dataset: xr.Dataset, request_parameters: dict) -> dict:
"""Set the global attributes of the merged output file.
Expand Down Expand Up @@ -330,6 +204,44 @@ def create_history_record(input_history: str, request_parameters: dict) -> dict:
return history_record


# def read_attrs(dataset: Union[Dataset, Variable]) -> Dict:
# """ Read attributes from either a NetCDF4 Dataset or variable object. """
# return dataset.__dict__
def retrieve_history(dataset: netCDF4.Dataset) -> dict:
"""
Retrieve history_json field from NetCDF dataset, if it exists
Parameters
----------
dataset: NetCDF Dataset representing a single granule
Returns
-------
A history_json field
"""
if "history_json" not in dataset.ncattrs():
return {}
history_json = dataset.getncattr("history_json")
return json.loads(history_json)


def construct_history(input_files: list, granule_urls: list) -> dict:
"""
Construct history JSON entry for this concatenation operation
https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42
Parameters
----------
input_files: List of input files
Returns
-------
History JSON constructed for this concat operation
"""
history_json = {
"$schema": HISTORY_JSON_SCHEMA,
"date_time": datetime.now(tz=timezone.utc).isoformat(),
"program": PROGRAM,
"version": VERSION,
"parameters": f"input_files={input_files}",
"derived_from": granule_urls,
"program_ref": PROGRAM_REF,
}
return history_json
18 changes: 12 additions & 6 deletions concatenator/group_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Functions for converting multidimensional data structures
between a group hierarchy and a flat structure
"""

import re

import netCDF4 as nc
Expand Down Expand Up @@ -103,7 +104,7 @@ def flatten_grouped_dataset(
netCDF4 Dataset that does not contain groups and that has been
flattened.
"""
# Close the existing read-only dataset and reopen in append mode
# Close the existing read-only dataset and reopen in 'append' mode
nc_dataset.close()
nc_dataset = nc.Dataset(file_to_subset, "r+")

Expand Down Expand Up @@ -159,7 +160,7 @@ def flatten_grouped_dataset(


def regroup_flattened_dataset(
dataset: xr.Dataset, output_file: str
dataset: xr.Dataset, output_file: str, history_to_append: str | None
) -> None: # pylint: disable=too-many-branches
"""
Given a list of xarray datasets, combine those datasets into a
Expand All @@ -176,7 +177,10 @@ def regroup_flattened_dataset(
"""
with nc.Dataset(output_file, mode="w", format="NETCDF4") as base_dataset:
# Copy global attributes
base_dataset.setncatts(dataset.attrs)
output_attributes = dataset.attrs
if history_to_append is not None:
output_attributes["history_json"] = history_to_append
base_dataset.setncatts(output_attributes)

# Create Groups
group_lst = []
Expand Down Expand Up @@ -283,9 +287,11 @@ def _calculate_chunks(dim_sizes: list, default_low_dim_chunksize=4000) -> tuple:
number_of_dims = len(dim_sizes)
if number_of_dims <= 3:
chunk_sizes = tuple(
default_low_dim_chunksize
if ((s > default_low_dim_chunksize) and (number_of_dims > 1))
else s
(
default_low_dim_chunksize
if ((s > default_low_dim_chunksize) and (number_of_dims > 1))
else s
)
for s in dim_sizes
)
else:
Expand Down
3 changes: 1 addition & 2 deletions concatenator/harmony/download_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ def multi_core_download(
Returns
-------
list
list of downloaded files as pathlib.Path objects
list of downloaded files as pathlib.Path objects
"""

if process_count is None:
Expand Down
12 changes: 12 additions & 0 deletions concatenator/harmony/service_adapter.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import json
from pathlib import Path
from shutil import copyfile
from tempfile import TemporaryDirectory
from urllib.parse import urlsplit
from uuid import uuid4

import netCDF4 as nc
import pystac
from harmony.adapter import BaseHarmonyAdapter
from harmony.util import bbox_to_geometry, stage
from pystac import Item
from pystac.item import Asset

from concatenator.attribute_handling import construct_history, retrieve_history
from concatenator.harmony.download_worker import multi_core_download
from concatenator.harmony.util import (
_get_netcdf_urls,
Expand Down Expand Up @@ -98,10 +101,18 @@ def process_catalog(self, catalog: pystac.Catalog) -> pystac.Catalog:
)
self.logger.info("Finished granule downloads.")

history_json: list[dict] = []
for file_count, file in enumerate(input_files):
file_size = sizeof_fmt(file.stat().st_size)
self.logger.info(f"File {file_count} is size <{file_size}>. Path={file}")

with nc.Dataset(file, "r") as dataset:
history_json.extend(retrieve_history(dataset))

history_json.append(construct_history(input_files, netcdf_urls))

new_history_json = json.dumps(history_json, default=str)

self.logger.info("Running Stitchee..")
output_path = str(Path(temp_dir).joinpath(filename).resolve())

Expand All @@ -112,6 +123,7 @@ def process_catalog(self, catalog: pystac.Catalog) -> pystac.Catalog:
write_tmp_flat_concatenated=False,
keep_tmp_files=False,
concat_dim="mirror_step", # This is currently set only for TEMPO
history_to_append=new_history_json,
logger=self.logger,
)
self.logger.info("Stitchee completed.")
Expand Down
4 changes: 2 additions & 2 deletions concatenator/stitchee.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def stitchee(
concat_method: str = "xarray-concat",
concat_dim: str = "",
concat_kwargs: dict | None = None,
history_to_append: dict | None = None,
history_to_append: str | None = None,
logger: Logger = default_logger,
) -> str:
"""Concatenate netCDF data files along an existing dimension.
Expand Down Expand Up @@ -145,7 +145,7 @@ def stitchee(
# The group hierarchy of the concatenated file is reconstructed (using XARRAY).
start_time = time.time()
logger.info("Reconstructing groups within concatenated file...")
regroup_flattened_dataset(combined_ds, output_file) # , new_global_attributes)
regroup_flattened_dataset(combined_ds, output_file, history_to_append)
benchmark_log["reconstructing_groups"] = time.time() - start_time

logger.info("--- Benchmark results ---")
Expand Down

0 comments on commit 33a25ae

Please sign in to comment.