From 9787f6730d30b602eb77d14addee0affe982578f Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Wed, 6 Apr 2016 17:05:58 +0200 Subject: [PATCH 1/2] fixes #79, I think --- Pyxis/Commands.py | 60 +++++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/Pyxis/Commands.py b/Pyxis/Commands.py index c0c50c5..87417c4 100644 --- a/Pyxis/Commands.py +++ b/Pyxis/Commands.py @@ -9,6 +9,8 @@ import itertools import math import fcntl +import multiprocessing +import Queue import Pyxis import Pyxis.Internals @@ -271,36 +273,37 @@ def _restore (): else: # else split varlist into forked subprocesses nforks = min(nforks,len(varlist)); - vpf = len(varlist)/nforks; + # create a queue for all variable values + varqueue = multiprocessing.Queue(len(varlist)) + for x in varlist: + varqueue.put(x) # distribute N values per each fork - subvals_list = [ varlist[i*vpf:(i+1)*vpf] for i in range(nforks) ]; - # if something is left over, assign to first few forks - for i in range(nforks*vpf,len(varlist)): - subvals_list[i-nforks*vpf].append(varlist[i]); - # how many vars per fork? -# print [ len(sv) for sv in subvals_list ]; -# print subvals_list; - _verbose(1,"splitting into %d jobs, up to %d %s's per job, staggered by %ds"%(len(subvals_list),len(subvals_list[0]),varname,stagger)); + _verbose(1,"splitting into %d jobs by %s, staggered by %ds"%(nforks,varname,stagger)); Pyxis.Internals.flush_log(); forked_pids = {}; try: - for job_id,subvals in enumerate(subvals_list): + for job_id in range(nforks): if job_id and stagger: time.sleep(stagger); # subvals is range of values to be iterated over by this subjob - subval_str = ",".join(map(str,subvals)); pid = os.fork(); if not pid: - # child fork: run commands + # child fork: run commands while something is on queue _subprocess_id = job_id; - _verbose(1,"started job %d for %s"%(job_id,", ".join(map(str,subvals))),sync=True); + _verbose(1,"started job %d"%job_id,sync=True); try: - fail_list = []; - for value in subvals: + fail_list = [] + success_list = [] + while True: + try: + value = varqueue.get(False) + except Queue.Empty: + break _verbose(1,"per-loop, setting %s=%s"%(varname,value),sync=True); assign(vname,value,namespace=namespace,interpolate=False); try: Pyxis.Internals.run(*commands); + success_list.append(value) except (Exception,SystemExit,KeyboardInterrupt) as exc: if persist: _warn("exception raised for %s=%s:\n"%(vname,value), @@ -309,13 +312,18 @@ def _restore (): fail_list.append((value,str(exc))); else: raise; + # any successes? + if success_list: + _verbose(1,"job #%d (pid %d): per-loop succeeded for %s"%(_subprocess_id,pid, + ", ".join([str(f) for f in success_list])),sync=True) # any fails? if fail_list: _restore(); - _abort("per-loop failed for %s"%(", ".join([str(f[0]) for f in fail_list])),sync=True); + _abort("job #%d (pid %d): per-loop failed for %s"%(_subprocess_id,pid, + ", ".join([str(f[0]) for f in fail_list])),sync=True) except: traceback.print_exc(); - _verbose(2,"job #%d (pid %d: %s=%s) exiting with error code 1"%(_subprocess_id,os.getpid(),varname,value),sync=True); + _verbose(1,"job #%d (pid %d) aborted at %s=%s, exiting with error code 1"%(_subprocess_id,pid,varname,value),sync=True); _restore(); _verbose(2,"logfile is",Pyxis.Context.get('LOG'),sync=True); _error("per-loop failed for %s"%value,sync=True); @@ -323,22 +331,22 @@ def _restore (): _verbose(2,"job #%d (pid %d) exiting normally"%(_subprocess_id,os.getpid()),sync=True); sys.exit(0); else: # parent pid: append to list - _verbose(2,"launched job #%d (%s=%s) with pid %d"%(job_id,varname,subval_str,pid),sync=True); - forked_pids[pid] = job_id,subval_str; + _verbose(2,"launched job #%d with pid %d"%(job_id,pid),sync=True); + forked_pids[pid] = job_id njobs = len(forked_pids); _verbose(1,"%d jobs launched, waiting for finish"%len(forked_pids),sync=True); failed = []; while forked_pids: pid,status = os.waitpid(-1,0); if pid in forked_pids: - job_id,subval_str = forked_pids.pop(pid); + job_id = forked_pids.pop(pid); status >>= 8; if status: - failed.append((job_id,subval_str)); + failed.append(job_id); # success = False; - _error("job #%d (%s=%s) exited with error status %d, waiting for %d more jobs to complete"%(job_id,varname,subval_str,status,len(forked_pids)),sync=True); + _error("job #%d exited with error status %d, waiting for %d more jobs to complete"%(job_id,status,len(forked_pids)),sync=True); else: - _verbose(1,"job #%d (%s=%s) finished, waiting for %d more jobs to complete"%(job_id,varname,subval_str,len(forked_pids)),sync=True); + _verbose(1,"job #%d finished, waiting for %d more jobs to complete"%(job_id,len(forked_pids)),sync=True); if failed: _abort("%d of %d jobs have failed"%(len(failed),njobs),sync=True); else: @@ -353,9 +361,9 @@ def _restore (): while forked_pids: pid,status = os.waitpid(-1,0); if pid in forked_pids: - job_id,subval_str = forked_pids.pop(pid); - _verbose(1,"job #%d (%s=%s) exited with error status %d, waiting for %d more"% - (job_id,varname,subval_str,status>>8,len(forked_pids)),sync=True); + job_id = forked_pids.pop(pid); + _verbose(1,"job #%d exited with error status %d, waiting for %d more"% + (job_id,status>>8,len(forked_pids)),sync=True); raise; finally: # note that children also execute this block with sys.exit() From 2c015cc24d4e3c5c575bb24361cb4120e7721076 Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Wed, 15 Jun 2016 15:30:21 +0200 Subject: [PATCH 2/2] fixes #80 --- Pyxis/bin/pyxis | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/Pyxis/bin/pyxis b/Pyxis/bin/pyxis index c0b83f7..888576c 100755 --- a/Pyxis/bin/pyxis +++ b/Pyxis/bin/pyxis @@ -270,10 +270,15 @@ if __name__ == "__main__": args.append("-d"); if SPAWN_SCREEN_LOG: args.append("-L"); - args += sys.argv; - args.append("--running-in-screen"); - if PAUSE_ON_EXIT and "--pause-on-exit" not in args: - args.append("--pause-on-exit"); + args += [ "bash", "-ci" ] + pyxis_args = sys.argv + pyxis_args.append("--running-in-screen"); + if PAUSE_ON_EXIT and "--pause-on-exit" not in pyxis_args: + pyxis_args.append("--pause-on-exit"); + # now turn the pyxis args list into a something that bash -c will process + # quote the single quotes in arguments + pyxis_args = [ re.sub(r'(["\'$` ])',r'\\\1',x) for x in pyxis_args ] + args.append(' '.join(pyxis_args)) os.execv(screenpath,args); Pyxis.Internals.saveconf();