Skip to content

Commit

Permalink
Add observation window to Argo float obs and other BUFR-sourced obs (#…
Browse files Browse the repository at this point in the history
…1423)

# Description

Adds observation window to Argo float obs (4 cycles back and forward)
and same facility for other BUFR-sourced obs. Since current BUFR to IODA
converters handle one input file at a time, this is the interface this
PR assumes, and so puts multiple argo IODA files, one per cycle, in the
`obs` directory under `COM`.

Tested with argo

# Companion PRs

NA

# Issues

Partially addresses #1132

# Automated CI tests to run in Global Workflow
<!-- Which Global Workflow CI tests are required to adequately test this
PR? -->
- [ ] atm_jjob <!-- JEDI atm single cycle DA !-->
- [ ] C96C48_ufs_hybatmDA <!-- JEDI atm cycled DA !-->
- [ ] C96C48_hybatmaerosnowDA  <!-- JEDI aero/snow cycled DA !-->
- [x] C48mx500_3DVarAOWCDA <!-- JEDI low-res marine 3DVar cycled DA !-->
- [ ] C48mx500_hybAOWCDA <!-- JEDI marine hybrid envar cycled DA !-->
- [ ] C96C48_hybatmDA <!-- GSI atm cycled DA !-->
  • Loading branch information
AndrewEichmann-NOAA authored Jan 6, 2025
1 parent c665ec9 commit a36255a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 32 deletions.
3 changes: 3 additions & 0 deletions parm/soca/obsprep/obsprep_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ observations:
provider: GTS
dmpdir subdir: atmos
type: bufr
window:
back: 4
forward: 4
dmpdir regex: 'gdas.*.subpfl.*.bufr_d'

- obs space:
Expand Down
57 changes: 37 additions & 20 deletions ush/soca/prep_ocean_obs.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,39 +138,51 @@ def initialize(self):
interval = timedelta(hours=assim_freq * i)
window_cdates.append(cdate + interval)

input_files = prep_ocean_obs_utils.obs_fetch(self.task_config,
self.task_config,
obsprep_space,
window_cdates)
# fetch the obs files to DATA directory and get the list of files and cycles
fetched_files = prep_ocean_obs_utils.obs_fetch(self.task_config,
self.task_config,
obsprep_space,
window_cdates)

if not input_files:
if not fetched_files:
logger.warning(f"No files found for obs source {obtype}, skipping")
break # go to next observer in OBS_YAML

obsprep_space['input files'] = input_files
obsprep_space['window begin'] = self.window_begin
obsprep_space['window end'] = self.window_end
ioda_filename = f"{RUN}.t{cyc:02d}z.{obs_space_name}.{cdatestr}.nc4"
obsprep_space['output file'] = ioda_filename
ioda_config_file = obtype + '2ioda.yaml'
obsprep_space['conversion config file'] = ioda_config_file

# set up the config file for conversion to IODA for bufr and
# netcdf files respectively
if obsprep_space['type'] == 'bufr':
# create a pre-filled template file for the bufr2ioda converter,
# which will be overwritten for each input cycle
bufrconv_config = {
'RUN': RUN,
'current_cycle': cdate,
'DMPDIR': COMIN_OBS,
'COM_OBS': COMIN_OBS,
'OCEAN_BASIN_FILE': OCEAN_BASIN_FILE}
obsprep_space['conversion config file'] = ioda_config_file
bufr2iodapy = BUFR2IODA_PY_DIR + '/bufr2ioda_' + obtype + '.py'
bufr2iodapy = os.path.join(BUFR2IODA_PY_DIR, f'bufr2ioda_{obtype}.py')
obsprep_space['bufr2ioda converter'] = bufr2iodapy
tmpl_filename = 'bufr2ioda_' + obtype + '.yaml'
tmpl_filename = f"bufr2ioda_{obtype}.yaml"
bufrconv_template = os.path.join(BUFR2IODA_TMPL_DIR, tmpl_filename)
output_files = [] # files to save to COM directory
bufrconv_files = [] # files needed to populate the IODA converter config
# for each cycle of the retrieved obs bufr files...
for input_file, cycle in fetched_files:
cycletime = cycle[8:10]
ioda_filename = f"{RUN}.t{cycletime}z.{obs_space_name}.{cycle}.nc4"
output_files.append(ioda_filename)
bufrconv_files.append((cycle, input_file, ioda_filename))

obsprep_space['output file'] = output_files
obsprep_space['bufrconv files'] = bufrconv_files

try:
bufrconv = parse_j2yaml(bufrconv_template, bufrconv_config)
bufrconv.update(obsprep_space)
bufrconv.save(ioda_config_file)
except Exception as e:
logger.warning(f"An exeception {e} occured while trying to create BUFR2IODA config")
Expand All @@ -180,7 +192,10 @@ def initialize(self):
obsspaces_to_convert.append({"obs space": obsprep_space})

elif obsprep_space['type'] == 'nc':
obsprep_space['conversion config file'] = ioda_config_file

obsprep_space['input files'] = [f[0] for f in fetched_files]
ioda_filename = f"{RUN}.t{cyc:02d}z.{obs_space_name}.{cdatestr}.nc4"
obsprep_space['output file'] = [ioda_filename]
save_as_yaml(obsprep_space, ioda_config_file)

obsspaces_to_convert.append({"obs space": obsprep_space})
Expand All @@ -192,7 +207,7 @@ def initialize(self):
logger.critical("Ill-formed OBS_YAML or OBSPREP_YAML file, exiting")
raise

# yes, there is redundancy between the yamls fed to the ioda converter and here,
# yes, there is redundancy between the yamls fed to the ioda converters and here,
# this seems safer and easier than being selective about the fields
save_as_yaml({"observations": obsspaces_to_convert}, self.task_config.conversion_list_file)

Expand Down Expand Up @@ -258,16 +273,18 @@ def finalize(self):

obsspaces_to_save = YAMLFile(self.task_config.save_list_file)

for obsspace_to_save in obsspaces_to_save['observations']:

output_file = os.path.basename(obsspace_to_save['output file'])
conv_config_file = os.path.basename(obsspace_to_save['conversion config file'])
output_file_dest = os.path.join(COMOUT_OBS, output_file)
for obs_space in obsspaces_to_save['observations']:
files_to_save = []
conv_config_file = os.path.basename(obs_space['conversion config file'])
conv_config_file_dest = os.path.join(COMOUT_OBS, conv_config_file)
files_to_save.append([conv_config_file, conv_config_file_dest])

for output_file in obs_space['output file']:
output_file_dest = os.path.join(COMOUT_OBS, output_file)
files_to_save.append([output_file, output_file_dest])

try:
FileHandler({'copy': [[output_file, output_file_dest]]}).sync()
FileHandler({'copy': [[conv_config_file, conv_config_file_dest]]}).sync()
FileHandler({'copy': files_to_save}).sync()
except Exception as e:
logger.warning(f"An exeception {e} occured while trying to run gen_bufr_json")
except OSError:
Expand Down
31 changes: 19 additions & 12 deletions ush/soca/prep_ocean_obs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import fnmatch
import subprocess
from wxflow import FileHandler, Logger
from wxflow import FileHandler, Logger, YAMLFile

logger = Logger()

Expand Down Expand Up @@ -36,10 +36,10 @@ def obs_fetch(config, task_config, obsprep_space, cycles):

for root, _, files in os.walk(full_input_dir):
for filename in fnmatch.filter(files, dumpdir_regex):
target_file = PDY + cyc + '-' + filename
matching_files.append((full_input_dir, filename, target_file))
target_file = f"{PDY}{cyc}-{filename}"
matching_files.append((full_input_dir, filename, target_file, f"{PDY}{cyc}"))

for full_input_dir, filename, target_file in matching_files:
for full_input_dir, filename, target_file, _ in matching_files:
file_path = os.path.join(full_input_dir, filename)
file_destination = os.path.join(COMIN_OBS, target_file)
file_copy.append([file_path, file_destination])
Expand All @@ -50,7 +50,7 @@ def obs_fetch(config, task_config, obsprep_space, cycles):
FileHandler({'copy': file_copy}).sync()

# return the modified file names for the IODA converters
return [f[2] for f in matching_files]
return [(f[2], f[3]) for f in matching_files]


def run_netcdf_to_ioda(obsspace_to_convert, OCNOBS2IODAEXEC):
Expand All @@ -69,11 +69,18 @@ def run_netcdf_to_ioda(obsspace_to_convert, OCNOBS2IODAEXEC):
def run_bufr_to_ioda(obsspace_to_convert):
logger.info(f"running run_bufr_to_ioda on {obsspace_to_convert['name']}")
bufrconv_yaml = obsspace_to_convert['conversion config file']
bufrconv_config = YAMLFile(bufrconv_yaml)
bufr2iodapy = obsspace_to_convert['bufr2ioda converter']
try:
subprocess.run(['python', bufr2iodapy, '-c', bufrconv_yaml], check=True)
return 0
except subprocess.CalledProcessError as e:
logger.warning(f"bufr2ioda converter failed with error >{e}<, \
return code {e.returncode}")
return e.returncode
obtype = obsspace_to_convert['name']

for cycle, input_file, output_file in obsspace_to_convert['bufrconv files']:
bufrconv_config['input_file'] = input_file
bufrconv_config['output_file'] = output_file
bufrconv_config['cycle_datetime'] = cycle
config_filename = f"{cycle}.{bufrconv_yaml}"
bufrconv_config.save(config_filename)
try:
subprocess.run(['python', bufr2iodapy, '-c', config_filename], check=True)
except subprocess.CalledProcessError as e:
logger.warning(f"bufr2ioda converter failed with error >{e}<, \
return code {e.returncode}")

0 comments on commit a36255a

Please sign in to comment.