Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give Stage the nofail behavior option #220

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,8 +1290,6 @@ def proc_wrapup(i):
sleeptime = min((sleeptime + 0.25) * 3, 60 / len(processes))

# All jobs are done, print a final closing and job info
stop_time = time.time()
proc_message = "Command completed. {info}"
info = (
"Elapsed time: "
+ str(datetime.timedelta(seconds=self.time_elapsed(start_time)))
Expand All @@ -1308,7 +1306,7 @@ def proc_wrapup(i):

info += "\n" # finish out the
self.info("</pre>")
self.info(proc_message.format(info=info))
self.info("Command completed. {info}".format(info=info))

for rc in returncodes:
if rc != 0:
Expand Down
5 changes: 4 additions & 1 deletion pypiper/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,10 @@ def run(self, start_point=None, stop_before=None, stop_after=None):

print(f"Running stage: {getattr(stage, 'name', str(stage))}")

stage.run()
try:
stage.run()
except Exception as e:
self.manager._triage_error(e, nofail=stage.nofail)
self.executed.append(stage)
self.checkpoint(stage)

Expand Down
14 changes: 13 additions & 1 deletion pypiper/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ class Stage(object):
collection of commands that is checkpointed.
"""

def __init__(self, func, f_args=None, f_kwargs=None, name=None, checkpoint=True):
def __init__(
self,
func,
f_args=None,
f_kwargs=None,
name=None,
checkpoint=True,
*,
nofail=False
):
"""
A function, perhaps with arguments, defines the stage.

Expand All @@ -26,6 +35,8 @@ def __init__(self, func, f_args=None, f_kwargs=None, name=None, checkpoint=True)
:param dict f_kwargs: Keyword arguments for func
:param str name: name for the phase/stage
:param callable func: Object that defines how the stage will execute.
:param bool nofail: Allow a failure of this stage to not fail the pipeline
in which it's running
"""
if isinstance(func, Stage):
raise TypeError("Cannot create Stage from Stage")
Expand All @@ -35,6 +46,7 @@ def __init__(self, func, f_args=None, f_kwargs=None, name=None, checkpoint=True)
self.f_kwargs = f_kwargs or dict()
self.name = name or func.__name__
self.checkpoint = checkpoint
self.nofail = nofail

@property
def checkpoint_name(self):
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements-dev-extra.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
black
Loading