diff --git a/parm/soca/obsprep/obsprep_config.yaml b/parm/soca/obsprep/obsprep_config.yaml index 905f49be6..02f4edb5d 100644 --- a/parm/soca/obsprep/obsprep_config.yaml +++ b/parm/soca/obsprep/obsprep_config.yaml @@ -244,6 +244,9 @@ observations: provider: GTS dmpdir subdir: atmos type: bufr + window: + back: 4 + forward: 4 dmpdir regex: 'gdas.*.subpfl.*.bufr_d' - obs space: diff --git a/ush/soca/prep_ocean_obs.py b/ush/soca/prep_ocean_obs.py index da7b2da6a..e751964ab 100644 --- a/ush/soca/prep_ocean_obs.py +++ b/ush/soca/prep_ocean_obs.py @@ -138,39 +138,51 @@ def initialize(self): interval = timedelta(hours=assim_freq * i) window_cdates.append(cdate + interval) - input_files = prep_ocean_obs_utils.obs_fetch(self.task_config, - self.task_config, - obsprep_space, - window_cdates) + # fetch the obs files to DATA directory and get the list of files and cycles + fetched_files = prep_ocean_obs_utils.obs_fetch(self.task_config, + self.task_config, + obsprep_space, + window_cdates) - if not input_files: + if not fetched_files: logger.warning(f"No files found for obs source {obtype}, skipping") break # go to next observer in OBS_YAML - obsprep_space['input files'] = input_files obsprep_space['window begin'] = self.window_begin obsprep_space['window end'] = self.window_end - ioda_filename = f"{RUN}.t{cyc:02d}z.{obs_space_name}.{cdatestr}.nc4" - obsprep_space['output file'] = ioda_filename ioda_config_file = obtype + '2ioda.yaml' + obsprep_space['conversion config file'] = ioda_config_file # set up the config file for conversion to IODA for bufr and # netcdf files respectively if obsprep_space['type'] == 'bufr': + # create a pre-filled template file for the bufr2ioda converter, + # which will be overwritten for each input cycle bufrconv_config = { 'RUN': RUN, 'current_cycle': cdate, 'DMPDIR': COMIN_OBS, 'COM_OBS': COMIN_OBS, 'OCEAN_BASIN_FILE': OCEAN_BASIN_FILE} - obsprep_space['conversion config file'] = ioda_config_file - bufr2iodapy = BUFR2IODA_PY_DIR + '/bufr2ioda_' + obtype + '.py' + bufr2iodapy = os.path.join(BUFR2IODA_PY_DIR, f'bufr2ioda_{obtype}.py') obsprep_space['bufr2ioda converter'] = bufr2iodapy - tmpl_filename = 'bufr2ioda_' + obtype + '.yaml' + tmpl_filename = f"bufr2ioda_{obtype}.yaml" bufrconv_template = os.path.join(BUFR2IODA_TMPL_DIR, tmpl_filename) + output_files = [] # files to save to COM directory + bufrconv_files = [] # files needed to populate the IODA converter config + # for each cycle of the retrieved obs bufr files... + for input_file, cycle in fetched_files: + cycletime = cycle[8:10] + ioda_filename = f"{RUN}.t{cycletime}z.{obs_space_name}.{cycle}.nc4" + output_files.append(ioda_filename) + bufrconv_files.append((cycle, input_file, ioda_filename)) + + obsprep_space['output file'] = output_files + obsprep_space['bufrconv files'] = bufrconv_files try: bufrconv = parse_j2yaml(bufrconv_template, bufrconv_config) + bufrconv.update(obsprep_space) bufrconv.save(ioda_config_file) except Exception as e: logger.warning(f"An exeception {e} occured while trying to create BUFR2IODA config") @@ -180,7 +192,10 @@ def initialize(self): obsspaces_to_convert.append({"obs space": obsprep_space}) elif obsprep_space['type'] == 'nc': - obsprep_space['conversion config file'] = ioda_config_file + + obsprep_space['input files'] = [f[0] for f in fetched_files] + ioda_filename = f"{RUN}.t{cyc:02d}z.{obs_space_name}.{cdatestr}.nc4" + obsprep_space['output file'] = [ioda_filename] save_as_yaml(obsprep_space, ioda_config_file) obsspaces_to_convert.append({"obs space": obsprep_space}) @@ -192,7 +207,7 @@ def initialize(self): logger.critical("Ill-formed OBS_YAML or OBSPREP_YAML file, exiting") raise - # yes, there is redundancy between the yamls fed to the ioda converter and here, + # yes, there is redundancy between the yamls fed to the ioda converters and here, # this seems safer and easier than being selective about the fields save_as_yaml({"observations": obsspaces_to_convert}, self.task_config.conversion_list_file) @@ -258,16 +273,18 @@ def finalize(self): obsspaces_to_save = YAMLFile(self.task_config.save_list_file) - for obsspace_to_save in obsspaces_to_save['observations']: - - output_file = os.path.basename(obsspace_to_save['output file']) - conv_config_file = os.path.basename(obsspace_to_save['conversion config file']) - output_file_dest = os.path.join(COMOUT_OBS, output_file) + for obs_space in obsspaces_to_save['observations']: + files_to_save = [] + conv_config_file = os.path.basename(obs_space['conversion config file']) conv_config_file_dest = os.path.join(COMOUT_OBS, conv_config_file) + files_to_save.append([conv_config_file, conv_config_file_dest]) + + for output_file in obs_space['output file']: + output_file_dest = os.path.join(COMOUT_OBS, output_file) + files_to_save.append([output_file, output_file_dest]) try: - FileHandler({'copy': [[output_file, output_file_dest]]}).sync() - FileHandler({'copy': [[conv_config_file, conv_config_file_dest]]}).sync() + FileHandler({'copy': files_to_save}).sync() except Exception as e: logger.warning(f"An exeception {e} occured while trying to run gen_bufr_json") except OSError: diff --git a/ush/soca/prep_ocean_obs_utils.py b/ush/soca/prep_ocean_obs_utils.py index 9ecb06464..c8f42a375 100755 --- a/ush/soca/prep_ocean_obs_utils.py +++ b/ush/soca/prep_ocean_obs_utils.py @@ -2,7 +2,7 @@ import os import fnmatch import subprocess -from wxflow import FileHandler, Logger +from wxflow import FileHandler, Logger, YAMLFile logger = Logger() @@ -36,10 +36,10 @@ def obs_fetch(config, task_config, obsprep_space, cycles): for root, _, files in os.walk(full_input_dir): for filename in fnmatch.filter(files, dumpdir_regex): - target_file = PDY + cyc + '-' + filename - matching_files.append((full_input_dir, filename, target_file)) + target_file = f"{PDY}{cyc}-{filename}" + matching_files.append((full_input_dir, filename, target_file, f"{PDY}{cyc}")) - for full_input_dir, filename, target_file in matching_files: + for full_input_dir, filename, target_file, _ in matching_files: file_path = os.path.join(full_input_dir, filename) file_destination = os.path.join(COMIN_OBS, target_file) file_copy.append([file_path, file_destination]) @@ -50,7 +50,7 @@ def obs_fetch(config, task_config, obsprep_space, cycles): FileHandler({'copy': file_copy}).sync() # return the modified file names for the IODA converters - return [f[2] for f in matching_files] + return [(f[2], f[3]) for f in matching_files] def run_netcdf_to_ioda(obsspace_to_convert, OCNOBS2IODAEXEC): @@ -69,11 +69,18 @@ def run_netcdf_to_ioda(obsspace_to_convert, OCNOBS2IODAEXEC): def run_bufr_to_ioda(obsspace_to_convert): logger.info(f"running run_bufr_to_ioda on {obsspace_to_convert['name']}") bufrconv_yaml = obsspace_to_convert['conversion config file'] + bufrconv_config = YAMLFile(bufrconv_yaml) bufr2iodapy = obsspace_to_convert['bufr2ioda converter'] - try: - subprocess.run(['python', bufr2iodapy, '-c', bufrconv_yaml], check=True) - return 0 - except subprocess.CalledProcessError as e: - logger.warning(f"bufr2ioda converter failed with error >{e}<, \ - return code {e.returncode}") - return e.returncode + obtype = obsspace_to_convert['name'] + + for cycle, input_file, output_file in obsspace_to_convert['bufrconv files']: + bufrconv_config['input_file'] = input_file + bufrconv_config['output_file'] = output_file + bufrconv_config['cycle_datetime'] = cycle + config_filename = f"{cycle}.{bufrconv_yaml}" + bufrconv_config.save(config_filename) + try: + subprocess.run(['python', bufr2iodapy, '-c', config_filename], check=True) + except subprocess.CalledProcessError as e: + logger.warning(f"bufr2ioda converter failed with error >{e}<, \ + return code {e.returncode}")