Skip to content

Commit

Permalink
separate default tasks into threadpool and htex
Browse files Browse the repository at this point in the history
  • Loading branch information
svandenhaute committed Oct 29, 2023
1 parent bd314ae commit 749bd66
Show file tree
Hide file tree
Showing 18 changed files with 97 additions and 90 deletions.
13 changes: 4 additions & 9 deletions configs/vsc_hortense.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
---
container:
engine: "apptainer"
uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda"
ModelEvaluation:
cores_per_worker: 1
max_walltime: 0.3
simulation_engine: 'openmm'
apptainer:
uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda"
SlurmProvider:
partition: "cpu_rome"
account: "2022_069"
Expand All @@ -17,10 +17,7 @@ ModelEvaluation:
scheduler_options: "#SBATCH --clusters=dodrio\n"
ModelTraining:
cores_per_worker: 12
max_walltime: 1
gpu: true
apptainer:
uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda"
SlurmProvider:
partition: "gpu_rome_a100"
account: "2022_069"
Expand All @@ -32,9 +29,7 @@ ModelTraining:
exclusive: false
scheduler_options: "#SBATCH --clusters=dodrio\n#SBATCH --gpus=1\n"
ReferenceEvaluation:
max_walltime: 0.3
apptainer:
uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda"
max_walltime: 20
SlurmProvider:
partition: "cpu_rome"
account: "2022_069"
Expand Down
8 changes: 5 additions & 3 deletions psiflow/committee.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def _compute_disagreements(
return disagreements


compute_disagreements = python_app(_compute_disagreements, executors=["Default"])
compute_disagreements = python_app(
_compute_disagreements, executors=["default_threads"]
)


# expose outside filter app to reproduce filtering behavior elsewhere
Expand All @@ -64,7 +66,7 @@ def _filter_disagreements(disagreements: np.ndarray, nstates: int):
return indices


filter_disagreements = python_app(_filter_disagreements, executors=["Default"]) # fmt: skip
filter_disagreements = python_app(_filter_disagreements, executors=["default_threads"]) # fmt: skip


@typeguard.typechecked
Expand All @@ -86,7 +88,7 @@ def _extract_highest(
)


extract_highest = python_app(_extract_highest, executors=["Default"])
extract_highest = python_app(_extract_highest, executors=["default_threads"])


@typeguard.typechecked
Expand Down
30 changes: 16 additions & 14 deletions psiflow/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ def _canonical_orientation(
write_extxyz(f, data)


canonical_orientation = python_app(_canonical_orientation, executors=["Default"])
canonical_orientation = python_app(
_canonical_orientation, executors=["default_threads"]
)


@typeguard.typechecked
Expand All @@ -180,7 +182,7 @@ def reset_atoms(
return _atoms


app_reset_atoms = python_app(reset_atoms, executors=["Default"])
app_reset_atoms = python_app(reset_atoms, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -204,7 +206,7 @@ def write_dataset(
return _data


app_write_dataset = python_app(write_dataset, executors=["Default"])
app_write_dataset = python_app(write_dataset, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -214,7 +216,7 @@ def _write_atoms(atoms: FlowAtoms, outputs=[]):
write(outputs[0].filepath, atoms)


write_atoms = python_app(_write_atoms, executors=["Default"])
write_atoms = python_app(_write_atoms, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -248,7 +250,7 @@ def read_dataset(
return data


app_read_dataset = python_app(read_dataset, executors=["Default"])
app_read_dataset = python_app(read_dataset, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -267,7 +269,7 @@ def reset_dataset(
write_extxyz(f, data)


app_reset_dataset = python_app(reset_dataset, executors=["Default"])
app_reset_dataset = python_app(reset_dataset, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -278,7 +280,7 @@ def join_dataset(inputs: List[File] = [], outputs: List[File] = []) -> None:
write_dataset(data, outputs=[outputs[0]])


app_join_dataset = python_app(join_dataset, executors=["Default"])
app_join_dataset = python_app(join_dataset, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -287,7 +289,7 @@ def get_length_dataset(inputs: List[File] = []) -> int:
return len(data)


app_length_dataset = python_app(get_length_dataset, executors=["Default"])
app_length_dataset = python_app(get_length_dataset, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -311,7 +313,7 @@ def _get_indices(
return indices


get_indices = python_app(_get_indices, executors=["Default"])
get_indices = python_app(_get_indices, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -372,7 +374,7 @@ def compute_errors(
return errors[outer_mask]


app_compute_errors = python_app(compute_errors, executors=["Default"])
app_compute_errors = python_app(compute_errors, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -414,7 +416,7 @@ def apply_offset(
write_dataset(data, outputs=[outputs[0]])


app_apply_offset = python_app(apply_offset, executors=["Default"])
app_apply_offset = python_app(apply_offset, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -423,7 +425,7 @@ def get_elements(inputs=[]) -> set[str]:
return set([e for atoms in data for e in atoms.elements])


app_get_elements = python_app(get_elements, executors=["Default"])
app_get_elements = python_app(get_elements, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -459,15 +461,15 @@ def assign_identifiers(
return identifier


app_assign_identifiers = python_app(assign_identifiers, executors=["Default"])
app_assign_identifiers = python_app(assign_identifiers, executors=["default_threads"])


@typeguard.typechecked
class Dataset:
"""Container to represent a dataset of atomic structures
Args:
context: an `ExecutionContext` instance with a 'Default' executor.
context: an `ExecutionContext` instance with a 'default_threads' executor.
atoms_list: a list of `Atoms` instances which represent the dataset.
data_future: a `parsl.app.futures.DataFuture` instance that points
to an `.xyz` file.
Expand Down
44 changes: 25 additions & 19 deletions psiflow/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class ExecutionContextLoader:
@staticmethod
def parse_config(yaml_dict: dict):
definitions = []

for name in ["ModelEvaluation", "ModelTraining", "ReferenceEvaluation"]:
if name in yaml_dict:
_dict = yaml_dict.pop(name)
Expand All @@ -218,18 +219,11 @@ def parse_config(yaml_dict: dict):
s = _dict["mpi_command"]
_dict["mpi_command"] = lambda x, s=s: s.format(x)

# check if container is requested
container_kwargs = None
if "apptainer" in _dict:
container_kwargs = _dict.pop("apptainer")
container_kwargs["apptainer_or_singularity"] = "apptainer"
elif "singularity" in _dict:
container_kwargs = _dict.pop("singularity")
container_kwargs["apptainer_or_singularity"] = "singularity"
if container_kwargs is not None:
if "container" in yaml_dict:
assert not _dict["use_threadpool"] # not possible with container
container_kwargs["enable_gpu"] = _dict["gpu"]
launcher = ContainerizedLauncher(**container_kwargs)
launcher = ContainerizedLauncher(
**yaml_dict["container"], enable_gpu=_dict["gpu"]
)
else:
launcher = SimpleLauncher()

Expand Down Expand Up @@ -321,7 +315,7 @@ def load(
format_string="%(levelname)s - %(name)s - %(message)s",
)

# create parsl executors and config
# create main parsl executors
executors = []
for definition in definitions:
if not definition.use_threadpool:
Expand All @@ -339,16 +333,28 @@ def load(
label=definition.name(),
)
executors.append(executor)
executors.append(
ThreadPoolExecutor(
max_threads=psiflow_config.pop("default_threads"),
working_dir=str(path),
label="Default",
)

# create default executors
if "container" in psiflow_config:
launcher = ContainerizedLauncher(**psiflow_config["container"])
else:
launcher = SimpleLauncher()
htex = HighThroughputExecutor(
label="default_htex",
address=psiflow_config.pop("htex_address"),
working_dir=str(path / "default_htex"),
cores_per_worker=1,
provider=LocalProvider(launcher=launcher), # noqa: F405
)
executors.append(htex)
threadpool = ThreadPoolExecutor(
label="default_threads",
max_threads=psiflow_config.pop("default_threads"),
working_dir=str(path),
)
executors.append(threadpool)

# remove additional kwargs
psiflow_config.pop("htex_address")
config = Config(
executors=executors,
run_dir=str(path),
Expand Down
14 changes: 7 additions & 7 deletions psiflow/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _trace_identifier(
return identifier_traces


trace_identifier = python_app(_trace_identifier, executors=["Default"])
trace_identifier = python_app(_trace_identifier, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -67,7 +67,7 @@ def _save_walker_logs(data: dict[str, list], path: Path) -> str:
return s


save_walker_logs = python_app(_save_walker_logs, executors=["Default"])
save_walker_logs = python_app(_save_walker_logs, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -96,7 +96,7 @@ def _save_dataset_log(data: dict[str, list], path: Path) -> str:
return s


save_dataset_log = python_app(_save_dataset_log, executors=["Default"])
save_dataset_log = python_app(_save_dataset_log, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -136,7 +136,7 @@ def _log_walker(
return data


log_walker = python_app(_log_walker, executors=["Default"])
log_walker = python_app(_log_walker, executors=["default_threads"])


@typeguard.typechecked
Expand All @@ -151,7 +151,7 @@ def _gather_walker_logs(*walker_data: dict) -> dict[str, list]:
return data


gather_walker_logs = python_app(_gather_walker_logs, executors=["Default"])
gather_walker_logs = python_app(_gather_walker_logs, executors=["default_threads"])


@typeguard.typechecked
Expand Down Expand Up @@ -237,7 +237,7 @@ def _log_dataset(inputs: list[File] = []) -> dict[str, list]:
return data


log_dataset = python_app(_log_dataset, executors=["Default"])
log_dataset = python_app(_log_dataset, executors=["default_threads"])


def fix_plotly_layout(figure):
Expand Down Expand Up @@ -347,7 +347,7 @@ def _to_wandb(
wandb.finish()


to_wandb = python_app(_to_wandb, executors=["Default"])
to_wandb = python_app(_to_wandb, executors=["default_threads"])


@typeguard.typechecked
Expand Down
6 changes: 3 additions & 3 deletions psiflow/parsl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ class ContainerizedLauncher(Launcher):
def __init__(
self,
uri: str,
apptainer_or_singularity: str = "apptainer",
engine: str = "apptainer", # or singularity
addopts: str = ADDOPTS,
entrypoint: str = ENTRYPOINT,
enable_gpu: Optional[bool] = False,
) -> None:
super().__init__(debug=True)
self.uri = uri # required by Parsl parent class to assign attributes
self.apptainer_or_singularity = apptainer_or_singularity
self.engine = engine
self.addopts = addopts
self.entrypoint = entrypoint
self.enable_gpu = enable_gpu

self.launch_command = ""
self.launch_command += apptainer_or_singularity
self.launch_command += engine
self.launch_command += " exec "
self.launch_command += addopts
self.launch_command += " --bind {}".format(
Expand Down
2 changes: 1 addition & 1 deletion psiflow/reference/_cp2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def create_apps(cls):
)
singlepoint_post = python_app(
cp2k_singlepoint_post,
executors=["Default"],
executors=["default_threads"],
cache=False,
)

Expand Down
2 changes: 1 addition & 1 deletion psiflow/reference/_emt.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def parameters(self):

@classmethod
def create_apps(cls) -> None:
app_evaluate_single = python_app(evaluate_emt, executors=["Default"])
app_evaluate_single = python_app(evaluate_emt, executors=["default_threads"])
context = psiflow.context()
context.register_app(cls, "evaluate_single", app_evaluate_single)
# see https://stackoverflow.com/questions/1817183/using-super-with-a-class-method
Expand Down
2 changes: 1 addition & 1 deletion psiflow/reference/_nwchem.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def create_apps(cls):
)
singlepoint_post = python_app(
nwchem_singlepoint_post,
executors=["Default"],
executors=["default_threads"],
)

@join_app
Expand Down
2 changes: 1 addition & 1 deletion psiflow/reference/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
logger = logging.getLogger(__name__) # logging per module


@python_app(executors=["Default"])
@python_app(executors=["default_threads"])
def extract_energy(state):
if state.reference_status:
return state.info["energy"]
Expand Down
Loading

0 comments on commit 749bd66

Please sign in to comment.