diff --git a/autodetect_config.py b/autodetect_config.py new file mode 100644 index 0000000000..a0012cd052 --- /dev/null +++ b/autodetect_config.py @@ -0,0 +1,9 @@ +import reframe.core.config as config + +site_configuration = config.detect_config( + exclude_feats=['colum*'], + detect_containers=False, + sched_options=[], + time_limit=200, + filename='system_config' +) diff --git a/docs/requirements.txt b/docs/requirements.txt index c2cb9e6696..95eb1cf5d6 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,6 +1,8 @@ archspec==0.2.5 +autopep8 docutils==0.18.1 jsonschema==3.2.0 +jinja2 semver==2.13.0; python_version == '3.6' semver==3.0.2; python_version >= '3.7' Sphinx==5.3.0; python_version < '3.8' diff --git a/reframe/core/backends.py b/reframe/core/backends.py index d9fbe7e2f6..7d044dc0a7 100644 --- a/reframe/core/backends.py +++ b/reframe/core/backends.py @@ -9,22 +9,24 @@ import reframe.core.fields as fields from reframe.core.exceptions import ConfigError +from reframe.core.modules import ModulesSystem +from reframe.core.logging import getlogger _launcher_backend_modules = [ 'reframe.core.launchers.local', - 'reframe.core.launchers.mpi', - 'reframe.core.launchers.rsh' + 'reframe.core.launchers.rsh', + 'reframe.core.launchers.mpi' ] _launchers = {} _scheduler_backend_modules = [ - 'reframe.core.schedulers.flux', 'reframe.core.schedulers.local', + 'reframe.core.schedulers.ssh', + 'reframe.core.schedulers.flux', 'reframe.core.schedulers.lsf', 'reframe.core.schedulers.pbs', 'reframe.core.schedulers.oar', 'reframe.core.schedulers.sge', - 'reframe.core.schedulers.slurm', - 'reframe.core.schedulers.ssh' + 'reframe.core.schedulers.slurm' ] _schedulers = {} @@ -62,6 +64,26 @@ def _get_backend(name, *, backend_type): return cls +def _detect_backend(backend_type: str): + backend_modules = globals()[f'_{backend_type}_backend_modules'] + backend_found = [] + for mod in backend_modules: + importlib.import_module(mod) + + for bcknd in globals()[f'_{backend_type}s']: + bcknd, _ = globals()[f'_{backend_type}s'][bcknd] + backend = bcknd.validate() + if not backend: + pass + else: + backend_found.append((bcknd, backend)) + getlogger().info(f'Found {backend_type}: {backend}') + if len(backend_found) == 1: + getlogger().warning(f'No remote {backend_type} detected') + # By default, select the last one detected + return backend_found[-1] + + register_scheduler = functools.partial( _register_backend, backend_type='scheduler' ) @@ -70,3 +92,7 @@ def _get_backend(name, *, backend_type): ) getscheduler = functools.partial(_get_backend, backend_type='scheduler') getlauncher = functools.partial(_get_backend, backend_type='launcher') +detect_scheduler = functools.partial(_detect_backend, backend_type='scheduler') +detect_launcher = functools.partial(_detect_backend, backend_type='launcher') +# TODO find a better place for this function +detect_modules_system = ModulesSystem.detect diff --git a/reframe/core/config.py b/reframe/core/config.py index feabf288cc..4a092f3866 100644 --- a/reframe/core/config.py +++ b/reframe/core/config.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: BSD-3-Clause +import autopep8 import contextlib import copy import fnmatch @@ -14,13 +15,18 @@ import os import re +from jinja2 import Environment, FileSystemLoader + import reframe import reframe.core.settings as settings import reframe.utility as util +import reframe.utility.color as color import reframe.utility.jsonext as jsonext import reframe.utility.osext as osext from reframe.core.environments import normalize_module_list from reframe.core.exceptions import (ConfigError, ReframeFatalError) +from reframe.core.backends import (detect_modules_system, detect_launcher, + detect_scheduler) from reframe.core.logging import getlogger from reframe.utility import ScopedDict @@ -330,7 +336,7 @@ def sources(self): def subconfig_system(self): return self._local_system - def load_config_python(self, filename): + def load_config_python(self, filename, validate=True): try: mod = util.import_module_from_file(filename) except ImportError as e: @@ -345,8 +351,9 @@ def load_config_python(self, filename): f"not a valid Python configuration file: '{filename}'" ) - self._config_modules.append(mod) - self.update_config(mod.site_configuration, filename) + if validate: + self._config_modules.append(mod) + self.update_config(mod.site_configuration, filename) def load_config_json(self, filename): with open(filename) as fp: @@ -408,7 +415,7 @@ def _fn(): else: self._autodetect_methods.append((m, _sh_meth(m))) - def _detect_system(self): + def _detect_system(self, detect_only: bool = False) -> str: getlogger().debug('Autodetecting system') if not self._autodetect_methods: self._setup_autodect_methods() @@ -424,12 +431,23 @@ def _detect_system(self): break if hostname is None: - raise ConfigError('all autodetection methods failed; ' - 'try passing a system name explicitly using ' - 'the `--system` option') + if detect_only: + getlogger().error( + 'Could not retrieve the name of the system' + ) + raise ConfigError('all autodetection methods failed') + else: + raise ConfigError('all autodetection methods failed; ' + 'try passing a system name explicitly using ' + 'the `--system` option') getlogger().debug(f'Retrieved hostname: {hostname!r}') - getlogger().debug(f'Looking for a matching configuration entry') + if detect_only: + # Make sure the numbers in the name are removed + hostname = re.search(r'^[A-Za-z]+', hostname.strip()) + return hostname.group(0) + + getlogger().debug('Looking for a matching configuration entry') for system in self._site_config['systems']: for patt in system['hostnames']: if re.match(patt, hostname): @@ -650,7 +668,7 @@ def find_config_files(config_path=None, config_file=None): return res -def load_config(*filenames): +def load_config(*filenames, validate=True): ret = _SiteConfig() getlogger().debug('Loading the builtin configuration') ret.update_config(settings.site_configuration, '') @@ -662,10 +680,106 @@ def load_config(*filenames): getlogger().debug(f'Loading configuration file: {f!r}') _, ext = os.path.splitext(f) if ext == '.py': - ret.load_config_python(f) + ret.load_config_python(f, validate) elif ext == '.json': ret.load_config_json(f) else: raise ConfigError(f"unknown configuration file type: '{f}'") return ret + + +def detect_config(detect_containers: bool = False, + exclude_feats: list = [], + filename: str = 'system_config', + sched_options: list = [], + time_limit: int = 200): + '''Detect the configuration of the system automatically and + write the corresponding reframe config file + + :param detect_containers: Submit a job to each remote partition to detect + container platforms + :param exclude_feats: List of node features to be excluded when determining + the system partitions + :param filename: File name of the reframe configuration file that will be + generated + :param sched_options: List of additional scheduler options that are + required to submit jobs to all partitions of the system + :param time_limit: Time limit until the job submission is cancelled for the + remote containers detection + ''' + + import reframe.core.runtime as rt + + # Initialize the Site Configuration object + ret = _SiteConfig() + getlogger().debug('Detecting the system configuration') + ret.update_config(settings.site_configuration, '') + + site_config = {} + + # Detect the hostname and the system name + site_config.setdefault('name', '') + site_config.setdefault('hostnames', []) + hostname = ret._detect_system(detect_only=True) + site_config['hostnames'] += [hostname] + site_config['name'] += hostname + msg = color.colorize( + f'Detected hostname: {hostname}', color.GREEN + ) + getlogger().info(msg) + + # Detect modules system + getlogger().debug('Detecting the modules system...') + site_config.setdefault('modules_system', 'nomod') + modules_system, modules_system_name = detect_modules_system() + site_config['modules_system'] = modules_system_name + msg = color.colorize( + f'Modules system set to {site_config["modules_system"]}', color.GREEN + ) + getlogger().info(msg) + + # Detect scheduler + scheduler, scheduler_name = detect_scheduler() + msg = color.colorize(f'Scheduler set to {scheduler_name}', color.GREEN) + getlogger().info(msg) + + # Detect launcher + launcher, launcher_name = detect_launcher() + msg = color.colorize(f'Launcher set to {launcher_name}', color.GREEN) + getlogger().info(msg) + + site_config.setdefault('partitions', []) + # Detect the context with the corresponding scheduler + site_config['partitions'] = scheduler().build_context( + modules_system=modules_system, launcher=launcher(), + exclude_feats=exclude_feats, detect_containers=detect_containers, + prefix=rt.runtime().prefix, sched_options=sched_options, + time_limit=time_limit + ) + + # Load the jinja2 template and format its content + template_loader = FileSystemLoader(searchpath=os.path.join( + reframe.INSTALL_PREFIX, 'reframe', 'schemas' + )) + env = Environment(loader=template_loader, + trim_blocks=True, lstrip_blocks=True) + rfm_config_template = env.get_template( + 'reframe_config_template.j2' + ) + organized_config = rfm_config_template.render(site_config) + + # Output filename for the generated configuration + output_filename = f'{filename}.py' + + # Format the content + organized_config = autopep8.fix_code(organized_config) + + # Overwrite the file with formatted content + with open(output_filename, "w") as output_file: + output_file.write(organized_config) + + getlogger().info( + f'\nThe following configuration file was created:\n' + f'PYTHON: {filename}.py' + ) diff --git a/reframe/core/launchers/__init__.py b/reframe/core/launchers/__init__.py index eef3eacd02..bc1d6e566f 100644 --- a/reframe/core/launchers/__init__.py +++ b/reframe/core/launchers/__init__.py @@ -7,6 +7,7 @@ import reframe.utility.typecheck as typ from reframe.core.meta import RegressionTestMeta from reframe.core.warnings import user_deprecation_warning +from typing import Union class _JobLauncherMeta(RegressionTestMeta, abc.ABCMeta): @@ -86,6 +87,20 @@ def run_command(self, job): cmd_tokens += self.command(job) + self.options return ' '.join(cmd_tokens) + @property + def name(self): + return self.registered_name + + @classmethod + @abc.abstractmethod + # Will not raise an error if not defined until instantiation + def validate(cls) -> Union[str, bool]: + '''Check if the launcher is in the system + + :returns: False if the launcher is not present and + the name of the launcher backend if it is + ''' + class LauncherWrapper(JobLauncher): '''Wrap a launcher object so as to modify its invocation. @@ -134,3 +149,7 @@ def __init__(self, target_launcher, wrapper_command, wrapper_options=None): def command(self, job): return self._wrapper_command + self._target_launcher.command(job) + + @classmethod + def validate(cls): + return cls._target_launcher.validate() diff --git a/reframe/core/launchers/local.py b/reframe/core/launchers/local.py index 6f541b1b94..bee538d3b9 100644 --- a/reframe/core/launchers/local.py +++ b/reframe/core/launchers/local.py @@ -16,3 +16,7 @@ def command(self, job): # `self.options`. self.options = [] return [] + + @classmethod + def validate(cls) -> str: + return cls.registered_name diff --git a/reframe/core/launchers/mpi.py b/reframe/core/launchers/mpi.py index 6e6acc56b8..f1d539b4b4 100644 --- a/reframe/core/launchers/mpi.py +++ b/reframe/core/launchers/mpi.py @@ -3,15 +3,20 @@ # # SPDX-License-Identifier: BSD-3-Clause +import functools import semver import re +from typing import Union import reframe.utility.osext as osext from reframe.core.backends import register_launcher +from reframe.core.exceptions import SpawnedProcessError from reframe.core.launchers import JobLauncher from reframe.core.logging import getlogger from reframe.utility import seconds_to_hms +_run_strict = functools.partial(osext.run_command, check=True) + @register_launcher('srun') class SrunLauncher(JobLauncher): @@ -50,6 +55,14 @@ def command(self, job): return ret + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which srun') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('ibrun') class IbrunLauncher(JobLauncher): @@ -58,6 +71,14 @@ class IbrunLauncher(JobLauncher): def command(self, job): return ['ibrun'] + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which ibrun') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('upcrun') class UpcrunLauncher(JobLauncher): @@ -72,6 +93,14 @@ def command(self, job): cmd += ['-n', str(job.num_tasks)] return cmd + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which upcrun') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('upcxx-run') class UpcxxrunLauncher(JobLauncher): @@ -86,6 +115,14 @@ def command(self, job): cmd += ['-n', str(job.num_tasks)] return cmd + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which upcxx-run') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('alps') class AlpsLauncher(JobLauncher): @@ -102,18 +139,42 @@ def command(self, job): return cmd + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which aprun') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('mpirun') class MpirunLauncher(JobLauncher): def command(self, job): return ['mpirun', '-np', str(job.num_tasks)] + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which mpirun') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('mpiexec') class MpiexecLauncher(JobLauncher): def command(self, job): return ['mpiexec', '-n', str(job.num_tasks)] + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which mpiexec') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('srunalloc') class SrunAllocationLauncher(JobLauncher): @@ -156,6 +217,11 @@ def command(self, job): return ret + @classmethod + # The srun launcher would be detected + def validate(cls) -> bool: + return False + @register_launcher('lrun') class LrunLauncher(JobLauncher): @@ -167,6 +233,14 @@ def command(self, job): return ['lrun', '-N', str(num_nodes), '-T', str(num_tasks_per_node)] + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which lrun') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('lrun-gpu') class LrungpuLauncher(LrunLauncher): @@ -174,3 +248,8 @@ class LrungpuLauncher(LrunLauncher): def command(self, job): return super().command(job) + ['-M "-gpu"'] + + @classmethod + def validate(cls) -> bool: + # The lrun launcher would be detected + return False diff --git a/reframe/core/launchers/rsh.py b/reframe/core/launchers/rsh.py index d858caf162..9e765ac205 100644 --- a/reframe/core/launchers/rsh.py +++ b/reframe/core/launchers/rsh.py @@ -3,23 +3,46 @@ # # SPDX-License-Identifier: BSD-3-Clause +import functools +from typing import Union + +import reframe.utility.osext as osext from reframe.core.backends import register_launcher +from reframe.core.exceptions import SpawnedProcessError from reframe.core.launchers import JobLauncher # Remote shell launchers +_run_strict = functools.partial(osext.run_command, check=True) + @register_launcher('clush') class ClushLauncher(JobLauncher): def command(self, job): return ['clush', *job.sched_access] + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which clush') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('pdsh') class PdshLauncher(JobLauncher): def command(self, job): return ['pdsh', *job.sched_access] + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which pdsh') + return cls.registered_name + except SpawnedProcessError: + return False + @register_launcher('ssh') class SSHLauncher(JobLauncher): @@ -37,3 +60,11 @@ def run_command(self, job): # self.options is processed specially above cmd_tokens += self.command(job) return ' '.join(cmd_tokens) + + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which ssh') + return cls.registered_name + except SpawnedProcessError: + return False diff --git a/reframe/core/modules.py b/reframe/core/modules.py index e8c9c504b1..45cbac0b98 100644 --- a/reframe/core/modules.py +++ b/reframe/core/modules.py @@ -109,16 +109,6 @@ class ModulesSystem: @classmethod def create(cls, modules_kind=None, validate=True): getlogger().debug(f'Initializing modules system {modules_kind!r}') - modules_impl = { - None: NoModImpl, - 'nomod': NoModImpl, - 'tmod31': TMod31Impl, - 'tmod': TModImpl, - 'tmod32': TModImpl, - 'tmod4': TMod4Impl, - 'lmod': LModImpl, - 'spack': SpackImpl - } try: impl_cls = modules_impl[modules_kind] except KeyError: @@ -127,6 +117,20 @@ def create(cls, modules_kind=None, validate=True): impl_cls.validate = validate return ModulesSystem(impl_cls()) + @classmethod + def detect(cls): + getlogger().debug('Detecting modules system...') + modules_system = NoModImpl() + for modules_kind in modules_impl: + try: + modules_system = modules_impl[modules_kind]() + if modules_kind not in ('nomod', None): + return (modules_system, modules_kind) + except ConfigError as e: + getlogger().debug2(f'Error detecting {modules_kind}:' + f'{e}') + return (modules_system, 'nomod') + def __init__(self, backend): self._backend = backend self.module_map = {} @@ -743,8 +747,12 @@ def __init__(self): def _do_validate(self): # Try to figure out if we are indeed using the TCL version + modulecmd = os.getenv('MODULESHOME') + if modulecmd is None: + raise ConfigError( + 'could not find a sane TMod31 installation' + ) try: - modulecmd = os.getenv('MODULESHOME') modulecmd = os.path.join(modulecmd, 'modulecmd.tcl') completed = osext.run_command(modulecmd) except OSError as e: @@ -1234,3 +1242,15 @@ def emit_load_instr(self, module): def emit_unload_instr(self, module): return [f'spack unload {module.fullname}'] + + +modules_impl = { + None: NoModImpl, + 'nomod': NoModImpl, + 'tmod31': TMod31Impl, + 'tmod': TModImpl, + 'tmod32': TModImpl, + 'tmod4': TMod4Impl, + 'lmod': LModImpl, + 'spack': SpackImpl +} diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index a8565a99bc..d6f260e501 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -8,11 +8,15 @@ # import abc +import copy import os +import tempfile import time +import shutil import reframe.core.runtime as runtime import reframe.core.shell as shell +import reframe.utility.config_detection as c_d import reframe.utility.jsonext as jsonext import reframe.utility.typecheck as typ from reframe.core.exceptions import JobError, JobNotStartedError, SkipTestError @@ -100,6 +104,38 @@ def filternodes(self, job, nodes): :meta private: ''' + @abc.abstractmethod + def feats_access_option(self, node_feats: list): + '''Return the scheduler specific access options to + access a node with certain feartures (node_feats) + + :arg node_feats: A list with the node features. + :returns: The acces option for the scheduler (list). + :meta private: + ''' + + @abc.abstractmethod + def build_context(self, modules_system: str, launcher: str, + exclude_feats: list, detect_containers: bool, + prefix: str, sched_options: list, time_limit: int): + '''Return the reframe context to build the configuration + of the system + + :arg modules_system: Name of the modules system + :arg launcher: Name of the launcher in the system + :arg exclude_feats: List of the features to be excluded in the + partitions detection + :arg detect_containers: Submit a job to each remote partition to + detect container platforms + :arg prefix: Prefix of the directory where the jobs are + prepared and submitted + :arg sched_options: List of additional scheduler options that are + required to submit jobs to all partitions of the system + :arg time_limit: Time limit until the job submission is cancelled + for the remote containers detection + :returns: Dictionary with the partitions of the system + ''' + @abc.abstractmethod def submit(self, job): '''Submit a job. @@ -152,6 +188,20 @@ def log(self, message, level=DEBUG2): ''' getlogger().log(level, f'[S] {self.registered_name}: {message}') + @property + def name(self): + return self.registered_name + + @classmethod + @abc.abstractmethod + # Will not raise an error if not defined until instantiation + def validate(cls): + '''Check if the scheduler is in the system + + :returns: False if the scheduler is not present and + the name of the scheduler backend if it is + ''' + def filter_nodes_by_state(nodelist, state): '''Filter nodes by their state @@ -551,6 +601,16 @@ def submit_time(self): ''' return self._submit_time + def add_sched_access(self, access_options: list): + '''Add access options to the job''' + self._sched_access += access_options + + def rm_sched_access(self, access_options: list): + '''Remove access options to the job''' + self._sched_access = [ + opt for opt in self._sched_access if opt not in access_options + ] + def prepare(self, commands, environs=None, prepare_cmds=None, strict_flex=False, **gen_opts): environs = environs or [] @@ -695,3 +755,305 @@ def in_statex(self, state): def in_state(self, state): return self.in_statex(state) + + +class ReframeContext(abc.ABC): + '''Abstract base class for representing a ReFrame context. + + The context contains information about the detected nodes and the + created partitions during the configuration autodetection process + ''' + + def __init__(self, modules_system: str, launcher: str, + scheduler: JobScheduler, detect_containers: bool, + prefix: str, time_limit: int): + self.partitions = [] + self._modules_system = modules_system + self._scheduler = scheduler + self._launcher = launcher + self._time_limit = time_limit + self._detect_containers = detect_containers + self._p_n = 0 # System partitions counter + self._keep_tmp_dir = False + self.TMP_DIR = tempfile.mkdtemp( + prefix='reframe_config_detection_', + dir=prefix + ) + if detect_containers: + getlogger().info(f'Stage directory: {self.TMP_DIR}') + + @abc.abstractmethod + def submit_detect_job(self, job): + '''Submission process of the remote detect job''' + + @abc.abstractmethod + def _find_devices(self, node_feats) -> dict: + '''Find the available devices in a node with a given set of features''' + # TODO: document the dictionary structure that should be returned + + def _check_gpus_count(self, node_devices_slurm: dict, + node_devices_job: dict) -> list: + + gpus_slurm_count = 0 # Number of GPUs from Slurm Gres + gpus_job_count = 0 # Number of GPUs from remote job detection + devices = [] + + # Check that the same number of GPU models are the same + if len(node_devices_job) != len(node_devices_slurm): + getlogger().warning( + 'WARNING: discrepancy between the ' + 'number of GPU models\n' + f'GPU models from Gres ({len(node_devices_slurm)}) ' + f'GPU models from job ({len(node_devices_job)}) ' + ) + + # Get the total number of GPUs (independently of the model) + for gpu_slurm in node_devices_slurm: + gpus_slurm_count += node_devices_slurm[gpu_slurm] + + # Format the dictionary of the devices for the configuration file + # and get the total number of GPUs found + for gpu_job in node_devices_job: + devices.append({'type': 'gpu', + 'model': gpu_job, + 'num_devices': node_devices_job[gpu_job]}) + gpus_job_count += node_devices_job[gpu_job] + + if gpus_job_count != gpus_slurm_count: + getlogger().warning('The total number of detected GPUs ' + f'({gpus_job_count}) ' + 'differs from the (minimum) in GRes ' + f'from slurm({gpus_slurm_count}).') + if gpus_job_count > gpus_slurm_count: + getlogger().debug('It might be that nodes in this partition ' + 'have different number of GPUs ' + 'of the same model.\nIn the config, the ' + 'minimum number of GPUs that will ' + 'be found in the nodes of this partition ' + 'is set.\n') + elif gpus_job_count < gpus_slurm_count: + getlogger().error( + 'Lower number of GPUs were detected in the node.\n') + + return devices + + def _parse_devices(self, file_path: str) -> dict: + '''Extract the information about the GPUs from the job output''' + gpu_info = {} # Initialize the dict for GPU info + nvidia_gpus_found = False + amd_gpus_found = False + + with open(file_path, 'r') as file: + lines = file.readlines() + + for line in lines: + # Check for NVIDIA GPUs + if "NVIDIA GPUs installed" in line: + nvidia_gpus_found = True + elif line == '\n': + nvidia_gpus_found = False + elif not line or "Batch Job Summary" in line: + break + elif nvidia_gpus_found: + model = [ + gpu_m for gpu_m in c_d.nvidia_gpu_architecture + if gpu_m in line + ] + if len(model) > 1: + model = [] + if model: + if model[0] not in gpu_info: + gpu_info.update({model[0]: 1}) + else: + gpu_info[model[0]] += 1 + + # Check for AMD GPUs + if "AMD GPUs" in line: + amd_gpus_found = True + amd_lines = [] + elif line == '\n' or "lspci" in line: + amd_gpus_found = False + elif not line or "Batch Job Summary" in line: + break + elif amd_gpus_found: + if line not in amd_lines: + amd_lines.append(line) + model = [ + gpu_m for gpu_m in c_d.amd_gpu_architecture + if gpu_m in line + ] + if len(model) > 1: + model = [] + if model: + if model[0] not in gpu_info: + gpu_info.update({model[0]: 1}) + else: + gpu_info[model[0]] += 1 + else: + pass + + return gpu_info + + def _parse_containers(self, file_path: str) -> list: + '''Extract the information about the containers from the job output''' + containers_info = [] + containers_found = False + + with open(file_path, 'r') as file: + lines = file.readlines() + + for line in lines: + if "Installed containers" in line: + containers_found = True + elif "GPU" in line or line == "\n" or "Batch Job Summary" in line: + containers_found = False + break + elif containers_found: + type = line.split(' modules: ')[0].strip() + try: + modules = line.split(' modules: ')[1].split(', ') + modules = [m.strip() for m in modules] + if modules[0] != '': + modules.append(type.lower()) + else: + modules = [type.lower()] + except Exception: + modules = [] + containers_info.append({'type': type, 'modules': modules}) + + return containers_info + + def _extract_info(self, job: Job): + '''Extract the information from the detect job oputput''' + file_path = os.path.join(self.TMP_DIR, job.stdout) + if job.detect_containers: + job.container_platforms = self._parse_containers(file_path) + + def _create_detection_job(self, name: str, access_node: list, + access_options: list): + '''Create the instance of the job for remote autodetection''' + remote_job = Job.create( + self._scheduler, + self._launcher, + name=f"autodetect_{name}", + workdir=self.TMP_DIR, + sched_access=access_node, + sched_options=access_options + ) + remote_job.max_pending_time = self._time_limit + remote_job.time_limit = '2m' + remote_job.container_platforms = [] + remote_job.devices = {} + return remote_job + + def _generate_job_content(self, job): + job.content = [] + if job.detect_containers: + job.content += [c_d.containers_detect_bash] + job.content += ['\n\n\n'] + + def create_login_partition(self): + max_jobs = 4 + time_limit = '2m' + self.partitions.append( + {'name': 'login', + 'scheduler': 'local', + 'time_limit': time_limit, + 'environs': ['builtin'], + 'max_jobs': max_jobs, + 'launcher': 'local'}) + + def create_remote_partition(self, node_feats: tuple, + sched_options): + + node_features = list(node_feats) + _detect_containers = copy.deepcopy(self._detect_containers) + self._p_n += 1 # Count the partition that is being created + access_options = copy.deepcopy(sched_options) + access_node = self._scheduler.feats_access_option(node_features) + name = f'partition_{self._p_n}' + getlogger().info(f'{name} : {node_feats}') + max_jobs = 100 + time_limit = '10m' + container_platforms = [] + + # Try to get the devices from the scheduler config + _detect_devices = self._find_devices(node_features) + if _detect_devices: + getlogger().info('GPUs were detected in this node type.') + + remote_job = None + if _detect_containers: + self._keep_tmp_dir = True + remote_job = self._create_detection_job( + name, access_node, access_options + ) + remote_job.detect_containers = _detect_containers + self._generate_job_content(remote_job) + submission_error, access_node = self.submit_detect_job( + remote_job, node_features + ) + if not submission_error: + try: + remote_job.wait() + except JobError as e: + submission_error = e + getlogger().warning(f'{name}: {e}') + else: + self._extract_info(remote_job) + else: + getlogger().warning( + f'encountered a job submission error in {name}:\n' + f'{submission_error}' + ) + + if remote_job and not submission_error: + if remote_job.container_platforms: + container_platforms = remote_job.container_platforms + if 'tmod' not in self._modules_system.name() and \ + 'lmod' not in self._modules_system.name(): + getlogger().warning( + 'Container platforms were detected but the automatic' + ' detection of required modules is not possible with ' + f'{self._modules_system}.' + ) + # Add the container platforms in the features + for cp in container_platforms: + getlogger().info( + f'Detected container platform {cp["type"]} ' + f'in partition "{name}"' + ) + node_features.append(cp['type'].lower()) + else: + getlogger().info( + 'No container platforms were detected in ' + f'partition "{name}"' + ) + + access_options += access_node + + # Create the partition + self.partitions.append( + {'name': name, + 'scheduler': self._scheduler.name, + 'time_limit': time_limit, + 'environs': ['builtin'], + 'max_jobs': max_jobs, + 'extras': {}, + 'env_vars': [], + 'launcher': self._launcher.name, + 'access': access_options, + 'features': node_features+['remote'], + 'container_platforms': container_platforms} + ) + + def create_partitions(self, sched_options): + # TODO: asynchronous + for node in self.node_types: + self.create_remote_partition(node, sched_options) + if not self._keep_tmp_dir: + shutil.rmtree(self.TMP_DIR) + else: + getlogger().info( + f'\nYou can check the job submissions in {self.TMP_DIR}.\n' + ) diff --git a/reframe/core/schedulers/flux.py b/reframe/core/schedulers/flux.py index a1d37038ab..0f6b9d710a 100644 --- a/reframe/core/schedulers/flux.py +++ b/reframe/core/schedulers/flux.py @@ -10,12 +10,15 @@ # Lawrence Livermore National Lab # +import functools import itertools import os import time +from typing import Union +import reframe.utility.osext as osext from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobError +from reframe.core.exceptions import JobError, SpawnedProcessError from reframe.core.schedulers import JobScheduler, Job # Just import flux once @@ -30,6 +33,8 @@ WAITING_STATES = ('QUEUED', 'HELD', 'WAITING', 'PENDING') +_run_strict = functools.partial(osext.run_command, check=True) + class _FluxJob(Job): def __init__(self, *args, **kwargs): @@ -141,6 +146,16 @@ def filternodes(self, job, nodes): 'flux backend does not support node filtering' ) + def feats_access_option(self, node_feats): + raise NotImplementedError( + 'flux backend does not support configuration autodetection' + ) + + def build_context(self, node_feats): + raise NotImplementedError( + 'flux backend does not support configuration autodetection' + ) + def wait(self, job): '''Wait until a job is finished.''' @@ -154,3 +169,11 @@ def finished(self, job): raise job.exception return job.completed + + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which flux') + return cls.registered_name + except SpawnedProcessError: + return False diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 87eead7530..ae6a0bd381 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -91,6 +91,16 @@ def allnodes(self): def filternodes(self, job, nodes): return [sched.AlwaysIdleNode(socket.gethostname())] + def feats_access_option(self, node_feats): + raise NotImplementedError( + 'local backend does not support configuration autodetection' + ) + + def build_context(self, node_feats): + raise NotImplementedError( + 'local backend does not support configuration autodetection' + ) + def _kill_all(self, job): '''Send SIGKILL to all the processes of the spawned job.''' try: @@ -207,3 +217,7 @@ def _poll_job(self, job): elif os.WIFSIGNALED(status): job._state = 'FAILURE' job._signal = os.WTERMSIG(status) + + @classmethod + def validate(cls) -> str: + return cls.registered_name diff --git a/reframe/core/schedulers/lsf.py b/reframe/core/schedulers/lsf.py index 73b6593f3b..1bd3f2eddf 100644 --- a/reframe/core/schedulers/lsf.py +++ b/reframe/core/schedulers/lsf.py @@ -13,10 +13,11 @@ import functools import re import time +from typing import Union import reframe.utility.osext as osext from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobSchedulerError +from reframe.core.exceptions import JobSchedulerError, SpawnedProcessError from reframe.core.schedulers.pbs import PbsJobScheduler _run_strict = functools.partial(osext.run_command, check=True) @@ -151,3 +152,11 @@ def finished(self, job): raise job.exception return job.state == 'COMPLETED' + + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which bsub') + return cls.registered_name + except SpawnedProcessError: + return False diff --git a/reframe/core/schedulers/oar.py b/reframe/core/schedulers/oar.py index 06733bf600..61d71bd18b 100644 --- a/reframe/core/schedulers/oar.py +++ b/reframe/core/schedulers/oar.py @@ -13,10 +13,12 @@ import os import re import time +from typing import Union import reframe.utility.osext as osext from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobError, JobSchedulerError +from reframe.core.exceptions import (JobError, JobSchedulerError, + SpawnedProcessError) from reframe.core.schedulers.pbs import PbsJobScheduler from reframe.utility import seconds_to_hms @@ -154,7 +156,8 @@ def poll(self, *jobs): # https://github.com/oar-team/oar/blob/37db5384c7827cca2d334e5248172bb700015434/sources/core/qfunctions/oarstat#L332 job_raw_info = completed.stdout jobid_match = re.search( - r'^(Job_Id|id):\s*(?P\S+)', completed.stdout, re.MULTILINE + r'^(Job_Id|id):\s*(?P\S+)', completed.stdout, + re.MULTILINE ) if jobid_match: jobid = jobid_match.group('jobid') @@ -198,3 +201,11 @@ def poll(self, *jobs): self.cancel(job) job._exception = JobError('maximum pending time exceeded', job.jobid) + + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which oarsub') + return cls.registered_name + except SpawnedProcessError: + return False diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index 86dbb6063d..76a8648c1c 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -14,11 +14,13 @@ import itertools import re import time +from typing import Union import reframe.core.schedulers as sched import reframe.utility.osext as osext from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobError, JobSchedulerError +from reframe.core.exceptions import (JobError, JobSchedulerError, + SpawnedProcessError) from reframe.utility import seconds_to_hms, toalphanum @@ -146,6 +148,16 @@ def filternodes(self, job, nodes): raise NotImplementedError('pbs backend does not support ' 'node filtering') + def feats_access_option(self, node_feats): + raise NotImplementedError( + 'pbs backend does not support configuration autodetection' + ) + + def build_context(self, node_feats): + raise NotImplementedError( + 'pbs backend does not support configuration autodetection' + ) + def submit(self, job): cmd_parts = ['qsub'] if self._sched_access_in_submit: @@ -305,6 +317,14 @@ def output_ready(job): job._exception = JobError('maximum pending time exceeded', job.jobid) + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which pbsnodes') + return cls.registered_name + except SpawnedProcessError: + return False + @register_scheduler('torque') class TorqueJobScheduler(PbsJobScheduler): diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index a6dc6b08ff..24fdd918ca 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -12,11 +12,12 @@ import functools import re import time +from typing import Union import xml.etree.ElementTree as ET import reframe.utility.osext as osext from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobSchedulerError +from reframe.core.exceptions import (JobSchedulerError, SpawnedProcessError) from reframe.core.schedulers.pbs import PbsJobScheduler from reframe.utility import seconds_to_hms @@ -40,7 +41,7 @@ def emit_preamble(self, job): if job.time_limit is not None: h, m, s = seconds_to_hms(job.time_limit) preamble.append( - self._format_option(f'-l h_rt=%d:%d:%d' % (h, m, s)) + self._format_option('-l h_rt=%d:%d:%d' % (h, m, s)) ) # Emit the rest of the options @@ -141,3 +142,11 @@ def finished(self, job): raise job.exception return job.state == 'COMPLETED' + + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which qconf') + return cls.registered_name + except SpawnedProcessError: + return False diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 47d44c1dfc..65d4a32569 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -3,15 +3,18 @@ # # SPDX-License-Identifier: BSD-3-Clause +import fnmatch import functools import glob import itertools import re import shlex import time +from typing import Union from argparse import ArgumentParser from contextlib import suppress +from reframe.core.logging import getlogger import reframe.core.runtime as rt import reframe.core.schedulers as sched import reframe.utility.osext as osext @@ -482,7 +485,7 @@ def _cancel_if_pending_too_long(self, job): t_pending = time.time() - job.submit_time if t_pending >= job.max_pending_time: - self.log(f'maximum pending time for job exceeded; cancelling it') + self.log('maximum pending time for job exceeded; cancelling it') self.cancel(job) job._exception = JobError('maximum pending time exceeded', job.jobid) @@ -571,6 +574,30 @@ def finished(self, job): return slurm_state_completed(job.state) + def feats_access_option(self, node_feats: list) -> list: + return ["-C "+"&".join(node_feats)] + + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which sacct') + return cls.registered_name + except SpawnedProcessError: + return False + + def build_context(self, modules_system, launcher, exclude_feats, + detect_containers, prefix, sched_options, time_limit): + '''Create the reframe context detecting the partitions with slurm''' + self._context = _SlurmContext( + modules_system=modules_system, launcher=launcher, scheduler=self, + detect_containers=detect_containers, prefix=prefix, + sched_options=sched_options, time_limit=time_limit + ) + self._context.search_node_types(exclude_feats) + self._context.create_login_partition() + self._context.create_partitions(sched_options) + return self._context.partitions + @register_scheduler('squeue') class SqueueJobScheduler(SlurmJobScheduler): @@ -631,6 +658,18 @@ def poll(self, *jobs): ) self._cancel_if_pending_too_long(job) + @classmethod + def validate(cls) -> Union[str, bool]: + # Make sure that if sacct is found it returns false + slurm_validate = super().validate() + if slurm_validate: + return False + try: + _run_strict('which squeue') + return cls.registered_name + except SpawnedProcessError: + return False + def _create_nodes(descriptions): nodes = set() @@ -657,6 +696,8 @@ def __init__(self, node_descr): 'ActiveFeatures', node_descr, sep=',') or set() self._states = self._extract_attribute( 'State', node_descr, sep='+') or set() + self._generic_resources = self._extract_attribute( + 'Gres', node_descr, sep=',') or set() self._descr = node_descr def __eq__(self, other): @@ -682,20 +723,25 @@ def is_avail(self): def is_down(self): return not self.is_avail() - def satisfies(self, slurm_constraint): + def satisfies(self, slurm_constraint: str): # Convert the Slurm constraint to a Python expression and evaluate it, # but restrict our syntax to accept only AND or OR constraints and # their combinations - if not re.match(r'^[\w\d\(\)\|\&]*$', slurm_constraint): + if not re.match(r'^[\w\d\(\)\|\&\-]*$', slurm_constraint): return False names = {grp[0] for grp in re.finditer(r'(\w(\w|\d)*)', slurm_constraint)} - expr = slurm_constraint.replace('|', ' or ').replace('&', ' and ') + slurm_constraint = slurm_constraint.replace( + "&", " and ").replace("|", " or ") + # Pattern to extract the variable names + pattern = r'\b(?!and\b|or\b)(\d*[a-zA-Z_]\w*)\b' + # Replace each variable with var['variable'] + expr = re.sub(pattern, r"vars['\1']", slurm_constraint) vars = {n: True for n in self.active_features} vars.update({n: False for n in names - self.active_features}) try: - return eval(expr, {}, vars) + return eval(expr, {}, {'vars': vars}) except BaseException: return False @@ -703,6 +749,10 @@ def satisfies(self, slurm_constraint): def active_features(self): return self._active_features + @property + def generic_resources(self): + return self._generic_resources + @property def name(self): return self._name @@ -729,3 +779,206 @@ def _extract_attribute(self, attr_name, node_descr, sep=None): def __str__(self): return self._name + + +class _SlurmContext(sched.ReframeContext): + + def __init__(self, modules_system: str, launcher: str, + scheduler: JobSchedulerError, detect_containers: bool, + prefix: str, sched_options: list, time_limit: int): + + super().__init__(modules_system=modules_system, launcher=launcher, + scheduler=scheduler, + detect_containers=detect_containers, + prefix=prefix, time_limit=time_limit) + self.node_types = [] + self.default_nodes = [] + self.reservations = [] + self._access = sched_options + + def submit_detect_job(self, job: _SlurmJob, node_features): + with osext.change_dir(job.workdir): + job.prepare(job.content) + try: + job.submit() + except SpawnedProcessError as e: + # Try resubmission with partition access + partition_access = self._get_access_partition(node_features) + if partition_access: + job.add_sched_access(partition_access) + # Second attempt + job.prepare(job.content) + try: + job.submit() + except SpawnedProcessError as e: + # Return the error + job.rm_sched_access(partition_access) + return e, job.sched_access + else: + return e, job.sched_access + + return None, job.sched_access + + def search_node_types(self, exclude_feats: Union[list, None] = []): + '''Search for node types in the system based on their features + ''' + + getlogger().debug('Filtering nodes based on ActiveFeatures...') + try: + nodes_info = self._scheduler.allnodes() + raw_node_types = [(tuple(n.active_features), tuple( + n.partitions)) for n in nodes_info] + raw_node_types = set(raw_node_types) + raw_node_types = [[tuple(n[0]), tuple( + n[1])] for n in raw_node_types] + except JobSchedulerError as e: + getlogger().error( + f'Node types could not be retrieved from scontrol: {e}' + ) + return + + default_partition = self._scheduler._get_default_partition() + + self._set_nodes_types(exclude_feats, raw_node_types, default_partition) + + def _set_nodes_types(self, exclude_feats: Union[list, None], + raw_node_types: list, + default_partition: Union[str, None]): + '''Set the node types in the system + + ''' + + default_nodes = [] # Initialize the list of node types in the default + # Initialize the list of node types (with filtered features) + node_types = [] + + for node in raw_node_types: + node_feats_raw = list(node[0]) # Before filtering features + node_feats = node_feats_raw + node_partition = node[1] + if exclude_feats: # Filter features + node_feats = self._filter_node_feats( + exclude_feats, node_feats_raw + ) + if node_feats: # If all features were removed, empty list + node_types.append(tuple(node_feats)) + # The nodes in the default partition based on their raw feats + if default_partition in node_partition: + default_nodes.append(tuple(node_feats)) + + default_nodes = set(default_nodes) + if len(default_nodes) > 1: + # Then all node types require the features in the access options + self.default_nodes = set() + else: + self.default_nodes = default_nodes # Get the filtered features + + getlogger().debug( + f'\nThe following {len(set(node_types))} ' + 'node types were detected:' + ) + for node_t in set(node_types): + getlogger().debug(node_t) + + self.node_types = set(node_types) # Get the unique types + + @staticmethod + def _filter_node_feats(exclude_feats: list, node_feats: list) -> list: + '''Filter the node types excluding the specified fixtures''' + node_valid_feats = [] + for feat in node_feats: # loop around the features + feat_valid = not any([fnmatch.fnmatch(feat, pattern) + for pattern in exclude_feats]) + if feat_valid: + node_valid_feats.append(feat) + return node_valid_feats + + def _find_devices(self, node_feats: list) -> Union[dict, None]: + # Retrieve a dictionary with the devices info + # If GRes for these nodes is 'gpu:a100:*' + # The returned dict will be: + # {'gpu:a100' : min(*)} + + getlogger().debug( + f'Detecting devices for node with features {node_feats}...') + try: + nodes_info = self._scheduler.allnodes() + node_feats = "&".join(node_feats) + devices_raw = {tuple(n.generic_resources) + for n in nodes_info + if n.satisfies(node_feats)} + except JobSchedulerError: + getlogger().warning('Unable to detect the devices in the node') + return None + + if len(devices_raw) > 1: + # This means that the nodes with this set of features + # do not all have the same devices installed. If the + # nodes have all the same model of GPUs but different + # number, it is considered as the same devices type + # so we don't raise this msg + getlogger().warning('Detected different devices in nodes ' + 'with the same set of features.\n' + 'Please check the devices option in ' + 'the configuration file.') + return None + else: + devices = [] + for device_i in devices_raw: + devices = [item.rsplit(':', 1)[0] for item in device_i] + devices = [','.join(devices)] + if '(null)' in list(devices) or 'gpu' not in next(iter(devices)): + # Detects if the nodes have no devices installed at + # all or if not GPUs are installed + getlogger().debug('No devices were found for this node type.') + return None + else: + getlogger().info('Detected GPUs') + # We only reach here if the devices installation + # is homogeneous accross the nodes + return self._count_gpus(devices_raw.pop()) + + def _get_access_partition(self, node_feats: list) -> Union[str, None]: + + nodes_info = self._scheduler.allnodes() + node_feats = "&".join(node_feats) + nd_partitions = {tuple(n.partitions) + for n in nodes_info + if n.satisfies(node_feats)} + nd_partitions = set(nd_partitions) + if len(nd_partitions) > 1: + return None + else: + nd_partitions = list(nd_partitions.pop()) + for n_f in node_feats: + if n_f in nd_partitions: + return [f'-p {n_f}'] + if len(nd_partitions) == 1: + return [f'-p {nd_partitions[0]}'] + + @staticmethod + def _count_gpus(node_devices: str) -> dict: + + # This method receives as input a string with the + # devices in the nodes + + # If more than one device is installed, we get the list + # Example: node_devices = 'gpu:2,craynetwork:6' + devices_dic = {} + for dvc in node_devices: + # Check if the device is a GPU + # There will be at least 1 GPU + if 'gpu' in dvc: + # Get the device model gpu or gpu:a100 + device_type = dvc.rsplit(":", 1)[0] + # Get the number of devices + devices_n = int(dvc.rsplit(":", 1)[1]) + # Save the minimum number found in all nodes + if device_type in devices_dic: + dvc_n = devices_dic[device_type] + if devices_n < dvc_n: + devices_dic[device_type] = devices_n + else: + devices_dic.update({device_type: devices_n}) + + return devices_dic diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index cae9089585..2e20725aa6 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -3,14 +3,18 @@ # # SPDX-License-Identifier: BSD-3-Clause +import functools import os import time +from typing import Union import reframe.utility.osext as osext from reframe.core.backends import register_scheduler from reframe.core.exceptions import ConfigError, SpawnedProcessError from reframe.core.schedulers import Job, JobScheduler, AlwaysIdleNode +_run_strict = functools.partial(osext.run_command, check=True) + class _SSHJob(Job): def __init__(self, *args, **kwargs): @@ -229,3 +233,21 @@ def filternodes(self, job, nodes): return [AlwaysIdleNode(host)] else: return [AlwaysIdleNode(h) for h in self._free_hosts] + + def feats_access_option(self, node_feats): + raise NotImplementedError( + 'ssh backend does not support configuration autodetection' + ) + + def build_context(self, node_feats): + raise NotImplementedError( + 'ssh backend does not support configuration autodetection' + ) + + @classmethod + def validate(cls) -> Union[str, bool]: + try: + _run_strict('which ssh') + return cls.registered_name + except SpawnedProcessError: + return False diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 92d9a1e01a..99636b61d0 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -438,6 +438,12 @@ def main(): help=('Detect the local host topology and exit, ' 'optionally saving it in FILE') ) + # CONFIGURATION + action_options.add_argument( + '--detect-configuration', metavar='FILE', action='store', + help=('Detect the configuration of the system. The content ' + 'in FILE must call the detect_config method from core/config') + ) action_options.add_argument( '--dry-run', action='store_true', help='Dry run the tests without submitting them for execution' @@ -872,6 +878,13 @@ def restrict_logging(): if not restrict_logging(): printer.adjust_verbosity(calc_verbosity(site_config, options.quiet)) + if options.detect_configuration: + runtime.init_runtime(site_config) + site_config = config.load_config( + options.detect_configuration, validate=False + ) + sys.exit(0) + # Now configure ReFrame according to the user configuration file try: # Issue a deprecation warning if the old `RFM_CONFIG_FILE` is used diff --git a/reframe/schemas/reframe_config_template.j2 b/reframe/schemas/reframe_config_template.j2 new file mode 100644 index 0000000000..4c171699fa --- /dev/null +++ b/reframe/schemas/reframe_config_template.j2 @@ -0,0 +1,74 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +# This is a generated ReFrame configuration file +# The values in this file are dynamically filled in using the system's current configuration + +site_configuration = { + 'systems': [ + { + 'name': '{{ name }}', # Name of the system + 'descr': 'System description for {{ name }}', # Description of the system + 'hostnames': {{hostnames}}, # Hostname used by this system + 'modules_system': '{{modules_system}}', + {% if modules %} + # Specify the modules to be loaded in the system when running reframe (if any) + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.modules + 'modules': {{ modules }}, + {% endif %} + {% if resourcesdir %} + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.resourcesdir + 'resourcesdir': '{{ resourcesdir }}', # Directory path for system resources + {% endif %} + # Define the partitions of the system (based on node type or reservations) + # !!Partition autodetection is only available for the slurm scheduler + 'partitions': [ + {% for partition in partitions %} + { + 'name': '{{partition.name}}', + 'descr': '{{partition.descr}}', + 'launcher': '{{partition.launcher}}', # Launcher for parallel jobs + 'environs': {{partition.environs}}, # Check 'environments' config below + 'scheduler': '{{partition.scheduler}}', + 'time_limit': '{{partition.time_limit}}', + 'max_jobs': {{partition.max_jobs}}, + {% if partition.features | length > 1 %} + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.partitions.extras + 'extras': {{partition.extras}}, + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.partitions.env_vars + 'env_vars': {{partition.env_vars}}, + {% if partition.container_platforms %} + # Check if any container platforms are available in these nodes and add them + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#container-platform-configuration + 'container_platforms': [ + {% for c_p in partition.container_platforms %} + { 'type': '{{c_p.type}}', # Type of container platform + {% if c_p.modules %} + # Specify here the modules required to run the container platforms (if any) + 'modules': {{c_p.modules}} + {% endif %} + }, + {% endfor %} + ], + {% endif %} + {% endif %} + {% if partition.access %} + # Options passed to the job scheduler in order to submit a job to the specific nodes in this partition + 'access': {{partition.access}}, + {% endif %} + {% if partition.features %} + # Node features detected in slurm + 'features': {{partition.features}}, + {% endif %} + }, + {% endfor %} + ], + }, + ], + # The environments cannot be automatically detected, check the following links for reference + # 'https://github.com/eth-cscs/cscs-reframe-tests/tree/alps/config/systems': CSCS github repo + # 'https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#environment-configuration': ReFrame documentation + 'environments': [] +} diff --git a/reframe/utility/config_detection.py b/reframe/utility/config_detection.py new file mode 100644 index 0000000000..a5b6990727 --- /dev/null +++ b/reframe/utility/config_detection.py @@ -0,0 +1,103 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +containers_detect_bash = ''' +# List of containers to check +CONTAINERS=( + "Sarus:sarus" + "Apptainer:apptainer" + "Docker:docker" + "Singularity:singularity" + "Shifter:shifter" +) + +# Array to hold installed containers +installed=() + +# Function to check for module existence (with lmod) +check_module_spider() { + output=$(module spider "$1" 2>&1) + if echo $output | grep -q "error"; then + return 1 + else + return 0 + fi +} + +# Function to check for module existence (with tmod) +check_module_avail() { + output=$(module avail "$1" 2>&1) + if echo $output | grep -q "$1"; then + return 0 + else + return 1 + fi +} + +check_lmod() { + if [[ -n "$LMOD_CMD" ]]; then + return 0 + else + return 1 + fi +} + +check_tmod() { + if [[ -n "modulecmd -V" ]]; then + return 0 + else + return 1 + fi +} + +# Check each container command +for container in "${CONTAINERS[@]}"; do + IFS=":" read -r name cmd <<< "$container" + + # Check if the command exists via 'which' + found_via_command=false + found_via_module=false + + if which "$cmd" > /dev/null 2>&1; then + found_via_command=true + fi + + if check_lmod; then + # Check if it is available as a module, regardless of 'which' result + if check_module_spider "$cmd"; then + output=$(module spider "$cmd" 2>&1) + modules_load=$(echo $output | grep -oP '\ + (?<=available to load.).*?(?= Help)') + found_via_module=true + fi + fi + + if check_tmod; then + # Check if it is available as a module, regardless of 'which' result + if check_module_avail "$cmd"; then + output=$(module avail "$cmd" 2>&1) + modules_load="" + found_via_module=true + fi + fi + + # Determine the status of the container + if $found_via_command && $found_via_module; then + installed+=("$name modules: $modules_load") + elif $found_via_command; then + installed+=("$name") + elif $found_via_module; then + installed+=("$name modules: $modules_load") + else + echo "$name is not installed." + fi +done + +# Output installed containers +echo "Installed containers:" +for name in "${installed[@]}"; do + echo "$name" +done +''' diff --git a/requirements.txt b/requirements.txt index b730f4ef21..1f2235de0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,14 @@ archspec==0.2.5 argcomplete==3.1.2; python_version < '3.8' argcomplete==3.5.1; python_version >= '3.8' +autopep8==2.0.4 filelock==3.4.1; python_version == '3.6' filelock==3.12.2; python_version == '3.7' filelock==3.16.1; python_version >= '3.8' importlib_metadata==4.0.1; python_version < '3.8' jsonschema==3.2.0 +jinja2==3.0; python_version < '3.7' +jinja2==3.1.2; python_version >= '3.7' lxml==5.2.0; python_version < '3.8' and platform_machine == 'aarch64' lxml==5.3.0; python_version >= '3.8' or platform_machine != 'aarch64' pytest==7.0.1; python_version < '3.8' diff --git a/setup.cfg b/setup.cfg index 642a277bc0..c12314d15c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,9 +29,11 @@ install_requires = archspec >= 0.2.4 argcomplete argcomplete <= 3.1.2; python_version < '3.8' + autopep8 filelock filelock<=3.12.2; python_version == '3.7' filelock<=3.4.1; python_version == '3.6' + jinja2 jsonschema lxml==5.2.0; python_version < '3.8' and platform_machine == 'aarch64' lxml==5.3.0; python_version >= '3.8' or platform_machine != 'aarch64' diff --git a/unittests/test_launchers.py b/unittests/test_launchers.py index 561dffd86e..c6d29493fb 100644 --- a/unittests/test_launchers.py +++ b/unittests/test_launchers.py @@ -56,9 +56,21 @@ def allnodes(self): def filternodes(self, job, nodes): pass + def feats_access_option(self, node_feats): + pass + + def build_context(self, modules_system, launcher, + exclude_feats, detect_containers, + prefix, sched_options, time_limit): + pass + def poll(self, *jobs): pass + @classmethod + def validate(cls): + pass + def _make_job(launcher, *args, **kwargs): return Job.create(FakeJobScheduler(), launcher, 'fake_job', *args, **kwargs)