Skip to content

Commit

Permalink
fixes #662. Fixes #663
Browse files Browse the repository at this point in the history
  • Loading branch information
o-smirnov committed Jul 26, 2020
1 parent fc8f59a commit b07bf77
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 10 deletions.
37 changes: 28 additions & 9 deletions stimela/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, name, recipe, label=None,
self.recipe = recipe
self.label = label or '{0}({1})'.format(name, id(name))
self.log = recipe.log
self._log_fh = None
self.active = False
self.jtype = jtype # ['docker', 'python', singularity']
self.job = None
Expand Down Expand Up @@ -106,9 +107,9 @@ def setup_job_log(self, log_name=None, loglevel=None):
log_dir = os.path.dirname(self.logfile) or "."
if not os.path.exists(log_dir):
os.mkdir(log_dir)
fh = logging.FileHandler(self.logfile, 'w', delay=True)
fh.setLevel(getattr(logging, loglevel))
self.log.addHandler(fh)
self._log_fh = logging.FileHandler(self.logfile, 'w', delay=True)
self._log_fh.setLevel(getattr(logging, loglevel))
self.log.addHandler(self._log_fh)

self.log.propagate = True # propagate also to main stimela logger

Expand Down Expand Up @@ -423,6 +424,14 @@ def run_job(self):
self.job.start(output_wrangler=self.apply_output_wranglers)
return 0

def close(self):
"""Call this to explicitly clean up after the job"""
if self._log_fh is not None:
self._log_fh.close()

def __del__(self):
self.close()


class Recipe(object):
def __init__(self, name, data=None,
Expand Down Expand Up @@ -473,6 +482,8 @@ def __init__(self, name, data=None,
self.logfile_task = "{0}/log-{1}-{{task}}".format(log_dir or ".", self.name_) \
if logfile_task is None else logfile_task

self._log_fh = None

if logger is not None:
self.log = logger
else:
Expand All @@ -496,10 +507,10 @@ def __init__(self, name, data=None,
self.log.info('creating log directory {0:s}'.format(log_dir))
os.makedirs(log_dir)

fh = logging.FileHandler(logfile, 'w', delay=True)
fh.setLevel(getattr(logging, self.loglevel))
fh.setFormatter(stimela.log_formatter)
self.log.addHandler(fh)
self._log_fh = logging.FileHandler(logfile, 'w', delay=True)
self._log_fh.setLevel(getattr(logging, self.loglevel))
self._log_fh.setFormatter(stimela.log_formatter)
self.log.addHandler(self._log_fh)

self.resume_file = '.last_{}.json'.format(self.name_)
# set to default if not set
Expand Down Expand Up @@ -750,7 +761,15 @@ def run(self, steps=None, resume=False, redo=None):

return 0

def __del__(self):
"""Failsafe"""
def close(self):
"""Call this to explicitly close the recipe and clean up. Don't call run() after close()!"""
for job in self.jobs:
job.close()
if os.path.exists(self.workdir):
shutil.rmtree(self.workdir)
if self._log_fh is not None:
self._log_fh.close()

def __del__(self):
"""Failsafe"""
self.close()
37 changes: 36 additions & 1 deletion stimela/utils/xrun_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def global_logger():
log = logging.getLogger()
return log

class Poller(object):
class SelectPoller(object):
"""Poller class. Poor man's select.poll(). Damn you OS/X and your select.poll will-you-won'y-you bollocks"""
def __init__ (self, log):
self.fdlabels = {}
Expand Down Expand Up @@ -70,6 +70,41 @@ def unregister_file(self, fobj):
def __contains__(self, fobj):
return fobj.fileno() in self.fdlabels

class Poller(object):
"""Poller class. Wraps select.poll()."""
def __init__ (self, log):
self.fdlabels = {}
self.log = log
self.poll = select.poll()

def register_file(self, fobj, label):
self.fdlabels[fobj.fileno()] = label, fobj
self.poll.register(fobj.fileno(), select.POLLIN)

def register_process(self, po, label_stdout='stdout', label_stderr='stderr'):
self.fdlabels[po.stdout.fileno()] = label_stdout, po.stdout
self.fdlabels[po.stderr.fileno()] = label_stderr, po.stderr
self.poll.register(po.stdout.fileno(), select.POLLIN)
self.poll.register(po.stderr.fileno(), select.POLLIN)

def poll(self, timeout=5, verbose=False):
try:
to_read = [fd for (fd, _) in self.poll.poll(timeout)]
if verbose:
self.log.debug("poll(): ready to read: {}".format(to_read))
return [self.fdlabels[fd] for fd in to_read]
except Exception:
if verbose:
self.log.debug("poll() exception: {}".format(traceback.format_exc()))
raise

def unregister_file(self, fobj):
if fobj.fileno() in self.fdlabels:
self.poll.unregister(fobj.fileno())

def __contains__(self, fobj):
return fobj.fileno() in self.fdlabels


def _remove_ctrls(msg):
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
Expand Down

0 comments on commit b07bf77

Please sign in to comment.