Skip to content

Commit

Permalink
Merge pull request #85 from ska-sa/issue-79
Browse files Browse the repository at this point in the history
Yes it works.
  • Loading branch information
o-smirnov authored Feb 17, 2017
2 parents fd6aac5 + 33d300e commit f57c0f8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
60 changes: 34 additions & 26 deletions Pyxis/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import itertools
import math
import fcntl
import multiprocessing
import Queue

import Pyxis
import Pyxis.Internals
Expand Down Expand Up @@ -271,36 +273,37 @@ def _restore ():
else:
# else split varlist into forked subprocesses
nforks = min(nforks,len(varlist));
vpf = len(varlist)/nforks;
# create a queue for all variable values
varqueue = multiprocessing.Queue(len(varlist))
for x in varlist:
varqueue.put(x)
# distribute N values per each fork
subvals_list = [ varlist[i*vpf:(i+1)*vpf] for i in range(nforks) ];
# if something is left over, assign to first few forks
for i in range(nforks*vpf,len(varlist)):
subvals_list[i-nforks*vpf].append(varlist[i]);
# how many vars per fork?
# print [ len(sv) for sv in subvals_list ];
# print subvals_list;
_verbose(1,"splitting into %d jobs, up to %d %s's per job, staggered by %ds"%(len(subvals_list),len(subvals_list[0]),varname,stagger));
_verbose(1,"splitting into %d jobs by %s, staggered by %ds"%(nforks,varname,stagger));
Pyxis.Internals.flush_log();
forked_pids = {};
try:
for job_id,subvals in enumerate(subvals_list):
for job_id in range(nforks):
if job_id and stagger:
time.sleep(stagger);
# subvals is range of values to be iterated over by this subjob
subval_str = ",".join(map(str,subvals));
pid = os.fork();
if not pid:
# child fork: run commands
# child fork: run commands while something is on queue
_subprocess_id = job_id;
_verbose(1,"started job %d for %s"%(job_id,", ".join(map(str,subvals))),sync=True);
_verbose(1,"started job %d"%job_id,sync=True);
try:
fail_list = [];
for value in subvals:
fail_list = []
success_list = []
while True:
try:
value = varqueue.get(False)
except Queue.Empty:
break
_verbose(1,"per-loop, setting %s=%s"%(varname,value),sync=True);
assign(vname,value,namespace=namespace,interpolate=False);
try:
Pyxis.Internals.run(*commands);
success_list.append(value)
except (Exception,SystemExit,KeyboardInterrupt) as exc:
if persist:
_warn("exception raised for %s=%s:\n"%(vname,value),
Expand All @@ -309,36 +312,41 @@ def _restore ():
fail_list.append((value,str(exc)));
else:
raise;
# any successes?
if success_list:
_verbose(1,"job #%d (pid %d): per-loop succeeded for %s"%(_subprocess_id,pid,
", ".join([str(f) for f in success_list])),sync=True)
# any fails?
if fail_list:
_restore();
_abort("per-loop failed for %s"%(", ".join([str(f[0]) for f in fail_list])),sync=True);
_abort("job #%d (pid %d): per-loop failed for %s"%(_subprocess_id,pid,
", ".join([str(f[0]) for f in fail_list])),sync=True)
except:
traceback.print_exc();
_verbose(2,"job #%d (pid %d: %s=%s) exiting with error code 1"%(_subprocess_id,os.getpid(),varname,value),sync=True);
_verbose(1,"job #%d (pid %d) aborted at %s=%s, exiting with error code 1"%(_subprocess_id,pid,varname,value),sync=True);
_restore();
_verbose(2,"logfile is",Pyxis.Context.get('LOG'),sync=True);
_error("per-loop failed for %s"%value,sync=True);
sys.exit(1);
_verbose(2,"job #%d (pid %d) exiting normally"%(_subprocess_id,os.getpid()),sync=True);
sys.exit(0);
else: # parent pid: append to list
_verbose(2,"launched job #%d (%s=%s) with pid %d"%(job_id,varname,subval_str,pid),sync=True);
forked_pids[pid] = job_id,subval_str;
_verbose(2,"launched job #%d with pid %d"%(job_id,pid),sync=True);
forked_pids[pid] = job_id
njobs = len(forked_pids);
_verbose(1,"%d jobs launched, waiting for finish"%len(forked_pids),sync=True);
failed = [];
while forked_pids:
pid,status = os.waitpid(-1,0);
if pid in forked_pids:
job_id,subval_str = forked_pids.pop(pid);
job_id = forked_pids.pop(pid);
status >>= 8;
if status:
failed.append((job_id,subval_str));
failed.append(job_id);
# success = False;
_error("job #%d (%s=%s) exited with error status %d, waiting for %d more jobs to complete"%(job_id,varname,subval_str,status,len(forked_pids)),sync=True);
_error("job #%d exited with error status %d, waiting for %d more jobs to complete"%(job_id,status,len(forked_pids)),sync=True);
else:
_verbose(1,"job #%d (%s=%s) finished, waiting for %d more jobs to complete"%(job_id,varname,subval_str,len(forked_pids)),sync=True);
_verbose(1,"job #%d finished, waiting for %d more jobs to complete"%(job_id,len(forked_pids)),sync=True);
if failed:
_abort("%d of %d jobs have failed"%(len(failed),njobs),sync=True);
else:
Expand All @@ -353,9 +361,9 @@ def _restore ():
while forked_pids:
pid,status = os.waitpid(-1,0);
if pid in forked_pids:
job_id,subval_str = forked_pids.pop(pid);
_verbose(1,"job #%d (%s=%s) exited with error status %d, waiting for %d more"%
(job_id,varname,subval_str,status>>8,len(forked_pids)),sync=True);
job_id = forked_pids.pop(pid);
_verbose(1,"job #%d exited with error status %d, waiting for %d more"%
(job_id,status>>8,len(forked_pids)),sync=True);
raise;
finally:
# note that children also execute this block with sys.exit()
Expand Down
13 changes: 9 additions & 4 deletions Pyxis/bin/pyxis
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,15 @@ if __name__ == "__main__":
args.append("-d");
if SPAWN_SCREEN_LOG:
args.append("-L");
args += sys.argv;
args.append("--running-in-screen");
if PAUSE_ON_EXIT and "--pause-on-exit" not in args:
args.append("--pause-on-exit");
args += [ "bash", "-ci" ]
pyxis_args = sys.argv
pyxis_args.append("--running-in-screen");
if PAUSE_ON_EXIT and "--pause-on-exit" not in pyxis_args:
pyxis_args.append("--pause-on-exit");
# now turn the pyxis args list into a something that bash -c will process
# quote the single quotes in arguments
pyxis_args = [ re.sub(r'(["\'$` ])',r'\\\1',x) for x in pyxis_args ]
args.append(' '.join(pyxis_args))
os.execv(screenpath,args);

Pyxis.Internals.saveconf();
Expand Down

0 comments on commit f57c0f8

Please sign in to comment.