Skip to content

Commit

Permalink
added perturbations for envar
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumevernieres committed Jul 3, 2024
1 parent ee36b5e commit 8ae7f79
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 116 deletions.
15 changes: 9 additions & 6 deletions jobs/JGLOBAL_MARINE_BMAT
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,26 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "marinebmat" -c "base ocnanal marinebm
# Set variables used in the script
##############################################
# shellcheck disable=SC2153
GDATE=$(date --utc +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours")
gPDY=${GDATE:0:8}
gcyc=${GDATE:8:2}
GDUMP="gdas"
export GDATE=$(date --utc +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours")
export gPDY=${GDATE:0:8}
export gcyc=${GDATE:8:2}
export GDUMP="gdas"
export GDUMP_ENS="enkf${GDUMP}"


##############################################
# Begin JOB SPECIFIC work
##############################################

# Generate COM variables from templates
# TODO: This is temporary, the plan is to prepare the bmatrix at the end of the
# cycle, so the backgrounds should be from the current cycle, not the previous
RUN=${GDUMP} YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx \
COMIN_OCEAN_HISTORY_PREV:COM_OCEAN_HISTORY_TMPL \
COMIN_ICE_HISTORY_PREV:COM_ICE_HISTORY_TMPL

RUN="enkfgdas" YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx \
COMIN_OCEAN_HISTORY_ENS_PREV:COM_OCEAN_HISTORY_TMPL \
COMIN_ICE_HISTORY_ENS_PREV:COM_ICE_HISTORY_TMPL

YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \
COMOUT_OCEAN_BMATRIX:COM_OCEAN_BMATRIX_TMPL \
COMOUT_ICE_BMATRIX:COM_ICE_BMATRIX_TMPL
Expand Down
4 changes: 2 additions & 2 deletions scripts/exglobal_marine_bmat_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@

# Create an instance of the MarineBMat task
marineBMat = MarineBMat(config)
marineBMat.initialize()
marineBMat.execute()


marineBMat.finalize()
2 changes: 2 additions & 0 deletions ush/python/pygfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from .task.aero_analysis import AerosolAnalysis
from .task.atm_analysis import AtmAnalysis
from .task.atmens_analysis import AtmEnsAnalysis
from .task.marine_bmat import MarineBMat
from .task.snow_analysis import SnowAnalysis
from .task.upp import UPP
from .task.oceanice_products import OceanIceProducts
from .task.gfs_forecast import GFSForecast
from .utils import marine_da_utils

__docformat__ = "restructuredtext"
__version__ = "0.1.0"
Expand Down
154 changes: 46 additions & 108 deletions ush/python/pygfs/task/marine_bmat.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
#!/usr/bin/env python3

import f90nml
import os
import glob
import gzip
import tarfile
from logging import getLogger
from typing import Dict, List, Any
import xarray as xr
import calc_scales
import subprocess
import pygfs.utils.marine_da_utils as mdau

from wxflow import (AttrDict,
FileHandler,
add_to_datetime, to_timedelta, to_YMDH,
add_to_datetime, to_timedelta,
chdir,
parse_j2yaml, save_as_yaml,
parse_j2yaml,
logit,
Executable,
WorkflowException,
jinja,
Task)

logger = getLogger(__name__.split('.')[-1])
Expand All @@ -31,8 +26,7 @@ class MarineBMat(Task):
@logit(logger, name="MarineBMat")
def __init__(self, config):
super().__init__(config)

_res_anl = self.task_config.OCNRES
_gPDY = self.task_config.gPDY
_gcyc_str = str(self.task_config.gcyc).zfill(2)
_cyc_str = str(self.task_config.cyc).zfill(2)
_home_gdas = os.path.join(self.task_config.HOMEgfs, 'sorc', 'gdas.cd')
Expand All @@ -50,83 +44,15 @@ def __init__(self, config):
'MARINE_WINDOW_END': _window_end,
'MARINE_WINDOW_MIDDLE': self.task_config.current_cycle,
'BERROR_YAML_DIR': os.path.join(_home_gdas, 'parm', 'soca', 'berror'),
'GRID_GEN_YAML': os.path.join(_home_gdas, 'parm', 'soca', 'gridgen', 'gridgen.yaml')
'GRID_GEN_YAML': os.path.join(_home_gdas, 'parm', 'soca', 'gridgen', 'gridgen.yaml'),
'MARINE_ENSDA_STAGE_BKG_YAML_TMPL': os.path.join(_home_gdas, 'parm', 'soca','ensda', 'stage_ens_mem.yaml.j2'),
'ens_dir': os.path.join(self.task_config.DATA, 'ens')
}
)

# Extend task_config with local_dict
self.task_config = AttrDict(**self.task_config, **local_dict)

@logit(logger)
def _run(self, exec_cmd):
"""Run the executable command
TODO: Move this method somewhere else
"""
logger.info(f"Executing {exec_cmd}")
try:
logger.debug(f"Executing {exec_cmd}")
exec_cmd()
except OSError:
raise OSError(f"Failed to execute {exec_cmd}")
except Exception:
raise WorkflowException(f"An error occured during execution of {exec_cmd}")

pass

@logit(logger)
def _link_executable(self, exe_name: str) -> None:
"""Link the executable to the DATA directory
TODO: Move this method somewhere else
"""
logger.info(f"Link executable {exe_name}")
logger.warn("Linking is not permitted per EE2.")
exe_src = os.path.join(self.task_config.EXECgfs, exe_name)
exe_dest = os.path.join(self.task_config.DATA, exe_name)
if os.path.exists(exe_dest):
os.remove(exe_dest)
os.symlink(exe_src, exe_dest)

@logit(logger)
def _prep_input_nml(self) -> None:
"""Prepare the input.nml file
TODO: Move this method somewhere else
"""
# stage input.nml
mom_input_nml_tmpl_src = os.path.join(self.task_config.HOMEgdas, 'parm', 'soca', 'fms', 'input.nml')
mom_input_nml_tmpl = os.path.join(self.task_config.DATA, 'mom_input.nml.tmpl')
FileHandler({'copy': [[mom_input_nml_tmpl_src, mom_input_nml_tmpl]]}).sync()

# swap date and stacksize
domain_stack_size = self.task_config.DOMAIN_STACK_SIZE
ymdhms = [int(s) for s in self.task_config.MARINE_WINDOW_END.strftime('%Y,%m,%d,%H,%M,%S').split(',')]
with open(mom_input_nml_tmpl, 'r') as nml_file:
nml = f90nml.read(nml_file)
nml['ocean_solo_nml']['date_init'] = ymdhms
nml['fms_nml']['domains_stack_size'] = int(domain_stack_size)
nml.write('mom_input.nml')

@logit(logger)
def _cice_hist2fms(self, input_filename, output_filename) -> None:
""" Reformat the CICE history file to be read by SOCA/FMS
Simple reformatting utility to allow soca/fms to read the CICE history files
"""

# open the CICE history file
ds = xr.open_dataset(input_filename)

if 'aicen' in ds.variables and 'hicen' in ds.variables and 'hsnon' in ds.variables:
logger.info(f"*** Already reformatted, skipping.")
return

# rename the dimensions to xaxis_1 and yaxis_1
ds = ds.rename({'ni': 'xaxis_1', 'nj': 'yaxis_1'})

# rename the variables
ds = ds.rename({'aice_h': 'aicen', 'hi_h': 'hicen', 'hs_h': 'hsnon'})

# Save the new netCDF file
ds.to_netcdf(output_filename, mode='w')

@logit(logger)
def initialize(self: Task) -> None:
"""Initialize a global B-matrix
Expand All @@ -150,7 +76,7 @@ def initialize(self: Task) -> None:
FileHandler(soca_fix_list).sync()

# prepare the MOM6 input.nml
self._prep_input_nml()
mdau.prep_input_nml(self.task_config)

# stage a single background
# TODO: check if we only need 1 deterministic background
Expand All @@ -165,7 +91,7 @@ def initialize(self: Task) -> None:
bkg_list.append([os.path.join(ice_bkg_dir, f"gdas.ice.t{self.task_config.gcyc_str}z.inst.f009.nc"),
os.path.join(self.task_config.DATA, "ice.bkg.nc")])
FileHandler({'copy': bkg_list}).sync()
self._cice_hist2fms("ice.bkg.nc", "ice.bkg.nc")
mdau.cice_hist2fms("ice.bkg.nc", "ice.bkg.nc")

# Copy MOM6 restart
# TODO: check if we can combine this with the step above
Expand Down Expand Up @@ -213,73 +139,75 @@ def initialize(self: Task) -> None:
diffhz_config.save(os.path.join(self.task_config.DATA, 'soca_parameters_diffusion_hz.yaml'))

# hybrid EnVAR case
if True:
# TODO(G): copy logic/steps from old script in the block below
# ensemble DA is temporarily off, check self.task_config.DOHYBVAR.
if self.task_config.DOHYBVAR == "YES" or self.task_config.NMEM_ENS > 3:
# stage ensemble membersfiles for use in hybrid background error
logger.debug("Stage ensemble files for DOHYBVAR {self.task_config.DOHYBVAR}")
logger.debug(f"Stage ensemble members for the hybrid background error")
mdau.stage_ens_mem(self.task_config)

# generate ensemble recentering YAML file
logger.debug("Generate ensemble recentering YAML file: {self.task_config.abcd_yaml}")
logger.debug("Generate ensemble recentering YAML file")
ensrecenter_config = parse_j2yaml(path=os.path.join(self.task_config.BERROR_YAML_DIR, 'soca_ensrecenter.yaml.j2'),
data=self.task_config)
ensrecenter_config.save(os.path.join(self.task_config.DATA, 'soca_ensrecenter.yaml'))

# generate hybrid weights YAML file
# generate ensemble weights YAML file
logger.debug("Generate ensemble recentering YAML file: {self.task_config.abcd_yaml}")
hybridweights_config = parse_j2yaml(path=os.path.join(self.task_config.BERROR_YAML_DIR, 'soca_ensweights.yaml.j2'),
data=self.task_config)
hybridweights_config.save(os.path.join(self.task_config.DATA, 'soca_ensweights.yaml'))

# need output dir for ensemble perturbations and static B-matrix
logger.debug("Create empty output [ensb, diagb] directories to receive output from executables")
newdirs = [
os.path.join(self.task_config.DATA, 'ensb'),
os.path.join(self.task_config.DATA, 'diagb'),
]
newdirs = [os.path.join(self.task_config.DATA, 'diagb')]
FileHandler({'mkdir': newdirs}).sync()

@logit(logger)
def gridgen(self: Task) -> None:
# link gdas_soca_gridgen.x
self._link_executable('gdas_soca_gridgen.x')
mdau.link_executable(self.task_config, 'gdas_soca_gridgen.x')
exec_cmd = Executable(self.task_config.APRUN_MARINEBMAT)
exec_name = os.path.join(self.task_config.DATA, 'gdas_soca_gridgen.x')
exec_cmd.add_default_arg(exec_name)
exec_cmd.add_default_arg('gridgen.yaml')

self._run(exec_cmd)
mdau.run(exec_cmd)

@logit(logger)
def variance_partitioning(self: Task) -> None:
# link the variance partitioning executable, gdas_soca_diagb.x
self._link_executable('gdas_soca_diagb.x')
mdau.link_executable(self.task_config, 'gdas_soca_diagb.x')
exec_cmd = Executable(self.task_config.APRUN_MARINEBMAT)
exec_name = os.path.join(self.task_config.DATA, 'gdas_soca_diagb.x')
exec_cmd.add_default_arg(exec_name)
exec_cmd.add_default_arg('soca_diagb.yaml')

self._run(exec_cmd)
mdau.run(exec_cmd)

@logit(logger)
def horizontal_diffusion(self: Task) -> None:
"""Generate the horizontal diffusion coefficients
"""
# link the executable that computes the correlation scales, gdas_soca_setcorscales.x,
# and prepare the command to run it
self._link_executable('gdas_soca_setcorscales.x')
mdau.link_executable(self.task_config, 'gdas_soca_setcorscales.x')
exec_cmd = Executable(self.task_config.APRUN_MARINEBMAT)
exec_name = os.path.join(self.task_config.DATA, 'gdas_soca_setcorscales.x')
exec_cmd.add_default_arg(exec_name)
exec_cmd.add_default_arg('soca_setcorscales.yaml')

# create a files containing the correlation scales
self._run(exec_cmd)
mdau.run(exec_cmd)

# link the executable that computes the correlation scales, gdas_soca_error_covariance_toolbox.x,
# and prepare the command to run it
self._link_executable('gdas_soca_error_covariance_toolbox.x')
mdau.link_executable(self.task_config, 'gdas_soca_error_covariance_toolbox.x')
exec_cmd = Executable(self.task_config.APRUN_MARINEBMAT)
exec_name = os.path.join(self.task_config.DATA, 'gdas_soca_error_covariance_toolbox.x')
exec_cmd.add_default_arg(exec_name)
exec_cmd.add_default_arg('soca_parameters_diffusion_hz.yaml')

# compute the coefficients of the diffusion operator
self._run(exec_cmd)
mdau.run(exec_cmd)

@logit(logger)
def vertical_diffusion(self: Task) -> None:
Expand All @@ -290,14 +218,14 @@ def vertical_diffusion(self: Task) -> None:

# link the executable that computes the correlation scales, gdas_soca_error_covariance_toolbox.x,
# and prepare the command to run it
self._link_executable('gdas_soca_error_covariance_toolbox.x')
mdau.link_executable(self.task_config, 'gdas_soca_error_covariance_toolbox.x')
exec_cmd = Executable(self.task_config.APRUN_MARINEBMAT)
exec_name = os.path.join(self.task_config.DATA, 'gdas_soca_error_covariance_toolbox.x')
exec_cmd.add_default_arg(exec_name)
exec_cmd.add_default_arg('soca_parameters_diffusion_vt.yaml')

# compute the coefficients of the diffusion operator
self._run(exec_cmd)
mdau.run(exec_cmd)

@logit(logger)
def ensemble_perturbations(self: Task) -> None:
Expand All @@ -311,7 +239,14 @@ def ensemble_perturbations(self: Task) -> None:
accounting for the nonlinear steric recentering
- saving the recentered ensemble statistics
"""
# gdas_ens_handler.x
mdau.link_executable(self.task_config, 'gdas_ens_handler.x')
exec_cmd = Executable(self.task_config.APRUN_MARINEBMAT)
exec_name = os.path.join(self.task_config.DATA, 'gdas_ens_handler.x')
exec_cmd.add_default_arg(exec_name)
exec_cmd.add_default_arg('soca_ensrecenter.yaml')

# compute the coefficients of the diffusion operator
mdau.run(exec_cmd)

@logit(logger)
def hybrid_weight(self: Task) -> None:
Expand All @@ -321,7 +256,12 @@ def hybrid_weight(self: Task) -> None:
variables.
TODO(G): Currently implemented for the specific case of the static ensemble members only
"""
# gdas_socahybridweights.x
mdau.link_executable(self.task_config, 'gdas_socahybridweights.x')
exec_cmd = Executable(self.task_config.APRUN_MARINEBMAT)
exec_name = os.path.join(self.task_config.DATA, 'gdas_socahybridweights.x')
exec_cmd.add_default_arg(exec_name)
exec_cmd.add_default_arg('soca_hybridweights.yaml')


@logit(logger)
def execute(self: Task) -> None:
Expand All @@ -330,14 +270,12 @@ def execute(self: Task) -> None:
This method will generate the full B-matrix according to the configuration.
"""
chdir(self.task_config.DATA)
self.initialize()
self.gridgen() # TODO: This should be optional in case the geometry file was staged
self.variance_partitioning()
self.horizontal_diffusion() # TODO: Make this optional once we've converged on an acceptable set of scales
self.vertical_diffusion()
self.ensemble_perturbations() # TODO: refactor this from the old scripts
self.hybrid_weight() # TODO: refactor this from the old scripts
self.finalize()

@logit(logger)
def finalize(self: Task) -> None:
Expand Down
Empty file.
Loading

0 comments on commit 8ae7f79

Please sign in to comment.