diff --git a/parm/snow/obs/config/bufr_sfcsno_mapping.yaml b/parm/snow/obs/config/bufr_sfcsno_mapping.yaml index 00ab1bc7d..31d042f27 100644 --- a/parm/snow/obs/config/bufr_sfcsno_mapping.yaml +++ b/parm/snow/obs/config/bufr_sfcsno_mapping.yaml @@ -17,20 +17,17 @@ bufr: query: "[*/CLON, */CLONH]" stationIdentification: query: "*/RPID" - stationElevation: query: "[*/SELV, */HSMSL]" + type: float # ObsValue totalSnowDepth: query: "[*/SNWSQ1/TOSD, */MTRMSC/TOSD, */STGDSNDM/TOSD]" transforms: - scale: 1000.0 - filters: - - bounding: - variable: totalSnowDepth - lowerBound: 0 - upperBound: 10000000 + groundState: + query: "[*/GRDSQ1/SOGR, */STGDSNDM/SOGR]" encoder: variables: @@ -65,7 +62,7 @@ encoder: coordinates: "longitude latitude" source: variables/stationIdentification longName: "Identification of Observing Location" - units: "m" + units: "index" # ObsValue - name: "ObsValue/totalSnowDepth" @@ -73,3 +70,10 @@ encoder: source: variables/totalSnowDepth longName: "Total Snow Depth" units: "mm" + + - name: "ObsValue/groundState" + coordinates: "longitude latitude" + source: variables/groundState + longName: "STATE OF THE GROUND" + units: "index" + diff --git a/sorc/bufr-query b/sorc/bufr-query index 97367fcd5..9e595b5a3 160000 --- a/sorc/bufr-query +++ b/sorc/bufr-query @@ -1 +1 @@ -Subproject commit 97367fcd59adf4863aba1a52189e20f9f66451af +Subproject commit 9e595b5a3bb91791f7f5ada456298e2b966e74f6 diff --git a/ush/ioda/bufr2ioda/bufr_sfcsno.py b/ush/ioda/bufr2ioda/bufr_sfcsno.py new file mode 100755 index 000000000..03a9102b4 --- /dev/null +++ b/ush/ioda/bufr2ioda/bufr_sfcsno.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +import sys +import os +import argparse +import time +import numpy as np +import bufr +from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder +from bufr.encoders.netcdf import Encoder as netcdfEncoder +from wxflow import Logger + +# Initialize Logger +# Get log level from the environment variable, default to 'INFO it not set +log_level = os.getenv('LOG_LEVEL', 'INFO') +logger = Logger('BUFR2IODA_sfcsno.py', level=log_level, colored_log=False) + + +def logging(comm, level, message): + """ + Logs a message to the console or log file, based on the specified logging level. + + This function ensures that logging is only performed by the root process (`rank 0`) + in a distributed computing environment. The function maps the logging level to + appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided. + + Parameters: + comm: object + The communicator object, typically from a distributed computing framework + (e.g., MPI). It must have a `rank()` method to determine the process rank. + level: str + The logging level as a string. Supported levels are: + - 'DEBUG' + - 'INFO' + - 'WARNING' + - 'ERROR' + - 'CRITICAL' + If an invalid level is provided, a warning will be logged, and the level + will default to 'INFO'. + message: str + The message to be logged. + + Behavior: + - Logs messages only on the root process (`comm.rank() == 0`). + - Maps the provided logging level to a method of the logger object. + - Defaults to 'INFO' and logs a warning if an invalid logging level is given. + - Supports standard logging levels for granular control over log verbosity. + + Example: + >>> logging(comm, 'DEBUG', 'This is a debug message.') + >>> logging(comm, 'ERROR', 'An error occurred!') + + Notes: + - Ensure that a global `logger` object is configured before using this function. + - The `comm` object should conform to MPI-like conventions (e.g., `rank()` method). + """ + + if comm.rank() == 0: + # Define a dictionary to map levels to logger methods + log_methods = { + 'DEBUG': logger.debug, + 'INFO': logger.info, + 'WARNING': logger.warning, + 'ERROR': logger.error, + 'CRITICAL': logger.critical, + } + + # Get the appropriate logging method, default to 'INFO' + log_method = log_methods.get(level.upper(), logger.info) + + if log_method == logger.info and level.upper() not in log_methods: + # Log a warning if the level is invalid + logger.warning(f'log level = {level}: not a valid level --> set to INFO') + + # Call the logging method + log_method(message) + + +def _mask_container(container, mask): + + new_container = bufr.DataContainer() + for var_name in container.list(): + var = container.get(var_name) + paths = container.get_paths(var_name) + new_container.add(var_name, var[mask], paths) + + return new_container + + +def _make_description(mapping_path, update=False): + + description = bufr.encoders.Description(mapping_path) + + return description + + +def _make_obs(comm, input_path, mapping_path): + """ + Create the ioda snow depth observations: + - reads state of ground (sogr) and snow depth (snod) + - applys sogr conditions to the missing snod values + - removes the filled/missing snow values and creates the masked container + + Parameters + ---------- + comm: object + The communicator object (e.g., MPI) + input_path: str + The input bufr file + mapping_path: str + The input bufr2ioda mapping file + """ + + # Get container from mapping file first + logging(comm, 'INFO', 'Get container from bufr') + container = bufr.Parser(input_path, mapping_path).parse(comm) + + logging(comm, 'DEBUG', f'container list (original): {container.list()}') + + # Add new/derived data into container + sogr = container.get('variables/groundState') + snod = container.get('variables/totalSnowDepth') + snod[(sogr <= 11.0) & snod.mask] = 0.0 + snod[(sogr == 15.0) & snod.mask] = 0.0 + container.replace('variables/totalSnowDepth', snod) + snod_upd = container.get('variables/totalSnowDepth') + + masked_container = _mask_container(container, (~snod.mask)) + + return masked_container + + +def create_obs_group(input_path, mapping_path, env): + + comm = bufr.mpi.Comm(env["comm_name"]) + + description = _make_description(mapping_path, update=False) + container = _make_obs(comm, input_path, mapping_path) + + # Gather data from all tasks into all tasks. Each task will have the complete record + logging(comm, 'INFO', f'Gather data from all tasks into all tasks') + container.all_gather(comm) + + # Encode the data + logging(comm, 'INFO', f'Encode data') + data = next(iter(iodaEncoder(mapping_path).encode(container).values())) + + logging(comm, 'INFO', f'Return the encoded data') + + return data + + +def create_obs_file(input_path, mapping_path, output_path): + + comm = bufr.mpi.Comm("world") + container = _make_obs(comm, input_path, mapping_path) + container.gather(comm) + + description = _make_description(mapping_path, update=False) + + # Encode the data + if comm.rank() == 0: + netcdfEncoder(description).encode(container, output_path) + + logging(comm, 'INFO', f'Return the encoded data') + + +if __name__ == '__main__': + + start_time = time.time() + + bufr.mpi.App(sys.argv) + comm = bufr.mpi.Comm("world") + + # Required input arguments as positional arguments + parser = argparse.ArgumentParser(description="Convert BUFR to NetCDF using a mapping file.") + parser.add_argument('input', type=str, help='Input BUFR file') + parser.add_argument('mapping', type=str, help='BUFR2IODA Mapping File') + parser.add_argument('output', type=str, help='Output NetCDF file') + + args = parser.parse_args() + mapping = args.mapping + infile = args.input + output = args.output + + create_obs_file(infile, mapping, output) + + end_time = time.time() + running_time = end_time - start_time + logging(comm, 'INFO', f'Total running time: {running_time}')