Skip to content

Commit

Permalink
Merge pull request #66 from fractal-analytics-platform/34_parsl_config
Browse files Browse the repository at this point in the history
First attempt towards pelkmanslab parsl/slurm config - ref #34
  • Loading branch information
tcompa authored Sep 22, 2022
2 parents 985f797 + 7d8f8de commit 8520b55
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 258 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ nohup.out

file:cachedb*
parsl_executors.log
fractal.log
25 changes: 24 additions & 1 deletion fractal_server/app/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,23 @@
from .runner_utils import get_unique_executor
from .runner_utils import load_parsl_config

from ... import __VERSION__

# from .runner_utils import shutdown_executors

from logging import FileHandler
from logging import Formatter
from logging import getLogger


formatter = Formatter("%(asctime)s; %(levelname)s; %(message)s")
logger = getLogger(__name__)
handler = FileHandler("fractal.log", mode="a")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel("INFO")



def _task_fun(
*,
Expand Down Expand Up @@ -169,6 +184,7 @@ def _atomic_task_factory(
task_executor = get_unique_executor(
workflow_id=workflow_id, task_executor=task.executor
)
logger.info(f"Starting \"{task.name}\" task on \"{task_executor}\" executor.")

parall_level = task.parallelization_level
if metadata and parall_level:
Expand Down Expand Up @@ -232,7 +248,7 @@ def _process_workflow(
this_metadata = deepcopy(metadata)

workflow_id = task.id
load_parsl_config(workflow_id=workflow_id)
load_parsl_config(workflow_id=workflow_id, logger=logger)

apps: List[PythonApp] = []

Expand Down Expand Up @@ -355,9 +371,16 @@ async def submit_workflow(
resource with the same name.
"""


input_paths = input_dataset.paths
output_path = output_dataset.paths[0]

logger.info("*" * 80)
logger.info(f"fractal_server.__VERSION__: {__VERSION__}")
logger.info(f"Start workflow {workflow.name}")
logger.info(f"{input_paths=}")
logger.info(f"{output_path=}")

final_metadata = _process_workflow(
task=workflow,
input_paths=input_paths,
Expand Down
101 changes: 55 additions & 46 deletions fractal_server/app/runner/runner_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
import asyncio
from functools import partial
from functools import wraps
from logging import FileHandler
from logging import Formatter
from logging import getLogger
from typing import Callable

import logging

import parsl
from parsl.addresses import address_by_hostname
from parsl.channels import LocalChannel
Expand All @@ -36,15 +35,6 @@
from ...config import settings


formatter = Formatter("%(asctime)s; %(levelname)s; %(message)s")

logger = getLogger(__name__)
handler = FileHandler("parsl_executors.log", mode="a")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel("INFO")


def add_prefix(*, workflow_id: int, executor_label: str):
return f"{workflow_id}___{executor_label}"

Expand Down Expand Up @@ -74,12 +64,17 @@ def load_parsl_config(
*,
workflow_id: int,
enable_monitoring: bool = True,
logger: logging.Logger = None,
):

config = settings.PARSL_CONFIG
logger.info(f"settings.PARSL_CONFIG: {config}")
if logger is None:
logger = logging.getLogger("logs")

logger.info(f"{settings.PARSL_CONFIG=}")
logger.info(f"{settings.PARSL_DEFAULT_EXECUTOR=}")

allowed_configs = ["local", "pelkmanslab", "fmi", "custom"]
config = settings.PARSL_CONFIG
if config not in allowed_configs:
raise ValueError(f"{config=} not in {allowed_configs=}")
if config == "custom":
Expand All @@ -98,7 +93,7 @@ def load_parsl_config(

# Define two identical (apart from the label) executors
htex = HighThroughputExecutor(
label=add_prefix(workflow_id=workflow_id, executor_label="cpu"),
label=add_prefix(workflow_id=workflow_id, executor_label="cpu-low"),
provider=prov,
address=address_by_hostname(),
)
Expand All @@ -111,21 +106,40 @@ def load_parsl_config(

elif config == "pelkmanslab":

# Define a cpu provider
provider_args = dict(
# Define providers
common_args = dict(
partition="main",
launcher=SrunLauncher(debug=False),
channel=LocalChannel(),
nodes_per_block=1,
init_blocks=1,
min_blocks=0,
max_blocks=4,
walltime="10:00:00",
max_blocks=100,
parallelism=1,
exclusive=False,
walltime="20:00:00",
)
prov_cpu_low = SlurmProvider(
launcher=SrunLauncher(debug=False),
channel=LocalChannel(),
cores_per_node=1,
mem_per_node=7,
**common_args,
)
prov_cpu_mid = SlurmProvider(
launcher=SrunLauncher(debug=False),
channel=LocalChannel(),
cores_per_node=4,
mem_per_node=15,
**common_args,
)
prov_cpu_high = SlurmProvider(
launcher=SrunLauncher(debug=False),
channel=LocalChannel(),
cores_per_node=16,
mem_per_node=63,
**common_args,
)
prov_slurm_cpu = SlurmProvider(**provider_args)

# Define a gpu provider
provider_args = dict(
prov_gpu = SlurmProvider(
partition="gpu",
launcher=SrunLauncher(debug=False),
channel=LocalChannel(),
Expand All @@ -135,29 +149,24 @@ def load_parsl_config(
max_blocks=1,
walltime="10:00:00",
)
prov_slurm_gpu = SlurmProvider(**provider_args)

# Define executors
htex_slurm_cpu = HighThroughputExecutor(
label=add_prefix(workflow_id=workflow_id, executor_label="cpu"),
provider=prov_slurm_cpu,
address=address_by_hostname(),
cpu_affinity="block",
)
htex_slurm_cpu_2 = HighThroughputExecutor(
label=add_prefix(workflow_id=workflow_id, executor_label="cpu-2"),
provider=prov_slurm_cpu,
address=address_by_hostname(),
cpu_affinity="block",
)
htex_slurm_gpu = HighThroughputExecutor(
label=add_prefix(workflow_id=workflow_id, executor_label="gpu"),
provider=prov_slurm_gpu,
address=address_by_hostname(),
cpu_affinity="block",
)

executors = [htex_slurm_cpu, htex_slurm_cpu_2, htex_slurm_gpu]
providers = [prov_cpu_low, prov_cpu_mid, prov_cpu_high, prov_gpu]
labels = ["cpu-low", "cpu-mid", "cpu-high", "gpu"]
# FIXME
list_mem_per_worker = [7, 15, 63, 63] # FIXME
executors = []
for provider, label in zip(providers, labels):
executors.append(
HighThroughputExecutor(
label=add_prefix(workflow_id=workflow_id, executor_label=label),
provider=provider,
mem_per_worker=list_mem_per_worker[labels.index(label)], # FIXME
max_workers=100,
address=address_by_hostname(),
cpu_affinity="block",
)
)

elif config == "fmi":

Expand All @@ -179,7 +188,7 @@ def load_parsl_config(

# Define executors
htex_slurm_cpu = HighThroughputExecutor(
label=add_prefix(workflow_id=workflow_id, executor_label="cpu"),
label=add_prefix(workflow_id=workflow_id, executor_label="cpu-low"),
provider=prov_slurm_cpu,
address=address_by_hostname(),
cpu_affinity="block",
Expand Down
2 changes: 1 addition & 1 deletion fractal_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def DB_ECHO(self):
###########################################################################
DATA_DIR_ROOT: str = fail_getenv("DATA_DIR_ROOT")
PARSL_CONFIG: str = getenv("PARSL_CONFIG", "local")
PARSL_DEFAULT_EXECUTOR: str = getenv("PARSL_DEFAULT_EXECUTOR", "cpu")
PARSL_DEFAULT_EXECUTOR: str = getenv("PARSL_DEFAULT_EXECUTOR", "cpu-low")

@root_validator(pre=True)
def collect_oauth_clients(cls, values):
Expand Down
Loading

0 comments on commit 8520b55

Please sign in to comment.