-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathposdo.py
385 lines (368 loc) · 13.5 KB
/
posdo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
import code, pickle, re, readline, select, socket, sys, threading, time, zlib
# Format
#
# Clinet->Server
# <connect>
# Server->Client
# Length\n
# State: (task base, job_globals, [task_args])
# Code
# Client->Server
# Length\n
# Result: (task base, [task_results])
# <repeat>/<disconnect>
dbg_lvl = 3
def dbg_out(lvl, *s):
global dbg_lvl
if lvl <= dbg_lvl: print(''.join([str(x) for x in s]))
def dmp(*s): dbg_out(5, *s)
def dbg(*s): dbg_out(4, *s)
def info(*s): dbg_out(3, *s)
def wrn(*s): dbg_out(2, *s)
def err(*s): dbg_out(1, *s)
def cli_ok(*s): print ''.join([str(x) for x in s])
def cli_err(*s): print ': '.join(('Error', ''.join([str(x) for x in s])))
class struct:
def __str__(self): return str(self.__dict__)
def __repr__(self): return str(self.__dict__)
def __len__(self): return len(self.__dict__)
class PosdoException(Exception): pass
def so_read_line(so):
s = ''
t = so.recv(1)
while (t and t != '\n'):
s = s + t
t = so.recv(1)
return s
def so_read_block(so):
# read length
s = so_read_line(so)
try:
lblk = long(s)
except:
lblk = 0
# if nothing to read, quit
if lblk == 0: return ''
# read payload
blk = []
blk_read = 0
while blk_read < lblk:
part = so.recv(min(lblk - blk_read, 4096))
if len(part) == 0: raise Exception, "Socket dead"
blk.append(part)
blk_read = blk_read + len(part)
blk = ''.join(blk)
s = zlib.decompress(blk)
dmp('=>', len(s), '\n', s)
return s
def so_read_task(so):
s = so_read_block(so)
task_info, task = pickle.loads(s)
return task_info, task
def so_write_block(so, r):
dmp('<=', len(r), r)
s = zlib.compress(r)
t = ''.join((str(len(s)), '\n', s))
so.sendall(t)
def so_write_task(so, s):
task_info, s = s
s = pickle.dumps((task_info, s))
so_write_block(so, s)
class Job(object):
def __init__(self, name, path, args):
self.name = name
self.path = path
self.args = args
self.inst = None
# runtime info
self.task_offset = 0
self.tasks_done = False # all tasks are issued, may not be processed yet
self.tasks_outstanding = {} # elements of the form (task_base, nof_tasks)
self.tasks_redo = [] # elements of the form (task_base, nof_tasks)
def __getattr__(self, name):
if self.inst: return self.inst.__dict__[name]
def done(self):
is_done = self.tasks_done and len(self.tasks_outstanding) == 0
if is_done:
self.inst.job_finish() # signal job finished
return is_done
def load(self, posdo):
try:
job_str = open(self.path, 'r').read()
except IOError, inst:
s = "%s: %s" % (self.name, inst)
raise PosdoException, s
self.parse(job_str, posdo)
def parse(self, job_str, posdo):
worker_match = re.compile('def.*job_worker').search(job_str)
try:
offset = worker_match.start()
except:
raise PosdoException, "%s: Unable to find 'job_worker'" % (self.name)
self.code_control, self.code_worker = job_str[:offset], job_str[offset:]
try:
# For now we check validity by compiling it
# At some point we may want to do something more elaborate
x = compile(self.code_control, 'test', 'exec')
compile(self.code_worker, 'test', 'exec')
except Exception, inst:
raise PosdoException, '%s: Invalid job (%s)' % (self.name, inst)
self.inst = struct()
exec(x, self.inst.__dict__)
# setup posdo accessor class
posdo_accessor = PosdoAccessor(posdo, self.name)
self.inst.__dict__.update({'posdo': posdo_accessor})
def init(self):
err = self.inst.job_init(self.args)
if err:
raise PosdoException, '%s: Failed init' % (self.name)
# Get job specific options (optional)
try:
options = self.inst.job_get_options()
if options == None: raise ValueError
except Exception:
dbg('using option defaults')
options = (1, 1, 0)
self.opt_power_scaling, self.opt_tasks_redo, self.opt_tasks_outstanding_max = options
dbg('options ', options)
# Get job globals
self.globals = self.inst.job_get_globals()
def tasks_at_max(self):
return self.opt_tasks_outstanding_max > 0 and len(self.task_outstanding) >= self.opt_tasks_outstanding_max
def uv_drop(self, uv):
if uv in self.tasks_outstanding:
task_info = self.tasks_outstanding.pop(uv)
if self.opt_tasks_redo:
dbg('queueing task for redo ', task_info)
self.tasks_redo.append(task_info)
else:
(task_offset, task_len) = task_info
for i in xrange(task_offset, task_offset + task_len):
self.inst.job_notify_failure(i) # notify the job of the failure
def uv_result_process(self, uv):
result_nof, results = uv.result()
for result in results:
self.inst.job_add_result(result_nof, result)
result_nof += 1
self.tasks_outstanding.pop(uv)
# based on how long this task took to complete, adjust
# UV power rating
if self.opt_power_scaling:
uv.power_scale()
def uv_task(self, uv):
if self.opt_power_scaling:
uv_power = uv.power
else:
uv_power = 1
# If we have a task that needs to be done,
# redo it within UV's constraints.
# Otherwise, generate a fresh task
if len(self.tasks_redo) > 0:
task_offset, task_len = self.tasks_redo[0]
if task_len > uv_power: # chop up task to UVs size
self.tasks_redo[0] = (task_offset + uv_power, task_len - uv_power)
task_len = uv_power
else: # entire redo task is consumed
self.tasks_redo.pop(0)
dbg('redoing task ', task_offset, ' ', task_len)
else: # this is a fresh task
# find out if we are allowed to ask for more args
try:
job_status = self.inst.job_get_status()
except Exception:
job_status = 1, 0
if job_status[0] == 0: # job complete
task_len = 0
self.tasks_done = True
elif job_status[0] == 1: # job not complete, may be pending more results
if job_status[1] == 0: # job is humming along
task_offset = self.task_offset
task_len = uv_power
self.task_offset += task_len
else: # job is pending more results
task_len = 0
# build the tasks args list
task_args = []
for i in xrange(task_len):
arg = self.inst.job_get_arg(task_offset + i)
if arg == None:
# We got cut down, however, if job_get_status() exists, we need to
# check it as well.
try:
job_status = self.inst.job_get_status()
if job_status[0] == 0:
self.tasks_done = True
except Exception:
# if job_get_status() doesn't exist then we are truly done
self.tasks_done = True
break
task_args.append(arg)
if len(task_args) > 0:
uv.task(self, task_offset, task_args)
self.tasks_outstanding[uv] = (task_offset, task_len)
dbg('outstanding ', len(self.tasks_outstanding))
return False
return True
class Uv(struct):
def __init__(self, so, addr):
self.addr = addr
self.power = 1
self.so = so
self.task_time_last = 0
self.task_offset = None
self.task_job = None
def drop(self):
self.so.close()
job = self.task_job
if job: job.uv_drop(self)
def power_scale(self):
if self.task_time_last > 0:
if glbl.now - self.task_time_last < glbl.task_params.time_min_sec:
self.power *= 2
dbg('increased power of ', self.addr, ' to ', self.power)
elif glbl.now - self.task_time_last > glbl.task_params.time_max_sec:
if self.power > 1:
self.power /= 2
dbg('decreased power of ', self.addr, ' to ', self.power)
def result(self):
task_results, dummy = so_read_task(self.so)
return task_results
def task(self, job, task_offset, task_args):
# If this UV has already done some work, then it has the task code and globals.
# Don't bother sending the task code and globals again.
if self.task_job == job:
task_code = ''
task_globals = ''
else:
self.task_job = job
task_code = job.code_worker
task_globals = job.globals
task_info = (task_offset, task_globals, task_args)
so_write_task(self.so, (task_info, task_code))
self.task_time_last = glbl.now
self.task_offset = task_offset
class Posdo(struct):
def __init__(self):
self.uv_q = [] # UV
self.uvs = {} # so -> UV
self.jobs = [] # job
# select lists
self.iwtd = []
self.owtd = []
self.ewtd = []
def run(self):
# setup listen socket
sol = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sol.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
sol.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sol.bind((glbl.network.host, glbl.network.port))
sol.listen(3)
self.iwtd.append(sol) # add socket to the select input list
now = 0
while not glbl.done:
while len(glbl.job_q) > 0:
job = glbl.job_q.pop()
try:
job.load(self)
job.init()
except Exception, inst:
cli_err(inst)
continue
self.jobs.append(job)
jobs_to_remove = []
for job in self.jobs:
if job.tasks_at_max(): continue
for uv in self.uv_q:
if job.tasks_at_max(): break
if job.uv_task(uv): break # if no more tasks left, then we are done
self.uv_q.remove(uv)
if job.done():
jobs_to_remove.append(job)
for job in jobs_to_remove:
cli_ok("Job '%s' complete" % (job.name))
self.jobs.remove(job)
jobs_to_remove = []
if glbl.done: continue
ri, ro, rerr = select.select(self.iwtd, self.owtd, self.ewtd, 1)
glbl.now = time.time()
for so in ri:
try:
if so == sol: # process UV connection
new_so, addr = so.accept()
uv = Uv(new_so, addr)
self.uvs[new_so] = uv
self.iwtd.append(new_so)
info('Connected ', addr)
else: # process UV response
uv = self.uvs[so]
job = uv.task_job
job.uv_result_process(uv)
self.uv_q.append(uv) # add to idle list
except (socket.error, ValueError, AttributeError, EOFError):
info('Disconnected ', uv.addr)
self.uvs.pop(uv.so, 0)
self.iwtd.remove(uv.so)
try:
self.uv_q.remove(uv) # remove from idle list if there
except:
pass
uv.drop()
class PosdoAccessor(object):
def __init__(self, posdo, job_name):
self._posdo = posdo
self._job_name = job_name
def jobs_nof(self):
return len(self._posdo.jobs)
def uvs_nof(self):
return len(self._posdo.uvs)
def terminate(self):
glbl.done = 1
def dbg(self, *s): dbg('%s: ' % (self._job_name), *s)
def info(self, *s): info('%s: ' % (self._job_name), *s)
def err(self, *s): err('%s: ' % (self._job_name), *s)
glbl = struct()
glbl.done = False
glbl.job_q = []
glbl.now = None
glbl.network = struct()
glbl.network.port = 0
glbl.task_params = struct()
glbl.task_params.time_min_sec = 20
glbl.task_params.time_max_sec = 120
def thread_cli():
#readline.parse_and_bind("tab: complete")
while not glbl.done:
cli_ok('ok')
line = raw_input()
glbl.now = time.time()
if len(line) == 0: continue
if line == 'exit':
glbl.done = True
break
if line == 'error':
cli_err('Manual')
continue
line_list = line.split()
try:
job_name = line_list[0]
job_filename = job_name + '.py'
job_args = line_list[1:] # XXX wrong parsing for arguments in quotes with spaces
except:
cli_err('Syntax')
continue
try:
job = Job(job_name, job_filename, job_args)
except (Exception, PosdoException), inst:
cli_err(inst)
continue
glbl.job_q.append(job)
def thread_posdo():
posdo = Posdo()
posdo.run()
def main():
glbl.network.port = int(sys.argv[1])
glbl.network.host = ''
threading.Thread(target=thread_cli).start()
threading.Thread(target=thread_posdo).start()
main()