-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtestfwk.py
278 lines (234 loc) · 8.64 KB
/
testfwk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import os, re, sys
def testfwk_print(value):
print(value)
def cleanup_workflow():
testfwk_remove_files(['work/gc-run.lib', 'work/jobs', 'work/output',
'work/params.dat.gz', 'work/params.map.gz', 'work/sandbox', 'work'])
def cmp_obj(obj_x, obj_y, desc=''):
from python_compat import sorted, izip, set
if type(obj_x) != type(obj_y):
return desc + 'different types %s %s' % (type(obj_x), type(obj_y))
if isinstance(obj_x, (list, tuple, dict, set)):
if len(obj_x) != len(obj_y):
return desc + 'different number of elements len(%r)=%d len(%r)=%d' % (
obj_x, len(obj_x), obj_y, len(obj_y))
if isinstance(obj_x, (dict, set)):
item_iter = enumerate(izip(sorted(obj_x, key=str), sorted(obj_y, key=str)))
else:
item_iter = enumerate(izip(obj_x, obj_y))
for idx, (x_key, y_key) in item_iter:
result = cmp_obj(x_key, y_key, desc + 'items (#%d):' % idx)
if result is not None:
return result
if isinstance(obj_x, dict):
for x_key in sorted(obj_x):
result = cmp_obj(obj_x[x_key], obj_y[x_key], desc + 'values (key:%s):' % repr(x_key))
if result is not None:
return result
elif isinstance(obj_x, (str, int, float)):
if obj_x != obj_y:
return desc + 'different objects %r %r' % (obj_x, obj_y)
elif isinstance(obj_x, type(None)):
assert obj_x == obj_y
else:
return 'unknown type %s' % type(obj_x)
def create_config(*args, **kwargs):
from grid_control.config import create_config
return create_config(*args, **kwargs)
def testfwk_create_workflow(user_config_dict=None, states=None):
from grid_control.workflow import Workflow
config_dict = {
'global': {'task': 'UserTask', 'backend': 'Host'},
'jobs': {'nseeds': 0, 'random variables': ''},
'task': {
'wall time': '1',
'executable': os.path.join(os.environ['GC_TESTSUITE_BASE'], 'testfwk.py'),
},
}
user_config_dict = user_config_dict or {}
for key in user_config_dict:
config_dict.setdefault(key, {}).update(user_config_dict[key])
config = create_config(config_dict=config_dict)
states = states or []
for (state, detail) in states:
config.set_state(True, state, detail)
workflow = Workflow(config, 'global')
workflow.testsuite_config = config
return workflow
def testsuite_display_job_obj_list(job_db, *args, **kwargs):
from grid_control.job_db import Job
for jobnum in job_db.get_job_list(*args, **kwargs):
job_obj = job_db.get_job(jobnum)
if not job_obj:
infos = [str(jobnum), '<no stored data>']
else:
infos = [str(jobnum), Job.enum2str(job_obj.state).ljust(8), job_obj.gc_id or '<no id>']
if job_obj.get('legacy_gc_id'):
infos.append('(%s)' % job_obj.get('legacy_gc_id'))
testfwk_print(str.join(' ', infos))
def testfwk_format_exception(ex_tuple):
from hpfwk.hpf_debug import format_exception
return format_exception(ex_tuple, show_file_stack=1, show_exception_stack=2)
def function_factory(*values, **kwargs):
display = kwargs.pop('display', True)
display_first = kwargs.pop('display_first', True)
def fun(*args, **kwargs):
if display:
if not display_first:
args = args[1:]
testfwk_print(repr((args, kwargs)))
result = fun.values[0]
if len(fun.values) > 1:
fun.values = fun.values[1:]
return result
fun.values = values
return fun
def get_logger():
import logging
return logging.getLogger().handlers[0]
def testfwk_remove_files(files):
import glob
from grid_control.utils import remove_files
files_list = []
for fn in files:
files_list.extend(glob.glob(fn))
return remove_files(files_list)
def run_test(exit_fun=None, cleanup_fun=None):
import doctest
kwargs = {}
if hasattr(doctest, 'REPORT_UDIFF'):
kwargs = {'optionflags': doctest.REPORT_UDIFF}
result = doctest.testmod(**kwargs)
if exit_fun is not None:
exit_fun()
if (cleanup_fun is not None) and (result[0] == 0):
cleanup_fun()
sys.exit(result[0])
def testfwk_set_path(new_path):
try:
path = testfwk_set_path.backup
except Exception:
testfwk_set_path.backup = os.environ['PATH']
path = os.environ['PATH']
os.environ['PATH'] = str.join(':', [os.path.abspath(new_path)] + path.split(':')[1:])
def setup(fn, term='gc_color256'):
def add_path(dn):
sys.path.insert(1, os.path.abspath(dn))
sys.path.pop()
dn_fn = os.path.dirname(fn)
if not dn_fn:
dn_fn = os.curdir
dir_testsuite = os.path.abspath(os.path.dirname(__file__))
os.environ['GC_TESTSUITE_BASE'] = dir_testsuite
add_path(os.path.join(os.path.dirname(__file__), '..', 'packages')) # gc dir (pip installed)
add_path(dn_fn) # test dir
add_path(dir_testsuite) # testsuite base dir
hpfwk = __import__('hpfwk') # to properly setup HPF_STARTUP_DIRECTORY
hpfwk.init_hpf_plugins(os.path.join(dir_testsuite, 'grid_control_tests'))
os.chdir(dn_fn)
if term is not None:
os.environ['GC_TERM'] = term
from grid_control.logging_setup import GCStreamHandler
GCStreamHandler.push_std_stream(TestsuiteStream(), TestsuiteStream())
def testfwktestfwk_print_exception(etype, value, traceback, limit=None, file=None):
file = file or sys.stderr
file.write(testfwk_format_exception((etype, value, traceback)) + '\n')
file.flush()
import traceback
traceback.print_exception = testfwktestfwk_print_exception
def str_dict_testsuite(mapping, keys=None):
if keys is None:
keys = list(mapping.keys())
keys.sort()
dict_list = []
for key in keys:
if key in mapping:
dict_list.append('%s: %s' % (repr(key), repr(mapping[key])))
return '{%s}' % str.join(', ', dict_list)
def try_catch(fun, catch='!!!', catch_value=None):
try:
testfwk_print(fun())
except SystemExit:
if catch == 'SystemExit':
testfwk_print('Exit with %r' % sys.exc_info()[1].args)
else:
testfwk_print('failed SystemExit')
except Exception:
messages = testfwk_format_exception(sys.exc_info())
caught = False
matching = False
for line in messages.splitlines():
if catch and line.strip().startswith(catch):
matching = True
if not catch_value:
caught = True
if matching and catch_value and (catch_value in line):
caught = True
if caught:
testfwk_print('caught')
else:
testfwk_print(repr(messages))
from hpfwk import clear_current_exception
clear_current_exception()
def write_file(fn, content):
fp = open(fn, 'w')
fp.write(content)
fp.close()
class DummyObj(object):
def __init__(self, **struct):
for item in struct:
setattr(self, item, struct[item])
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, str_dict_testsuite(self.__dict__))
class TestsuiteStream(object):
_modify = None
def __init__(self, display_commands=False):
self._display_commands = display_commands
def flush(self):
pass
def set_modify(cls, fun):
cls._modify = staticmethod(fun)
set_modify = classmethod(set_modify)
def write(self, value):
for line in self._transform(value):
if not line:
msg = '-' * 5
else:
msg = line.rstrip()
testfwk_print(msg)
def _transform(self, value):
from grid_control_gui.ansi import ANSI
if self._display_commands:
for attr_name in ANSI.__dict__:
if (attr_name != 'esc') and getattr(ANSI, attr_name):
value = value.replace(str(getattr(ANSI, attr_name)), '<%s>' % attr_name)
value = re.sub('\x1b\[([0-9]*);([0-9]*)H', '<move:\\1,\\2>', value)
value = re.sub('\x1b\[([0-9]*)A', '<move_up:\\1>', value)
value = re.sub('\x1b\[38;5;([0-9]*)m', '<grayscale:\\1>', value)
value = value.replace(r'\r', '<move:line-start>')
value = value.replace('\n', '<newline>\n')
value = value.replace('IOError', 'XXError')
value = value.replace('OSError', 'XXError')
value = value.replace('BadGzipFile', 'XXError')
value = value.replace('python_compat', 'py_th_on_com_pat')
value = value.replace(os.environ['GC_TESTSUITE_BASE'], '<testsuite dir>')
value = value.replace(os.environ['GC_PACKAGES_PATH'], '<gc package dir>')
value = re.sub(r'\'.*/debug.log\'', '\'.../debug.log\'', value)
value = re.sub(r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d', '0000-00-00 00:00:00', value)
value = re.sub(r'GC[0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f]', 'GC00000000', value)
value = re.sub(r'GC[0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f]', 'GC0000000000', value)
value = re.sub(r'\d\d\d\d-\d\d-\d\d', '0000-00-00', value)
value = re.sub(r'\(\d+:\d\d:\d\d\)', '(XX:XX:XX)', value)
value = re.sub(r'Using batch system: .*', 'Using batch system: ---', value)
value = re.sub(r'\| \d+\)', '| ...)', value)
value = value.replace('\t', ' ').replace('<module>', '?')
iter_lines = value.splitlines()
if TestsuiteStream._modify:
iter_lines = TestsuiteStream._modify(iter_lines)
for line in iter_lines:
if not self._display_commands:
yield ANSI.strip_fmt(line).rstrip()
elif ANSI.strip_fmt(line).rstrip() == line.rstrip():
yield line.rstrip()
else:
yield repr(line.rstrip())