diff --git a/configs/vsc_hortense.yaml b/configs/vsc_hortense.yaml index 3f7118e..d78abf2 100644 --- a/configs/vsc_hortense.yaml +++ b/configs/vsc_hortense.yaml @@ -1,10 +1,10 @@ --- +container: + engine: "apptainer" + uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda" ModelEvaluation: cores_per_worker: 1 - max_walltime: 0.3 simulation_engine: 'openmm' - apptainer: - uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda" SlurmProvider: partition: "cpu_rome" account: "2022_069" @@ -17,10 +17,7 @@ ModelEvaluation: scheduler_options: "#SBATCH --clusters=dodrio\n" ModelTraining: cores_per_worker: 12 - max_walltime: 1 gpu: true - apptainer: - uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda" SlurmProvider: partition: "gpu_rome_a100" account: "2022_069" @@ -32,9 +29,7 @@ ModelTraining: exclusive: false scheduler_options: "#SBATCH --clusters=dodrio\n#SBATCH --gpus=1\n" ReferenceEvaluation: - max_walltime: 0.3 - apptainer: - uri: "oras://ghcr.io/molmod/psiflow:2.0.0-cuda" + max_walltime: 20 SlurmProvider: partition: "cpu_rome" account: "2022_069" diff --git a/psiflow/committee.py b/psiflow/committee.py index 4ac562f..1d33c3d 100644 --- a/psiflow/committee.py +++ b/psiflow/committee.py @@ -51,7 +51,9 @@ def _compute_disagreements( return disagreements -compute_disagreements = python_app(_compute_disagreements, executors=["Default"]) +compute_disagreements = python_app( + _compute_disagreements, executors=["default_threads"] +) # expose outside filter app to reproduce filtering behavior elsewhere @@ -64,7 +66,7 @@ def _filter_disagreements(disagreements: np.ndarray, nstates: int): return indices -filter_disagreements = python_app(_filter_disagreements, executors=["Default"]) # fmt: skip +filter_disagreements = python_app(_filter_disagreements, executors=["default_threads"]) # fmt: skip @typeguard.typechecked @@ -86,7 +88,7 @@ def _extract_highest( ) -extract_highest = python_app(_extract_highest, executors=["Default"]) +extract_highest = python_app(_extract_highest, executors=["default_threads"]) @typeguard.typechecked diff --git a/psiflow/data.py b/psiflow/data.py index 3401863..05fb6b7 100644 --- a/psiflow/data.py +++ b/psiflow/data.py @@ -164,7 +164,9 @@ def _canonical_orientation( write_extxyz(f, data) -canonical_orientation = python_app(_canonical_orientation, executors=["Default"]) +canonical_orientation = python_app( + _canonical_orientation, executors=["default_threads"] +) @typeguard.typechecked @@ -180,7 +182,7 @@ def reset_atoms( return _atoms -app_reset_atoms = python_app(reset_atoms, executors=["Default"]) +app_reset_atoms = python_app(reset_atoms, executors=["default_threads"]) @typeguard.typechecked @@ -204,7 +206,7 @@ def write_dataset( return _data -app_write_dataset = python_app(write_dataset, executors=["Default"]) +app_write_dataset = python_app(write_dataset, executors=["default_threads"]) @typeguard.typechecked @@ -214,7 +216,7 @@ def _write_atoms(atoms: FlowAtoms, outputs=[]): write(outputs[0].filepath, atoms) -write_atoms = python_app(_write_atoms, executors=["Default"]) +write_atoms = python_app(_write_atoms, executors=["default_threads"]) @typeguard.typechecked @@ -248,7 +250,7 @@ def read_dataset( return data -app_read_dataset = python_app(read_dataset, executors=["Default"]) +app_read_dataset = python_app(read_dataset, executors=["default_threads"]) @typeguard.typechecked @@ -267,7 +269,7 @@ def reset_dataset( write_extxyz(f, data) -app_reset_dataset = python_app(reset_dataset, executors=["Default"]) +app_reset_dataset = python_app(reset_dataset, executors=["default_threads"]) @typeguard.typechecked @@ -278,7 +280,7 @@ def join_dataset(inputs: List[File] = [], outputs: List[File] = []) -> None: write_dataset(data, outputs=[outputs[0]]) -app_join_dataset = python_app(join_dataset, executors=["Default"]) +app_join_dataset = python_app(join_dataset, executors=["default_threads"]) @typeguard.typechecked @@ -287,7 +289,7 @@ def get_length_dataset(inputs: List[File] = []) -> int: return len(data) -app_length_dataset = python_app(get_length_dataset, executors=["Default"]) +app_length_dataset = python_app(get_length_dataset, executors=["default_threads"]) @typeguard.typechecked @@ -311,7 +313,7 @@ def _get_indices( return indices -get_indices = python_app(_get_indices, executors=["Default"]) +get_indices = python_app(_get_indices, executors=["default_threads"]) @typeguard.typechecked @@ -372,7 +374,7 @@ def compute_errors( return errors[outer_mask] -app_compute_errors = python_app(compute_errors, executors=["Default"]) +app_compute_errors = python_app(compute_errors, executors=["default_threads"]) @typeguard.typechecked @@ -414,7 +416,7 @@ def apply_offset( write_dataset(data, outputs=[outputs[0]]) -app_apply_offset = python_app(apply_offset, executors=["Default"]) +app_apply_offset = python_app(apply_offset, executors=["default_threads"]) @typeguard.typechecked @@ -423,7 +425,7 @@ def get_elements(inputs=[]) -> set[str]: return set([e for atoms in data for e in atoms.elements]) -app_get_elements = python_app(get_elements, executors=["Default"]) +app_get_elements = python_app(get_elements, executors=["default_threads"]) @typeguard.typechecked @@ -459,7 +461,7 @@ def assign_identifiers( return identifier -app_assign_identifiers = python_app(assign_identifiers, executors=["Default"]) +app_assign_identifiers = python_app(assign_identifiers, executors=["default_threads"]) @typeguard.typechecked @@ -467,7 +469,7 @@ class Dataset: """Container to represent a dataset of atomic structures Args: - context: an `ExecutionContext` instance with a 'Default' executor. + context: an `ExecutionContext` instance with a 'default_threads' executor. atoms_list: a list of `Atoms` instances which represent the dataset. data_future: a `parsl.app.futures.DataFuture` instance that points to an `.xyz` file. diff --git a/psiflow/execution.py b/psiflow/execution.py index 7bc84b0..befd261 100644 --- a/psiflow/execution.py +++ b/psiflow/execution.py @@ -193,6 +193,7 @@ class ExecutionContextLoader: @staticmethod def parse_config(yaml_dict: dict): definitions = [] + for name in ["ModelEvaluation", "ModelTraining", "ReferenceEvaluation"]: if name in yaml_dict: _dict = yaml_dict.pop(name) @@ -218,18 +219,11 @@ def parse_config(yaml_dict: dict): s = _dict["mpi_command"] _dict["mpi_command"] = lambda x, s=s: s.format(x) - # check if container is requested - container_kwargs = None - if "apptainer" in _dict: - container_kwargs = _dict.pop("apptainer") - container_kwargs["apptainer_or_singularity"] = "apptainer" - elif "singularity" in _dict: - container_kwargs = _dict.pop("singularity") - container_kwargs["apptainer_or_singularity"] = "singularity" - if container_kwargs is not None: + if "container" in yaml_dict: assert not _dict["use_threadpool"] # not possible with container - container_kwargs["enable_gpu"] = _dict["gpu"] - launcher = ContainerizedLauncher(**container_kwargs) + launcher = ContainerizedLauncher( + **yaml_dict["container"], enable_gpu=_dict["gpu"] + ) else: launcher = SimpleLauncher() @@ -321,7 +315,7 @@ def load( format_string="%(levelname)s - %(name)s - %(message)s", ) - # create parsl executors and config + # create main parsl executors executors = [] for definition in definitions: if not definition.use_threadpool: @@ -339,16 +333,28 @@ def load( label=definition.name(), ) executors.append(executor) - executors.append( - ThreadPoolExecutor( - max_threads=psiflow_config.pop("default_threads"), - working_dir=str(path), - label="Default", - ) + + # create default executors + if "container" in psiflow_config: + launcher = ContainerizedLauncher(**psiflow_config["container"]) + else: + launcher = SimpleLauncher() + htex = HighThroughputExecutor( + label="default_htex", + address=psiflow_config.pop("htex_address"), + working_dir=str(path / "default_htex"), + cores_per_worker=1, + provider=LocalProvider(launcher=launcher), # noqa: F405 + ) + executors.append(htex) + threadpool = ThreadPoolExecutor( + label="default_threads", + max_threads=psiflow_config.pop("default_threads"), + working_dir=str(path), ) + executors.append(threadpool) # remove additional kwargs - psiflow_config.pop("htex_address") config = Config( executors=executors, run_dir=str(path), diff --git a/psiflow/metrics.py b/psiflow/metrics.py index 83eeb2d..9164deb 100644 --- a/psiflow/metrics.py +++ b/psiflow/metrics.py @@ -33,7 +33,7 @@ def _trace_identifier( return identifier_traces -trace_identifier = python_app(_trace_identifier, executors=["Default"]) +trace_identifier = python_app(_trace_identifier, executors=["default_threads"]) @typeguard.typechecked @@ -67,7 +67,7 @@ def _save_walker_logs(data: dict[str, list], path: Path) -> str: return s -save_walker_logs = python_app(_save_walker_logs, executors=["Default"]) +save_walker_logs = python_app(_save_walker_logs, executors=["default_threads"]) @typeguard.typechecked @@ -96,7 +96,7 @@ def _save_dataset_log(data: dict[str, list], path: Path) -> str: return s -save_dataset_log = python_app(_save_dataset_log, executors=["Default"]) +save_dataset_log = python_app(_save_dataset_log, executors=["default_threads"]) @typeguard.typechecked @@ -136,7 +136,7 @@ def _log_walker( return data -log_walker = python_app(_log_walker, executors=["Default"]) +log_walker = python_app(_log_walker, executors=["default_threads"]) @typeguard.typechecked @@ -151,7 +151,7 @@ def _gather_walker_logs(*walker_data: dict) -> dict[str, list]: return data -gather_walker_logs = python_app(_gather_walker_logs, executors=["Default"]) +gather_walker_logs = python_app(_gather_walker_logs, executors=["default_threads"]) @typeguard.typechecked @@ -237,7 +237,7 @@ def _log_dataset(inputs: list[File] = []) -> dict[str, list]: return data -log_dataset = python_app(_log_dataset, executors=["Default"]) +log_dataset = python_app(_log_dataset, executors=["default_threads"]) def fix_plotly_layout(figure): @@ -347,7 +347,7 @@ def _to_wandb( wandb.finish() -to_wandb = python_app(_to_wandb, executors=["Default"]) +to_wandb = python_app(_to_wandb, executors=["default_threads"]) @typeguard.typechecked diff --git a/psiflow/parsl_utils.py b/psiflow/parsl_utils.py index 77ad0c2..a75b315 100644 --- a/psiflow/parsl_utils.py +++ b/psiflow/parsl_utils.py @@ -24,20 +24,20 @@ class ContainerizedLauncher(Launcher): def __init__( self, uri: str, - apptainer_or_singularity: str = "apptainer", + engine: str = "apptainer", # or singularity addopts: str = ADDOPTS, entrypoint: str = ENTRYPOINT, enable_gpu: Optional[bool] = False, ) -> None: super().__init__(debug=True) self.uri = uri # required by Parsl parent class to assign attributes - self.apptainer_or_singularity = apptainer_or_singularity + self.engine = engine self.addopts = addopts self.entrypoint = entrypoint self.enable_gpu = enable_gpu self.launch_command = "" - self.launch_command += apptainer_or_singularity + self.launch_command += engine self.launch_command += " exec " self.launch_command += addopts self.launch_command += " --bind {}".format( diff --git a/psiflow/reference/_cp2k.py b/psiflow/reference/_cp2k.py index 85170f5..4e2ed74 100644 --- a/psiflow/reference/_cp2k.py +++ b/psiflow/reference/_cp2k.py @@ -334,7 +334,7 @@ def create_apps(cls): ) singlepoint_post = python_app( cp2k_singlepoint_post, - executors=["Default"], + executors=["default_threads"], cache=False, ) diff --git a/psiflow/reference/_emt.py b/psiflow/reference/_emt.py index 4d61416..99d3130 100644 --- a/psiflow/reference/_emt.py +++ b/psiflow/reference/_emt.py @@ -49,7 +49,7 @@ def parameters(self): @classmethod def create_apps(cls) -> None: - app_evaluate_single = python_app(evaluate_emt, executors=["Default"]) + app_evaluate_single = python_app(evaluate_emt, executors=["default_threads"]) context = psiflow.context() context.register_app(cls, "evaluate_single", app_evaluate_single) # see https://stackoverflow.com/questions/1817183/using-super-with-a-class-method diff --git a/psiflow/reference/_nwchem.py b/psiflow/reference/_nwchem.py index daeed73..cb5f6b7 100644 --- a/psiflow/reference/_nwchem.py +++ b/psiflow/reference/_nwchem.py @@ -227,7 +227,7 @@ def create_apps(cls): ) singlepoint_post = python_app( nwchem_singlepoint_post, - executors=["Default"], + executors=["default_threads"], ) @join_app diff --git a/psiflow/reference/base.py b/psiflow/reference/base.py index 3b1011a..745a9b1 100644 --- a/psiflow/reference/base.py +++ b/psiflow/reference/base.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) # logging per module -@python_app(executors=["Default"]) +@python_app(executors=["default_threads"]) def extract_energy(state): if state.reference_status: return state.info["energy"] diff --git a/psiflow/sampling.py b/psiflow/sampling.py index d258c22..05a755b 100644 --- a/psiflow/sampling.py +++ b/psiflow/sampling.py @@ -69,7 +69,7 @@ def _assign_identifier(state: FlowAtoms, identifier: int): return state, identifier -assign_identifier = python_app(_assign_identifier, executors=["Default"]) +assign_identifier = python_app(_assign_identifier, executors=["default_threads"]) @join_app @@ -141,7 +141,7 @@ def _compute_error( return None, None -compute_error = python_app(_compute_error, executors=["Default"]) +compute_error = python_app(_compute_error, executors=["default_threads"]) @typeguard.typechecked @@ -158,7 +158,7 @@ def _check_error( return False -check_error = python_app(_check_error, executors=["Default"]) +check_error = python_app(_check_error, executors=["default_threads"]) @typeguard.typechecked @@ -224,7 +224,7 @@ def _reset_condition( return False -reset_condition = python_app(_reset_condition, executors=["Default"]) +reset_condition = python_app(_reset_condition, executors=["default_threads"]) @join_app diff --git a/psiflow/utils.py b/psiflow/utils.py index 7b70b92..726530c 100644 --- a/psiflow/utils.py +++ b/psiflow/utils.py @@ -52,7 +52,7 @@ def _sum_integers(a: int, b: int) -> int: return a + b -sum_integers = python_app(_sum_integers, executors=["Default"]) +sum_integers = python_app(_sum_integers, executors=["default_threads"]) @typeguard.typechecked @@ -60,7 +60,7 @@ def _combine_futures(inputs: List[Any]) -> List[Any]: return list(inputs) -combine_futures = python_app(_combine_futures, executors=["Default"]) +combine_futures = python_app(_combine_futures, executors=["default_threads"]) @typeguard.typechecked @@ -98,7 +98,7 @@ def _copy_data_future(inputs: List[File] = [], outputs: List[File] = []) -> None pass -copy_data_future = python_app(_copy_data_future, executors=["Default"]) +copy_data_future = python_app(_copy_data_future, executors=["default_threads"]) @typeguard.typechecked @@ -108,14 +108,14 @@ def _copy_app_future(future: Any) -> Any: return deepcopy(future) -copy_app_future = python_app(_copy_app_future, executors=["Default"]) +copy_app_future = python_app(_copy_app_future, executors=["default_threads"]) def _pack(*args): return args -pack = python_app(_pack, executors=["Default"]) +pack = python_app(_pack, executors=["default_threads"]) @typeguard.typechecked @@ -124,7 +124,7 @@ def _unpack_i(result: Union[np.ndarray, List, Tuple], i: int) -> Any: return result[i] -unpack_i = python_app(_unpack_i, executors=["Default"]) +unpack_i = python_app(_unpack_i, executors=["default_threads"]) @typeguard.typechecked @@ -154,7 +154,7 @@ def _make_dict_safe(arg): yaml.dump(input_dict, f, default_flow_style=False) -save_yaml = python_app(_save_yaml, executors=["Default"]) +save_yaml = python_app(_save_yaml, executors=["default_threads"]) @typeguard.typechecked @@ -166,7 +166,7 @@ def _read_yaml(inputs: List[File] = [], outputs: List[File] = []) -> dict: return config_dict -read_yaml = python_app(_read_yaml, executors=["Default"]) +read_yaml = python_app(_read_yaml, executors=["default_threads"]) @typeguard.typechecked @@ -175,7 +175,7 @@ def _save_txt(data: str, outputs: List[File] = []) -> None: f.write(data) -save_txt = python_app(_save_txt, executors=["Default"]) +save_txt = python_app(_save_txt, executors=["default_threads"]) @typeguard.typechecked @@ -196,7 +196,9 @@ def _app_train_valid_indices( return [int(i) for i in train_list], [int(i) for i in valid_list] -app_train_valid_indices = python_app(_app_train_valid_indices, executors=["Default"]) +app_train_valid_indices = python_app( + _app_train_valid_indices, executors=["default_threads"] +) @typeguard.typechecked @@ -429,7 +431,7 @@ def _check_distances(state: Atoms, threshold: float): return NullState -check_distances = python_app(_check_distances, executors=["Default"]) +check_distances = python_app(_check_distances, executors=["default_threads"]) @typeguard.typechecked diff --git a/psiflow/walkers/base.py b/psiflow/walkers/base.py index e458347..9190753 100644 --- a/psiflow/walkers/base.py +++ b/psiflow/walkers/base.py @@ -31,7 +31,7 @@ def _conditioned_reset( return deepcopy(state), counter -conditioned_reset = python_app(_conditioned_reset, executors=["Default"]) +conditioned_reset = python_app(_conditioned_reset, executors=["default_threads"]) @typeguard.typechecked @@ -39,7 +39,7 @@ def _is_reset(counter: int) -> bool: return counter == 0 -is_reset = python_app(_is_reset, executors=["Default"]) +is_reset = python_app(_is_reset, executors=["default_threads"]) @typeguard.typechecked diff --git a/psiflow/walkers/bias.py b/psiflow/walkers/bias.py index d5c08f5..990e273 100644 --- a/psiflow/walkers/bias.py +++ b/psiflow/walkers/bias.py @@ -198,7 +198,7 @@ def evaluate_bias( return values -app_evaluate = python_app(evaluate_bias, executors=["Default"]) +app_evaluate = python_app(evaluate_bias, executors=["default_htex"]) @typeguard.typechecked @@ -215,7 +215,7 @@ def _gather_partitions( return final -gather_partitions = python_app(_gather_partitions, executors=["Default"]) +gather_partitions = python_app(_gather_partitions, executors=["default_threads"]) @join_app @@ -282,7 +282,7 @@ def _reset_mtd( f.write(content) -reset_mtd = python_app(_reset_mtd, executors=["Default"]) +reset_mtd = python_app(_reset_mtd, executors=["default_threads"]) @typeguard.typechecked @@ -314,7 +314,7 @@ def extract_grid( return to_extract -app_extract_grid = python_app(extract_grid, executors=["Default"]) +app_extract_grid = python_app(extract_grid, executors=["default_threads"]) @typeguard.typechecked @@ -333,7 +333,7 @@ def extract_between( return [int(index) for index in indices] -app_extract_between = python_app(extract_between, executors=["Default"]) +app_extract_between = python_app(extract_between, executors=["default_threads"]) @typeguard.typechecked @@ -342,7 +342,7 @@ def extract_column(array: np.ndarray, index: int) -> np.ndarray: return array[:, index].reshape(-1, 1) # maintain shape -app_extract_column = python_app(extract_column, executors=["Default"]) +app_extract_column = python_app(extract_column, executors=["default_threads"]) @typeguard.typechecked @@ -363,7 +363,7 @@ def insert_cv_values( return state -app_insert_cv_values = python_app(insert_cv_values, executors=["Default"]) +app_insert_cv_values = python_app(insert_cv_values, executors=["default_threads"]) @typeguard.typechecked @@ -380,7 +380,9 @@ def insert_cv_values_data( write_dataset(data, outputs=[outputs[0]]) -app_insert_cv_values_data = python_app(insert_cv_values_data, executors=["Default"]) +app_insert_cv_values_data = python_app( + insert_cv_values_data, executors=["default_threads"] +) @typeguard.typechecked diff --git a/psiflow/walkers/dynamic.py b/psiflow/walkers/dynamic.py index 6aeaecf..6976084 100644 --- a/psiflow/walkers/dynamic.py +++ b/psiflow/walkers/dynamic.py @@ -84,7 +84,7 @@ def molecular_dynamics_yaff( return " ".join(command_list) -@python_app(executors=["Default"]) +@python_app(executors=["default_threads"]) def molecular_dynamics_yaff_post( inputs: list[File] = [], outputs: list[File] = [], @@ -167,7 +167,7 @@ def molecular_dynamics_openmm( return " ".join(command_list) -@python_app(executors=["Default"]) +@python_app(executors=["default_threads"]) def molecular_dynamics_openmm_post( inputs: list[File] = [], outputs: list[File] = [], diff --git a/psiflow/walkers/random.py b/psiflow/walkers/random.py index 9dc1272..bef8e45 100644 --- a/psiflow/walkers/random.py +++ b/psiflow/walkers/random.py @@ -53,7 +53,7 @@ def random_perturbation( return state, 1, False -app_random_perturbation = python_app(random_perturbation, executors=["Default"]) +app_random_perturbation = python_app(random_perturbation, executors=["default_threads"]) @typeguard.typechecked diff --git a/pyproject.toml b/pyproject.toml index e05368c..a5fca47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "wandb>=0.14.0", "pymatgen>=2023.3.23", "parsl", + "prettytable", ] [project.optional-dependencies] diff --git a/tests/test_models.py b/tests/test_models.py index d0577e1..5ad52c8 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -70,7 +70,6 @@ def test_nequip_train(gpu, nequip_config, dataset, tmp_path): def test_nequip_save_load(nequip_config, dataset, tmp_path): model = NequIPModel(nequip_config) future_raw, _, _ = model.save(tmp_path) - assert not future_raw.done() assert _ is None model.initialize(dataset[:2]) e0 = model.evaluate(dataset.get(indices=[3]))[0].result().info["energy"] @@ -190,7 +189,6 @@ def test_allegro_train(gpu, allegro_config, dataset, tmp_path): def test_allegro_save_load(allegro_config, dataset, tmp_path): model = AllegroModel(allegro_config) future_raw, _, _ = model.save(tmp_path) - assert not future_raw.done() assert _ is None model.initialize(dataset[:2]) e0 = model.evaluate(dataset.get(indices=[3]))[0].result().info["energy"] @@ -287,7 +285,6 @@ def test_mace_save_load(mace_config, dataset, tmp_path): model.add_atomic_energy("H", 3) model.add_atomic_energy("Cu", 4) future_raw, _, _ = model.save(tmp_path) - assert not future_raw.done() # do not wait for result by default assert _ is None model.initialize(dataset[:2]) e0 = model.evaluate(dataset.get(indices=[3]))[0].result().info["energy"]