From b69c0e6ad4d10e93bca5b52b34d6fb839ce63075 Mon Sep 17 00:00:00 2001 From: Yumeng Chen Date: Wed, 22 Jan 2025 09:32:48 +0000 Subject: [PATCH 1/3] bug fixes for nextsimdg --- config/samples/nextsim.dg.yml | 3 + config/samples/python_dahu.src | 2 +- models/nextsim/dg/default.yml | 10 +-- models/nextsim/dg/forcing.py | 2 +- models/nextsim/dg/model.py | 125 ++++++++++++++++++--------------- models/nextsim/dg/namelist.py | 12 ++-- scripts/preprocess.py | 1 + utils/job_submitters/base.py | 2 +- utils/job_submitters/oar.py | 37 +++++++--- 9 files changed, 112 insertions(+), 82 deletions(-) diff --git a/config/samples/nextsim.dg.yml b/config/samples/nextsim.dg.yml index 1db7ae2..de2d7a5 100644 --- a/config/samples/nextsim.dg.yml +++ b/config/samples/nextsim.dg.yml @@ -7,7 +7,9 @@ job_submit: scheduler: oar ppn: 32 project: pr-sasip + # one can choose `devel` queue or `standard` queue queue: devel + walltime: 1800 run_separate_jobs: True nproc: 32 ##total number of processors @@ -63,6 +65,7 @@ model_def: nextsim.dg: config_file: '/bettik/yumengch-ext/NEDAS/models/nextsim/dg/default.yml' model_env: '/bettik/yumengch-ext/NEDAS/models/nextsim/dg/setup-gricad.src' + model_config_file: '/bettik/yumengch-ext/NEDAS/models/nextsim/dg/template.cfg' nproc_per_run: 8 ##number of processors to run a forecast parallel_mode: openmp walltime: 10000 ##walltime in seconds diff --git a/config/samples/python_dahu.src b/config/samples/python_dahu.src index c6cd2ec..9ff50f3 100644 --- a/config/samples/python_dahu.src +++ b/config/samples/python_dahu.src @@ -1,5 +1,5 @@ source $HOME/.bashrc source /applis/environments/conda.sh -conda activate NEDAS +conda activate nedas export PYTHONPATH=$PYTHONPATH:/bettik/yumengch-ext/NEDAS \ No newline at end of file diff --git a/models/nextsim/dg/default.yml b/models/nextsim/dg/default.yml index ed19aee..61aad1c 100644 --- a/models/nextsim/dg/default.yml +++ b/models/nextsim/dg/default.yml @@ -4,11 +4,11 @@ ## instead of converting from lon,lat which I think is unnecessary grid_def: proj: '+proj=stere +a=6378273 +b=6356889.448910593 +lat_0=90. +lon_0=-45. +lat_ts=60.' - xstart: - xend: - ystart: - yend: - dx: + xstart: -2.5e6 + xend: 2.498e6 + ystart: -2e6 + yend: 2.5e6 + dx: 3e3 restart_dt: 24 ##model restart interval (hours) ##physical parameters diff --git a/models/nextsim/dg/forcing.py b/models/nextsim/dg/forcing.py index bb156a7..ca2f7b5 100644 --- a/models/nextsim/dg/forcing.py +++ b/models/nextsim/dg/forcing.py @@ -312,7 +312,7 @@ def perturb_forcing(forcing_options:dict, file_options:dict, i_ens: int, time: d os.makedirs(os.path.join(pert_path, f'ensemble_{i_ens}'), exist_ok=True) # time index and time array time_index: np.ndarray[typing.Any, np.dtype[np.int64]] - time_array: np.ndarray[typing.Any, np.dtype[np.float64]] + time_array: list[datetime] for forcing_name in forcing_options: if forcing_name not in file_options: continue diff --git a/models/nextsim/dg/model.py b/models/nextsim/dg/model.py index 1870857..bd2669c 100644 --- a/models/nextsim/dg/model.py +++ b/models/nextsim/dg/model.py @@ -39,7 +39,13 @@ def __init__(self, config_file=None, parse_args=False, **kwargs): self.z_units = 'm' # construct grid obj based on config - self.grid = Grid.regular_grid(Proj(self.proj), self.xstart, self.xend, self.ystart, self.yend, self.dx) + self.grid = Grid.regular_grid( + Proj(self.grid_def['proj']), + float(self.grid_def['xstart']), + float(self.grid_def['xend']), + float(self.grid_def['ystart']), + float(self.grid_def['yend']), + float(self.grid_def['dx'])) self.mask = np.full(self.grid.x.shape, False) ##model grid points that are masked (land?) def filename(self, **kwargs): @@ -159,9 +165,20 @@ def preprocess(self, task_id:int=0, **kwargs): ensemble member id - time : datetime start time of the forecast + - time_start: datetime + initial time of the forecast cycles - path : str path to the working directory of the ensemble member - These are defined in the configuration file model_def:nextsim.dg section, + - forecast_period : int + number of hours being forecasted by model + - restart_dir : str + the saved restart directory from the previous cycle, + which is the model run directory from + the previous cycle. In the initial cycle, this + directory is given as `ens_init_dir` defined in + `nextsim.dg`` section of the `model_def` section + in nedas config file. + These are defined in the `config_file` entry of model_def:nextsim.dg section, and parse_config will bring them in this class as - self.files : dict This section contains the filenames for the restart file. @@ -185,10 +202,7 @@ def preprocess(self, task_id:int=0, **kwargs): # directory where files are being collected to, and where the model will be run run_dir = os.path.join(kwargs['path'], ens_mem_dir) makedir(run_dir) - # kwargs['restart_dir'] is where to obtain the files, see scripts/preprocess.py how it's defined - # at time_start files are from model_def:nextsim.dg:ens_init_dir - # during cycling, the files come from work_dir/cycle/prev_time/model_name - + # get all required filenames for the initial ensemble # 1. get current and next time time = kwargs['time'] @@ -197,77 +211,69 @@ def preprocess(self, task_id:int=0, **kwargs): # 2. get the restart and forcing filename file_options_restart = self.files['restart'] - fname_restart:str = restart.get_restart_filename(file_options_restart, ens_mem_id, time) - # the restart file needs to be obtained from prev_time instead during cycling, - # not just copying from files:restart:format path again - # TODO: also, get_restart_filename doesn't seem to need ens_mem_id input, format doesn't involve {i} - fname_restart_src = os.path.join(kwargs['restart_dir'], ens_mem_dir, os.path.basename(fname_restart)) - # TODO: at time_start this is pointing to ens_init_dir/ens_mem/fname; but it seems ens_init_dir only contain a single copy, not an ensemble - # temporary fix, if cannot find ens_init_dir/ens_mem/fname, use fname instead: - if not os.path.exists(fname_restart_src): - fname_restart_src = fname_restart - - file_options_forcing:dict[str, str] = self.files['forcing'] + # obtain restart file at initial cycling + fname_restart_init:str = restart.get_restart_filename(file_options_restart, ens_mem_id, time) + fname_restart:str = os.path.join(kwargs['restart_dir'], ens_mem_dir, os.path.basename(fname_restart_init)) + if not os.path.exists(fname_restart): + fname_restart = fname_restart_init + + file_options_forcing:dict[str, dict] = self.files['forcing'] fname_forcing:dict[str, str] = dict() for forcing_name in file_options_forcing: fname_forcing[forcing_name] = forcing.get_forcing_filename(file_options_forcing[forcing_name], ens_mem_id, time) - # note: it is okay to always copy forcing files from original directory over to the run_dir - # there is no need to cycle these forcing variables # no need for perturbation if not specified in yaml file if not hasattr(self, 'perturb'): - print ('We do no perturbations as perturb section is not specified in the model configuration.', flush=True) + print('We do no perturbations as perturb section ' + 'is not specified in the model configuration.', + flush=True) # we we do not perturb the restart file # simply link the restart files - os.system(f'ln -s {fname_restart_src} {run_dir}') + os.system(f'ln -s {fname_restart} {run_dir}') # we we do not perturb the forcing file # simply link the forcing files for forcing_name in file_options_forcing: os.system(f'ln -s {fname_forcing[forcing_name]} {run_dir}') return + # 3. add perturbations # here, if 'restart section is not under perturb section # we only link the restart file to each ensemble directory - if 'restart' not in self.perturb: + if 'restart' not in self.perturb or kwargs['time'] != kwargs['time_start']: # we we do not perturb the restart file # simply link the restart files - os.system(f'ln -s {fname_restart_src} {run_dir}') - return + os.system(f'ln -s {fname_restart} {run_dir}') + else: + restart_options = self.perturb['restart'] + # copy restart files to the ensemble member directory + fname = os.path.join(run_dir, os.path.basename(fname_restart)) + subprocess.run(['cp', '-v', fname_restart, fname]) + # prepare the restart file options for the perturbation + file_options_rst = {'fname': fname, + 'lon_name':file_options_restart['lon_name'], + 'lat_name':file_options_restart['lat_name']} + # perturb the restart file + restart.perturb_restart(restart_options, file_options_rst) + if 'forcing' not in self.perturb: # we we do not perturb the forcing file # simply link the forcing files for forcing_name in file_options_forcing: os.system(f'ln -s {fname_forcing[forcing_name]} {run_dir}') - return - - # 3. add perturbations - restart_options = self.perturb['restart'] - # copy restart files to the ensemble member directory - fname = os.path.join(run_dir, os.path.basename(fname_restart)) - subprocess.run(['cp', '-v', fname_restart_src, fname]) - # prepare the restart file options for the perturbation - file_options = {'fname': fname, - 'lon_name':file_options_restart['lon_name'], - 'lat_name':file_options_restart['lat_name']} - # perturb the restart file - restart.perturb_restart(restart_options, file_options) - - forcing_options = self.perturb['forcing'] - # construct file options for forcing - file_options:dict = dict() - for forcing_name in forcing_options: - # we ignore entries that are not in the files options - # e.g., path - if forcing_name not in fname_forcing: continue - fname = os.path.join(run_dir, - os.path.basename(fname_forcing[forcing_name]) - ) - # the forcing file options for the perturbation - file_options[forcing_name] = {'fname_src': fname_forcing[forcing_name], - 'fname': fname, - **file_options_forcing[forcing_name]} - # add forcing perturbations - forcing.perturb_forcing(forcing_options, file_options, ens_mem_id, time, next_time) + else: + forcing_options = self.perturb['forcing'] + for forcing_name in forcing_options: + # we ignore entries that are not in the files options + # e.g., path + if forcing_name not in fname_forcing: continue + fname = os.path.join(run_dir, + os.path.basename(fname_forcing[forcing_name]) + ) + # the forcing file options for the perturbation + file_options_forcing[forcing_name]['fname_src'] = fname_forcing[forcing_name] + file_options_forcing[forcing_name]['fname'] = fname + # add forcing perturbations + forcing.perturb_forcing(forcing_options, file_options_forcing, ens_mem_id, time, next_time) def run(self, task_id=0, **kwargs): """Run nextsim.dg model forecast""" @@ -286,7 +292,7 @@ def run(self, task_id=0, **kwargs): else: run_dir = kwargs['path'] makedir(run_dir) - namelist.make_namelist(run_dir, **kwargs) + namelist.make_namelist(self.files, run_dir, **kwargs) ##build shell commands for running the model shell_cmd = "echo starting the script...; " @@ -313,7 +319,8 @@ def run(self, task_id=0, **kwargs): def run_batch(self, task_id=0, **kwargs): """Run nextsim.dg model ensemble forecast, use job array to spawn the member runs""" kwargs = super().parse_kwargs(**kwargs) - assert kwargs['use_job_array'], "use_job_array shall be True if running ensemble in batch mode." + assert self.use_job_array, \ + "use_job_array shall be True if running ensemble in batch mode." time = kwargs['time'] forecast_period = kwargs['forecast_period'] @@ -325,8 +332,10 @@ def run_batch(self, task_id=0, **kwargs): ens_dir = os.path.join(run_dir, f"ens_{member+1:02}") makedir(ens_dir) + kwargs['member'] = member ##this creates run_dir/ens_??/nextsim.cfg - namelist.make_namelist(ens_dir, member=member, **kwargs) + namelist.make_namelist(self.files, self.model_config_file, + ens_dir, **kwargs) ##build shell commands for running the model using job array shell_cmd = "echo starting the script...; " @@ -344,7 +353,7 @@ def run_batch(self, task_id=0, **kwargs): else: raise TypeError(f"unknown parallel mode '{self.parallel_mode}'") - run_job(shell_cmd, job_name='nextsim.dg.ens_run', nproc=self.nproc_per_run, array_size=nens, run_dir=run_dir, **kwargs) + run_job(shell_cmd, job_name='nextsim.dg.ens_run', use_job_array=self.use_job_array, nproc=self.nproc_per_run, array_size=nens, run_dir=run_dir, **kwargs) ##check if the restart files at next_time are produced fname_restart = restart.get_restart_filename(self.files['restart'], 1, next_time) diff --git a/models/nextsim/dg/namelist.py b/models/nextsim/dg/namelist.py index b28c46d..977a74a 100644 --- a/models/nextsim/dg/namelist.py +++ b/models/nextsim/dg/namelist.py @@ -3,7 +3,7 @@ from utils.conversion import dt1h from . import restart, forcing -def make_namelist(path='.', **kwargs): +def make_namelist(file_options:dict, model_config_file:str, ens_dir='.', **kwargs): ens_mem_id:int = kwargs['member'] + 1 ##TODO: member could be None for deterministic runs time = kwargs['time'] forecast_period = kwargs['forecast_period'] @@ -12,17 +12,17 @@ def make_namelist(path='.', **kwargs): # read the config file model_config = configparser.ConfigParser() model_config.optionxform = str - model_config.read('template.cfg') + model_config.read(model_config_file) ##change the restart file name - file_options_restart = kwargs['files']['restart'] + file_options_restart = file_options['restart'] fname_restart:str = restart.get_restart_filename(file_options_restart, ens_mem_id, time) - model_config['model']['init_file'] = fname_restart + model_config['model']['init_file'] = os.path.join(ens_dir, os.path.basename(fname_restart)) model_config['model']['start'] = time.strftime("%Y-%m-%dT%H:%M:%SZ") model_config['model']['stop'] = next_time.strftime("%Y-%m-%dT%H:%M:%SZ") model_config['ConfigOutput']['start'] = time.strftime("%Y-%m-%dT%H:%M:%SZ") # changing the forcing file in ERA5Atmosphere - file_options_forcing:dict[str, str] = kwargs['files']['forcing'] + file_options_forcing:dict[str, str] = file_options['forcing'] fname_atmos_forcing = forcing.get_forcing_filename(file_options_forcing['atmosphere'], 1, time) fname_atmos_forcing = os.path.basename(fname_atmos_forcing) @@ -34,7 +34,7 @@ def make_namelist(path='.', **kwargs): model_config['TOPAZOcean']['file'] = fname_ocn_forcing # dump the config to new file - config_file = os.path.join(path, 'nextsim.cfg') + config_file = os.path.join(ens_dir, 'nextsim.cfg') with open(config_file, 'w') as configfile: model_config.write(configfile) diff --git a/scripts/preprocess.py b/scripts/preprocess.py index a6e61ac..1c8eccd 100644 --- a/scripts/preprocess.py +++ b/scripts/preprocess.py @@ -41,6 +41,7 @@ def preprocess(c, model_name): 'path': path, 'member': mem_id, 'time': c.time, + 'time_start': c.time_start, 'forecast_period': c.cycle_period, **c.job_submit, } diff --git a/utils/job_submitters/base.py b/utils/job_submitters/base.py index 130709d..53f1a9c 100644 --- a/utils/job_submitters/base.py +++ b/utils/job_submitters/base.py @@ -31,7 +31,7 @@ def __init__(self, **kwargs): self.walltime = kwargs.get('walltime', 3600) self.run_separate_jobs = kwargs.get('run_separate_jobs', False) self.use_job_array = kwargs.get('use_job_array', False) - self.array_size = 1 + self.array_size = kwargs.get('array_size', 1) @property def nproc(self): diff --git a/utils/job_submitters/oar.py b/utils/job_submitters/oar.py index 0ccaae1..94520d3 100644 --- a/utils/job_submitters/oar.py +++ b/utils/job_submitters/oar.py @@ -1,4 +1,5 @@ import os +import stat import subprocess import tempfile from time import sleep @@ -19,12 +20,12 @@ def __init__(self, **kwargs): ##host specific settings if self.host == 'gricad': p = subprocess.run("hostname", capture_output=True, text=True) - if 'dahu' in p.stdout: ##TODO: what is the typical hostname for compute nodes? change the statement here accordingly. + if p.stdout.replace('\n', '').replace(' ', '') in ['dahu-oar3', 'f-dahu']: self.job_submit_node = None ##don't need ssh for oarsub on compute nodes else: ##job submit node based on queue type if self.queue == 'devel': - self.job_submit_node = 'oar-dahu3' + self.job_submit_node = 'dahu-oar3' else: self.job_submit_node = 'f-dahu' @@ -87,7 +88,8 @@ def submit_job_and_monitor(self, commands): job_script.write(f"#OAR -n {self.job_name}\n") job_script.write(f"#OAR -l /nodes={self.nnode}/core={self.ppn},walltime={seconds_to_timestr(self.walltime)}\n") job_script.write(f"#OAR --project {self.project}\n") - job_script.write(f"#OAR -t {self.queue}\n") + if self.queue == 'devel': + job_script.write(f"#OAR -t {self.queue}\n") log_file = os.path.join(self.run_dir, f"{self.job_name}.%jobid%.stdout") job_script.write(f"#OAR --stdout {log_file}\n") @@ -102,11 +104,14 @@ def submit_job_and_monitor(self, commands): self.job_script = job_script.name + # the job script has to be executables in OAR system + st = os.stat(self.job_script) + os.chmod(self.job_script, st.st_mode | stat.S_IEXEC) + ##submit the job script if self.job_submit_node: - submit_cmd = ['ssh', self.job_submit_node, f"'oarsub -S {self.job_script}'"] - ##TODO: maybe the ssh command need a '' wrap around the oarsub command - # (sending it to ssh server to be run), check if it works + submit_cmd = ['ssh', self.job_submit_node, + f"oarsub -S {self.job_script}"] else: submit_cmd = ["oarsub", "-S", f"{self.job_script}"] process = subprocess.run(submit_cmd, capture_output=True) @@ -117,7 +122,7 @@ def submit_job_and_monitor(self, commands): ##monitor the queue for job completion if self.use_job_array: - self.job_id = int(s.split('OAR_ARRAY_ID=')[-1]) + self.job_id = int(s.replace(' ', '').replace('\n', '').split('OAR_ARRAY_ID=')[-1]) while True: sleep(20) p = subprocess.run(['ssh', self.job_submit_node, @@ -135,6 +140,18 @@ def submit_job_and_monitor(self, commands): raise RuntimeError(f"Error job array {self.job_id}") else: - ##TODO: also add capture of normal job id and monitor here - pass - + self.job_id = int(s.replace(' ', '').replace('\n', '').split('OAR_JOB_ID=')[-1]) + while True: + sleep(20) + p = subprocess.run(['ssh', self.job_submit_node, + 'oarstat', '-f', f'--job {self.job_id}', + '| grep "state = "'], capture_output=True) + s = p.stdout.decode('utf-8').replace(' ', '').split('\n')[:-1] + s = s[0] + print(s, flush=True) + jobs_status = s.split('state=')[-1].replace(' ', '') + # end this loop if all jobs are terminated + if jobs_status == 'Terminated': + break + if jobs_status == 'Error': + raise RuntimeError(f"Error in job {self.job_id}") \ No newline at end of file From a34938c1f65783f1a235d074c404c101b508220c Mon Sep 17 00:00:00 2001 From: Yumeng Chen Date: Wed, 22 Jan 2025 15:43:42 +0000 Subject: [PATCH 2/3] fix namelist arguments for non job-array cases; note that func:`model.run_batch` is tested but `model.run` is not tested at all --- models/nextsim/dg/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/nextsim/dg/model.py b/models/nextsim/dg/model.py index bd2669c..1e4b5c3 100644 --- a/models/nextsim/dg/model.py +++ b/models/nextsim/dg/model.py @@ -292,7 +292,7 @@ def run(self, task_id=0, **kwargs): else: run_dir = kwargs['path'] makedir(run_dir) - namelist.make_namelist(self.files, run_dir, **kwargs) + namelist.make_namelist(self.files, self.model_config_file, run_dir, **kwargs) ##build shell commands for running the model shell_cmd = "echo starting the script...; " From d0235dbcb7964117b691be27534ed2eb8d3ba797 Mon Sep 17 00:00:00 2001 From: Ali Aydogdu Date: Wed, 22 Jan 2025 22:31:44 +0100 Subject: [PATCH 3/3] a bit more generic cfg files regarding user --- config/samples/python_dahu.src | 2 +- models/nextsim/dg/setup-gricad.src | 2 +- scripts/dahu_submit_job.sh | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/config/samples/python_dahu.src b/config/samples/python_dahu.src index 9ff50f3..c33105c 100644 --- a/config/samples/python_dahu.src +++ b/config/samples/python_dahu.src @@ -2,4 +2,4 @@ source $HOME/.bashrc source /applis/environments/conda.sh conda activate nedas -export PYTHONPATH=$PYTHONPATH:/bettik/yumengch-ext/NEDAS \ No newline at end of file +export PYTHONPATH=$PYTHONPATH:/bettik/${USER}/NEDAS diff --git a/models/nextsim/dg/setup-gricad.src b/models/nextsim/dg/setup-gricad.src index 22d5ec9..e7d3399 100755 --- a/models/nextsim/dg/setup-gricad.src +++ b/models/nextsim/dg/setup-gricad.src @@ -3,5 +3,5 @@ . /applis/environments/conda.sh conda activate nextsimdg -export NDG_BLD_DIR=/bettik/yumengch-ext/nextsimdg/build +export NDG_BLD_DIR=/bettik/${USER}/nextsimdg/build export INPUT_DATA_DIR=/summer/sasip/model-forcings/nextsim-dg diff --git a/scripts/dahu_submit_job.sh b/scripts/dahu_submit_job.sh index d8eeef4..e7414a3 100755 --- a/scripts/dahu_submit_job.sh +++ b/scripts/dahu_submit_job.sh @@ -7,7 +7,7 @@ #OAR --project pr-sasip #OAR -t devel -USER=yumengch-ext +DFLT_USER=yumengch-ext WD=/bettik/${USER} CD=${WD}/NEDAS @@ -17,12 +17,13 @@ model=nextsim.dg source $HOME/.bashrc source /applis/environments/conda.sh -#source /bettik/aydogdu-ext/pkgs/nedas-venv/nds/bin/activate conda activate nedas export PYTHONPATH=$PYTHONPATH:$WD/NEDAS +sed "s;${DFLT_USER};${USER};g" $CD/config/samples/$model.yml > $CD/config/samples/${model}_${USER}.yml + python $CD/scripts/run_expt.py \ - --config_file=$CD/config/samples/$model.yml \ + --config_file=$CD/config/samples/${model}_${USER}.yml \ --nens $nens \ --nproc $nproc \ --work_dir $WD/DATA/$model/ndg_$(printf "%02d" $nens)