Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage initial conditions stored on HPSS #2988

Open
3 tasks
aerorahul opened this issue Oct 7, 2024 · 4 comments · May be fixed by #3141
Open
3 tasks

Stage initial conditions stored on HPSS #2988

aerorahul opened this issue Oct 7, 2024 · 4 comments · May be fixed by #3141
Assignees
Labels
feature New feature or request

Comments

@aerorahul
Copy link
Contributor

What new functionality do you need?

Most jobs require the initial conditions to be available on local disk. The stage_ic job copies/stages these initial condition into the experiment's COM directory.
This issue extends that functionality to copy from HPSS (on HPSS accessible machines) into COM.

What are the requirements for the new functionality?

The requirements for this functionality lies in 2 spaces:

  • wxflow
    • hpss.py and htar.py have the code to pull from HPSS
  • global-workflow
    • The inverse of archive needs to be coded in stage_ic in providing the yaml with location of the tarball and its contents.

Acceptance Criteria

  • fetch tarball
  • unpack tarball
  • stage contents of the tarball

Suggest a solution (optional)

No response

@aerorahul aerorahul added feature New feature or request triage Issues that are triage labels Oct 7, 2024
@WalterKolczynski-NOAA WalterKolczynski-NOAA removed the triage Issues that are triage label Oct 8, 2024
@aerorahul
Copy link
Contributor Author

An example yaml extending stage_atm_cold.yaml

untar:
    tarball : "{{ ATARDIR }}/{{ cycle_YMDH }}/atm_cold.tar"
    on_hpss: True
    contents:
        - gfs_ctrl.nc
        {% for ftype in ["gfs_data", "sfc_data"] %}
        {% for ntile in range(1, ntiles + 1) %}
        - {{ ftype }}.tile{{ ntile }}.nc
        {% endfor %} # ntile
        {% endfor %} # ftype
    destination: "{{ DATA }}"
atmosphere_cold:
    copy:
        - ["{{ DATA }}/gfs_ctrl.nc", "{{ COMOUT_ATMOS_INPUT }}"]
        {% for ftype in ["gfs_data", "sfc_data"] %}
        {% for ntile in range(1, ntiles + 1) %}
        - ["{{ DATA }}/{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_INPUT }}"]
        {% endfor %} # ntile
        {% endfor %} # ftype

@DavidHuber-NOAA
Copy link
Contributor

DavidHuber-NOAA commented Oct 8, 2024

Here is are example scripts and a stub fetch.py module. To run this, modify the settings in fetch.sh and either launch it directly (./fetch.sh) or submit it to Slurm (sbatch fetch.sh).

After copying all of these files into a directory, make a subdirectory named fetch and place @aerorahul's YAML in it. Next, clone wxflow into the directory: git clone [email protected]:NOAA-EMC/wxflow.

fetch.sh

#!/usr/bin/env bash

#SBATCH -A fv3-cpu
#SBATCH -t 06:00:00
#SBATCH -p service
#SBATCH -n 1
#SBATCH -J fetch_from_hpss
#SBATCH -o fetch.out
#SBATCH --open-mode truncate

### ADJUST THESE SETTINGS FOR YOUR CASE ###
# Get the location where this script exists
script_dir="/scratch1/NCEPDEV/global/David.Huber/fetch"  # Change to your local directory

# Set the name of the yaml we will be working with
export fetch_yaml="stage_atm_cold.yaml"

# Set the directory on HPSS where data will be pulled from
export ATARDIR="/NCEPDEV/emc-global/1year/${USER}/test_data"

# Date (YYYYMMDDHH) for which the data is valid
export CDATE=2024100100

##########

# Load modules
module use /scratch1/NCEPDEV/nems/role.epic/spack-stack/spack-stack-1.6.0/envs/gsi-addon-dev-rocky8/install/modulefiles/Core
module load stack-intel stack-python
module load py-pyyaml
module load py-jinja2
module load py-python-dateutil
module load hpss

# Set job variables
export jobid="fetch.$$"
export pslot="fetch_test"
export COMROOT="/scratch1/NCEPDEV/stmp2/${USER}/COMROOT"
export ROTDIR="${COMROOT}/${pslot}"
export DATAROOT="/scratch1/NCEPDEV/stmp2/${USER}/RUNDIRS/${pslot}"
export DATA="${DATAROOT}/${jobid}"
export RUN="gdas"
export NET="gfs"
export SDATE=${CDATE}
export EDATE=${CDATE}
export PDY=$( echo ${CDATE} | cut -c1-8 )
export cyc=$( echo ${CDATE} | cut -c9-10 )
export cycle="t${cyc}z"
export RUN_ENVIR=emc
export assim_freq=6
export ntiles=6

export HOMEgfs="${script_dir}"
export PARMgfs="${script_dir}"

# Extend the PYTHONPATH into wxflow
wxflowPATH="${HOMEgfs}/wxflow/src"
PYTHONPATH="${PYTHONPATH}:${script_dir}:${wxflowPATH}"
export PYTHONPATH

declare -rx COMOUT_ATMOS_INPUT="${ROTDIR}/${RUN}.${PDY}/${cyc}/model/atmos/input"

# Now call exglobal_fetch.py
./exglobal_fetch.py

exglobal_fetch.py

#!/usr/bin/env python3

import os

from fetch import Fetch
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit

# initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)


@logit(logger)
def main():

    config = cast_strdict_as_dtypedict(os.environ)

    # Instantiate the Fetch object
    fetch = Fetch(config)

    # Pull out all the configuration keys needed to run the fetch step
    keys = ['current_cycle', 'RUN', 'PDY', 'PARMgfs', 'PSLOT', 'ROTDIR', 'fetch_yaml', 'ATARDIR', 'ntiles']

    fetch_dict = AttrDict()
    for key in keys:
        fetch_dict[key] = fetch.task_config.get(key)
        if fetch_dict[key] is None:
            print(f"Warning: key ({key}) not found in task_config!")

    # Also import all COMOUT* directory and template variables
    for key in fetch.task_config.keys():
        if key.startswith("COMOUT_"):
            fetch_dict[key] = fetch.task_config.get(key)
            if fetch_dict[key] is None:
                print(f"Warning: key ({key}) not found in task_config!")

    # Determine which archives to retrieve from HPSS
    # Read the input YAML file to get the list of tarballs on tape
    atardir_set = fetch.configure(fetch_dict)

    # Pull the data from tape or locally and store the specified destination
    fetch.execute_pull_data(atardir_set)


if __name__ == '__main__':
    main()

Stub of fetch.py

#!/usr/bin/env python3

import os
from logging import getLogger
from typing import Any, Dict, List

from wxflow import (AttrDict, FileHandler, Hsi, Task,
                    logit, parse_j2yaml)

logger = getLogger(__name__.split('.')[-1])


class Fetch(Task):
    """Task to pull ROTDIR data from HPSS (or locally)
    """

    @logit(logger, name="Fetch")
    def __init__(self, config: Dict[str, Any]) -> None:
        """Constructor for the Fetch task
        The constructor is responsible for collecting necessary yamls based on
        the runtime options and RUN.

        Parameters
        ----------
        config : Dict[str, Any]
            Incoming configuration for the task from the environment

        Returns
        -------
        None
        """
        super().__init__(config)

        # Perhaps add other stuff to self.

    @logit(logger)
    def configure(self, fetch_dict: Dict[str, Any]):
        """Determine which tarballs will need to be extracted.

        Parameters
        ----------
        fetch_dict : Dict[str, Any]
            Task specific keys, e.g. COM directories, etc

        Return
        ------
        ?: Dict[str, Any]
            Dictionary derived from the yaml file with necessary HPSS info.
        """

        self.hsi = Hsi()

        fetch_yaml = fetch_dict.fetch_yaml
        fetch_parm = os.path.join(fetch_dict.PARMgfs, "fetch")

        parsed_fetch = parse_j2yaml(os.path.join(fetch_parm, fetch_yaml),
                                    fetch_dict)

        return {}  # Needed parameters from parsed_fetch

    @logit(logger)
    def execute_pull_data(self, atardir_set: Dict[str, Any]) -> None:
        """Pull data from HPSS based on a yaml dict.

        Parameters
        ----------
        atardir_set: Dict[str, Any]
            Dict defining set of tarballs to pull and where to put them.

        Return
        ------
        None
        """

        # Pull the data and place it where it needs to go

    # Other helper methods...

@DavidHuber-NOAA
Copy link
Contributor

@DavidGrumm-NOAA I updated the sample scripts above after testing. The script can now be submitted to slurm via sbatch as well.

@DavidGrumm-NOAA DavidGrumm-NOAA linked a pull request Dec 5, 2024 that will close this issue
10 tasks
@DavidGrumm-NOAA
Copy link

DavidGrumm-NOAA commented Dec 6, 2024

To test my code, I ran create_experiment with the short yaml C48_ATM.yaml, (which created /scratch1/NCEPDEV/global/David.Grumm/G_WF_2988/testroot_1/EXPDIR and COMROOT) by :

HPC_ACCOUNT="fv3-cpu" MY_TESTROOT="/scratch1/NCEPDEV/global/David.Grumm/G_WF_2988/testroot_1" RUNTESTS=${MY_TESTROOT} pslot="1306a_2988" ./create_experiment.py --yaml ../ci/cases/pr/C48_ATM.yaml

… which completed without error or warning messages.

From within that EXPDIR I ran rocotorun:
rocotorun -w ./1306a_2988/1306a_2988.xml -d ./1306a_2988/1306a_2988.db

… which completed without error or warning messages. There was also no output to stdout, which I did not expect as I had placed a few diagnostic prints in my code. I verified that I am my current branch.

Runniing rocotostat gives me:

   CYCLE                    TASK                       JOBID               STATE         EXIT STATUS     TRIES      DURATION

========================================================================
202103231200 gfs_stage_ic druby://10.184.8.62:37937 SUBMITTING - 0 0.0
202103231200 gfs_fcst_seg0 - - - - -
202103231200 gfs_atmos_prod_f000 - - - - -
etc.
… and this appears unchanged (at least for the hour since I ran rocotorun)

I have 2 questions:

  1. Am I not running my code, as the diagnostic prints do not appear ?
  2. Rocotostat indicates that a job has been submitted (presumably with another version of the code)
    Shouldn’t I expect it to progress ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants