Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add observation window to Argo float obs and other BUFR-sourced obs #1423

Merged
merged 6 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions parm/soca/obsprep/obsprep_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ observations:
provider: GTS
dmpdir subdir: atmos
type: bufr
window:
back: 4
forward: 4
dmpdir regex: 'gdas.*.subpfl.*.bufr_d'

- obs space:
Expand Down
57 changes: 37 additions & 20 deletions ush/soca/prep_ocean_obs.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,39 +138,51 @@ def initialize(self):
interval = timedelta(hours=assim_freq * i)
window_cdates.append(cdate + interval)

input_files = prep_ocean_obs_utils.obs_fetch(self.task_config,
self.task_config,
obsprep_space,
window_cdates)
# fetch the obs files to DATA directory and get the list of files and cycles
fetched_files = prep_ocean_obs_utils.obs_fetch(self.task_config,
self.task_config,
obsprep_space,
window_cdates)

if not input_files:
if not fetched_files:
logger.warning(f"No files found for obs source {obtype}, skipping")
break # go to next observer in OBS_YAML

obsprep_space['input files'] = input_files
obsprep_space['window begin'] = self.window_begin
obsprep_space['window end'] = self.window_end
ioda_filename = f"{RUN}.t{cyc:02d}z.{obs_space_name}.{cdatestr}.nc4"
obsprep_space['output file'] = ioda_filename
ioda_config_file = obtype + '2ioda.yaml'
obsprep_space['conversion config file'] = ioda_config_file

# set up the config file for conversion to IODA for bufr and
# netcdf files respectively
if obsprep_space['type'] == 'bufr':
# create a pre-filled template file for the bufr2ioda converter,
# which will be overwritten for each input cycle
bufrconv_config = {
'RUN': RUN,
'current_cycle': cdate,
'DMPDIR': COMIN_OBS,
'COM_OBS': COMIN_OBS,
'OCEAN_BASIN_FILE': OCEAN_BASIN_FILE}
obsprep_space['conversion config file'] = ioda_config_file
bufr2iodapy = BUFR2IODA_PY_DIR + '/bufr2ioda_' + obtype + '.py'
bufr2iodapy = os.path.join(BUFR2IODA_PY_DIR, f'bufr2ioda_{obtype}.py')
obsprep_space['bufr2ioda converter'] = bufr2iodapy
tmpl_filename = 'bufr2ioda_' + obtype + '.yaml'
tmpl_filename = f"bufr2ioda_{obtype}.yaml"
bufrconv_template = os.path.join(BUFR2IODA_TMPL_DIR, tmpl_filename)
output_files = [] # files to save to COM directory
bufrconv_files = [] # files needed to populate the IODA converter config
# for each cycle of the retrieved obs bufr files...
for input_file, cycle in fetched_files:
cycletime = cycle[8:10]
ioda_filename = f"{RUN}.t{cycletime}z.{obs_space_name}.{cycle}.nc4"
output_files.append(ioda_filename)
bufrconv_files.append((cycle, input_file, ioda_filename))

obsprep_space['output file'] = output_files
obsprep_space['bufrconv files'] = bufrconv_files

try:
bufrconv = parse_j2yaml(bufrconv_template, bufrconv_config)
bufrconv.update(obsprep_space)
bufrconv.save(ioda_config_file)
except Exception as e:
logger.warning(f"An exeception {e} occured while trying to create BUFR2IODA config")
Expand All @@ -180,7 +192,10 @@ def initialize(self):
obsspaces_to_convert.append({"obs space": obsprep_space})

elif obsprep_space['type'] == 'nc':
obsprep_space['conversion config file'] = ioda_config_file

obsprep_space['input files'] = [f[0] for f in fetched_files]
ioda_filename = f"{RUN}.t{cyc:02d}z.{obs_space_name}.{cdatestr}.nc4"
obsprep_space['output file'] = [ioda_filename]
save_as_yaml(obsprep_space, ioda_config_file)

obsspaces_to_convert.append({"obs space": obsprep_space})
Expand All @@ -192,7 +207,7 @@ def initialize(self):
logger.critical("Ill-formed OBS_YAML or OBSPREP_YAML file, exiting")
raise

# yes, there is redundancy between the yamls fed to the ioda converter and here,
# yes, there is redundancy between the yamls fed to the ioda converters and here,
# this seems safer and easier than being selective about the fields
save_as_yaml({"observations": obsspaces_to_convert}, self.task_config.conversion_list_file)

Expand Down Expand Up @@ -258,16 +273,18 @@ def finalize(self):

obsspaces_to_save = YAMLFile(self.task_config.save_list_file)

for obsspace_to_save in obsspaces_to_save['observations']:

output_file = os.path.basename(obsspace_to_save['output file'])
conv_config_file = os.path.basename(obsspace_to_save['conversion config file'])
output_file_dest = os.path.join(COMOUT_OBS, output_file)
for obs_space in obsspaces_to_save['observations']:
files_to_save = []
conv_config_file = os.path.basename(obs_space['conversion config file'])
conv_config_file_dest = os.path.join(COMOUT_OBS, conv_config_file)
files_to_save.append([conv_config_file, conv_config_file_dest])

for output_file in obs_space['output file']:
output_file_dest = os.path.join(COMOUT_OBS, output_file)
files_to_save.append([output_file, output_file_dest])

try:
FileHandler({'copy': [[output_file, output_file_dest]]}).sync()
FileHandler({'copy': [[conv_config_file, conv_config_file_dest]]}).sync()
FileHandler({'copy': files_to_save}).sync()
except Exception as e:
logger.warning(f"An exeception {e} occured while trying to run gen_bufr_json")
except OSError:
Expand Down
31 changes: 19 additions & 12 deletions ush/soca/prep_ocean_obs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import fnmatch
import subprocess
from wxflow import FileHandler, Logger
from wxflow import FileHandler, Logger, YAMLFile

logger = Logger()

Expand Down Expand Up @@ -36,10 +36,10 @@ def obs_fetch(config, task_config, obsprep_space, cycles):

for root, _, files in os.walk(full_input_dir):
for filename in fnmatch.filter(files, dumpdir_regex):
target_file = PDY + cyc + '-' + filename
matching_files.append((full_input_dir, filename, target_file))
target_file = f"{PDY}{cyc}-{filename}"
matching_files.append((full_input_dir, filename, target_file, f"{PDY}{cyc}"))

for full_input_dir, filename, target_file in matching_files:
for full_input_dir, filename, target_file, _ in matching_files:
file_path = os.path.join(full_input_dir, filename)
file_destination = os.path.join(COMIN_OBS, target_file)
file_copy.append([file_path, file_destination])
Expand All @@ -50,7 +50,7 @@ def obs_fetch(config, task_config, obsprep_space, cycles):
FileHandler({'copy': file_copy}).sync()

# return the modified file names for the IODA converters
return [f[2] for f in matching_files]
return [(f[2], f[3]) for f in matching_files]


def run_netcdf_to_ioda(obsspace_to_convert, OCNOBS2IODAEXEC):
Expand All @@ -69,11 +69,18 @@ def run_netcdf_to_ioda(obsspace_to_convert, OCNOBS2IODAEXEC):
def run_bufr_to_ioda(obsspace_to_convert):
logger.info(f"running run_bufr_to_ioda on {obsspace_to_convert['name']}")
bufrconv_yaml = obsspace_to_convert['conversion config file']
bufrconv_config = YAMLFile(bufrconv_yaml)
bufr2iodapy = obsspace_to_convert['bufr2ioda converter']
try:
subprocess.run(['python', bufr2iodapy, '-c', bufrconv_yaml], check=True)
return 0
except subprocess.CalledProcessError as e:
logger.warning(f"bufr2ioda converter failed with error >{e}<, \
return code {e.returncode}")
return e.returncode
obtype = obsspace_to_convert['name']

for cycle, input_file, output_file in obsspace_to_convert['bufrconv files']:
bufrconv_config['input_file'] = input_file
bufrconv_config['output_file'] = output_file
bufrconv_config['cycle_datetime'] = cycle
config_filename = f"{cycle}.{bufrconv_yaml}"
bufrconv_config.save(config_filename)
try:
subprocess.run(['python', bufr2iodapy, '-c', config_filename], check=True)
except subprocess.CalledProcessError as e:
logger.warning(f"bufr2ioda converter failed with error >{e}<, \
return code {e.returncode}")