diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index cd8c61b84b..56d711de1c 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -1,5 +1,6 @@ """Handle execution backends.""" import asyncio +import time from .workers import SerialWorker, ConcurrentFuturesWorker, SlurmWorker, DaskWorker from .core import is_workflow from .helpers import get_open_loop, load_and_run_async @@ -150,15 +151,31 @@ async def _run_workflow(self, wf, rerun=False): The computed workflow """ + for nd in wf.graph.nodes: + if nd.allow_cache_override: + nd.cache_dir = wf.cache_dir + # creating a copy of the graph that will be modified # the copy contains new lists with original runnable objects graph_copy = wf.graph.copy() # keep track of pending futures task_futures = set() tasks, tasks_follow_errored = get_runnable_tasks(graph_copy) - while tasks or len(task_futures): + while tasks or task_futures or graph_copy.nodes: if not tasks and not task_futures: - raise Exception("Nothing queued or todo - something went wrong") + # it's possible that task_futures is empty, but not able to get any + # tasks from graph_copy (using get_runnable_tasks) + # this might be related to some delays saving the files + # so try to get_runnable_tasks for another minut + ii = 0 + while not tasks and graph_copy.nodes: + tasks, follow_err = get_runnable_tasks(graph_copy) + ii += 1 + time.sleep(1) + if ii > 60: + raise Exception( + "graph is not empty, but not able to get more tasks - something is wrong (e.g. with the filesystem)" + ) for task in tasks: # grab inputs if needed logger.debug(f"Retrieving inputs for {task}") diff --git a/pydra/engine/tests/test_boutiques.py b/pydra/engine/tests/test_boutiques.py index b2ea022b77..4f6665bac3 100644 --- a/pydra/engine/tests/test_boutiques.py +++ b/pydra/engine/tests/test_boutiques.py @@ -26,11 +26,12 @@ "maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"] ) @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_boutiques_1(maskfile, plugin, results_function): +def test_boutiques_1(maskfile, plugin, results_function, tmpdir): """ simple task to run fsl.bet using BoshTask""" btask = BoshTask(name="NA", zenodo_id="1482743") btask.inputs.infile = Infile btask.inputs.maskfile = maskfile + btask.cache_dir = tmpdir res = results_function(btask, plugin) assert res.output.return_code == 0 @@ -97,11 +98,12 @@ def test_boutiques_spec_2(): @pytest.mark.parametrize( "maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"] ) -def test_boutiques_wf_1(maskfile, plugin): +def test_boutiques_wf_1(maskfile, plugin, tmpdir): """ wf with one task that runs fsl.bet using BoshTask""" wf = Workflow(name="wf", input_spec=["maskfile", "infile"]) wf.inputs.maskfile = maskfile wf.inputs.infile = Infile + wf.cache_dir = tmpdir wf.add( BoshTask( @@ -128,11 +130,12 @@ def test_boutiques_wf_1(maskfile, plugin): @pytest.mark.parametrize( "maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"] ) -def test_boutiques_wf_2(maskfile, plugin): +def test_boutiques_wf_2(maskfile, plugin, tmpdir): """ wf with two BoshTasks (fsl.bet and fsl.stats) and one ShellTask""" wf = Workflow(name="wf", input_spec=["maskfile", "infile"]) wf.inputs.maskfile = maskfile wf.inputs.infile = Infile + wf.cache_dir = tmpdir wf.add( BoshTask( diff --git a/pydra/engine/tests/test_node_task.py b/pydra/engine/tests/test_node_task.py index 518cf95a02..34ec3c11ca 100644 --- a/pydra/engine/tests/test_node_task.py +++ b/pydra/engine/tests/test_node_task.py @@ -365,9 +365,10 @@ def test_odir_init(): @pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_1(plugin_dask_opt): +def test_task_nostate_1(plugin_dask_opt, tmpdir): """ task without splitter""" nn = fun_addtwo(name="NA", a=3) + nn.cache_dir = tmpdir assert np.allclose(nn.inputs.a, [3]) assert nn.state is None @@ -405,9 +406,10 @@ def test_task_nostate_1_call(): @pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_1_call_subm(plugin_dask_opt): +def test_task_nostate_1_call_subm(plugin_dask_opt, tmpdir): """ task without splitter""" nn = fun_addtwo(name="NA", a=3) + nn.cache_dir = tmpdir assert np.allclose(nn.inputs.a, [3]) assert nn.state is None @@ -422,9 +424,10 @@ def test_task_nostate_1_call_subm(plugin_dask_opt): @pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_1_call_plug(plugin_dask_opt): +def test_task_nostate_1_call_plug(plugin_dask_opt, tmpdir): """ task without splitter""" nn = fun_addtwo(name="NA", a=3) + nn.cache_dir = tmpdir assert np.allclose(nn.inputs.a, [3]) assert nn.state is None @@ -450,9 +453,10 @@ def test_task_nostate_1_call_updateinp(): assert nn.output_dir.exists() -def test_task_nostate_2(plugin): +def test_task_nostate_2(plugin, tmpdir): """ task with a list as an input, but no splitter""" nn = moment(name="NA", n=3, lst=[2, 3, 4]) + nn.cache_dir = tmpdir assert np.allclose(nn.inputs.n, [3]) assert np.allclose(nn.inputs.lst, [2, 3, 4]) assert nn.state is None @@ -467,9 +471,10 @@ def test_task_nostate_2(plugin): assert nn.output_dir.exists() -def test_task_nostate_3(plugin): +def test_task_nostate_3(plugin, tmpdir): """ task with a dictionary as an input""" nn = fun_dict(name="NA", d={"a": "ala", "b": "bala"}) + nn.cache_dir = tmpdir assert nn.inputs.d == {"a": "ala", "b": "bala"} with Submitter(plugin=plugin) as sub: @@ -489,6 +494,7 @@ def test_task_nostate_4(plugin, tmpdir): f.write("hello from pydra\n") nn = fun_file(name="NA", filename=file1) + nn.cache_dir = tmpdir with Submitter(plugin) as sub: sub(nn) @@ -719,13 +725,14 @@ def test_task_nostate_cachelocations_updated(plugin, tmpdir): @pytest.mark.flaky(reruns=2) # when dask @pytest.mark.parametrize("input_type", ["list", "array"]) -def test_task_state_1(plugin_dask_opt, input_type): +def test_task_state_1(plugin_dask_opt, input_type, tmpdir): """ task with the simplest splitter""" a_in = [3, 5] if input_type == "array": a_in = np.array(a_in) nn = fun_addtwo(name="NA").split(splitter="a", a=a_in) + nn.cache_dir = tmpdir assert nn.state.splitter == "NA.a" assert nn.state.splitter_rpn == ["NA.a"] @@ -761,11 +768,12 @@ def test_task_state_1(plugin_dask_opt, input_type): assert odir.exists() -def test_task_state_1a(plugin): +def test_task_state_1a(plugin, tmpdir): """ task with the simplest splitter (inputs set separately)""" nn = fun_addtwo(name="NA") nn.split(splitter="a") nn.inputs.a = [3, 5] + nn.cache_dir = tmpdir assert nn.state.splitter == "NA.a" assert nn.state.splitter_rpn == ["NA.a"] @@ -781,11 +789,12 @@ def test_task_state_1a(plugin): assert results[i].output.out == res[1] -def test_task_state_singl_1(plugin): +def test_task_state_singl_1(plugin, tmpdir): """ Tasks with two inputs and a splitter (no combiner) one input is a single value, the other is in the splitter and combiner """ nn = fun_addvar(name="NA").split(splitter="a", a=[3, 5], b=10) + nn.cache_dir = tmpdir assert nn.inputs.a == [3, 5] assert nn.inputs.b == 10 @@ -839,7 +848,14 @@ def test_task_state_singl_1(plugin): ) @pytest.mark.parametrize("input_type", ["list", "array", "mixed"]) def test_task_state_2( - plugin, splitter, state_splitter, state_rpn, expected, expected_ind, input_type + plugin, + splitter, + state_splitter, + state_rpn, + expected, + expected_ind, + input_type, + tmpdir, ): """ Tasks with two inputs and a splitter (no combiner)""" a_in, b_in = [3, 5], [10, 20] @@ -848,6 +864,8 @@ def test_task_state_2( elif input_type == "mixed": a_in = np.array(a_in) nn = fun_addvar(name="NA").split(splitter=splitter, a=a_in, b=b_in) + nn.cache_dir = tmpdir + assert (nn.inputs.a == np.array([3, 5])).all() assert (nn.inputs.b == np.array([10, 20])).all() assert nn.state.splitter == state_splitter @@ -883,9 +901,10 @@ def test_task_state_2( assert odir.exists() -def test_task_state_3(plugin): +def test_task_state_3(plugin, tmpdir): """ task with the simplest splitter, the input is an empty list""" nn = fun_addtwo(name="NA").split(splitter="a", a=[]) + nn.cache_dir = tmpdir assert nn.state.splitter == "NA.a" assert nn.state.splitter_rpn == ["NA.a"] @@ -904,12 +923,14 @@ def test_task_state_3(plugin): @pytest.mark.parametrize("input_type", ["list", "array"]) -def test_task_state_4(plugin, input_type): +def test_task_state_4(plugin, input_type, tmpdir): """ task with a list as an input, and a simple splitter """ lst_in = [[2, 3, 4], [1, 2, 3]] if input_type == "array": lst_in = np.array(lst_in) nn = moment(name="NA", n=3, lst=lst_in).split(splitter="lst") + nn.cache_dir = tmpdir + assert np.allclose(nn.inputs.n, 3) assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]]) assert nn.state.splitter == "NA.lst" @@ -935,9 +956,11 @@ def test_task_state_4(plugin, input_type): assert odir.exists() -def test_task_state_4a(plugin): +def test_task_state_4a(plugin, tmpdir): """ task with a tuple as an input, and a simple splitter """ nn = moment(name="NA", n=3, lst=[(2, 3, 4), (1, 2, 3)]).split(splitter="lst") + nn.cache_dir = tmpdir + assert np.allclose(nn.inputs.n, 3) assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]]) assert nn.state.splitter == "NA.lst" @@ -955,11 +978,13 @@ def test_task_state_4a(plugin): assert odir.exists() -def test_task_state_5(plugin): +def test_task_state_5(plugin, tmpdir): """ task with a list as an input, and the variable is part of the scalar splitter""" nn = moment(name="NA", n=[1, 3], lst=[[2, 3, 4], [1, 2, 3]]).split( splitter=("n", "lst") ) + nn.cache_dir = tmpdir + assert np.allclose(nn.inputs.n, [1, 3]) assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]]) assert nn.state.splitter == ("NA.n", "NA.lst") @@ -977,13 +1002,15 @@ def test_task_state_5(plugin): assert odir.exists() -def test_task_state_5_exception(plugin): +def test_task_state_5_exception(plugin, tmpdir): """ task with a list as an input, and the variable is part of the scalar splitter the shapes are not matching, so exception should be raised """ nn = moment(name="NA", n=[1, 3, 3], lst=[[2, 3, 4], [1, 2, 3]]).split( splitter=("n", "lst") ) + nn.cache_dir = tmpdir + assert np.allclose(nn.inputs.n, [1, 3, 3]) assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]]) assert nn.state.splitter == ("NA.n", "NA.lst") @@ -994,11 +1021,13 @@ def test_task_state_5_exception(plugin): assert "shape" in str(excinfo.value) -def test_task_state_6(plugin): +def test_task_state_6(plugin, tmpdir): """ ask with a list as an input, and the variable is part of the outer splitter """ nn = moment(name="NA", n=[1, 3], lst=[[2, 3, 4], [1, 2, 3]]).split( splitter=["n", "lst"] ) + nn.cache_dir = tmpdir + assert np.allclose(nn.inputs.n, [1, 3]) assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]]) assert nn.state.splitter == ["NA.n", "NA.lst"] @@ -1016,11 +1045,13 @@ def test_task_state_6(plugin): assert odir.exists() -def test_task_state_6a(plugin): +def test_task_state_6a(plugin, tmpdir): """ ask with a tuple as an input, and the variable is part of the outer splitter """ nn = moment(name="NA", n=[1, 3], lst=[(2, 3, 4), (1, 2, 3)]).split( splitter=["n", "lst"] ) + nn.cache_dir = tmpdir + assert np.allclose(nn.inputs.n, [1, 3]) assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]]) assert nn.state.splitter == ["NA.n", "NA.lst"] @@ -1039,9 +1070,10 @@ def test_task_state_6a(plugin): @pytest.mark.flaky(reruns=2) # when dask -def test_task_state_comb_1(plugin_dask_opt): +def test_task_state_comb_1(plugin_dask_opt, tmpdir): """ task with the simplest splitter and combiner""" nn = fun_addtwo(name="NA").split(a=[3, 5], splitter="a").combine(combiner="a") + nn.cache_dir = tmpdir assert (nn.inputs.a == np.array([3, 5])).all() @@ -1173,6 +1205,7 @@ def test_task_state_comb_2( state_rpn_final, expected, expected_val, + tmpdir, ): """ Tasks with scalar and outer splitters and partial or full combiners""" nn = ( @@ -1180,6 +1213,7 @@ def test_task_state_comb_2( .split(a=[3, 5], b=[10, 20], splitter=splitter) .combine(combiner=combiner) ) + nn.cache_dir = tmpdir assert (nn.inputs.a == np.array([3, 5])).all() @@ -1219,11 +1253,12 @@ def test_task_state_comb_2( assert odir.exists() -def test_task_state_comb_singl_1(plugin): +def test_task_state_comb_singl_1(plugin, tmpdir): """ Tasks with two inputs; one input is a single value, the other is in the splitter and combiner """ nn = fun_addvar(name="NA").split(splitter="a", a=[3, 5], b=10).combine(combiner="a") + nn.cache_dir = tmpdir assert nn.inputs.a == [3, 5] assert nn.inputs.b == 10 @@ -1248,9 +1283,10 @@ def test_task_state_comb_singl_1(plugin): assert odir.exists() -def test_task_state_comb_3(plugin): +def test_task_state_comb_3(plugin, tmpdir): """ task with the simplest splitter, the input is an empty list""" nn = fun_addtwo(name="NA").split(splitter="a", a=[]).combine(combiner=["a"]) + nn.cache_dir = tmpdir assert nn.state.splitter == "NA.a" assert nn.state.splitter_rpn == ["NA.a"] diff --git a/pydra/engine/tests/test_numpy_examples.py b/pydra/engine/tests/test_numpy_examples.py index 35b8972319..572d8707a2 100644 --- a/pydra/engine/tests/test_numpy_examples.py +++ b/pydra/engine/tests/test_numpy_examples.py @@ -17,12 +17,13 @@ def arrayout(val): return np.array([val, val]) -def test_multiout(plugin): +def test_multiout(plugin, tmpdir): """ testing a simple function that returns a numpy array""" wf = Workflow("wf", input_spec=["val"], val=2) wf.add(arrayout(name="mo", val=wf.lzin.val)) wf.set_output([("array", wf.mo.lzout.b)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin, n_procs=2) as sub: sub(runnable=wf) @@ -33,13 +34,14 @@ def test_multiout(plugin): assert np.array_equal(results[1].output.array, np.array([2, 2])) -def test_multiout_st(plugin): +def test_multiout_st(plugin, tmpdir): """ testing a simple function that returns a numpy array, adding splitter""" wf = Workflow("wf", input_spec=["val"], val=[0, 1, 2]) wf.add(arrayout(name="mo", val=wf.lzin.val)) wf.mo.split("val").combine("val") wf.set_output([("array", wf.mo.lzout.b)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin, n_procs=2) as sub: sub(runnable=wf) diff --git a/pydra/engine/tests/test_shelltask.py b/pydra/engine/tests/test_shelltask.py index ba95c6af81..bbcf637cb7 100644 --- a/pydra/engine/tests/test_shelltask.py +++ b/pydra/engine/tests/test_shelltask.py @@ -17,10 +17,10 @@ @pytest.mark.flaky(reruns=2) # when dask @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_1(plugin_dask_opt, results_function): +def test_shell_cmd_1(plugin_dask_opt, results_function, tmpdir): """ simple command, no arguments """ cmd = ["pwd"] - shelly = ShellCommandTask(name="shelly", executable=cmd) + shelly = ShellCommandTask(name="shelly", executable=cmd, cache_dir=tmpdir) assert shelly.cmdline == " ".join(cmd) res = results_function(shelly, plugin=plugin_dask_opt) @@ -30,12 +30,13 @@ def test_shell_cmd_1(plugin_dask_opt, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_1_strip(plugin, results_function): +def test_shell_cmd_1_strip(plugin, results_function, tmpdir): """ simple command, no arguments strip option to remove \n at the end os stdout """ cmd = ["pwd"] shelly = ShellCommandTask(name="shelly", executable=cmd, strip=True) + shelly.cache_dir = tmpdir assert shelly.cmdline == " ".join(cmd) res = results_function(shelly, plugin) @@ -45,10 +46,11 @@ def test_shell_cmd_1_strip(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_2(plugin, results_function): +def test_shell_cmd_2(plugin, results_function, tmpdir): """ a command with arguments, cmd and args given as executable """ cmd = ["echo", "hail", "pydra"] shelly = ShellCommandTask(name="shelly", executable=cmd) + shelly.cache_dir = tmpdir assert shelly.cmdline == " ".join(cmd) res = results_function(shelly, plugin) @@ -58,12 +60,13 @@ def test_shell_cmd_2(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_2a(plugin, results_function): +def test_shell_cmd_2a(plugin, results_function, tmpdir): """ a command with arguments, using executable and args """ cmd_exec = "echo" cmd_args = ["hail", "pydra"] # separate command into exec + args shelly = ShellCommandTask(name="shelly", executable=cmd_exec, args=cmd_args) + shelly.cache_dir = tmpdir assert shelly.inputs.executable == "echo" assert shelly.cmdline == "echo " + " ".join(cmd_args) @@ -74,12 +77,13 @@ def test_shell_cmd_2a(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_2b(plugin, results_function): +def test_shell_cmd_2b(plugin, results_function, tmpdir): """ a command with arguments, using strings executable and args """ cmd_exec = "echo" cmd_args = "pydra" # separate command into exec + args shelly = ShellCommandTask(name="shelly", executable=cmd_exec, args=cmd_args) + shelly.cache_dir = tmpdir assert shelly.inputs.executable == "echo" assert shelly.cmdline == "echo pydra" @@ -93,7 +97,7 @@ def test_shell_cmd_2b(plugin, results_function): @pytest.mark.flaky(reruns=2) -def test_shell_cmd_3(plugin_dask_opt): +def test_shell_cmd_3(plugin_dask_opt, tmpdir): """ commands without arguments splitter = executable """ @@ -101,6 +105,8 @@ def test_shell_cmd_3(plugin_dask_opt): # all args given as executable shelly = ShellCommandTask(name="shelly", executable=cmd).split("executable") + shelly.cache_dir = tmpdir + assert shelly.cmdline == ["pwd", "whoami"] res = shelly(plugin=plugin_dask_opt) assert Path(res[0].output.stdout.rstrip()) == shelly.output_dir[0] @@ -113,7 +119,7 @@ def test_shell_cmd_3(plugin_dask_opt): assert res[0].output.stderr == res[1].output.stderr == "" -def test_shell_cmd_4(plugin): +def test_shell_cmd_4(plugin, tmpdir): """ a command with arguments, using executable and args splitter=args """ @@ -123,6 +129,8 @@ def test_shell_cmd_4(plugin): shelly = ShellCommandTask(name="shelly", executable=cmd_exec, args=cmd_args).split( splitter="args" ) + shelly.cache_dir = tmpdir + assert shelly.inputs.executable == "echo" assert shelly.inputs.args == ["nipype", "pydra"] assert shelly.cmdline == ["echo nipype", "echo pydra"] @@ -135,7 +143,7 @@ def test_shell_cmd_4(plugin): assert res[0].output.stderr == res[1].output.stderr == "" -def test_shell_cmd_5(plugin): +def test_shell_cmd_5(plugin, tmpdir): """ a command with arguments using splitter and combiner for args """ @@ -147,6 +155,8 @@ def test_shell_cmd_5(plugin): .split(splitter="args") .combine("args") ) + shelly.cache_dir = tmpdir + assert shelly.inputs.executable == "echo" assert shelly.inputs.args == ["nipype", "pydra"] assert shelly.cmdline == ["echo nipype", "echo pydra"] @@ -156,7 +166,7 @@ def test_shell_cmd_5(plugin): assert res[1].output.stdout == "pydra\n" -def test_shell_cmd_6(plugin): +def test_shell_cmd_6(plugin, tmpdir): """ a command with arguments, outer splitter for executable and args """ @@ -166,6 +176,8 @@ def test_shell_cmd_6(plugin): shelly = ShellCommandTask(name="shelly", executable=cmd_exec, args=cmd_args).split( splitter=["executable", "args"] ) + shelly.cache_dir = tmpdir + assert shelly.inputs.executable == ["echo", ["echo", "-n"]] assert shelly.inputs.args == ["nipype", "pydra"] assert shelly.cmdline == [ @@ -197,7 +209,7 @@ def test_shell_cmd_6(plugin): ) -def test_shell_cmd_7(plugin): +def test_shell_cmd_7(plugin, tmpdir): """ a command with arguments, outer splitter for executable and args, and combiner=args """ @@ -209,6 +221,8 @@ def test_shell_cmd_7(plugin): .split(splitter=["executable", "args"]) .combine("args") ) + shelly.cache_dir = tmpdir + assert shelly.inputs.executable == ["echo", ["echo", "-n"]] assert shelly.inputs.args == ["nipype", "pydra"] @@ -224,7 +238,7 @@ def test_shell_cmd_7(plugin): # tests with workflows -def test_wf_shell_cmd_1(plugin): +def test_wf_shell_cmd_1(plugin, tmpdir): """ a workflow with two connected commands""" wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) wf.inputs.cmd1 = "pwd" @@ -237,6 +251,7 @@ def test_wf_shell_cmd_1(plugin): ) wf.set_output([("out", wf.shelly_ls.lzout.stdout)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: wf(submitter=sub) @@ -250,7 +265,7 @@ def test_wf_shell_cmd_1(plugin): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_1(plugin, results_function, use_validator): +def test_shell_cmd_inputspec_1(plugin, results_function, use_validator, tmpdir): """ a command with executable, args and one command opt, using a customized input_spec to add the opt to the command in the right place that is specified in metadata["cmd_pos"] @@ -279,6 +294,7 @@ def test_shell_cmd_inputspec_1(plugin, results_function, use_validator): args=cmd_args, opt_n=cmd_opt, input_spec=my_input_spec, + cache_dir=tmpdir, ) assert shelly.inputs.executable == cmd_exec assert shelly.inputs.args == cmd_args @@ -289,7 +305,7 @@ def test_shell_cmd_inputspec_1(plugin, results_function, use_validator): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_2(plugin, results_function, use_validator): +def test_shell_cmd_inputspec_2(plugin, results_function, use_validator, tmpdir): """ a command with executable, args and two command options, using a customized input_spec to add the opt to the command in the right place that is specified in metadata["cmd_pos"] @@ -327,6 +343,7 @@ def test_shell_cmd_inputspec_2(plugin, results_function, use_validator): opt_n=cmd_opt, opt_hello=cmd_opt_hello, input_spec=my_input_spec, + cache_dir=tmpdir, ) assert shelly.inputs.executable == cmd_exec assert shelly.inputs.args == cmd_args @@ -336,7 +353,7 @@ def test_shell_cmd_inputspec_2(plugin, results_function, use_validator): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_3(plugin, results_function): +def test_shell_cmd_inputspec_3(plugin, results_function, tmpdir): """ mandatory field added to fields, value provided """ cmd_exec = "echo" hello = "HELLO" @@ -361,7 +378,11 @@ def test_shell_cmd_inputspec_3(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, text=hello, input_spec=my_input_spec + name="shelly", + executable=cmd_exec, + text=hello, + input_spec=my_input_spec, + cache_dir=tmpdir, ) assert shelly.inputs.executable == cmd_exec assert shelly.cmdline == "echo HELLO" @@ -370,7 +391,7 @@ def test_shell_cmd_inputspec_3(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_3a(plugin, results_function): +def test_shell_cmd_inputspec_3a(plugin, results_function, tmpdir): """ mandatory field added to fields, value provided using shorter syntax for input spec (no attr.ib) """ @@ -390,7 +411,11 @@ def test_shell_cmd_inputspec_3a(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, text=hello, input_spec=my_input_spec + name="shelly", + executable=cmd_exec, + text=hello, + input_spec=my_input_spec, + cache_dir=tmpdir, ) assert shelly.inputs.executable == cmd_exec assert shelly.cmdline == "echo HELLO" @@ -399,7 +424,7 @@ def test_shell_cmd_inputspec_3a(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_3b(plugin, results_function): +def test_shell_cmd_inputspec_3b(plugin, results_function, tmpdir): """ mandatory field added to fields, value provided after init""" cmd_exec = "echo" hello = "HELLO" @@ -424,16 +449,17 @@ def test_shell_cmd_inputspec_3b(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, input_spec=my_input_spec + name="shelly", executable=cmd_exec, input_spec=my_input_spec, cache_dir=tmpdir ) shelly.inputs.text = hello + assert shelly.inputs.executable == cmd_exec assert shelly.cmdline == "echo HELLO" res = results_function(shelly, plugin) assert res.output.stdout == "HELLO\n" -def test_shell_cmd_inputspec_3c_exception(plugin): +def test_shell_cmd_inputspec_3c_exception(plugin, tmpdir): """ mandatory field added to fields, value is not provided, so exception is raised """ cmd_exec = "echo" my_input_spec = SpecInfo( @@ -456,15 +482,16 @@ def test_shell_cmd_inputspec_3c_exception(plugin): ) shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, input_spec=my_input_spec + name="shelly", executable=cmd_exec, input_spec=my_input_spec, cache_dir=tmpdir ) + with pytest.raises(Exception) as excinfo: shelly() assert "mandatory" in str(excinfo.value) @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_3c(plugin, results_function): +def test_shell_cmd_inputspec_3c(plugin, results_function, tmpdir): """ mandatory=False, so tasks runs fine even without the value """ cmd_exec = "echo" my_input_spec = SpecInfo( @@ -489,8 +516,9 @@ def test_shell_cmd_inputspec_3c(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, input_spec=my_input_spec + name="shelly", executable=cmd_exec, input_spec=my_input_spec, cache_dir=tmpdir ) + assert shelly.inputs.executable == cmd_exec assert shelly.cmdline == "echo" res = results_function(shelly, plugin) @@ -498,7 +526,7 @@ def test_shell_cmd_inputspec_3c(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_4(plugin, results_function): +def test_shell_cmd_inputspec_4(plugin, results_function, tmpdir): """ mandatory field added to fields, value provided """ cmd_exec = "echo" my_input_spec = SpecInfo( @@ -518,7 +546,7 @@ def test_shell_cmd_inputspec_4(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, input_spec=my_input_spec + name="shelly", executable=cmd_exec, input_spec=my_input_spec, cache_dir=tmpdir ) assert shelly.inputs.executable == cmd_exec @@ -529,7 +557,7 @@ def test_shell_cmd_inputspec_4(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_4a(plugin, results_function): +def test_shell_cmd_inputspec_4a(plugin, results_function, tmpdir): """ mandatory field added to fields, value provided using shorter syntax for input spec (no attr.ib) """ @@ -544,7 +572,7 @@ def test_shell_cmd_inputspec_4a(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, input_spec=my_input_spec + name="shelly", executable=cmd_exec, input_spec=my_input_spec, cache_dir=tmpdir ) assert shelly.inputs.executable == cmd_exec @@ -555,7 +583,7 @@ def test_shell_cmd_inputspec_4a(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_4b(plugin, results_function): +def test_shell_cmd_inputspec_4b(plugin, results_function, tmpdir): """ mandatory field added to fields, value provided """ cmd_exec = "echo" my_input_spec = SpecInfo( @@ -575,7 +603,7 @@ def test_shell_cmd_inputspec_4b(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, input_spec=my_input_spec + name="shelly", executable=cmd_exec, input_spec=my_input_spec, cache_dir=tmpdir ) assert shelly.inputs.executable == cmd_exec @@ -654,7 +682,7 @@ def test_shell_cmd_inputspec_4d_exception(plugin): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_5_nosubm(plugin, results_function): +def test_shell_cmd_inputspec_5_nosubm(plugin, results_function, tmpdir): """ checking xor in metadata: task should work fine, since only one option is True""" cmd_exec = "ls" cmd_t = True @@ -691,14 +719,18 @@ def test_shell_cmd_inputspec_5_nosubm(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, opt_t=cmd_t, input_spec=my_input_spec + name="shelly", + executable=cmd_exec, + opt_t=cmd_t, + input_spec=my_input_spec, + cache_dir=tmpdir, ) assert shelly.inputs.executable == cmd_exec assert shelly.cmdline == "ls -t" res = results_function(shelly, plugin) -def test_shell_cmd_inputspec_5a_exception(plugin): +def test_shell_cmd_inputspec_5a_exception(plugin, tmpdir): """ checking xor in metadata: both options are True, so the task raises exception""" cmd_exec = "ls" cmd_t = True @@ -740,6 +772,7 @@ def test_shell_cmd_inputspec_5a_exception(plugin): opt_t=cmd_t, opt_S=cmd_S, input_spec=my_input_spec, + cache_dir=tmpdir, ) with pytest.raises(Exception) as excinfo: shelly() @@ -747,7 +780,7 @@ def test_shell_cmd_inputspec_5a_exception(plugin): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_6(plugin, results_function): +def test_shell_cmd_inputspec_6(plugin, results_function, tmpdir): """ checking requires in metadata: the required field is set in the init, so the task works fine """ @@ -787,6 +820,7 @@ def test_shell_cmd_inputspec_6(plugin, results_function): opt_t=cmd_t, opt_l=cmd_l, input_spec=my_input_spec, + cache_dir=tmpdir, ) assert shelly.inputs.executable == cmd_exec assert shelly.cmdline == "ls -l -t" @@ -834,7 +868,7 @@ def test_shell_cmd_inputspec_6a_exception(plugin): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_6b(plugin, results_function): +def test_shell_cmd_inputspec_6b(plugin, results_function, tmpdir): """ checking requires in metadata: the required field set after the init """ @@ -874,6 +908,7 @@ def test_shell_cmd_inputspec_6b(plugin, results_function): opt_t=cmd_t, # opt_l=cmd_l, input_spec=my_input_spec, + cache_dir=tmpdir, ) shelly.inputs.opt_l = cmd_l assert shelly.inputs.executable == cmd_exec @@ -882,7 +917,7 @@ def test_shell_cmd_inputspec_6b(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_7(plugin, results_function): +def test_shell_cmd_inputspec_7(plugin, results_function, tmpdir): """ providing output name using input_spec, using name_tamplate in metadata @@ -908,7 +943,11 @@ def test_shell_cmd_inputspec_7(plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, args=args, input_spec=my_input_spec + name="shelly", + executable=cmd, + args=args, + input_spec=my_input_spec, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -920,7 +959,7 @@ def test_shell_cmd_inputspec_7(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_7a(plugin, results_function): +def test_shell_cmd_inputspec_7a(plugin, results_function, tmpdir): """ providing output name using input_spec, using name_tamplate in metadata @@ -948,7 +987,11 @@ def test_shell_cmd_inputspec_7a(plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, args=args, input_spec=my_input_spec + name="shelly", + executable=cmd, + args=args, + input_spec=my_input_spec, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -960,7 +1003,7 @@ def test_shell_cmd_inputspec_7a(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_7b(plugin, results_function): +def test_shell_cmd_inputspec_7b(plugin, results_function, tmpdir): """ providing new file and output name using input_spec, using name_template in metadata @@ -996,6 +1039,7 @@ def test_shell_cmd_inputspec_7b(plugin, results_function): executable=cmd, newfile="newfile_tmp.txt", input_spec=my_input_spec, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1052,6 +1096,7 @@ def test_shell_cmd_inputspec_8(plugin, results_function, tmpdir): newfile="newfile_tmp.txt", time="02121010", input_spec=my_input_spec, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1108,6 +1153,7 @@ def test_shell_cmd_inputspec_8a(plugin, results_function, tmpdir): newfile="newfile_tmp.txt", time="02121010", input_spec=my_input_spec, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1151,7 +1197,11 @@ def test_shell_cmd_inputspec_9(tmpdir, plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, input_spec=my_input_spec, file_orig=file + name="shelly", + executable=cmd, + input_spec=my_input_spec, + file_orig=file, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1199,7 +1249,11 @@ def test_shell_cmd_inputspec_9a(tmpdir, plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, input_spec=my_input_spec, file_orig=file + name="shelly", + executable=cmd, + input_spec=my_input_spec, + file_orig=file, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1246,7 +1300,11 @@ def test_shell_cmd_inputspec_9b(tmpdir, plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, input_spec=my_input_spec, file_orig=file + name="shelly", + executable=cmd, + input_spec=my_input_spec, + file_orig=file, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1291,7 +1349,11 @@ def test_shell_cmd_inputspec_10(plugin, results_function, tmpdir): ) shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, files=files_list, input_spec=my_input_spec + name="shelly", + executable=cmd_exec, + files=files_list, + input_spec=my_input_spec, + cache_dir=tmpdir, ) assert shelly.inputs.executable == cmd_exec @@ -1337,6 +1399,7 @@ def test_shell_cmd_inputspec_10_err(tmpdir): shelly = ShellCommandTask( name="shelly", executable=cmd_exec, files=file_2, input_spec=my_input_spec ) + shelly.cache_dir = tmpdir with pytest.raises(AttributeError) as e: res = shelly() @@ -1385,7 +1448,11 @@ def test_shell_cmd_inputspec_copyfile_1(plugin, results_function, tmpdir): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, input_spec=my_input_spec, orig_file=str(file) + name="shelly", + executable=cmd, + input_spec=my_input_spec, + orig_file=str(file), + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1443,7 +1510,11 @@ def test_shell_cmd_inputspec_copyfile_1a(plugin, results_function, tmpdir): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, input_spec=my_input_spec, orig_file=str(file) + name="shelly", + executable=cmd, + input_spec=my_input_spec, + orig_file=str(file), + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1515,7 +1586,11 @@ def test_shell_cmd_inputspec_copyfile_1b(plugin, results_function, tmpdir): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, input_spec=my_input_spec, orig_file=str(file) + name="shelly", + executable=cmd, + input_spec=my_input_spec, + orig_file=str(file), + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -1528,7 +1603,7 @@ def test_shell_cmd_inputspec_copyfile_1b(plugin, results_function, tmpdir): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_state_1(plugin, results_function): +def test_shell_cmd_inputspec_state_1(plugin, results_function, tmpdir): """ adding state to the input from input_spec """ cmd_exec = "echo" hello = ["HELLO", "hi"] @@ -1553,7 +1628,11 @@ def test_shell_cmd_inputspec_state_1(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, text=hello, input_spec=my_input_spec + name="shelly", + executable=cmd_exec, + text=hello, + input_spec=my_input_spec, + cache_dir=tmpdir, ).split("text") assert shelly.inputs.executable == cmd_exec # todo: this doesn't work when state @@ -1608,7 +1687,7 @@ def test_shell_cmd_inputspec_typeval_2(use_validator): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_state_1a(plugin, results_function): +def test_shell_cmd_inputspec_state_1a(plugin, results_function, tmpdir): """ adding state to the input from input_spec using shorter syntax for input_spec (without default) """ @@ -1628,7 +1707,11 @@ def test_shell_cmd_inputspec_state_1a(plugin, results_function): # separate command into exec + args shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, text=hello, input_spec=my_input_spec + name="shelly", + executable=cmd_exec, + text=hello, + input_spec=my_input_spec, + cache_dir=tmpdir, ).split("text") assert shelly.inputs.executable == cmd_exec @@ -1638,7 +1721,7 @@ def test_shell_cmd_inputspec_state_1a(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_inputspec_state_2(plugin, results_function): +def test_shell_cmd_inputspec_state_2(plugin, results_function, tmpdir): """ adding splitter to input tha is used in the output_file_tamplate """ @@ -1663,7 +1746,11 @@ def test_shell_cmd_inputspec_state_2(plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, args=args, input_spec=my_input_spec + name="shelly", + executable=cmd, + args=args, + input_spec=my_input_spec, + cache_dir=tmpdir, ).split("args") res = results_function(shelly, plugin) @@ -1707,7 +1794,11 @@ def test_shell_cmd_inputspec_state_3(plugin, results_function, tmpdir): ) shelly = ShellCommandTask( - name="shelly", executable=cmd_exec, file=files, input_spec=my_input_spec + name="shelly", + executable=cmd_exec, + file=files, + input_spec=my_input_spec, + cache_dir=tmpdir, ).split("file") assert shelly.inputs.executable == cmd_exec @@ -1764,7 +1855,11 @@ def test_shell_cmd_inputspec_copyfile_state_1(plugin, results_function, tmpdir): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, input_spec=my_input_spec, orig_file=files + name="shelly", + executable=cmd, + input_spec=my_input_spec, + orig_file=files, + cache_dir=tmpdir, ).split("orig_file") txt_l = ["from pydra", "world"] @@ -1785,7 +1880,7 @@ def test_shell_cmd_inputspec_copyfile_state_1(plugin, results_function, tmpdir): @pytest.mark.flaky(reruns=2) # when dask -def test_wf_shell_cmd_2(plugin_dask_opt): +def test_wf_shell_cmd_2(plugin_dask_opt, tmpdir): """ a workflow with input with defined output_file_template (str) that requires wf.lzin """ @@ -1793,6 +1888,7 @@ def test_wf_shell_cmd_2(plugin_dask_opt): wf.inputs.cmd = "touch" wf.inputs.args = "newfile.txt" + wf.cache_dir = tmpdir my_input_spec = SpecInfo( name="Input", @@ -1831,7 +1927,7 @@ def test_wf_shell_cmd_2(plugin_dask_opt): assert res.output.out_f.parent == wf.output_dir -def test_wf_shell_cmd_2a(plugin): +def test_wf_shell_cmd_2a(plugin, tmpdir): """ a workflow with input with defined output_file_template (tuple) that requires wf.lzin """ @@ -1839,6 +1935,7 @@ def test_wf_shell_cmd_2a(plugin): wf.inputs.cmd = "touch" wf.inputs.args = "newfile.txt" + wf.cache_dir = tmpdir my_input_spec = SpecInfo( name="Input", @@ -1876,7 +1973,7 @@ def test_wf_shell_cmd_2a(plugin): assert res.output.out_f.exists() -def test_wf_shell_cmd_3(plugin): +def test_wf_shell_cmd_3(plugin, tmpdir): """ a workflow with 2 tasks, first one has input with output_file_template (str, uses wf.lzin), that is passed to the second task @@ -1886,6 +1983,7 @@ def test_wf_shell_cmd_3(plugin): wf.inputs.cmd1 = "touch" wf.inputs.cmd2 = "cp" wf.inputs.args = "newfile.txt" + wf.cache_dir = tmpdir my_input_spec1 = SpecInfo( name="Input", @@ -1972,7 +2070,7 @@ def test_wf_shell_cmd_3(plugin): assert res.output.cp_file.parent == wf.output_dir -def test_wf_shell_cmd_3a(plugin): +def test_wf_shell_cmd_3a(plugin, tmpdir): """ a workflow with 2 tasks, first one has input with output_file_template (str, uses wf.lzin), that is passed to the second task @@ -1982,6 +2080,7 @@ def test_wf_shell_cmd_3a(plugin): wf.inputs.cmd1 = "touch" wf.inputs.cmd2 = "cp" wf.inputs.args = "newfile.txt" + wf.cache_dir = tmpdir my_input_spec1 = SpecInfo( name="Input", @@ -2163,7 +2262,7 @@ def test_wf_shell_cmd_state_1(plugin): assert res.output.cp_file.parent == wf.output_dir[i] -def test_wf_shell_cmd_ndst_1(plugin): +def test_wf_shell_cmd_ndst_1(plugin, tmpdir): """ a workflow with 2 tasks and a splitter on the node level, first one has input with output_file_template (str, uses wf.lzin), that is passed to the second task @@ -2173,6 +2272,7 @@ def test_wf_shell_cmd_ndst_1(plugin): wf.inputs.cmd1 = "touch" wf.inputs.cmd2 = "cp" wf.inputs.args = ["newfile_1.txt", "newfile_2.txt"] + wf.cache_dir = tmpdir my_input_spec1 = SpecInfo( name="Input", @@ -2261,7 +2361,7 @@ def test_wf_shell_cmd_ndst_1(plugin): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_outputspec_1(plugin, results_function): +def test_shell_cmd_outputspec_1(plugin, results_function, tmpdir): """ customised output_spec, adding files to the output, providing specific pathname """ @@ -2271,7 +2371,9 @@ def test_shell_cmd_outputspec_1(plugin, results_function): fields=[("newfile", File, "newfile_tmp.txt")], bases=(ShellOutSpec,), ) - shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) + shelly = ShellCommandTask( + name="shelly", executable=cmd, output_spec=my_output_spec, cache_dir=tmpdir + ) res = results_function(shelly, plugin) assert res.output.stdout == "" @@ -2279,7 +2381,7 @@ def test_shell_cmd_outputspec_1(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_outputspec_1a(plugin, results_function): +def test_shell_cmd_outputspec_1a(plugin, results_function, tmpdir): """ customised output_spec, adding files to the output, providing specific pathname """ @@ -2289,14 +2391,16 @@ def test_shell_cmd_outputspec_1a(plugin, results_function): fields=[("newfile", attr.ib(type=File, default="newfile_tmp.txt"))], bases=(ShellOutSpec,), ) - shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) + shelly = ShellCommandTask( + name="shelly", executable=cmd, output_spec=my_output_spec, cache_dir=tmpdir + ) res = results_function(shelly, plugin) assert res.output.stdout == "" assert res.output.newfile.exists() -def test_shell_cmd_outputspec_1b_exception(plugin): +def test_shell_cmd_outputspec_1b_exception(plugin, tmpdir): """ customised output_spec, adding files to the output, providing specific pathname """ @@ -2306,7 +2410,9 @@ def test_shell_cmd_outputspec_1b_exception(plugin): fields=[("newfile", File, "newfile_tmp_.txt")], bases=(ShellOutSpec,), ) - shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) + shelly = ShellCommandTask( + name="shelly", executable=cmd, output_spec=my_output_spec, cache_dir=tmpdir + ) with pytest.raises(Exception) as exinfo: with Submitter(plugin=plugin) as sub: @@ -2315,7 +2421,7 @@ def test_shell_cmd_outputspec_1b_exception(plugin): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_outputspec_2(plugin, results_function): +def test_shell_cmd_outputspec_2(plugin, results_function, tmpdir): """ customised output_spec, adding files to the output, using a wildcard in default @@ -2326,14 +2432,16 @@ def test_shell_cmd_outputspec_2(plugin, results_function): fields=[("newfile", File, "newfile_*.txt")], bases=(ShellOutSpec,), ) - shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) + shelly = ShellCommandTask( + name="shelly", executable=cmd, output_spec=my_output_spec, cache_dir=tmpdir + ) res = results_function(shelly, plugin) assert res.output.stdout == "" assert res.output.newfile.exists() -def test_shell_cmd_outputspec_2a_exception(plugin): +def test_shell_cmd_outputspec_2a_exception(plugin, tmpdir): """ customised output_spec, adding files to the output, using a wildcard in default @@ -2344,7 +2452,9 @@ def test_shell_cmd_outputspec_2a_exception(plugin): fields=[("newfile", File, "newfile_*K.txt")], bases=(ShellOutSpec,), ) - shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) + shelly = ShellCommandTask( + name="shelly", executable=cmd, output_spec=my_output_spec, cache_dir=tmpdir + ) with pytest.raises(Exception) as excinfo: with Submitter(plugin=plugin) as sub: @@ -2353,7 +2463,7 @@ def test_shell_cmd_outputspec_2a_exception(plugin): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_outputspec_3(plugin, results_function): +def test_shell_cmd_outputspec_3(plugin, results_function, tmpdir): """ customised output_spec, adding files to the output, using a wildcard in default, should collect two files @@ -2364,7 +2474,9 @@ def test_shell_cmd_outputspec_3(plugin, results_function): fields=[("newfile", File, "newfile_*.txt")], bases=(ShellOutSpec,), ) - shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) + shelly = ShellCommandTask( + name="shelly", executable=cmd, output_spec=my_output_spec, cache_dir=tmpdir + ) res = results_function(shelly, plugin) assert res.output.stdout == "" @@ -2374,7 +2486,7 @@ def test_shell_cmd_outputspec_3(plugin, results_function): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_outputspec_4(plugin, results_function): +def test_shell_cmd_outputspec_4(plugin, results_function, tmpdir): """ customised output_spec, adding files to the output, using a function to collect output, the function is saved in the field metadata @@ -2390,7 +2502,9 @@ def gather_output(keyname, output_dir): fields=[("newfile", attr.ib(type=File, metadata={"callable": gather_output}))], bases=(ShellOutSpec,), ) - shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) + shelly = ShellCommandTask( + name="shelly", executable=cmd, output_spec=my_output_spec, cache_dir=tmpdir + ) res = results_function(shelly, plugin) assert res.output.stdout == "" @@ -2400,7 +2514,7 @@ def gather_output(keyname, output_dir): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_outputspec_5(plugin, results_function): +def test_shell_cmd_outputspec_5(plugin, results_function, tmpdir): """ providing output name by providing output_file_template (similar to the previous example, but not touching input_spec) @@ -2426,7 +2540,11 @@ def test_shell_cmd_outputspec_5(plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, args=args, output_spec=my_output_spec + name="shelly", + executable=cmd, + args=args, + output_spec=my_output_spec, + cache_dir=tmpdir, ) res = results_function(shelly, plugin) @@ -2464,7 +2582,7 @@ def test_shell_cmd_outputspec_5a(): @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_state_outputspec_1(plugin, results_function): +def test_shell_cmd_state_outputspec_1(plugin, results_function, tmpdir): """ providing output name by providing output_file_template splitter for a field that is used in the template @@ -2490,7 +2608,11 @@ def test_shell_cmd_state_outputspec_1(plugin, results_function): ) shelly = ShellCommandTask( - name="shelly", executable=cmd, args=args, output_spec=my_output_spec + name="shelly", + executable=cmd, + args=args, + output_spec=my_output_spec, + cache_dir=tmpdir, ).split("args") res = results_function(shelly, plugin) @@ -2502,7 +2624,7 @@ def test_shell_cmd_state_outputspec_1(plugin, results_function): # customised output_spec for tasks in workflows -def test_shell_cmd_outputspec_wf_1(plugin): +def test_shell_cmd_outputspec_wf_1(plugin, tmpdir): """ customised output_spec for tasks within a Workflow, adding files to the output, providing specific pathname @@ -2511,6 +2633,7 @@ def test_shell_cmd_outputspec_wf_1(plugin): cmd = ["touch", "newfile_tmp.txt"] wf = Workflow(name="wf", input_spec=["cmd"]) wf.inputs.cmd = cmd + wf.cache_dir = tmpdir my_output_spec = SpecInfo( name="Output", diff --git a/pydra/engine/tests/test_workflow.py b/pydra/engine/tests/test_workflow.py index 085f656c33..ecf84e5091 100644 --- a/pydra/engine/tests/test_workflow.py +++ b/pydra/engine/tests/test_workflow.py @@ -48,9 +48,9 @@ def test_wf_name_conflict2(): assert "Another task named task_name is already added" in str(excinfo.value) -def test_wf_no_output(plugin): +def test_wf_no_output(plugin, tmpdir): """ Raise error when output isn't set with set_output""" - wf = Workflow(name="wf_1", input_spec=["x"]) + wf = Workflow(name="wf_1", input_spec=["x"], cache_dir=tmpdir) wf.add(add2(name="add2", x=wf.lzin.x)) wf.inputs.x = 2 @@ -60,12 +60,13 @@ def test_wf_no_output(plugin): assert "Workflow output cannot be None" in str(excinfo.value) -def test_wf_1(plugin): +def test_wf_1(plugin, tmpdir): """ workflow with one task and no splitter""" wf = Workflow(name="wf_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) wf.set_output([("out", wf.add2.lzout.out)]) wf.inputs.x = 2 + wf.cache_dir = tmpdir checksum_before = wf.checksum with Submitter(plugin=plugin) as sub: @@ -77,7 +78,7 @@ def test_wf_1(plugin): assert wf.output_dir.exists() -def test_wf_1a_outpastuple(plugin): +def test_wf_1a_outpastuple(plugin, tmpdir): """ workflow with one task and no splitter set_output takes a tuple """ @@ -86,6 +87,7 @@ def test_wf_1a_outpastuple(plugin): wf.set_output(("out", wf.add2.lzout.out)) wf.inputs.x = 2 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -95,12 +97,13 @@ def test_wf_1a_outpastuple(plugin): assert wf.output_dir.exists() -def test_wf_1_call_subm(plugin): +def test_wf_1_call_subm(plugin, tmpdir): """using wf.__call_ with submitter""" wf = Workflow(name="wf_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) wf.set_output([("out", wf.add2.lzout.out)]) wf.inputs.x = 2 + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: wf(submitter=sub) @@ -110,13 +113,14 @@ def test_wf_1_call_subm(plugin): assert wf.output_dir.exists() -def test_wf_1_call_plug(plugin): +def test_wf_1_call_plug(plugin, tmpdir): """using wf.__call_ with plugin""" wf = Workflow(name="wf_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) wf.set_output([("out", wf.add2.lzout.out)]) wf.inputs.x = 2 wf.plugin = plugin + wf.cache_dir = tmpdir wf(plugin=plugin) @@ -125,13 +129,14 @@ def test_wf_1_call_plug(plugin): assert wf.output_dir.exists() -def test_wf_1_call_exception(plugin): +def test_wf_1_call_exception(plugin, tmpdir): """using wf.__call_ with plugin and submitter - should raise an exception""" wf = Workflow(name="wf_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) wf.set_output([("out", wf.add2.lzout.out)]) wf.inputs.x = 2 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: with pytest.raises(Exception) as e: @@ -139,7 +144,7 @@ def test_wf_1_call_exception(plugin): assert "Specify submitter OR plugin" in str(e.value) -def test_wf_2(plugin): +def test_wf_2(plugin, tmpdir): """ workflow with 2 tasks, no splitter""" wf = Workflow(name="wf_2", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) @@ -148,6 +153,7 @@ def test_wf_2(plugin): wf.inputs.x = 2 wf.inputs.y = 3 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -157,7 +163,7 @@ def test_wf_2(plugin): assert 8 == results.output.out -def test_wf_2a(plugin): +def test_wf_2a(plugin, tmpdir): """ workflow with 2 tasks, no splitter creating add2_task first (before calling add method), """ @@ -170,6 +176,7 @@ def test_wf_2a(plugin): wf.inputs.x = 2 wf.inputs.y = 3 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -179,7 +186,7 @@ def test_wf_2a(plugin): assert wf.output_dir.exists() -def test_wf_2b(plugin): +def test_wf_2b(plugin, tmpdir): """ workflow with 2 tasks, no splitter creating add2_task first (before calling add method), adding inputs.x after add method @@ -193,6 +200,7 @@ def test_wf_2b(plugin): wf.inputs.x = 2 wf.inputs.y = 3 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -203,7 +211,7 @@ def test_wf_2b(plugin): assert wf.output_dir.exists() -def test_wf_2c_multoutp(plugin): +def test_wf_2c_multoutp(plugin, tmpdir): """ workflow with 2 tasks, no splitter setting multiple outputs for the workflow """ @@ -217,6 +225,7 @@ def test_wf_2c_multoutp(plugin): wf.inputs.x = 2 wf.inputs.y = 3 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -228,7 +237,7 @@ def test_wf_2c_multoutp(plugin): assert wf.output_dir.exists() -def test_wf_2d_outpasdict(plugin): +def test_wf_2d_outpasdict(plugin, tmpdir): """ workflow with 2 tasks, no splitter setting multiple outputs using a dictionary """ @@ -242,6 +251,7 @@ def test_wf_2d_outpasdict(plugin): wf.inputs.x = 2 wf.inputs.y = 3 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -254,7 +264,7 @@ def test_wf_2d_outpasdict(plugin): @pytest.mark.flaky(reruns=3) # when dask -def test_wf_3(plugin_dask_opt): +def test_wf_3(plugin_dask_opt, tmpdir): """ testing None value for an input""" wf = Workflow(name="wf_3", input_spec=["x", "y"]) wf.add(fun_addvar_none(name="addvar", a=wf.lzin.x, b=wf.lzin.y)) @@ -262,6 +272,7 @@ def test_wf_3(plugin_dask_opt): wf.set_output([("out", wf.add2.lzout.out)]) wf.inputs.x = 2 wf.inputs.y = None + wf.cache_dir = tmpdir with Submitter(plugin=plugin_dask_opt) as sub: sub(wf) @@ -272,7 +283,7 @@ def test_wf_3(plugin_dask_opt): @pytest.mark.xfail(reason="the task error doesn't propagate") -def test_wf_3a_exception(plugin): +def test_wf_3a_exception(plugin, tmpdir): """ testinh wf without set input, attr.NOTHING should be set and the function should raise an exception """ @@ -283,6 +294,7 @@ def test_wf_3a_exception(plugin): wf.inputs.x = 2 wf.inputs.y = attr.NOTHING wf.plugin = plugin + wf.cache_dir = tmpdir with pytest.raises(TypeError) as excinfo: with Submitter(plugin=plugin) as sub: @@ -290,7 +302,7 @@ def test_wf_3a_exception(plugin): assert "unsupported" in str(excinfo.value) -def test_wf_4(plugin): +def test_wf_4(plugin, tmpdir): """wf with a task that doesn't set one input and use the function default value""" wf = Workflow(name="wf_4", input_spec=["x", "y"]) wf.add(fun_addvar_default(name="addvar", a=wf.lzin.x)) @@ -298,6 +310,7 @@ def test_wf_4(plugin): wf.set_output([("out", wf.add2.lzout.out)]) wf.inputs.x = 2 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -307,7 +320,7 @@ def test_wf_4(plugin): assert 5 == results.output.out -def test_wf_4a(plugin): +def test_wf_4a(plugin, tmpdir): """ wf with a task that doesn't set one input, the unset input is send to the task input, so the task should use the function default value @@ -318,6 +331,7 @@ def test_wf_4a(plugin): wf.set_output([("out", wf.add2.lzout.out)]) wf.inputs.x = 2 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -327,13 +341,14 @@ def test_wf_4a(plugin): assert 5 == results.output.out -def test_wf_5(plugin): +def test_wf_5(plugin, tmpdir): """ wf with two outputs connected to the task outputs one set_output """ wf = Workflow(name="wf_5", input_spec=["x", "y"], x=3, y=2) wf.add(fun_addsubvar(name="addsub", a=wf.lzin.x, b=wf.lzin.y)) wf.set_output([("out_sum", wf.addsub.lzout.sum), ("out_sub", wf.addsub.lzout.sub)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -343,7 +358,7 @@ def test_wf_5(plugin): assert 1 == results.output.out_sub -def test_wf_5a(plugin): +def test_wf_5a(plugin, tmpdir): """ wf with two outputs connected to the task outputs, set_output set twice """ @@ -351,6 +366,7 @@ def test_wf_5a(plugin): wf.add(fun_addsubvar(name="addsub", a=wf.lzin.x, b=wf.lzin.y)) wf.set_output([("out_sum", wf.addsub.lzout.sum)]) wf.set_output([("out_sub", wf.addsub.lzout.sub)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -360,18 +376,19 @@ def test_wf_5a(plugin): assert 1 == results.output.out_sub -def test_wf_5b_exception(): +def test_wf_5b_exception(tmpdir): """ set_output used twice with the same name - exception should be raised """ wf = Workflow(name="wf_5", input_spec=["x", "y"], x=3, y=2) wf.add(fun_addsubvar(name="addsub", a=wf.lzin.x, b=wf.lzin.y)) wf.set_output([("out", wf.addsub.lzout.sum)]) + wf.cache_dir = tmpdir with pytest.raises(Exception) as excinfo: wf.set_output([("out", wf.addsub.lzout.sub)]) assert "is already set" in str(excinfo.value) -def test_wf_6(plugin): +def test_wf_6(plugin, tmpdir): """ wf with two tasks and two outputs connected to both tasks, one set_output """ @@ -379,6 +396,7 @@ def test_wf_6(plugin): wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) wf.add(add2(name="add2", x=wf.mult.lzout.out)) wf.set_output([("out1", wf.mult.lzout.out), ("out2", wf.add2.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -389,7 +407,7 @@ def test_wf_6(plugin): assert 8 == results.output.out2 -def test_wf_6a(plugin): +def test_wf_6a(plugin, tmpdir): """ wf with two tasks and two outputs connected to both tasks, set_output used twice """ @@ -398,6 +416,7 @@ def test_wf_6a(plugin): wf.add(add2(name="add2", x=wf.mult.lzout.out)) wf.set_output([("out1", wf.mult.lzout.out)]) wf.set_output([("out2", wf.add2.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -408,7 +427,7 @@ def test_wf_6a(plugin): assert 8 == results.output.out2 -def test_wf_st_1(plugin): +def test_wf_st_1(plugin, tmpdir): """ Workflow with one task, a splitter for the workflow""" wf = Workflow(name="wf_spl_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) @@ -417,6 +436,7 @@ def test_wf_st_1(plugin): wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir checksum_before = wf.checksum with Submitter(plugin=plugin) as sub: @@ -433,7 +453,7 @@ def test_wf_st_1(plugin): assert odir.exists() -def test_wf_st_1_call_subm(plugin): +def test_wf_st_1_call_subm(plugin, tmpdir): """ Workflow with one task, a splitter for the workflow""" wf = Workflow(name="wf_spl_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) @@ -442,6 +462,7 @@ def test_wf_st_1_call_subm(plugin): wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: wf(submitter=sub) @@ -456,7 +477,7 @@ def test_wf_st_1_call_subm(plugin): assert odir.exists() -def test_wf_st_1_call_plug(plugin): +def test_wf_st_1_call_plug(plugin, tmpdir): """ Workflow with one task, a splitter for the workflow""" wf = Workflow(name="wf_spl_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) @@ -465,6 +486,7 @@ def test_wf_st_1_call_plug(plugin): wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir wf(plugin=plugin) @@ -478,7 +500,7 @@ def test_wf_st_1_call_plug(plugin): assert odir.exists() -def test_wf_st_noinput_1(plugin): +def test_wf_st_noinput_1(plugin, tmpdir): """ Workflow with one task, a splitter for the workflow""" wf = Workflow(name="wf_spl_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) @@ -487,6 +509,7 @@ def test_wf_st_noinput_1(plugin): wf.inputs.x = [] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir checksum_before = wf.checksum with Submitter(plugin=plugin) as sub: @@ -499,13 +522,14 @@ def test_wf_st_noinput_1(plugin): assert wf.output_dir == [] -def test_wf_ndst_1(plugin): +def test_wf_ndst_1(plugin, tmpdir): """ workflow with one task, a splitter on the task level""" wf = Workflow(name="wf_spl_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x).split("x")) wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir checksum_before = wf.checksum with Submitter(plugin=plugin) as sub: @@ -518,7 +542,7 @@ def test_wf_ndst_1(plugin): assert wf.output_dir.exists() -def test_wf_ndst_updatespl_1(plugin): +def test_wf_ndst_updatespl_1(plugin, tmpdir): """ workflow with one task, a splitter on the task level is added *after* calling add """ @@ -527,6 +551,7 @@ def test_wf_ndst_updatespl_1(plugin): wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir wf.add2.split("x") with Submitter(plugin=plugin) as sub: @@ -540,7 +565,7 @@ def test_wf_ndst_updatespl_1(plugin): assert wf.output_dir.exists() -def test_wf_ndst_updatespl_1a(plugin): +def test_wf_ndst_updatespl_1a(plugin, tmpdir): """ workflow with one task (initialize before calling add), a splitter on the task level is added *after* calling add """ @@ -551,6 +576,7 @@ def test_wf_ndst_updatespl_1a(plugin): wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -563,7 +589,7 @@ def test_wf_ndst_updatespl_1a(plugin): assert wf.output_dir.exists() -def test_wf_ndst_updateinp_1(plugin): +def test_wf_ndst_updateinp_1(plugin, tmpdir): """ workflow with one task, a splitter on the task level, updating input of the task after calling add @@ -576,6 +602,7 @@ def test_wf_ndst_updateinp_1(plugin): wf.plugin = plugin wf.add2.split("x") wf.add2.inputs.x = wf.lzin.y + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -587,13 +614,14 @@ def test_wf_ndst_updateinp_1(plugin): assert wf.output_dir.exists() -def test_wf_ndst_noinput_1(plugin): +def test_wf_ndst_noinput_1(plugin, tmpdir): """ workflow with one task, a splitter on the task level""" wf = Workflow(name="wf_spl_1", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x).split("x")) wf.inputs.x = [] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir checksum_before = wf.checksum with Submitter(plugin=plugin) as sub: @@ -606,7 +634,7 @@ def test_wf_ndst_noinput_1(plugin): assert wf.output_dir.exists() -def test_wf_st_2(plugin): +def test_wf_st_2(plugin, tmpdir): """ workflow with one task, splitters and combiner for workflow""" wf = Workflow(name="wf_st_2", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x)) @@ -615,6 +643,7 @@ def test_wf_st_2(plugin): wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -629,13 +658,14 @@ def test_wf_st_2(plugin): assert odir.exists() -def test_wf_ndst_2(plugin): +def test_wf_ndst_2(plugin, tmpdir): """ workflow with one task, splitters and combiner on the task level""" wf = Workflow(name="wf_ndst_2", input_spec=["x"]) wf.add(add2(name="add2", x=wf.lzin.x).split("x").combine(combiner="x")) wf.inputs.x = [1, 2] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -649,7 +679,7 @@ def test_wf_ndst_2(plugin): # workflows with structures A -> B -def test_wf_st_3(plugin): +def test_wf_st_3(plugin, tmpdir): """ workflow with 2 tasks, splitter on wf level""" wf = Workflow(name="wfst_3", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) @@ -659,6 +689,7 @@ def test_wf_st_3(plugin): wf.split(("x", "y")) wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -696,7 +727,7 @@ def test_wf_st_3(plugin): assert odir.exists() -def test_wf_ndst_3(plugin): +def test_wf_ndst_3(plugin, tmpdir): """Test workflow with 2 tasks, splitter on a task level""" wf = Workflow(name="wf_ndst_3", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split(("x", "y"))) @@ -705,6 +736,7 @@ def test_wf_ndst_3(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -716,7 +748,7 @@ def test_wf_ndst_3(plugin): assert wf.output_dir.exists() -def test_wf_st_4(plugin): +def test_wf_st_4(plugin, tmpdir): """ workflow with two tasks, scalar splitter and combiner for the workflow""" wf = Workflow(name="wf_st_4", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) @@ -726,6 +758,7 @@ def test_wf_st_4(plugin): wf.combine("x") wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -742,7 +775,7 @@ def test_wf_st_4(plugin): assert odir.exists() -def test_wf_ndst_4(plugin): +def test_wf_ndst_4(plugin, tmpdir): """ workflow with two tasks, scalar splitter and combiner on tasks level""" wf = Workflow(name="wf_ndst_4", input_spec=["a", "b"]) wf.add(multiply(name="mult", x=wf.lzin.a, y=wf.lzin.b).split(("x", "y"))) @@ -750,6 +783,7 @@ def test_wf_ndst_4(plugin): wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir wf.inputs.a = [1, 2] wf.inputs.b = [11, 12] @@ -765,7 +799,7 @@ def test_wf_ndst_4(plugin): assert wf.output_dir.exists() -def test_wf_st_5(plugin): +def test_wf_st_5(plugin, tmpdir): """ workflow with two tasks, outer splitter and no combiner""" wf = Workflow(name="wf_st_5", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) @@ -773,6 +807,7 @@ def test_wf_st_5(plugin): wf.split(["x", "y"], x=[1, 2], y=[11, 12]) wf.set_output([("out", wf.add2.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -788,7 +823,7 @@ def test_wf_st_5(plugin): assert odir.exists() -def test_wf_ndst_5(plugin): +def test_wf_ndst_5(plugin, tmpdir): """ workflow with two tasks, outer splitter on tasks level and no combiner""" wf = Workflow(name="wf_ndst_5", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split(["x", "y"])) @@ -796,6 +831,7 @@ def test_wf_ndst_5(plugin): wf.inputs.x = [1, 2] wf.inputs.y = [11, 12] wf.set_output([("out", wf.add2.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -809,7 +845,7 @@ def test_wf_ndst_5(plugin): assert wf.output_dir.exists() -def test_wf_st_6(plugin): +def test_wf_st_6(plugin, tmpdir): """ workflow with two tasks, outer splitter and combiner for the workflow""" wf = Workflow(name="wf_st_6", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) @@ -819,6 +855,7 @@ def test_wf_st_6(plugin): wf.combine("x") wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -836,7 +873,7 @@ def test_wf_st_6(plugin): assert odir.exists() -def test_wf_ndst_6(plugin): +def test_wf_ndst_6(plugin, tmpdir): """ workflow with two tasks, outer splitter and combiner on tasks level""" wf = Workflow(name="wf_ndst_6", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split(["x", "y"])) @@ -845,6 +882,7 @@ def test_wf_ndst_6(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -857,7 +895,7 @@ def test_wf_ndst_6(plugin): assert wf.output_dir.exists() -def test_wf_ndst_7(plugin): +def test_wf_ndst_7(plugin, tmpdir): """ workflow with two tasks, outer splitter and (full) combiner for first node only""" wf = Workflow(name="wf_ndst_6", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split("x").combine("x")) @@ -866,6 +904,7 @@ def test_wf_ndst_7(plugin): wf.inputs.y = 11 wf.set_output([("out", wf.iden.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -877,7 +916,7 @@ def test_wf_ndst_7(plugin): assert wf.output_dir.exists() -def test_wf_ndst_8(plugin): +def test_wf_ndst_8(plugin, tmpdir): """ workflow with two tasks, outer splitter and (partial) combiner for first task only""" wf = Workflow(name="wf_ndst_6", input_spec=["x", "y"]) wf.add( @@ -888,6 +927,7 @@ def test_wf_ndst_8(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.iden.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -900,7 +940,7 @@ def test_wf_ndst_8(plugin): assert wf.output_dir.exists() -def test_wf_ndst_9(plugin): +def test_wf_ndst_9(plugin, tmpdir): """ workflow with two tasks, outer splitter and (full) combiner for first task only""" wf = Workflow(name="wf_ndst_6", input_spec=["x", "y"]) wf.add( @@ -913,6 +953,7 @@ def test_wf_ndst_9(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.iden.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -927,7 +968,7 @@ def test_wf_ndst_9(plugin): # workflows with structures A -> B -> C -def test_wf_3sernd_ndst_1(plugin): +def test_wf_3sernd_ndst_1(plugin, tmpdir): """ workflow with three "serial" tasks, checking if the splitter is propagating""" wf = Workflow(name="wf_3sernd_ndst_1", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split(["x", "y"])) @@ -937,6 +978,7 @@ def test_wf_3sernd_ndst_1(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.add2_2nd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -954,7 +996,7 @@ def test_wf_3sernd_ndst_1(plugin): @pytest.mark.flaky(reruns=3) # when dask -def test_wf_3nd_st_1(plugin_dask_opt): +def test_wf_3nd_st_1(plugin_dask_opt, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter on the workflow level """ @@ -965,6 +1007,7 @@ def test_wf_3nd_st_1(plugin_dask_opt): wf.split(["x", "y"], x=[1, 2, 3], y=[11, 12]) wf.set_output([("out", wf.mult.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin_dask_opt) as sub: sub(wf) @@ -981,7 +1024,7 @@ def test_wf_3nd_st_1(plugin_dask_opt): @pytest.mark.flaky(reruns=3) # when dask -def test_wf_3nd_ndst_1(plugin_dask_opt): +def test_wf_3nd_ndst_1(plugin_dask_opt, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter on the tasks levels """ @@ -992,6 +1035,7 @@ def test_wf_3nd_ndst_1(plugin_dask_opt): wf.inputs.x = [1, 2, 3] wf.inputs.y = [11, 12] wf.set_output([("out", wf.mult.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin_dask_opt) as sub: sub(wf) @@ -1003,7 +1047,7 @@ def test_wf_3nd_ndst_1(plugin_dask_opt): assert wf.output_dir.exists() -def test_wf_3nd_st_2(plugin): +def test_wf_3nd_st_2(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter and partial combiner on the workflow level """ @@ -1015,6 +1059,7 @@ def test_wf_3nd_st_2(plugin): wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1033,7 +1078,7 @@ def test_wf_3nd_st_2(plugin): assert odir.exists() -def test_wf_3nd_ndst_2(plugin): +def test_wf_3nd_ndst_2(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter and partial combiner on the tasks levels """ @@ -1049,6 +1094,7 @@ def test_wf_3nd_ndst_2(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1061,7 +1107,7 @@ def test_wf_3nd_ndst_2(plugin): assert wf.output_dir.exists() -def test_wf_3nd_st_3(plugin): +def test_wf_3nd_st_3(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter and partial combiner (from the second task) on the workflow level """ @@ -1072,6 +1118,7 @@ def test_wf_3nd_st_3(plugin): wf.split(["x", "y"], x=[1, 2, 3], y=[11, 12]).combine("y") wf.set_output([("out", wf.mult.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1090,7 +1137,7 @@ def test_wf_3nd_st_3(plugin): assert odir.exists() -def test_wf_3nd_ndst_3(plugin): +def test_wf_3nd_ndst_3(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter and partial combiner (from the second task) on the tasks levels """ @@ -1105,6 +1152,7 @@ def test_wf_3nd_ndst_3(plugin): wf.inputs.x = [1, 2, 3] wf.inputs.y = [11, 12] wf.set_output([("out", wf.mult.lzout.out)]) + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1118,7 +1166,7 @@ def test_wf_3nd_ndst_3(plugin): assert wf.output_dir.exists() -def test_wf_3nd_st_4(plugin): +def test_wf_3nd_st_4(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter and full combiner on the workflow level """ @@ -1129,6 +1177,7 @@ def test_wf_3nd_st_4(plugin): wf.split(["x", "y"], x=[1, 2, 3], y=[11, 12]).combine(["x", "y"]) wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1147,7 +1196,7 @@ def test_wf_3nd_st_4(plugin): assert odir.exists() -def test_wf_3nd_ndst_4(plugin): +def test_wf_3nd_ndst_4(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter and full combiner on the tasks levels """ @@ -1163,6 +1212,7 @@ def test_wf_3nd_ndst_4(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1175,7 +1225,7 @@ def test_wf_3nd_ndst_4(plugin): assert wf.output_dir.exists() -def test_wf_3nd_st_5(plugin): +def test_wf_3nd_st_5(plugin, tmpdir): """ workflow with three tasks (A->C, B->C) and three fields in the splitter, splitter and partial combiner (from the second task) on the workflow level """ @@ -1191,6 +1241,7 @@ def test_wf_3nd_st_5(plugin): wf.set_output([("out", wf.addvar.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1212,7 +1263,7 @@ def test_wf_3nd_st_5(plugin): assert odir.exists() -def test_wf_3nd_ndst_5(plugin): +def test_wf_3nd_ndst_5(plugin, tmpdir): """ workflow with three tasks (A->C, B->C) and three fields in the splitter, all tasks have splitters and the last one has a partial combiner (from the 2nd) """ @@ -1232,6 +1283,7 @@ def test_wf_3nd_ndst_5(plugin): wf.set_output([("out", wf.addvar.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1247,7 +1299,7 @@ def test_wf_3nd_ndst_5(plugin): assert wf.output_dir.exists() -def test_wf_3nd_ndst_6(plugin): +def test_wf_3nd_ndst_6(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, the third one uses scalar splitter from the previous ones and a combiner """ @@ -1263,6 +1315,7 @@ def test_wf_3nd_ndst_6(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1276,7 +1329,7 @@ def test_wf_3nd_ndst_6(plugin): # workflows with Left and Right part in splitters A -> B (L&R parts of the splitter) -def test_wf_ndstLR_1(plugin): +def test_wf_ndstLR_1(plugin, tmpdir): """ Test workflow with 2 tasks, splitters on tasks levels The second task has its own simple splitter and the Left part from the first task should be added @@ -1288,6 +1341,7 @@ def test_wf_ndstLR_1(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1304,7 +1358,7 @@ def test_wf_ndstLR_1(plugin): assert wf.output_dir.exists() -def test_wf_ndstLR_1a(plugin): +def test_wf_ndstLR_1a(plugin, tmpdir): """ Test workflow with 2 tasks, splitters on tasks levels The second task has splitter that has Left part (from previous state) and the Right part (it's onw splitter) @@ -1318,6 +1372,7 @@ def test_wf_ndstLR_1a(plugin): wf.inputs.y = [11, 12] wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1334,7 +1389,7 @@ def test_wf_ndstLR_1a(plugin): assert wf.output_dir.exists() -def test_wf_ndstLR_2(plugin): +def test_wf_ndstLR_2(plugin, tmpdir): """ Test workflow with 2 tasks, splitters on tasks levels The second task has its own outer splitter and the Left part from the first task should be added @@ -1351,6 +1406,7 @@ def test_wf_ndstLR_2(plugin): wf.inputs.z = [100, 200] wf.set_output([("out", wf.addvar.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1383,7 +1439,7 @@ def test_wf_ndstLR_2(plugin): assert wf.output_dir.exists() -def test_wf_ndstLR_2a(plugin): +def test_wf_ndstLR_2a(plugin, tmpdir): """ Test workflow with 2 tasks, splitters on tasks levels The second task has splitter that has Left part (from previous state) and the Right part (it's onw outer splitter) @@ -1400,6 +1456,7 @@ def test_wf_ndstLR_2a(plugin): wf.inputs.z = [100, 200] wf.set_output([("out", wf.addvar.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1435,7 +1492,7 @@ def test_wf_ndstLR_2a(plugin): # workflows with inner splitters A -> B (inner spl) -def test_wf_ndstinner_1(plugin): +def test_wf_ndstinner_1(plugin, tmpdir): """ workflow with 2 tasks, the second task has inner splitter """ @@ -1445,6 +1502,7 @@ def test_wf_ndstinner_1(plugin): wf.inputs.x = 1 wf.set_output([("out_list", wf.list.lzout.out), ("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1459,7 +1517,7 @@ def test_wf_ndstinner_1(plugin): assert wf.output_dir.exists() -def test_wf_ndstinner_2(plugin): +def test_wf_ndstinner_2(plugin, tmpdir): """ workflow with 2 tasks, the second task has two inputs and inner splitter from one of the input """ @@ -1470,6 +1528,7 @@ def test_wf_ndstinner_2(plugin): wf.inputs.y = 10 wf.set_output([("out_list", wf.list.lzout.out), ("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1484,7 +1543,7 @@ def test_wf_ndstinner_2(plugin): assert wf.output_dir.exists() -def test_wf_ndstinner_3(plugin): +def test_wf_ndstinner_3(plugin, tmpdir): """ workflow with 2 tasks, the second task has two inputs and outer splitter that includes an inner field """ @@ -1495,6 +1554,7 @@ def test_wf_ndstinner_3(plugin): wf.inputs.y = [10, 100] wf.set_output([("out_list", wf.list.lzout.out), ("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1509,7 +1569,7 @@ def test_wf_ndstinner_3(plugin): assert wf.output_dir.exists() -def test_wf_ndstinner_4(plugin): +def test_wf_ndstinner_4(plugin, tmpdir): """ workflow with 3 tasks, the second task has two inputs and inner splitter from one of the input, the third task has no its own splitter @@ -1522,6 +1582,7 @@ def test_wf_ndstinner_4(plugin): wf.inputs.y = 10 wf.set_output([("out_list", wf.list.lzout.out), ("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1541,7 +1602,7 @@ def test_wf_ndstinner_4(plugin): # workflow that have some single values as the input -def test_wf_st_singl_1(plugin): +def test_wf_st_singl_1(plugin, tmpdir): """ workflow with two tasks, only one input is in the splitter and combiner""" wf = Workflow(name="wf_st_5", input_spec=["x", "y"]) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) @@ -1551,6 +1612,7 @@ def test_wf_st_singl_1(plugin): wf.combine("x") wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1564,7 +1626,7 @@ def test_wf_st_singl_1(plugin): assert odir.exists() -def test_wf_ndst_singl_1(plugin): +def test_wf_ndst_singl_1(plugin, tmpdir): """ workflow with two tasks, outer splitter and combiner on tasks level; only one input is part of the splitter, the other is a single value """ @@ -1575,6 +1637,7 @@ def test_wf_ndst_singl_1(plugin): wf.inputs.y = 11 wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1585,7 +1648,7 @@ def test_wf_ndst_singl_1(plugin): assert wf.output_dir.exists() -def test_wf_st_singl_2(plugin): +def test_wf_st_singl_2(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter on the workflow level only one input is part of the splitter, the other is a single value @@ -1598,6 +1661,7 @@ def test_wf_st_singl_2(plugin): wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1613,7 +1677,7 @@ def test_wf_st_singl_2(plugin): assert odir.exists() -def test_wf_ndst_singl_2(plugin): +def test_wf_ndst_singl_2(plugin, tmpdir): """ workflow with three tasks, third one connected to two previous tasks, splitter on the tasks levels only one input is part of the splitter, the other is a single value @@ -1626,6 +1690,7 @@ def test_wf_ndst_singl_2(plugin): wf.inputs.y = 11 wf.set_output([("out", wf.mult.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1640,7 +1705,7 @@ def test_wf_ndst_singl_2(plugin): # workflows with structures wf(A) -def test_wfasnd_1(plugin): +def test_wfasnd_1(plugin, tmpdir): """ workflow as a node workflow-node with one task and no splitter """ @@ -1653,6 +1718,7 @@ def test_wfasnd_1(plugin): wf.add(wfnd) wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1663,7 +1729,7 @@ def test_wfasnd_1(plugin): assert wf.output_dir.exists() -def test_wfasnd_wfinp_1(plugin): +def test_wfasnd_wfinp_1(plugin, tmpdir): """ workflow as a node workflow-node with one task and no splitter input set for the main workflow @@ -1677,6 +1743,7 @@ def test_wfasnd_wfinp_1(plugin): wf.inputs.x = 2 wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir checksum_before = wf.checksum with Submitter(plugin=plugin) as sub: @@ -1689,7 +1756,7 @@ def test_wfasnd_wfinp_1(plugin): assert wf.output_dir.exists() -def test_wfasnd_wfndupdate(plugin): +def test_wfasnd_wfndupdate(plugin, tmpdir): """ workflow as a node workflow-node with one task and no splitter wfasnode input is updated to use the main workflow input @@ -1704,6 +1771,7 @@ def test_wfasnd_wfndupdate(plugin): wf.add(wfnd) wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1713,7 +1781,7 @@ def test_wfasnd_wfndupdate(plugin): assert wf.output_dir.exists() -def test_wfasnd_wfndupdate_rerun(plugin): +def test_wfasnd_wfndupdate_rerun(plugin, tmpdir): """ workflow as a node workflow-node with one task and no splitter wfasnode is run first and later is @@ -1723,6 +1791,7 @@ def test_wfasnd_wfndupdate_rerun(plugin): wfnd = Workflow(name="wfnd", input_spec=["x"], x=2) wfnd.add(add2(name="add2", x=wfnd.lzin.x)) wfnd.set_output([("out", wfnd.add2.lzout.out)]) + wfnd.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wfnd) @@ -1734,6 +1803,7 @@ def test_wfasnd_wfndupdate_rerun(plugin): wf.wfnd.inputs.x = wf.lzin.x wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1748,6 +1818,7 @@ def test_wfasnd_wfndupdate_rerun(plugin): wf_o.add(wf) wf_o.set_output([("out", wf_o.wf.lzout.out)]) wf_o.plugin = plugin + wf_o.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf_o) @@ -1757,7 +1828,7 @@ def test_wfasnd_wfndupdate_rerun(plugin): assert wf_o.output_dir.exists() -def test_wfasnd_st_1(plugin): +def test_wfasnd_st_1(plugin, tmpdir): """ workflow as a node workflow-node with one task, splitter for wfnd @@ -1772,6 +1843,7 @@ def test_wfasnd_st_1(plugin): wf.add(wfnd) wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir checksum_before = wf.checksum with Submitter(plugin=plugin) as sub: @@ -1784,7 +1856,7 @@ def test_wfasnd_st_1(plugin): assert wf.output_dir.exists() -def test_wfasnd_st_updatespl_1(plugin): +def test_wfasnd_st_updatespl_1(plugin, tmpdir): """ workflow as a node workflow-node with one task, splitter for wfnd is set after add @@ -1799,6 +1871,7 @@ def test_wfasnd_st_updatespl_1(plugin): wfnd.split("x") wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1809,7 +1882,7 @@ def test_wfasnd_st_updatespl_1(plugin): assert wf.output_dir.exists() -def test_wfasnd_ndst_1(plugin): +def test_wfasnd_ndst_1(plugin, tmpdir): """ workflow as a node workflow-node with one task, splitter for node @@ -1825,6 +1898,7 @@ def test_wfasnd_ndst_1(plugin): wf.add(wfnd) wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1835,7 +1909,7 @@ def test_wfasnd_ndst_1(plugin): assert wf.output_dir.exists() -def test_wfasnd_ndst_updatespl_1(plugin): +def test_wfasnd_ndst_updatespl_1(plugin, tmpdir): """ workflow as a node workflow-node with one task, splitter for node added after add @@ -1852,6 +1926,7 @@ def test_wfasnd_ndst_updatespl_1(plugin): wfnd.add2.split("x") wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1862,12 +1937,12 @@ def test_wfasnd_ndst_updatespl_1(plugin): assert wf.output_dir.exists() -def test_wfasnd_wfst_1(plugin): +def test_wfasnd_wfst_1(plugin, tmpdir): """ workflow as a node workflow-node with one task, splitter for the main workflow """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.lzin.x) wfnd.add(add2(name="add2", x=wfnd.lzin.x)) wfnd.set_output([("out", wfnd.add2.lzout.out)]) @@ -1893,7 +1968,7 @@ def test_wfasnd_wfst_1(plugin): # workflows with structures wf(A) -> B -def test_wfasnd_st_2(plugin): +def test_wfasnd_st_2(plugin, tmpdir): """ workflow as a node, the main workflow has two tasks, splitter for wfnd @@ -1910,6 +1985,7 @@ def test_wfasnd_st_2(plugin): wf.add(add2(name="add2", x=wf.wfnd.lzout.out)) wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1920,7 +1996,7 @@ def test_wfasnd_st_2(plugin): assert wf.output_dir.exists() -def test_wfasnd_wfst_2(plugin): +def test_wfasnd_wfst_2(plugin, tmpdir): """ workflow as a node, the main workflow has two tasks, splitter for the main workflow @@ -1937,6 +2013,7 @@ def test_wfasnd_wfst_2(plugin): wf.inputs.y = [1, 10] wf.set_output([("out", wf.add2.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1953,7 +2030,7 @@ def test_wfasnd_wfst_2(plugin): # workflows with structures A -> wf(B) -def test_wfasnd_ndst_3(plugin): +def test_wfasnd_ndst_3(plugin, tmpdir): """ workflow as the second node, the main workflow has two tasks, splitter for the first task @@ -1970,6 +2047,7 @@ def test_wfasnd_ndst_3(plugin): wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -1980,7 +2058,7 @@ def test_wfasnd_ndst_3(plugin): assert wf.output_dir.exists() -def test_wfasnd_wfst_3(plugin): +def test_wfasnd_wfst_3(plugin, tmpdir): """ workflow as the second node, the main workflow has two tasks, splitter for the main workflow @@ -1998,6 +2076,7 @@ def test_wfasnd_wfst_3(plugin): wf.set_output([("out", wf.wfnd.lzout.out)]) wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -2011,6 +2090,91 @@ def test_wfasnd_wfst_3(plugin): assert odir.exists() +# workflows with structures wfns(A->B) + + +def test_wfasnd_4(plugin, tmpdir): + """ workflow as a node + workflow-node with two tasks and no splitter + """ + wfnd = Workflow(name="wfnd", input_spec=["x"]) + wfnd.add(add2(name="add2_1st", x=wfnd.lzin.x)) + wfnd.add(add2(name="add2_2nd", x=wfnd.add2_1st.lzout.out)) + wfnd.set_output([("out", wfnd.add2_2nd.lzout.out)]) + wfnd.inputs.x = 2 + + wf = Workflow(name="wf", input_spec=["x"]) + wf.add(wfnd) + wf.set_output([("out", wf.wfnd.lzout.out)]) + wf.plugin = plugin + wf.cache_dir = tmpdir + + with Submitter(plugin=plugin) as sub: + sub(wf) + + results = wf.result() + assert results.output.out == 6 + # checking the output directory + assert wf.output_dir.exists() + + +def test_wfasnd_ndst_4(plugin, tmpdir): + """ workflow as a node + workflow-node with two tasks, + splitter for node + """ + wfnd = Workflow(name="wfnd", input_spec=["x"]) + wfnd.add(add2(name="add2_1st", x=wfnd.lzin.x).split("x")) + wfnd.add(add2(name="add2_2nd", x=wfnd.add2_1st.lzout.out)) + wfnd.set_output([("out", wfnd.add2_2nd.lzout.out)]) + # TODO: without this the test is failing + wfnd.plugin = plugin + wfnd.inputs.x = [2, 4] + + wf = Workflow(name="wf", input_spec=["x"]) + wf.add(wfnd) + wf.set_output([("out", wf.wfnd.lzout.out)]) + wf.plugin = plugin + wf.cache_dir = tmpdir + + with Submitter(plugin=plugin) as sub: + sub(wf) + + results = wf.result() + assert results.output.out == [6, 8] + # checking the output directory + assert wf.output_dir.exists() + + +def test_wfasnd_wfst_4(plugin, tmpdir): + """ workflow as a node + workflow-node with two tasks, + splitter for the main workflow + """ + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) + wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.lzin.x) + wfnd.add(add2(name="add2_1st", x=wfnd.lzin.x)) + wfnd.add(add2(name="add2_2nd", x=wfnd.add2_1st.lzout.out)) + wfnd.set_output([("out", wfnd.add2_2nd.lzout.out)]) + + wf.add(wfnd) + wf.split("x") + wf.inputs.x = [2, 4] + wf.set_output([("out", wf.wfnd.lzout.out)]) + wf.plugin = plugin + + with Submitter(plugin=plugin) as sub: + sub(wf) + # assert wf.output_dir.exists() + results = wf.result() + assert results[0].output.out == 6 + assert results[1].output.out == 8 + # checking all directories + assert wf.output_dir + for odir in wf.output_dir: + assert odir.exists() + + # Testing caching @@ -3497,7 +3661,7 @@ def test_workflow_combine2(tmpdir): # testing lzout.all to collect all of the results and let FunctionTask deal with it -def test_wf_lzoutall_1(plugin): +def test_wf_lzoutall_1(plugin, tmpdir): """ workflow with 2 tasks, no splitter passing entire result object to add2_sub2_res function by using lzout.all syntax @@ -3509,6 +3673,7 @@ def test_wf_lzoutall_1(plugin): wf.inputs.x = 2 wf.inputs.y = 3 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -3518,7 +3683,7 @@ def test_wf_lzoutall_1(plugin): assert 8 == results.output.out -def test_wf_lzoutall_1a(plugin): +def test_wf_lzoutall_1a(plugin, tmpdir): """ workflow with 2 tasks, no splitter passing entire result object to add2_res function by using lzout.all syntax in the node connections and for wf output @@ -3530,6 +3695,7 @@ def test_wf_lzoutall_1a(plugin): wf.inputs.x = 2 wf.inputs.y = 3 wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -3539,7 +3705,7 @@ def test_wf_lzoutall_1a(plugin): assert results.output.out_all == {"out_add": 8, "out_sub": 4} -def test_wf_lzoutall_st_1(plugin): +def test_wf_lzoutall_st_1(plugin, tmpdir): """ workflow with 2 tasks, no splitter passing entire result object to add2_res function by using lzout.all syntax @@ -3551,6 +3717,7 @@ def test_wf_lzoutall_st_1(plugin): wf.inputs.x = [2, 20] wf.inputs.y = [3, 30] wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -3560,7 +3727,7 @@ def test_wf_lzoutall_st_1(plugin): assert results.output.out_add == [8, 62, 62, 602] -def test_wf_lzoutall_st_1a(plugin): +def test_wf_lzoutall_st_1a(plugin, tmpdir): """ workflow with 2 tasks, no splitter passing entire result object to add2_res function by using lzout.all syntax @@ -3572,6 +3739,7 @@ def test_wf_lzoutall_st_1a(plugin): wf.inputs.x = [2, 20] wf.inputs.y = [3, 30] wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -3586,7 +3754,7 @@ def test_wf_lzoutall_st_1a(plugin): ] -def test_wf_lzoutall_st_2(plugin): +def test_wf_lzoutall_st_2(plugin, tmpdir): """ workflow with 2 tasks, no splitter passing entire result object to add2_res function by using lzout.all syntax @@ -3600,6 +3768,7 @@ def test_wf_lzoutall_st_2(plugin): wf.inputs.x = [2, 20] wf.inputs.y = [3, 30] wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -3610,7 +3779,7 @@ def test_wf_lzoutall_st_2(plugin): assert results.output.out_add[1] == [62, 602] -def test_wf_lzoutall_st_2a(plugin): +def test_wf_lzoutall_st_2a(plugin, tmpdir): """ workflow with 2 tasks, no splitter passing entire result object to add2_res function by using lzout.all syntax @@ -3624,6 +3793,7 @@ def test_wf_lzoutall_st_2a(plugin): wf.inputs.x = [2, 20] wf.inputs.y = [3, 30] wf.plugin = plugin + wf.cache_dir = tmpdir with Submitter(plugin=plugin) as sub: sub(wf) @@ -3639,9 +3809,9 @@ def test_wf_lzoutall_st_2a(plugin): # worfklows that have files in the result, the files should be copied to the wf dir -def test_wf_resultfile_1(plugin): +def test_wf_resultfile_1(plugin, tmpdir): """ workflow with a file in the result, file should be copied to the wf dir""" - wf = Workflow(name="wf_file_1", input_spec=["x"]) + wf = Workflow(name="wf_file_1", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_write_file(name="writefile", filename=wf.lzin.x)) wf.inputs.x = "file_1.txt" wf.plugin = plugin @@ -3656,11 +3826,11 @@ def test_wf_resultfile_1(plugin): assert results.output.wf_out == wf.output_dir / "file_1.txt" -def test_wf_resultfile_2(plugin): +def test_wf_resultfile_2(plugin, tmpdir): """ workflow with a list of files in the wf result, all files should be copied to the wf dir """ - wf = Workflow(name="wf_file_1", input_spec=["x"]) + wf = Workflow(name="wf_file_1", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_write_file_list(name="writefile", filename_list=wf.lzin.x)) file_list = ["file_1.txt", "file_2.txt", "file_3.txt"] wf.inputs.x = file_list @@ -3677,11 +3847,11 @@ def test_wf_resultfile_2(plugin): assert file == wf.output_dir / file_list[ii] -def test_wf_resultfile_3(plugin): +def test_wf_resultfile_3(plugin, tmpdir): """ workflow with a dictionaries of files in the wf result, all files should be copied to the wf dir """ - wf = Workflow(name="wf_file_1", input_spec=["x"]) + wf = Workflow(name="wf_file_1", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_write_file_list2dict(name="writefile", filename_list=wf.lzin.x)) file_list = ["file_1.txt", "file_2.txt", "file_3.txt"] wf.inputs.x = file_list @@ -3702,9 +3872,9 @@ def test_wf_resultfile_3(plugin): assert val == wf.output_dir / file_list[ii] -def test_wf_upstream_error1(plugin): +def test_wf_upstream_error1(plugin, tmpdir): """ workflow with two tasks, task2 dependent on an task1 which raised an error""" - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = "hi" # TypeError for adding str and int wf.plugin = plugin @@ -3718,11 +3888,11 @@ def test_wf_upstream_error1(plugin): assert "raised an error" in str(excinfo.value) -def test_wf_upstream_error2(plugin): +def test_wf_upstream_error2(plugin, tmpdir): """ task2 dependent on task1, task1 errors, workflow-level split on task 1 goal - workflow finish running, one output errors but the other doesn't """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = [1, "hi"] # TypeError for adding str and int wf.split("x") # workflow-level split @@ -3737,11 +3907,11 @@ def test_wf_upstream_error2(plugin): assert "raised an error" in str(excinfo.value) -def test_wf_upstream_error3(plugin): +def test_wf_upstream_error3(plugin, tmpdir): """ task2 dependent on task1, task1 errors, task-level split on task 1 goal - workflow finish running, one output errors but the other doesn't """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = [1, "hi"] # TypeError for adding str and int wf.addvar1.split("a") # task-level split @@ -3756,9 +3926,9 @@ def test_wf_upstream_error3(plugin): assert "raised an error" in str(excinfo.value) -def test_wf_upstream_error4(plugin): +def test_wf_upstream_error4(plugin, tmpdir): """ workflow with one task, which raises an error""" - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = "hi" # TypeError for adding str and int wf.plugin = plugin @@ -3771,9 +3941,9 @@ def test_wf_upstream_error4(plugin): assert "addvar1" in str(excinfo.value) -def test_wf_upstream_error5(plugin): +def test_wf_upstream_error5(plugin, tmpdir): """ nested workflow with one task, which raises an error""" - wf_main = Workflow(name="wf_main", input_spec=["x"]) + wf_main = Workflow(name="wf_main", input_spec=["x"], cache_dir=tmpdir) wf = Workflow(name="wf", input_spec=["x"], x=wf_main.lzin.x) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.plugin = plugin @@ -3791,9 +3961,9 @@ def test_wf_upstream_error5(plugin): assert "raised an error" in str(excinfo.value) -def test_wf_upstream_error6(plugin): +def test_wf_upstream_error6(plugin, tmpdir): """ nested workflow with two tasks, the first one raises an error""" - wf_main = Workflow(name="wf_main", input_spec=["x"]) + wf_main = Workflow(name="wf_main", input_spec=["x"], cache_dir=tmpdir) wf = Workflow(name="wf", input_spec=["x"], x=wf_main.lzin.x) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.add(fun_addvar_default(name="addvar2", a=wf.addvar1.lzout.out)) @@ -3812,12 +3982,12 @@ def test_wf_upstream_error6(plugin): assert "raised an error" in str(excinfo.value) -def test_wf_upstream_error7(plugin): +def test_wf_upstream_error7(plugin, tmpdir): """ workflow with three sequential tasks, the first task raises an error the last task is set as the workflow output """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = "hi" # TypeError for adding str and int wf.plugin = plugin @@ -3834,12 +4004,12 @@ def test_wf_upstream_error7(plugin): assert wf.addvar2._errored == wf.addvar3._errored == ["addvar1"] -def test_wf_upstream_error7a(plugin): +def test_wf_upstream_error7a(plugin, tmpdir): """ workflow with three sequential tasks, the first task raises an error the second task is set as the workflow output """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = "hi" # TypeError for adding str and int wf.plugin = plugin @@ -3856,12 +4026,12 @@ def test_wf_upstream_error7a(plugin): assert wf.addvar2._errored == wf.addvar3._errored == ["addvar1"] -def test_wf_upstream_error7b(plugin): +def test_wf_upstream_error7b(plugin, tmpdir): """ workflow with three sequential tasks, the first task raises an error the second and the third tasks are set as the workflow output """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = "hi" # TypeError for adding str and int wf.plugin = plugin @@ -3878,9 +4048,9 @@ def test_wf_upstream_error7b(plugin): assert wf.addvar2._errored == wf.addvar3._errored == ["addvar1"] -def test_wf_upstream_error8(plugin): +def test_wf_upstream_error8(plugin, tmpdir): """ workflow with three tasks, the first one raises an error, so 2 others are removed""" - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = "hi" # TypeError for adding str and int wf.plugin = plugin @@ -3898,13 +4068,13 @@ def test_wf_upstream_error8(plugin): assert wf.addvar2._errored == wf.addtwo._errored == ["addvar1"] -def test_wf_upstream_error9(plugin): +def test_wf_upstream_error9(plugin, tmpdir): """ workflow with five tasks with two "branches", one branch has an error, the second is fine the errored branch is connected to the workflow output """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = 2 wf.add(fun_addvar(name="err", a=wf.addvar1.lzout.out, b="hi")) @@ -3924,14 +4094,14 @@ def test_wf_upstream_error9(plugin): assert wf.follow_err._errored == ["err"] -def test_wf_upstream_error9a(plugin): +def test_wf_upstream_error9a(plugin, tmpdir): """ workflow with five tasks with two "branches", one branch has an error, the second is fine the branch without error is connected to the workflow output so the workflow finished clean """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = 2 wf.add(fun_addvar(name="err", a=wf.addvar1.lzout.out, b="hi")) @@ -3948,13 +4118,13 @@ def test_wf_upstream_error9a(plugin): assert wf.follow_err._errored == ["err"] -def test_wf_upstream_error9b(plugin): +def test_wf_upstream_error9b(plugin, tmpdir): """ workflow with five tasks with two "branches", one branch has an error, the second is fine both branches are connected to the workflow output """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wf.add(fun_addvar_default(name="addvar1", a=wf.lzin.x)) wf.inputs.x = 2 wf.add(fun_addvar(name="err", a=wf.addvar1.lzout.out, b="hi")) @@ -4002,7 +4172,7 @@ def exporting_graphs(wf, name): def test_graph_1(tmpdir): """creating a set of graphs, wf with two nodes""" - wf = Workflow(name="wf", input_spec=["x", "y"]) + wf = Workflow(name="wf", input_spec=["x", "y"], cache_dir=tmpdir) wf.add(multiply(name="mult_1", x=wf.lzin.x, y=wf.lzin.y)) wf.add(multiply(name="mult_2", x=wf.lzin.x, y=wf.lzin.x)) wf.add(add2(name="add2", x=wf.mult_1.lzout.out)) @@ -4043,7 +4213,7 @@ def test_graph_1st(tmpdir): """creating a set of graphs, wf with two nodes some nodes have splitters, should be marked with blue color """ - wf = Workflow(name="wf", input_spec=["x", "y"]) + wf = Workflow(name="wf", input_spec=["x", "y"], cache_dir=tmpdir) wf.add(multiply(name="mult_1", x=wf.lzin.x, y=wf.lzin.y).split("x")) wf.add(multiply(name="mult_2", x=wf.lzin.x, y=wf.lzin.x)) wf.add(add2(name="add2", x=wf.mult_1.lzout.out)) @@ -4084,7 +4254,7 @@ def test_graph_1st_cmb(tmpdir): the first one has a splitter, the second has a combiner, so the third one is stateless first two nodes should be blue and the arrow between them should be blue """ - wf = Workflow(name="wf", input_spec=["x", "y"]) + wf = Workflow(name="wf", input_spec=["x", "y"], cache_dir=tmpdir) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split("x")) wf.add(add2(name="add2", x=wf.mult.lzout.out).combine("mult.x")) wf.add(list_sum(name="sum", x=wf.add2.lzout.out)) @@ -4123,7 +4293,7 @@ def test_graph_1st_cmb(tmpdir): def test_graph_2(tmpdir): """creating a graph, wf with one worfklow as a node""" - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.lzin.x) wfnd.add(add2(name="add2", x=wfnd.lzin.x)) wfnd.set_output([("out", wfnd.add2.lzout.out)]) @@ -4157,7 +4327,7 @@ def test_graph_2st(tmpdir): """creating a set of graphs, wf with one worfklow as a node the inner workflow has a state, so should be blue """ - wf = Workflow(name="wf", input_spec=["x"]) + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.lzin.x).split("x") wfnd.add(add2(name="add2", x=wfnd.lzin.x)) wfnd.set_output([("out", wfnd.add2.lzout.out)]) @@ -4191,7 +4361,7 @@ def test_graph_2st(tmpdir): def test_graph_3(tmpdir): """creating a set of graphs, wf with two nodes (one node is a workflow)""" - wf = Workflow(name="wf", input_spec=["x", "y"]) + wf = Workflow(name="wf", input_spec=["x", "y"], cache_dir=tmpdir) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.mult.lzout.out) @@ -4233,7 +4403,7 @@ def test_graph_3st(tmpdir): the first node has a state and it should be passed to the second node (blue node and a wfasnd, and blue arrow from the node to the wfasnd) """ - wf = Workflow(name="wf", input_spec=["x", "y"]) + wf = Workflow(name="wf", input_spec=["x", "y"], cache_dir=tmpdir) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split("x")) wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.mult.lzout.out) @@ -4274,7 +4444,7 @@ def test_graph_4(tmpdir): """creating a set of graphs, wf with two nodes (one node is a workflow with two nodes inside). Connection from the node to the inner workflow. """ - wf = Workflow(name="wf", input_spec=["x", "y"]) + wf = Workflow(name="wf", input_spec=["x", "y"], cache_dir=tmpdir) wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.mult.lzout.out) wfnd.add(add2(name="add2_a", x=wfnd.lzin.x)) @@ -4317,7 +4487,7 @@ def test_graph_5(tmpdir): """creating a set of graphs, wf with two nodes (one node is a workflow with two nodes inside). Connection from the inner workflow to the node. """ - wf = Workflow(name="wf", input_spec=["x", "y"]) + wf = Workflow(name="wf", input_spec=["x", "y"], cache_dir=tmpdir) wfnd = Workflow(name="wfnd", input_spec=["x"], x=wf.lzin.x) wfnd.add(add2(name="add2_a", x=wfnd.lzin.x)) wfnd.add(add2(name="add2_b", x=wfnd.add2_a.lzout.out))