Skip to content

Commit

Permalink
Update image_labeling and first example of Fractal integration (ref #64)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcompa committed Jul 4, 2022
1 parent 621b12a commit 8e2998a
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ monitoring.db

output*
tmp_data
LOG_*
slurm-*.out
Local_backup
31 changes: 17 additions & 14 deletions examples/example_uzh_1_well_2x2_sites.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ echo

echo 'Add yokogawa_to_zarr task'
$CMD task add yokogawa_to_zarr zarr zarr well
$CMD task list
echo

#echo 'Add replicate_zarr_structure'
#$CMD task add replicate_zarr_structure zarr zarr plate
$CMD task list
echo
#echo

#echo 'Add illumination_correction'
#$CMD task add illumination_correction zarr zarr well
#$CMD task list
echo 'Add illumination_correction'
$CMD task add illumination_correction zarr zarr well
echo

echo 'Add replicate_zarr_structure_mip'
$CMD task add replicate_zarr_structure_mip zarr zarr plate
$CMD task list
echo

echo 'Add image_labeling'
$CMD task add image_labeling zarr zarr well
echo

echo 'Add maximum_intensity_projection'
Expand All @@ -64,26 +64,29 @@ echo

echo 'Create workflow'
$CMD workflow new mwe-test wftest create_zarr_structure
$CMD workflow list mwe-test
echo

echo 'Add yokogawa_to_zarr task'
$CMD workflow add-task mwe-test wftest yokogawa_to_zarr
$CMD workflow list mwe-test
echo

#echo 'Add illumination_correction'
#$CMD workflow add-task mwe-test wftest illumination_correction
#$CMD workflow list mwe-test
#echo
echo 'Add illumination_correction'
$CMD workflow add-task mwe-test wftest illumination_correction
echo

echo 'Add image_labeling'
$CMD workflow add-task mwe-test wftest image_labeling
echo

echo 'Add replicate_zarr_structure_mip'
$CMD workflow add-task mwe-test wftest replicate_zarr_structure_mip
$CMD workflow list mwe-test
echo

echo 'Add maximum_intensity_projection'
$CMD workflow add-task mwe-test wftest maximum_intensity_projection
echo

echo 'Final list:'
$CMD workflow list mwe-test
echo

Expand Down
2 changes: 1 addition & 1 deletion examples/wf_params_uzh_1_well_2x2_sites.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"dims": [2, 2],
"coarsening_xy": 2,
"coarsening_z": 1,
"num_levels": 4,
"num_levels": 5,
"channel_file": "wf_params_uzh_cardiac_channels.json",
"path_dict_corr": "wf_params_uzh_cardiac_illumination.json"
}
3 changes: 2 additions & 1 deletion fractal/dictionary_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
Institute for Biomedical Research and Pelkmans Lab from the University of
Zurich.
"""

from fractal.tasks.create_zarr_structure import create_zarr_structure
from fractal.tasks.create_zarr_structure_multifov import (
create_zarr_structure_multifov,
)
from fractal.tasks.illumination_correction import illumination_correction
from fractal.tasks.image_labeling import image_labeling
from fractal.tasks.maximum_intensity_projection import (
maximum_intensity_projection,
)
Expand All @@ -36,3 +36,4 @@
dict_tasks["maximum_intensity_projection"] = maximum_intensity_projection
dict_tasks["replicate_zarr_structure"] = replicate_zarr_structure
dict_tasks["illumination_correction"] = illumination_correction
dict_tasks["image_labeling"] = image_labeling
36 changes: 31 additions & 5 deletions fractal/fractal_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ def workflow_apply(
path_dict_channels = params["channel_file"]
path_dict_corr = params["path_dict_corr"]
delete_input = params.get("delete_input", False)
labeling_channel = params.get("labeling_channel", "A01_C01")

# FIXME validate tasks somewhere?

Expand All @@ -478,10 +479,27 @@ def workflow_apply(
exclusive=fractal_config.exclusive,
)
htex = define_HighThroughputExecutor(
provider=provider, max_workers=fractal_config.max_workers
provider=provider,
max_workers=fractal_config.max_workers,
label="cpu",
)
provider_gpu = define_SlurmProvider(
nodes_per_block=fractal_config.nodes_per_block,
cores_per_node=fractal_config.cores_per_node,
mem_per_node_GB=fractal_config.mem_per_node_GB,
partition=fractal_config.partition_gpu,
worker_init=fractal_config.worker_init,
max_blocks=fractal_config.max_blocks,
exclusive=fractal_config.exclusive,
)
htex_gpu = define_HighThroughputExecutor(
provider=provider_gpu,
max_workers=fractal_config.max_workers,
label="gpu",
)

monitoring = define_MonitoringHub(workflow_name=workflow_name)
config = Config(executors=[htex], monitoring=monitoring)
config = Config(executors=[htex, htex_gpu], monitoring=monitoring)
# config = Config(executors=[htex])
parsl.clear()
parsl.load(config)
Expand All @@ -490,7 +508,7 @@ def workflow_apply(

debug(dict_tasks)

@parsl.python_app
@parsl.python_app(executors=["cpu"])
def collect_intermediate_results(inputs=[]):
return [x for x in inputs]

Expand All @@ -507,7 +525,7 @@ def collect_intermediate_results(inputs=[]):
num_levels=num_levels,
)

@parsl.python_app
@parsl.python_app(executors=["cpu"])
def app_create_zarr_structure(**kwargs_):
os.environ["OPENBLAS_NUM_THREADS"] = OPENBLAS_NUM_THREADS
import fractal.dictionary_tasks # noqa: F401
Expand Down Expand Up @@ -542,6 +560,7 @@ def app_create_zarr_structure(**kwargs_):
"yokogawa_to_zarr and take a global dict of paths."
)

executor = "cpu"
if task == "yokogawa_to_zarr":
kwargs = dict(
in_path=resources_in[0], # FIXME
Expand Down Expand Up @@ -582,8 +601,15 @@ def app_create_zarr_structure(**kwargs_):
overwrite=True,
# background=background,
)
elif task == "image_labeling":
kwargs = dict(
chl_list=chl_list,
coarsening_xy=coarsening_xy,
labeling_channel=labeling_channel,
)
executor = "gpu"

@python_app
@python_app(executors=[executor])
def app(zarrurl, **kwargs_):
os.environ["OPENBLAS_NUM_THREADS"] = OPENBLAS_NUM_THREADS
import fractal.dictionary_tasks # noqa: F401
Expand Down
6 changes: 3 additions & 3 deletions fractal/fractal_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
Institute for Biomedical Research and Pelkmans Lab from the University of
Zurich.
"""

# Parameters of parsl.executors.HighThroughputExecutor
max_workers = 1 # This is the maximum number of workers per block
max_workers = 2 # This is the maximum number of workers per block

# Parameters of parsl.providers.SlurmProvider
# Note that worker_init is a command which is included at the beginning of
# each SLURM submission scripts
nodes_per_block = 1 # This implies that a block corresponds to a node
max_blocks = 15 # Maximum number of blocks (=nodes) that parsl can use
max_blocks = 24 # Maximum number of blocks (=nodes) that parsl can use
exclusive = False
cores_per_node = 16
mem_per_node_GB = 60
partition = "main"
partition_gpu = "gpu"
worker_init = "source /opt/easybuild/software/Anaconda3/2019.07/"
worker_init += "etc/profile.d/conda.sh\n"
worker_init += "conda activate fractal"
8 changes: 4 additions & 4 deletions fractal/parsl_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
Institute for Biomedical Research and Pelkmans Lab from the University of
Zurich.
"""

from parsl.addresses import address_by_hostname
from parsl.channels import LocalChannel
from parsl.executors import HighThroughputExecutor
Expand Down Expand Up @@ -57,17 +56,18 @@ def define_SlurmProvider(
parallelism=1,
exclusive=exclusive,
)

return slurm


def define_HighThroughputExecutor(provider=None, max_workers=40):
def define_HighThroughputExecutor(provider=None, max_workers=40, label="htex"):

htex = HighThroughputExecutor(
label="htex",
label=label,
address=address_by_hostname(),
# worker_debug=True,
max_workers=max_workers,
cores_per_worker=8,
cores_per_worker=16,
provider=provider,
cpu_affinity="block",
)
Expand Down
22 changes: 22 additions & 0 deletions fractal/tasks/image_labeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,30 @@ def image_labeling(
parser.add_argument(
"-z", "--zarrurl", help="zarr url, at the FOV level", required=True
)
parser.add_argument(
"-C",
"--chl_list",
nargs="+",
help="list of channel names (e.g. A01_C01)",
)
parser.add_argument(
"-cxy",
"--coarsening_xy",
default=2,
type=int,
help="coarsening factor along X and Y (optional, defaults to 2)",
)
parser.add_argument(
"-lc",
"--labeling_channel",
help="name of channel for labeling (e.g. A01_C01)",
)

args = parser.parse_args()
image_labeling(
args.zarrurl,
coarsening_xy=args.coarsening_xy,
chl_list=args.chl_list,
labeling_channel=args.labeling_channel,
# FIXME: more arguments
)

0 comments on commit 8e2998a

Please sign in to comment.