Skip to content

Commit

Permalink
Merge pull request #376 from djarecka/nicolocin-pytest-slurm-tmpdir
Browse files Browse the repository at this point in the history
fixing issues with workflow on cluster with slurm (closes #369, #368)
  • Loading branch information
djarecka authored Nov 11, 2020
2 parents 2d1a0b3 + 7669b81 commit e168af4
Show file tree
Hide file tree
Showing 6 changed files with 584 additions and 233 deletions.
21 changes: 19 additions & 2 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Handle execution backends."""
import asyncio
import time
from .workers import SerialWorker, ConcurrentFuturesWorker, SlurmWorker, DaskWorker
from .core import is_workflow
from .helpers import get_open_loop, load_and_run_async
Expand Down Expand Up @@ -150,15 +151,31 @@ async def _run_workflow(self, wf, rerun=False):
The computed workflow
"""
for nd in wf.graph.nodes:
if nd.allow_cache_override:
nd.cache_dir = wf.cache_dir

# creating a copy of the graph that will be modified
# the copy contains new lists with original runnable objects
graph_copy = wf.graph.copy()
# keep track of pending futures
task_futures = set()
tasks, tasks_follow_errored = get_runnable_tasks(graph_copy)
while tasks or len(task_futures):
while tasks or task_futures or graph_copy.nodes:
if not tasks and not task_futures:
raise Exception("Nothing queued or todo - something went wrong")
# it's possible that task_futures is empty, but not able to get any
# tasks from graph_copy (using get_runnable_tasks)
# this might be related to some delays saving the files
# so try to get_runnable_tasks for another minut
ii = 0
while not tasks and graph_copy.nodes:
tasks, follow_err = get_runnable_tasks(graph_copy)
ii += 1
time.sleep(1)
if ii > 60:
raise Exception(
"graph is not empty, but not able to get more tasks - something is wrong (e.g. with the filesystem)"
)
for task in tasks:
# grab inputs if needed
logger.debug(f"Retrieving inputs for {task}")
Expand Down
9 changes: 6 additions & 3 deletions pydra/engine/tests/test_boutiques.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
)
@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter])
def test_boutiques_1(maskfile, plugin, results_function):
def test_boutiques_1(maskfile, plugin, results_function, tmpdir):
""" simple task to run fsl.bet using BoshTask"""
btask = BoshTask(name="NA", zenodo_id="1482743")
btask.inputs.infile = Infile
btask.inputs.maskfile = maskfile
btask.cache_dir = tmpdir
res = results_function(btask, plugin)

assert res.output.return_code == 0
Expand Down Expand Up @@ -97,11 +98,12 @@ def test_boutiques_spec_2():
@pytest.mark.parametrize(
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
)
def test_boutiques_wf_1(maskfile, plugin):
def test_boutiques_wf_1(maskfile, plugin, tmpdir):
""" wf with one task that runs fsl.bet using BoshTask"""
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])
wf.inputs.maskfile = maskfile
wf.inputs.infile = Infile
wf.cache_dir = tmpdir

wf.add(
BoshTask(
Expand All @@ -128,11 +130,12 @@ def test_boutiques_wf_1(maskfile, plugin):
@pytest.mark.parametrize(
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
)
def test_boutiques_wf_2(maskfile, plugin):
def test_boutiques_wf_2(maskfile, plugin, tmpdir):
""" wf with two BoshTasks (fsl.bet and fsl.stats) and one ShellTask"""
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])
wf.inputs.maskfile = maskfile
wf.inputs.infile = Infile
wf.cache_dir = tmpdir

wf.add(
BoshTask(
Expand Down
74 changes: 55 additions & 19 deletions pydra/engine/tests/test_node_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,10 @@ def test_odir_init():


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_1(plugin_dask_opt):
def test_task_nostate_1(plugin_dask_opt, tmpdir):
""" task without splitter"""
nn = fun_addtwo(name="NA", a=3)
nn.cache_dir = tmpdir
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

Expand Down Expand Up @@ -405,9 +406,10 @@ def test_task_nostate_1_call():


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_1_call_subm(plugin_dask_opt):
def test_task_nostate_1_call_subm(plugin_dask_opt, tmpdir):
""" task without splitter"""
nn = fun_addtwo(name="NA", a=3)
nn.cache_dir = tmpdir
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

Expand All @@ -422,9 +424,10 @@ def test_task_nostate_1_call_subm(plugin_dask_opt):


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_1_call_plug(plugin_dask_opt):
def test_task_nostate_1_call_plug(plugin_dask_opt, tmpdir):
""" task without splitter"""
nn = fun_addtwo(name="NA", a=3)
nn.cache_dir = tmpdir
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

Expand All @@ -450,9 +453,10 @@ def test_task_nostate_1_call_updateinp():
assert nn.output_dir.exists()


def test_task_nostate_2(plugin):
def test_task_nostate_2(plugin, tmpdir):
""" task with a list as an input, but no splitter"""
nn = moment(name="NA", n=3, lst=[2, 3, 4])
nn.cache_dir = tmpdir
assert np.allclose(nn.inputs.n, [3])
assert np.allclose(nn.inputs.lst, [2, 3, 4])
assert nn.state is None
Expand All @@ -467,9 +471,10 @@ def test_task_nostate_2(plugin):
assert nn.output_dir.exists()


def test_task_nostate_3(plugin):
def test_task_nostate_3(plugin, tmpdir):
""" task with a dictionary as an input"""
nn = fun_dict(name="NA", d={"a": "ala", "b": "bala"})
nn.cache_dir = tmpdir
assert nn.inputs.d == {"a": "ala", "b": "bala"}

with Submitter(plugin=plugin) as sub:
Expand All @@ -489,6 +494,7 @@ def test_task_nostate_4(plugin, tmpdir):
f.write("hello from pydra\n")

nn = fun_file(name="NA", filename=file1)
nn.cache_dir = tmpdir

with Submitter(plugin) as sub:
sub(nn)
Expand Down Expand Up @@ -719,13 +725,14 @@ def test_task_nostate_cachelocations_updated(plugin, tmpdir):

@pytest.mark.flaky(reruns=2) # when dask
@pytest.mark.parametrize("input_type", ["list", "array"])
def test_task_state_1(plugin_dask_opt, input_type):
def test_task_state_1(plugin_dask_opt, input_type, tmpdir):
""" task with the simplest splitter"""
a_in = [3, 5]
if input_type == "array":
a_in = np.array(a_in)

nn = fun_addtwo(name="NA").split(splitter="a", a=a_in)
nn.cache_dir = tmpdir

assert nn.state.splitter == "NA.a"
assert nn.state.splitter_rpn == ["NA.a"]
Expand Down Expand Up @@ -761,11 +768,12 @@ def test_task_state_1(plugin_dask_opt, input_type):
assert odir.exists()


def test_task_state_1a(plugin):
def test_task_state_1a(plugin, tmpdir):
""" task with the simplest splitter (inputs set separately)"""
nn = fun_addtwo(name="NA")
nn.split(splitter="a")
nn.inputs.a = [3, 5]
nn.cache_dir = tmpdir

assert nn.state.splitter == "NA.a"
assert nn.state.splitter_rpn == ["NA.a"]
Expand All @@ -781,11 +789,12 @@ def test_task_state_1a(plugin):
assert results[i].output.out == res[1]


def test_task_state_singl_1(plugin):
def test_task_state_singl_1(plugin, tmpdir):
""" Tasks with two inputs and a splitter (no combiner)
one input is a single value, the other is in the splitter and combiner
"""
nn = fun_addvar(name="NA").split(splitter="a", a=[3, 5], b=10)
nn.cache_dir = tmpdir

assert nn.inputs.a == [3, 5]
assert nn.inputs.b == 10
Expand Down Expand Up @@ -839,7 +848,14 @@ def test_task_state_singl_1(plugin):
)
@pytest.mark.parametrize("input_type", ["list", "array", "mixed"])
def test_task_state_2(
plugin, splitter, state_splitter, state_rpn, expected, expected_ind, input_type
plugin,
splitter,
state_splitter,
state_rpn,
expected,
expected_ind,
input_type,
tmpdir,
):
""" Tasks with two inputs and a splitter (no combiner)"""
a_in, b_in = [3, 5], [10, 20]
Expand All @@ -848,6 +864,8 @@ def test_task_state_2(
elif input_type == "mixed":
a_in = np.array(a_in)
nn = fun_addvar(name="NA").split(splitter=splitter, a=a_in, b=b_in)
nn.cache_dir = tmpdir

assert (nn.inputs.a == np.array([3, 5])).all()
assert (nn.inputs.b == np.array([10, 20])).all()
assert nn.state.splitter == state_splitter
Expand Down Expand Up @@ -883,9 +901,10 @@ def test_task_state_2(
assert odir.exists()


def test_task_state_3(plugin):
def test_task_state_3(plugin, tmpdir):
""" task with the simplest splitter, the input is an empty list"""
nn = fun_addtwo(name="NA").split(splitter="a", a=[])
nn.cache_dir = tmpdir

assert nn.state.splitter == "NA.a"
assert nn.state.splitter_rpn == ["NA.a"]
Expand All @@ -904,12 +923,14 @@ def test_task_state_3(plugin):


@pytest.mark.parametrize("input_type", ["list", "array"])
def test_task_state_4(plugin, input_type):
def test_task_state_4(plugin, input_type, tmpdir):
""" task with a list as an input, and a simple splitter """
lst_in = [[2, 3, 4], [1, 2, 3]]
if input_type == "array":
lst_in = np.array(lst_in)
nn = moment(name="NA", n=3, lst=lst_in).split(splitter="lst")
nn.cache_dir = tmpdir

assert np.allclose(nn.inputs.n, 3)
assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]])
assert nn.state.splitter == "NA.lst"
Expand All @@ -935,9 +956,11 @@ def test_task_state_4(plugin, input_type):
assert odir.exists()


def test_task_state_4a(plugin):
def test_task_state_4a(plugin, tmpdir):
""" task with a tuple as an input, and a simple splitter """
nn = moment(name="NA", n=3, lst=[(2, 3, 4), (1, 2, 3)]).split(splitter="lst")
nn.cache_dir = tmpdir

assert np.allclose(nn.inputs.n, 3)
assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]])
assert nn.state.splitter == "NA.lst"
Expand All @@ -955,11 +978,13 @@ def test_task_state_4a(plugin):
assert odir.exists()


def test_task_state_5(plugin):
def test_task_state_5(plugin, tmpdir):
""" task with a list as an input, and the variable is part of the scalar splitter"""
nn = moment(name="NA", n=[1, 3], lst=[[2, 3, 4], [1, 2, 3]]).split(
splitter=("n", "lst")
)
nn.cache_dir = tmpdir

assert np.allclose(nn.inputs.n, [1, 3])
assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]])
assert nn.state.splitter == ("NA.n", "NA.lst")
Expand All @@ -977,13 +1002,15 @@ def test_task_state_5(plugin):
assert odir.exists()


def test_task_state_5_exception(plugin):
def test_task_state_5_exception(plugin, tmpdir):
""" task with a list as an input, and the variable is part of the scalar splitter
the shapes are not matching, so exception should be raised
"""
nn = moment(name="NA", n=[1, 3, 3], lst=[[2, 3, 4], [1, 2, 3]]).split(
splitter=("n", "lst")
)
nn.cache_dir = tmpdir

assert np.allclose(nn.inputs.n, [1, 3, 3])
assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]])
assert nn.state.splitter == ("NA.n", "NA.lst")
Expand All @@ -994,11 +1021,13 @@ def test_task_state_5_exception(plugin):
assert "shape" in str(excinfo.value)


def test_task_state_6(plugin):
def test_task_state_6(plugin, tmpdir):
""" ask with a list as an input, and the variable is part of the outer splitter """
nn = moment(name="NA", n=[1, 3], lst=[[2, 3, 4], [1, 2, 3]]).split(
splitter=["n", "lst"]
)
nn.cache_dir = tmpdir

assert np.allclose(nn.inputs.n, [1, 3])
assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]])
assert nn.state.splitter == ["NA.n", "NA.lst"]
Expand All @@ -1016,11 +1045,13 @@ def test_task_state_6(plugin):
assert odir.exists()


def test_task_state_6a(plugin):
def test_task_state_6a(plugin, tmpdir):
""" ask with a tuple as an input, and the variable is part of the outer splitter """
nn = moment(name="NA", n=[1, 3], lst=[(2, 3, 4), (1, 2, 3)]).split(
splitter=["n", "lst"]
)
nn.cache_dir = tmpdir

assert np.allclose(nn.inputs.n, [1, 3])
assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]])
assert nn.state.splitter == ["NA.n", "NA.lst"]
Expand All @@ -1039,9 +1070,10 @@ def test_task_state_6a(plugin):


@pytest.mark.flaky(reruns=2) # when dask
def test_task_state_comb_1(plugin_dask_opt):
def test_task_state_comb_1(plugin_dask_opt, tmpdir):
""" task with the simplest splitter and combiner"""
nn = fun_addtwo(name="NA").split(a=[3, 5], splitter="a").combine(combiner="a")
nn.cache_dir = tmpdir

assert (nn.inputs.a == np.array([3, 5])).all()

Expand Down Expand Up @@ -1173,13 +1205,15 @@ def test_task_state_comb_2(
state_rpn_final,
expected,
expected_val,
tmpdir,
):
""" Tasks with scalar and outer splitters and partial or full combiners"""
nn = (
fun_addvar(name="NA")
.split(a=[3, 5], b=[10, 20], splitter=splitter)
.combine(combiner=combiner)
)
nn.cache_dir = tmpdir

assert (nn.inputs.a == np.array([3, 5])).all()

Expand Down Expand Up @@ -1219,11 +1253,12 @@ def test_task_state_comb_2(
assert odir.exists()


def test_task_state_comb_singl_1(plugin):
def test_task_state_comb_singl_1(plugin, tmpdir):
""" Tasks with two inputs;
one input is a single value, the other is in the splitter and combiner
"""
nn = fun_addvar(name="NA").split(splitter="a", a=[3, 5], b=10).combine(combiner="a")
nn.cache_dir = tmpdir

assert nn.inputs.a == [3, 5]
assert nn.inputs.b == 10
Expand All @@ -1248,9 +1283,10 @@ def test_task_state_comb_singl_1(plugin):
assert odir.exists()


def test_task_state_comb_3(plugin):
def test_task_state_comb_3(plugin, tmpdir):
""" task with the simplest splitter, the input is an empty list"""
nn = fun_addtwo(name="NA").split(splitter="a", a=[]).combine(combiner=["a"])
nn.cache_dir = tmpdir

assert nn.state.splitter == "NA.a"
assert nn.state.splitter_rpn == ["NA.a"]
Expand Down
Loading

0 comments on commit e168af4

Please sign in to comment.