forked from MPAS-Dev/MPAS-Analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_mpas_analysis
executable file
·548 lines (449 loc) · 20.7 KB
/
run_mpas_analysis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
#!/usr/bin/env python
"""
Runs MPAS-Analysis via a configuration file (e.g. `config.analysis`)
specifying analysis options.
Authors
-------
Xylar Asay-Davis, Phillip J. Wolfram
"""
from __future__ import absolute_import, division, print_function, \
unicode_literals
import matplotlib as mpl
import argparse
import traceback
import sys
import pkg_resources
import shutil
import os
from collections import OrderedDict
from mpas_analysis.configuration import MpasAnalysisConfigParser
from mpas_analysis.shared.io.utility import build_config_full_path, \
make_directories
from mpas_analysis.shared.html import generate_html
from mpas_analysis.shared import AnalysisTask
def build_analysis_list(config, refConfig): # {{{
"""
Build a list of analysis tasks. New tasks should be added here, following
the approach used for existing analysis tasks.
Parameters
----------
config : ``MpasAnalysisConfigParser`` object
contains config options
refConfig : ``MpasAnalysisConfigParser`` object
contains config options for a reference run, or ``None`` if no config
file for a reference run was specified
Returns
-------
analyses : list of ``AnalysisTask`` objects
A list of all analysis tasks
Authors
-------
Xylar Asay-Davis
"""
# choose the right rendering backend, depending on whether we're displaying
# to the screen
if not config.getboolean('plot', 'displayToScreen'):
mpl.use('Agg')
# analysis can only be imported after the right MPL renderer is selected
from mpas_analysis import ocean
from mpas_analysis import sea_ice
from mpas_analysis.shared.climatology import MpasClimatologyTask
from mpas_analysis.shared.time_series import MpasTimeSeriesTask
analyses = []
# Ocean Analyses
oceanClimatolgyTask = MpasClimatologyTask(config=config,
componentName='ocean')
oceanTimeSeriesTask = MpasTimeSeriesTask(config=config,
componentName='ocean')
oceanIndexTask = MpasTimeSeriesTask(config=config,
componentName='ocean',
section='index')
analyses.append(oceanClimatolgyTask)
analyses.append(ocean.ClimatologyMapMLD(config, oceanClimatolgyTask,
refConfig))
analyses.append(ocean.ClimatologyMapSST(config, oceanClimatolgyTask,
refConfig))
analyses.append(ocean.ClimatologyMapSSS(config, oceanClimatolgyTask,
refConfig))
analyses.append(ocean.ClimatologyMapSSH(config, oceanClimatolgyTask,
refConfig))
analyses.append(ocean.ClimatologyMapSoseTemperature(
config, oceanClimatolgyTask, refConfig))
analyses.append(ocean.ClimatologyMapSoseSalinity(
config, oceanClimatolgyTask, refConfig))
analyses.append(ocean.ClimatologyMapAntarcticMelt(config,
oceanClimatolgyTask,
refConfig))
analyses.append(ocean.TimeSeriesAntarcticMelt(config, oceanTimeSeriesTask,
refConfig))
analyses.append(ocean.TimeSeriesTemperatureAnomaly(config,
oceanTimeSeriesTask))
analyses.append(ocean.TimeSeriesSalinityAnomaly(config,
oceanTimeSeriesTask))
analyses.append(ocean.TimeSeriesOHCAnomaly(config,
oceanTimeSeriesTask,
refConfig))
analyses.append(ocean.TimeSeriesSST(config, oceanTimeSeriesTask,
refConfig))
analyses.append(ocean.MeridionalHeatTransport(config, oceanClimatolgyTask,
refConfig))
analyses.append(ocean.StreamfunctionMOC(config, oceanClimatolgyTask))
analyses.append(ocean.IndexNino34(config, oceanIndexTask, refConfig))
# Sea Ice Analyses
seaIceClimatolgyTask = MpasClimatologyTask(config=config,
componentName='seaIce')
seaIceTimeSeriesTask = MpasTimeSeriesTask(config=config,
componentName='seaIce')
analyses.append(seaIceClimatolgyTask)
analyses.append(sea_ice.ClimatologyMapSeaIceConc(
config=config, mpasClimatologyTask=seaIceClimatolgyTask,
hemisphere='NH', refConfig=refConfig))
analyses.append(sea_ice.ClimatologyMapSeaIceThick(
config=config, mpasClimatologyTask=seaIceClimatolgyTask,
hemisphere='NH', refConfig=refConfig))
analyses.append(sea_ice.ClimatologyMapSeaIceConc(
config=config, mpasClimatologyTask=seaIceClimatolgyTask,
hemisphere='SH', refConfig=refConfig))
analyses.append(sea_ice.ClimatologyMapSeaIceThick(
config=config, mpasClimatologyTask=seaIceClimatolgyTask,
hemisphere='SH', refConfig=refConfig))
analyses.append(seaIceTimeSeriesTask)
analyses.append(sea_ice.TimeSeriesSeaIce(config, seaIceTimeSeriesTask,
refConfig))
return analyses # }}}
def determine_analyses_to_generate(analyses): # {{{
"""
Build a list of analysis tasks to run based on the 'generate' config
option (or command-line flag) and prerequisites and subtasks of each
requested task. Each task's ``setup_and_check`` method is called in the
process.
Parameters
----------
analyses : list of ``AnalysisTask`` objects
A list of all analysis tasks
Returns
-------
analysesToGenerate : list of ``AnalysisTask`` objects
A list of analysis tasks to run
Authors
-------
Xylar Asay-Davis
"""
analysesToGenerate = OrderedDict()
# check which analysis we actually want to generate and only keep those
for analysisTask in analyses:
# update the dictionary with this task and perhaps its subtasks
add_task_and_subtasks(analysisTask, analysesToGenerate)
return analysesToGenerate # }}}
def add_task_and_subtasks(analysisTask, analysesToGenerate,
callCheckGenerate=True):
# {{{
"""
If a task has been requested through the generate config option or
if it is a prerequisite of a requested task, add it to the dictionary of
tasks to generate.
Parameters
----------
analysisTask : ``AnalysisTask``
A task to be added
analysesToGenerate : ``OrderedDict`` of ``AnalysisTask``
The list of analysis tasks to be generated, which this call may
update to include this task and its subtasks
callCheckGenerate : bool
Whether the ``check_generate`` method should be call for this task to
see if it has been requested. We skip this for subtasks and
prerequisites, since they are needed by another task regardless of
whether the user specifically requested them.
Authors
-------
Xylar Asay-Davis
"""
key = (analysisTask.taskName, analysisTask.subtaskName)
if key in analysesToGenerate.keys():
# The task was already added
assert(analysisTask._setupStatus == 'success')
return
# for each anlaysis task, check if we want to generate this task
# and if the analysis task has a valid configuration
taskTitle = analysisTask.printTaskName
if callCheckGenerate and not analysisTask.check_generate():
# we don't need to add this task -- it wasn't requested
return
# first, we should try to add the prerequisites of this task and its
# subtasks (if they aren't also subtasks for this task)
prereqs = analysisTask.runAfterTasks
for subtask in analysisTask.subtasks:
for prereq in subtask.runAfterTasks:
if prereq not in analysisTask.subtasks:
prereqs.extend(subtask.runAfterTasks)
for prereq in prereqs:
add_task_and_subtasks(prereq, analysesToGenerate,
callCheckGenerate=False)
if prereq._setupStatus != 'success':
# a prereq failed setup_and_check
print("ERROR: prerequisite task {} of analysis task {}"
" failed during check,\n"
" so this task will not be run".format(
prereq.printTaskName, taskTitle))
analysisTask._setupStatus = 'fail'
return
# make sure all prereqs have been set up successfully before trying to
# set up this task -- this task's setup may depend on setup in the prereqs
try:
analysisTask.setup_and_check()
except (Exception, BaseException):
traceback.print_exc(file=sys.stdout)
print("ERROR: analysis task {} failed during check and "
"will not be run".format(taskTitle))
analysisTask._setupStatus = 'fail'
return
# next, we should try to add the subtasks. This is done after the current
# analysis task has been set up in case subtasks depend on information
# from the parent task
for subtask in analysisTask.subtasks:
add_task_and_subtasks(subtask, analysesToGenerate,
callCheckGenerate=False)
if subtask._setupStatus != 'success':
# a subtask failed setup_and_check
print("ERROR: a subtask of analysis task {}"
" failed during check,\n"
" so this task will not be run".format(taskTitle))
analysisTask._setupStatus = 'fail'
return
analysesToGenerate[key] = analysisTask
analysisTask._setupStatus = 'success'
# }}}
def update_generate(config, generate): # {{{
"""
Update the 'generate' config option using a string from the command line.
Parameters
----------
config : ``MpasAnalysisConfigParser`` object
contains config options
generate : str
a comma-separated string of generate flags: either names of analysis
tasks or commands of the form ``all_<tag>`` or ``no_<tag>`` indicating
that analysis with a given tag should be included or excluded).
Authors
-------
Xylar Asay-Davis
"""
# overwrite the 'generate' in config with a string that parses to
# a list of string
generateList = generate.split(',')
generateString = ', '.join(["'{}'".format(element)
for element in generateList])
generateString = '[{}]'.format(generateString)
config.set('output', 'generate', generateString) # }}}
def run_analysis(config, analyses): # {{{
"""
Run all the tasks, either in serial or in parallel
Parameters
----------
config : ``MpasAnalysisConfigParser`` object
contains config options
analyses : OrderedDict of ``AnalysisTask`` objects
A dictionary of analysis tasks to run with (task, subtask) names as
keys
Authors
-------
Xylar Asay-Davis
"""
# write the config file the log directory
logsDirectory = build_config_full_path(config, 'output',
'logsSubdirectory')
mainRunName = config.get('runs', 'mainRunName')
configFileName = '{}/config.{}'.format(logsDirectory, mainRunName)
configFile = open(configFileName, 'w')
config.write(configFile)
configFile.close()
taskCount = config.getWithDefault('execute', 'parallelTaskCount',
default=1)
isParallel = taskCount > 1 and len(analyses) > 1
for analysisTask in analyses.values():
if not analysisTask.runAfterTasks and not analysisTask.subtasks:
analysisTask._runStatus.value = AnalysisTask.READY
else:
analysisTask._runStatus.value = AnalysisTask.BLOCKED
tasksWithErrors = []
runningTasks = {}
# run each analysis task
while True:
# we still have tasks to run
for analysisTask in analyses.values():
if analysisTask._runStatus.value == AnalysisTask.BLOCKED:
prereqs = analysisTask.runAfterTasks + analysisTask.subtasks
prereqStatus = [prereq._runStatus.value for prereq in prereqs]
if any([runStatus == AnalysisTask.FAIL for runStatus in
prereqStatus]):
# a prerequisite failed so this task cannot succeed
analysisTask._runStatus.value = AnalysisTask.FAIL
if all([runStatus == AnalysisTask.SUCCESS for runStatus in
prereqStatus]):
# no unfinished prerequisites so we can run this task
analysisTask._runStatus.value = AnalysisTask.READY
unfinishedCount = 0
for analysisTask in analyses.values():
if analysisTask._runStatus.value not in [AnalysisTask.SUCCESS,
AnalysisTask.FAIL]:
unfinishedCount += 1
if unfinishedCount <= 0:
# we're done
break
# launch new tasks
for key, analysisTask in analyses.items():
if analysisTask._runStatus.value == AnalysisTask.READY:
if isParallel:
print('Running {}'.format(analysisTask.printTaskName))
analysisTask._runStatus.value = AnalysisTask.RUNNING
analysisTask.start()
runningTasks[key] = analysisTask
if len(runningTasks.keys()) >= taskCount:
break
else:
analysisTask.run(writeLogFile=False)
if isParallel:
# wait for a task to finish
analysisTask = wait_for_task(runningTasks)
key = (analysisTask.taskName, analysisTask.subtaskName)
runningTasks.pop(key)
taskTitle = analysisTask.printTaskName
if analysisTask._runStatus.value == AnalysisTask.SUCCESS:
print(" Task {} has finished successfully.".format(
taskTitle))
elif analysisTask._runStatus.value == AnalysisTask.FAIL:
print("ERROR in task {}. See log file {} for details".format(
taskTitle, analysisTask._logFileName))
tasksWithErrors.append(taskTitle)
else:
print("Unexpected status from in task {}. This may be a "
"bug.".format(taskTitle))
else:
if analysisTask._runStatus.value == AnalysisTask.FAIL:
sys.exit(1)
if not isParallel and config.getboolean('plot', 'displayToScreen'):
import matplotlib.pyplot as plt
plt.show()
# raise the last exception so the process exits with an error
errorCount = len(tasksWithErrors)
if errorCount == 1:
print("There were errors in task {}".format(tasksWithErrors[0]))
sys.exit(1)
elif errorCount > 0:
print("There were errors in {} tasks: {}".format(
errorCount, ', '.join(tasksWithErrors)))
sys.exit(1)
# }}}
def wait_for_task(runningTasks, timeout=0.1): # {{{
"""
Build a list of analysis modules based on the 'generate' config option.
New tasks should be added here, following the approach used for existing
analysis tasks.
Parameters
----------
runningTasks : dict of ``AnalysisTasks``
The tasks that are currently running, with task names as keys
Returns
-------
analysisTask : ``AnalysisTasks``
A task that finished
Authors
-------
Xylar Asay-Davis
"""
# necessary to have a timeout so we can kill the whole thing
# with a keyboard interrupt
while True:
for analysisTask in runningTasks.values():
analysisTask.join(timeout=timeout)
if not analysisTask.is_alive():
return analysisTask # }}}
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("--setup_only", dest="setup_only", action='store_true',
help="If only the setup phase, not the run or HTML "
"generation phases, should be executed.")
parser.add_argument("--html_only", dest="html_only", action='store_true',
help="If only the setup and HTML generation phases, "
"not the run phase, should be executed.")
parser.add_argument("-g", "--generate", dest="generate",
help="A list of analysis modules to generate "
"(nearly identical generate option in config file).",
metavar="ANALYSIS1[,ANALYSIS2,ANALYSIS3,...]")
parser.add_argument("-l", "--list", dest="list", action='store_true',
help="List the available analysis tasks")
parser.add_argument("-p", "--purge", dest="purge", action='store_true',
help="Purge the analysis by deleting the output"
"directory before running")
parser.add_argument('configFiles', metavar='CONFIG',
type=str, nargs='*', help='config file')
args = parser.parse_args()
for configFile in args.configFiles:
if not os.path.exists(configFile):
raise OSError('Config file {} not found.'.format(configFile))
# add config.default to cover default not included in the config files
# provided on the command line
if pkg_resources.resource_exists('mpas_analysis', 'config.default'):
defaultConfig = pkg_resources.resource_filename('mpas_analysis',
'config.default')
configFiles = [defaultConfig] + args.configFiles
else:
print('WARNING: Did not find config.default. Assuming other config '
'file(s) contain a\n'
'full set of configuration options.')
defaultConfig = None
configFiles = args.configFiles
config = MpasAnalysisConfigParser()
config.read(configFiles)
if args.list:
analyses = build_analysis_list(config, refConfig=None)
for analysisTask in analyses:
print('task: {}'.format(analysisTask.taskName))
print(' component: {}'.format(analysisTask.componentName)),
print(' tags: {}'.format(', '.join(analysisTask.tags)))
sys.exit(0)
if config.has_option('runs', 'referenceRunConfigFile'):
refConfigFile = config.get('runs', 'referenceRunConfigFile')
if not os.path.exists(refConfigFile):
raise OSError('A reference config file {} was specified but the '
'file does not exist'.format(refConfigFile))
refConfigFiles = [refConfigFile]
if defaultConfig is not None:
refConfigFiles = [defaultConfig] + refConfigFiles
refConfig = MpasAnalysisConfigParser()
refConfig.read(refConfigFiles)
# replace the log directory so log files get written to this run's
# log directory, not the reference run's
logsDirectory = build_config_full_path(config, 'output',
'logsSubdirectory')
refConfig.set('output', 'logsSubdirectory', logsDirectory)
print('Comparing to reference run {} rather than observations. \n'
'Make sure that MPAS-Analysis has been run previously with the '
'ref config file.'.format(refConfig.get('runs', 'mainRunName')))
else:
refConfig = None
if args.purge:
outputDirectory = config.get('output', 'baseDirectory')
if not os.path.exists(outputDirectory):
print('Output directory {} does not exist.\n'
'No purge necessary.'.format(outputDirectory))
else:
print('Deleting contents of {}'.format(outputDirectory))
shutil.rmtree(outputDirectory)
if args.generate:
update_generate(config, args.generate)
if refConfig is not None:
# we want to use the "generate" option from the current run, not
# the reference config file
refConfig.set('output', 'generate', config.get('output', 'generate'))
logsDirectory = build_config_full_path(config, 'output',
'logsSubdirectory')
make_directories(logsDirectory)
analyses = build_analysis_list(config, refConfig)
analyses = determine_analyses_to_generate(analyses)
if not args.setup_only and not args.html_only:
run_analysis(config, analyses)
if not args.setup_only:
generate_html(config, analyses, refConfig)
# vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python