Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nextsimdg #8

Merged
merged 3 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/samples/nextsim.dg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ job_submit:
scheduler: oar
ppn: 32
project: pr-sasip
# one can choose `devel` queue or `standard` queue
queue: devel
walltime: 1800
myying marked this conversation as resolved.
Show resolved Hide resolved
run_separate_jobs: True

nproc: 32 ##total number of processors
Expand Down Expand Up @@ -63,6 +65,7 @@ model_def:
nextsim.dg:
config_file: '/bettik/yumengch-ext/NEDAS/models/nextsim/dg/default.yml'
model_env: '/bettik/yumengch-ext/NEDAS/models/nextsim/dg/setup-gricad.src'
model_config_file: '/bettik/yumengch-ext/NEDAS/models/nextsim/dg/template.cfg'
nproc_per_run: 8 ##number of processors to run a forecast
parallel_mode: openmp
walltime: 10000 ##walltime in seconds
Expand Down
4 changes: 2 additions & 2 deletions config/samples/python_dahu.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
source $HOME/.bashrc
source /applis/environments/conda.sh

conda activate NEDAS
export PYTHONPATH=$PYTHONPATH:/bettik/yumengch-ext/NEDAS
conda activate nedas
export PYTHONPATH=$PYTHONPATH:/bettik/${USER}/NEDAS
10 changes: 5 additions & 5 deletions models/nextsim/dg/default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
## instead of converting from lon,lat which I think is unnecessary
grid_def:
proj: '+proj=stere +a=6378273 +b=6356889.448910593 +lat_0=90. +lon_0=-45. +lat_ts=60.'
xstart:
xend:
ystart:
yend:
dx:
xstart: -2.5e6
xend: 2.498e6
ystart: -2e6
yend: 2.5e6
dx: 3e3

restart_dt: 24 ##model restart interval (hours)
##physical parameters
Expand Down
2 changes: 1 addition & 1 deletion models/nextsim/dg/forcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def perturb_forcing(forcing_options:dict, file_options:dict, i_ens: int, time: d
os.makedirs(os.path.join(pert_path, f'ensemble_{i_ens}'), exist_ok=True)
# time index and time array
time_index: np.ndarray[typing.Any, np.dtype[np.int64]]
time_array: np.ndarray[typing.Any, np.dtype[np.float64]]
time_array: list[datetime]

for forcing_name in forcing_options:
if forcing_name not in file_options: continue
Expand Down
125 changes: 67 additions & 58 deletions models/nextsim/dg/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ def __init__(self, config_file=None, parse_args=False, **kwargs):
self.z_units = 'm'

# construct grid obj based on config
self.grid = Grid.regular_grid(Proj(self.proj), self.xstart, self.xend, self.ystart, self.yend, self.dx)
self.grid = Grid.regular_grid(
Proj(self.grid_def['proj']),
float(self.grid_def['xstart']),
float(self.grid_def['xend']),
float(self.grid_def['ystart']),
float(self.grid_def['yend']),
float(self.grid_def['dx']))
self.mask = np.full(self.grid.x.shape, False) ##model grid points that are masked (land?)

def filename(self, **kwargs):
Expand Down Expand Up @@ -159,9 +165,20 @@ def preprocess(self, task_id:int=0, **kwargs):
ensemble member id
- time : datetime
start time of the forecast
- time_start: datetime
initial time of the forecast cycles
- path : str
path to the working directory of the ensemble member
These are defined in the configuration file model_def:nextsim.dg section,
- forecast_period : int
number of hours being forecasted by model
- restart_dir : str
the saved restart directory from the previous cycle,
which is the model run directory from
the previous cycle. In the initial cycle, this
directory is given as `ens_init_dir` defined in
`nextsim.dg`` section of the `model_def` section
in nedas config file.
These are defined in the `config_file` entry of model_def:nextsim.dg section,
and parse_config will bring them in this class as
- self.files : dict
This section contains the filenames for the restart file.
Expand All @@ -185,10 +202,7 @@ def preprocess(self, task_id:int=0, **kwargs):
# directory where files are being collected to, and where the model will be run
run_dir = os.path.join(kwargs['path'], ens_mem_dir)
makedir(run_dir)
# kwargs['restart_dir'] is where to obtain the files, see scripts/preprocess.py how it's defined
# at time_start files are from model_def:nextsim.dg:ens_init_dir
# during cycling, the files come from work_dir/cycle/prev_time/model_name


# get all required filenames for the initial ensemble
# 1. get current and next time
time = kwargs['time']
Expand All @@ -197,77 +211,69 @@ def preprocess(self, task_id:int=0, **kwargs):

# 2. get the restart and forcing filename
file_options_restart = self.files['restart']
fname_restart:str = restart.get_restart_filename(file_options_restart, ens_mem_id, time)
# the restart file needs to be obtained from prev_time instead during cycling,
# not just copying from files:restart:format path again
# TODO: also, get_restart_filename doesn't seem to need ens_mem_id input, format doesn't involve {i}
fname_restart_src = os.path.join(kwargs['restart_dir'], ens_mem_dir, os.path.basename(fname_restart))
# TODO: at time_start this is pointing to ens_init_dir/ens_mem/fname; but it seems ens_init_dir only contain a single copy, not an ensemble
# temporary fix, if cannot find ens_init_dir/ens_mem/fname, use fname instead:
if not os.path.exists(fname_restart_src):
fname_restart_src = fname_restart

file_options_forcing:dict[str, str] = self.files['forcing']
# obtain restart file at initial cycling
fname_restart_init:str = restart.get_restart_filename(file_options_restart, ens_mem_id, time)
fname_restart:str = os.path.join(kwargs['restart_dir'], ens_mem_dir, os.path.basename(fname_restart_init))
if not os.path.exists(fname_restart):
fname_restart = fname_restart_init

file_options_forcing:dict[str, dict] = self.files['forcing']
fname_forcing:dict[str, str] = dict()
for forcing_name in file_options_forcing:
fname_forcing[forcing_name] = forcing.get_forcing_filename(file_options_forcing[forcing_name], ens_mem_id, time)
# note: it is okay to always copy forcing files from original directory over to the run_dir
# there is no need to cycle these forcing variables

# no need for perturbation if not specified in yaml file
if not hasattr(self, 'perturb'):
print ('We do no perturbations as perturb section is not specified in the model configuration.', flush=True)
print('We do no perturbations as perturb section '
'is not specified in the model configuration.',
flush=True)
# we we do not perturb the restart file
# simply link the restart files
os.system(f'ln -s {fname_restart_src} {run_dir}')
os.system(f'ln -s {fname_restart} {run_dir}')
# we we do not perturb the forcing file
# simply link the forcing files
for forcing_name in file_options_forcing:
os.system(f'ln -s {fname_forcing[forcing_name]} {run_dir}')
return

# 3. add perturbations
# here, if 'restart section is not under perturb section
# we only link the restart file to each ensemble directory
if 'restart' not in self.perturb:
if 'restart' not in self.perturb or kwargs['time'] != kwargs['time_start']:
myying marked this conversation as resolved.
Show resolved Hide resolved
# we we do not perturb the restart file
# simply link the restart files
os.system(f'ln -s {fname_restart_src} {run_dir}')
return
os.system(f'ln -s {fname_restart} {run_dir}')
else:
restart_options = self.perturb['restart']
# copy restart files to the ensemble member directory
fname = os.path.join(run_dir, os.path.basename(fname_restart))
subprocess.run(['cp', '-v', fname_restart, fname])
# prepare the restart file options for the perturbation
file_options_rst = {'fname': fname,
'lon_name':file_options_restart['lon_name'],
'lat_name':file_options_restart['lat_name']}
# perturb the restart file
restart.perturb_restart(restart_options, file_options_rst)

if 'forcing' not in self.perturb:
# we we do not perturb the forcing file
# simply link the forcing files
for forcing_name in file_options_forcing:
os.system(f'ln -s {fname_forcing[forcing_name]} {run_dir}')
return

# 3. add perturbations
restart_options = self.perturb['restart']
# copy restart files to the ensemble member directory
fname = os.path.join(run_dir, os.path.basename(fname_restart))
subprocess.run(['cp', '-v', fname_restart_src, fname])
# prepare the restart file options for the perturbation
file_options = {'fname': fname,
'lon_name':file_options_restart['lon_name'],
'lat_name':file_options_restart['lat_name']}
# perturb the restart file
restart.perturb_restart(restart_options, file_options)

forcing_options = self.perturb['forcing']
# construct file options for forcing
file_options:dict = dict()
for forcing_name in forcing_options:
# we ignore entries that are not in the files options
# e.g., path
if forcing_name not in fname_forcing: continue
fname = os.path.join(run_dir,
os.path.basename(fname_forcing[forcing_name])
)
# the forcing file options for the perturbation
file_options[forcing_name] = {'fname_src': fname_forcing[forcing_name],
'fname': fname,
**file_options_forcing[forcing_name]}
# add forcing perturbations
forcing.perturb_forcing(forcing_options, file_options, ens_mem_id, time, next_time)
else:
forcing_options = self.perturb['forcing']
for forcing_name in forcing_options:
# we ignore entries that are not in the files options
# e.g., path
if forcing_name not in fname_forcing: continue
fname = os.path.join(run_dir,
os.path.basename(fname_forcing[forcing_name])
)
# the forcing file options for the perturbation
file_options_forcing[forcing_name]['fname_src'] = fname_forcing[forcing_name]
file_options_forcing[forcing_name]['fname'] = fname
# add forcing perturbations
forcing.perturb_forcing(forcing_options, file_options_forcing, ens_mem_id, time, next_time)

def run(self, task_id=0, **kwargs):
"""Run nextsim.dg model forecast"""
Expand All @@ -286,7 +292,7 @@ def run(self, task_id=0, **kwargs):
else:
run_dir = kwargs['path']
makedir(run_dir)
namelist.make_namelist(run_dir, **kwargs)
namelist.make_namelist(self.files, self.model_config_file, run_dir, **kwargs)

##build shell commands for running the model
shell_cmd = "echo starting the script...; "
Expand All @@ -313,7 +319,8 @@ def run(self, task_id=0, **kwargs):
def run_batch(self, task_id=0, **kwargs):
"""Run nextsim.dg model ensemble forecast, use job array to spawn the member runs"""
kwargs = super().parse_kwargs(**kwargs)
assert kwargs['use_job_array'], "use_job_array shall be True if running ensemble in batch mode."
assert self.use_job_array, \
"use_job_array shall be True if running ensemble in batch mode."

time = kwargs['time']
forecast_period = kwargs['forecast_period']
Expand All @@ -325,8 +332,10 @@ def run_batch(self, task_id=0, **kwargs):
ens_dir = os.path.join(run_dir, f"ens_{member+1:02}")
makedir(ens_dir)

kwargs['member'] = member
##this creates run_dir/ens_??/nextsim.cfg
namelist.make_namelist(ens_dir, member=member, **kwargs)
namelist.make_namelist(self.files, self.model_config_file,
ens_dir, **kwargs)

##build shell commands for running the model using job array
shell_cmd = "echo starting the script...; "
Expand All @@ -344,7 +353,7 @@ def run_batch(self, task_id=0, **kwargs):
else:
raise TypeError(f"unknown parallel mode '{self.parallel_mode}'")

run_job(shell_cmd, job_name='nextsim.dg.ens_run', nproc=self.nproc_per_run, array_size=nens, run_dir=run_dir, **kwargs)
run_job(shell_cmd, job_name='nextsim.dg.ens_run', use_job_array=self.use_job_array, nproc=self.nproc_per_run, array_size=nens, run_dir=run_dir, **kwargs)

##check if the restart files at next_time are produced
fname_restart = restart.get_restart_filename(self.files['restart'], 1, next_time)
Expand Down
12 changes: 6 additions & 6 deletions models/nextsim/dg/namelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from utils.conversion import dt1h
from . import restart, forcing

def make_namelist(path='.', **kwargs):
def make_namelist(file_options:dict, model_config_file:str, ens_dir='.', **kwargs):
ens_mem_id:int = kwargs['member'] + 1 ##TODO: member could be None for deterministic runs
time = kwargs['time']
forecast_period = kwargs['forecast_period']
Expand All @@ -12,17 +12,17 @@ def make_namelist(path='.', **kwargs):
# read the config file
model_config = configparser.ConfigParser()
model_config.optionxform = str
model_config.read('template.cfg')
model_config.read(model_config_file)
myying marked this conversation as resolved.
Show resolved Hide resolved

##change the restart file name
file_options_restart = kwargs['files']['restart']
file_options_restart = file_options['restart']
fname_restart:str = restart.get_restart_filename(file_options_restart, ens_mem_id, time)
model_config['model']['init_file'] = fname_restart
model_config['model']['init_file'] = os.path.join(ens_dir, os.path.basename(fname_restart))
model_config['model']['start'] = time.strftime("%Y-%m-%dT%H:%M:%SZ")
model_config['model']['stop'] = next_time.strftime("%Y-%m-%dT%H:%M:%SZ")
model_config['ConfigOutput']['start'] = time.strftime("%Y-%m-%dT%H:%M:%SZ")
# changing the forcing file in ERA5Atmosphere
file_options_forcing:dict[str, str] = kwargs['files']['forcing']
file_options_forcing:dict[str, str] = file_options['forcing']
fname_atmos_forcing = forcing.get_forcing_filename(file_options_forcing['atmosphere'],
1, time)
fname_atmos_forcing = os.path.basename(fname_atmos_forcing)
Expand All @@ -34,7 +34,7 @@ def make_namelist(path='.', **kwargs):
model_config['TOPAZOcean']['file'] = fname_ocn_forcing

# dump the config to new file
config_file = os.path.join(path, 'nextsim.cfg')
config_file = os.path.join(ens_dir, 'nextsim.cfg')
with open(config_file, 'w') as configfile:
model_config.write(configfile)

2 changes: 1 addition & 1 deletion models/nextsim/dg/setup-gricad.src
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
. /applis/environments/conda.sh
conda activate nextsimdg

export NDG_BLD_DIR=/bettik/yumengch-ext/nextsimdg/build
export NDG_BLD_DIR=/bettik/${USER}/nextsimdg/build
export INPUT_DATA_DIR=/summer/sasip/model-forcings/nextsim-dg
7 changes: 4 additions & 3 deletions scripts/dahu_submit_job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#OAR --project pr-sasip
#OAR -t devel

USER=yumengch-ext
DFLT_USER=yumengch-ext
WD=/bettik/${USER}
CD=${WD}/NEDAS

Expand All @@ -17,12 +17,13 @@ model=nextsim.dg

source $HOME/.bashrc
source /applis/environments/conda.sh
#source /bettik/aydogdu-ext/pkgs/nedas-venv/nds/bin/activate
conda activate nedas
export PYTHONPATH=$PYTHONPATH:$WD/NEDAS

sed "s;${DFLT_USER};${USER};g" $CD/config/samples/$model.yml > $CD/config/samples/${model}_${USER}.yml

python $CD/scripts/run_expt.py \
--config_file=$CD/config/samples/$model.yml \
--config_file=$CD/config/samples/${model}_${USER}.yml \
--nens $nens \
--nproc $nproc \
--work_dir $WD/DATA/$model/ndg_$(printf "%02d" $nens)
1 change: 1 addition & 0 deletions scripts/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def preprocess(c, model_name):
'path': path,
'member': mem_id,
'time': c.time,
'time_start': c.time_start,
'forecast_period': c.cycle_period,
**c.job_submit,
}
Expand Down
2 changes: 1 addition & 1 deletion utils/job_submitters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, **kwargs):
self.walltime = kwargs.get('walltime', 3600)
self.run_separate_jobs = kwargs.get('run_separate_jobs', False)
self.use_job_array = kwargs.get('use_job_array', False)
self.array_size = 1
self.array_size = kwargs.get('array_size', 1)

@property
def nproc(self):
Expand Down
Loading