Skip to content

Commit

Permalink
Merge pull request NCAR#78 from nusbaume/adf_gen_timeseries
Browse files Browse the repository at this point in the history
Add single-variable time series file generation function from ADF
  • Loading branch information
TeaganKing authored Mar 22, 2024
2 parents 86a9373 + 962e21d commit dec3cef
Show file tree
Hide file tree
Showing 5 changed files with 566 additions and 51 deletions.
15 changes: 6 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@ Python Framework for Generating Diagnostics from CESM

## Project Vision

CUPiD is a collaborative effort that unifies all CESM component diagnostics and provides
CUPiD is a “one stop shop” that enables and integrates timeseries file generation, data standardization, diagnostics, and metrics from all CESM components.

- Python code that
1. runs in an easy-to-generate conda environment, and
1. can be launched via CIME workflow or independently
- Diagnostics for single/multiple runs and single/multiple components
- Ability to call post-processing tools that other groups are working on
- An API that makes it easy to include outside code
- Ongoing support and software maintenance
This collaborative effort aims to simplify the user experience of running diagnostics by calling post-processing tools directly from CUPiD, running all component diagnostics from the same tool as either part of the CIME workflow or independently, and sharing python code and a standard conda environment across components.

## Installing

Expand Down Expand Up @@ -94,4 +88,7 @@ if not serial:
client = Client(cluster)

client
```
```

### Timeseries File Generation
CUPiD also has the capability to generate single variable timeseries files from history files for all components. To run timeseries, edit the `config.yml` file's timeseries section to fit your preferences, and then run `cupid-run config.yml -ts`.
133 changes: 93 additions & 40 deletions cupid/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,82 @@

import click
import os
import sys
from glob import glob
import papermill as pm
import intake
import cupid.util
import cupid.timeseries
from dask.distributed import Client
import dask
import time
import ploomber
import yaml

CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])


CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.option("--serial", "-s", is_flag=True, help="Do not use LocalCluster objects")
@click.option("--time-series", "-ts", is_flag=True,
help="Run time series generation scripts prior to diagnostics")
@click.option(
"--time-series",
"-ts",
is_flag=True,
help="Run time series generation scripts prior to diagnostics",
)
@click.argument("config_path")
def run(config_path, serial=False, time_series=False):
"""
Main engine to set up running all the notebooks.
"""

# Abort if run with --time-series (until feature is added)
if time_series:
sys.tracebacklimit = 0
raise NotImplementedError("--time-series option not implemented yet")

# Get control structure
control = cupid.util.get_control_dict(config_path)
cupid.util.setup_book(config_path)

#####################################################################
# Managing global parameters

global_params = dict()

if "global_params" in control:
global_params = control["global_params"]
####################################################################

if time_series:
timeseries_params = control["timeseries"]

# general timeseries arguments for all components
num_procs = timeseries_params["num_procs"]



for component in ['atm', 'ocn', 'lnd', 'ice', 'glc']:
cupid.timeseries.create_time_series(
component,
timeseries_params[component]["vars"],
timeseries_params[component]["derive_vars"],
[timeseries_params["case_name"]], # could also grab from compute_notebooks section of config file
timeseries_params[component]["hist_str"],
[global_params["CESM_output_dir"] + "/" + timeseries_params["case_name"] + f"/{component}/hist/"], # could also grab from compute_notebooks section of config file
[global_params["CESM_output_dir"]+'/'+timeseries_params['case_name']+f'/{component}/proc/tseries/'],
# Note that timeseries output will eventually go in /glade/derecho/scratch/${USER}/archive/${CASE}/${component}/proc/tseries/
timeseries_params["ts_done"],
timeseries_params["overwrite_ts"],
timeseries_params[component]["start_years"], # could get from yaml file in adf_quick_run.parameter_groups.none.config_fil_str, or for other notebooks config files, eg ocean_surface.parameter_gropus.none.mom6_tools_config.start_date
timeseries_params[component]["end_years"], # could get from yaml file in adf_quick_run.parameter_groups.none.config_fil_str, or for other notebooks config files, eg ocean_surface.parameter_gropus.none.mom6_tools_config.end_date
timeseries_params[component]["level"],
num_procs,
serial,
)

# Grab paths

run_dir = os.path.realpath(os.path.expanduser(control['data_sources']['run_dir']))
output_dir = run_dir + "/computed_notebooks/" + control['data_sources']['sname']
temp_data_path = run_dir + "/temp_data"
nb_path_root = os.path.realpath(os.path.expanduser(control['data_sources']['nb_path_root']))
run_dir = os.path.realpath(os.path.expanduser(control["data_sources"]["run_dir"]))
output_dir = run_dir + "/computed_notebooks/" + control["data_sources"]["sname"]
temp_data_path = run_dir + "/temp_data"
nb_path_root = os.path.realpath(
os.path.expanduser(control["data_sources"]["nb_path_root"])
)

#####################################################################
# Managing catalog-related stuff
Expand All @@ -46,67 +86,71 @@ def run(config_path, serial=False, time_series=False):

cat_path = None

if 'path_to_cat_json' in control['data_sources']:
if "path_to_cat_json" in control["data_sources"]:
use_catalog = True
full_cat_path = os.path.realpath(os.path.expanduser(control['data_sources']['path_to_cat_json']))
full_cat_path = os.path.realpath(
os.path.expanduser(control["data_sources"]["path_to_cat_json"])
)
full_cat = intake.open_esm_datastore(full_cat_path)

# Doing initial subsetting on full catalog, e.g. to only use certain cases
# Doing initial subsetting on full catalog, e.g. to only use certain cases

if 'subset' in control['data_sources']:
first_subset_kwargs = control['data_sources']['subset']
if "subset" in control["data_sources"]:
first_subset_kwargs = control["data_sources"]["subset"]
cat_subset = full_cat.search(**first_subset_kwargs)
# This pulls out the name of the catalog from the path
cat_subset_name = full_cat_path.split("/")[-1].split('.')[0] + "_subset"
cat_subset.serialize(directory=temp_data_path, name=cat_subset_name, catalog_type="file")
cat_subset_name = full_cat_path.split("/")[-1].split(".")[0] + "_subset"
cat_subset.serialize(
directory=temp_data_path, name=cat_subset_name, catalog_type="file"
)
cat_path = temp_data_path + "/" + cat_subset_name + ".json"
else:
cat_path = full_cat_path


#####################################################################
# Managing global parameters

global_params = dict()

if 'global_params' in control:
global_params = control['global_params']


#####################################################################
# Ploomber - making a DAG

dag = ploomber.DAG(executor=ploomber.executors.Serial())


#####################################################################
# Organizing notebooks - holdover from manually managing dependencies before

all_nbs = dict()

for nb, info in control['compute_notebooks'].items():
for nb, info in control["compute_notebooks"].items():

all_nbs[nb] = info

# Setting up notebook tasks

for nb, info in all_nbs.items():

global_params['serial'] = serial
global_params["serial"] = serial
if "dependency" in info:
cupid.util.create_ploomber_nb_task(nb, info, cat_path, nb_path_root, output_dir, global_params, dag, dependency = info["dependency"])
cupid.util.create_ploomber_nb_task(
nb,
info,
cat_path,
nb_path_root,
output_dir,
global_params,
dag,
dependency=info["dependency"],
)

else:
cupid.util.create_ploomber_nb_task(nb, info, cat_path, nb_path_root, output_dir, global_params, dag)
cupid.util.create_ploomber_nb_task(
nb, info, cat_path, nb_path_root, output_dir, global_params, dag
)

#####################################################################
# Organizing scripts

if 'compute_scripts' in control:
if "compute_scripts" in control:

all_scripts = dict()

for script, info in control['compute_scripts'].items():
for script, info in control["compute_scripts"].items():

all_scripts[script] = info

Expand All @@ -115,14 +159,23 @@ def run(config_path, serial=False, time_series=False):
for script, info in all_scripts.items():

if "dependency" in info:
cupid.util.create_ploomber_script_task(script, info, cat_path, nb_path_root, global_params, dag, dependency = info["dependency"])
cupid.util.create_ploomber_script_task(
script,
info,
cat_path,
nb_path_root,
global_params,
dag,
dependency=info["dependency"],
)

else:
cupid.util.create_ploomber_script_task(script, info, cat_path, nb_path_root, global_params, dag)
cupid.util.create_ploomber_script_task(
script, info, cat_path, nb_path_root, global_params, dag
)

# Run the full DAG

dag.build()

return None

Loading

0 comments on commit dec3cef

Please sign in to comment.