From 5042f0f3499f5364beed613d7dbc83217d6a5332 Mon Sep 17 00:00:00 2001 From: Johannes Bulin Date: Mon, 13 Jan 2025 18:28:37 +0000 Subject: [PATCH] Reimplemented launching procedure (#11). --- VERSION | 2 +- ifsbench/__init__.py | 3 +- ifsbench/application.py | 156 ++++++++ ifsbench/arch.py | 531 +++++--------------------- ifsbench/benchmark.py | 228 ----------- ifsbench/env.py | 197 ++++++++++ ifsbench/ifs.py | 4 - ifsbench/job.py | 294 +++++--------- ifsbench/launcher.py | 287 -------------- ifsbench/launcher/__init__.py | 11 + ifsbench/launcher/bashlauncher.py | 75 ++++ ifsbench/launcher/launcher.py | 90 +++++ ifsbench/launcher/mpirunlauncher.py | 81 ++++ ifsbench/launcher/srunlauncher.py | 99 +++++ tests/launcher/test_mpirunlauncher.py | 78 ++++ tests/launcher/test_srunlauncher.py | 77 ++++ tests/test_application.py | 47 +++ tests/test_arch.py | 211 +++------- tests/test_benchmark.py | 240 ------------ tests/test_envhandler.py | 121 ++++++ tests/test_job.py | 163 +++----- tests/test_launcher.py | 170 --------- 22 files changed, 1366 insertions(+), 1799 deletions(-) create mode 100644 ifsbench/application.py delete mode 100644 ifsbench/benchmark.py create mode 100644 ifsbench/env.py delete mode 100644 ifsbench/launcher.py create mode 100644 ifsbench/launcher/__init__.py create mode 100644 ifsbench/launcher/bashlauncher.py create mode 100644 ifsbench/launcher/launcher.py create mode 100644 ifsbench/launcher/mpirunlauncher.py create mode 100644 ifsbench/launcher/srunlauncher.py create mode 100644 tests/launcher/test_mpirunlauncher.py create mode 100644 tests/launcher/test_srunlauncher.py create mode 100644 tests/test_application.py delete mode 100644 tests/test_benchmark.py create mode 100644 tests/test_envhandler.py delete mode 100644 tests/test_launcher.py diff --git a/VERSION b/VERSION index d917d3e..0ea3a94 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.2 +0.2.0 diff --git a/ifsbench/__init__.py b/ifsbench/__init__.py index 05ca20a..a4eb88b 100644 --- a/ifsbench/__init__.py +++ b/ifsbench/__init__.py @@ -13,11 +13,12 @@ from importlib.metadata import version, PackageNotFoundError +from .application import * # noqa from .arch import * # noqa -from .benchmark import * # noqa from .cli import * # noqa from .darshanreport import * # noqa from .drhook import * # noqa +from .env import * # noqa from .files import * # noqa from .gribfile import * # noqa from .ifs import * # noqa diff --git a/ifsbench/application.py b/ifsbench/application.py new file mode 100644 index 0000000..f3e1bcd --- /dev/null +++ b/ifsbench/application.py @@ -0,0 +1,156 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +from abc import ABC, abstractmethod +from pathlib import Path + + +from ifsbench.data import DataHandler +from ifsbench.env import EnvHandler +from ifsbench.job import Job + +__all__ = ['Application', 'DefaultApplication'] + +class Application(ABC): + """ + Base class for applications that can be launched. + """ + + @abstractmethod + def get_data_handlers(self, run_dir, job): + """ + Return necessary data handlers. + + Return all application-specific handler objects that must be run before + launching the executable. + + Parameters + ---------- + run_dir: :any:`pathlib.Path` + The working directory from where the executable will be launched. + job: Job + The parallel job setup. + + Returns + ------- + list[DataHandler]: + List of DataHandler objects. + """ + return NotImplemented + + @abstractmethod + def get_env_handlers(self, run_dir, job): + """ + Return necessary environment handlers. + + Return all application-specific environment handler objects that are + needed for launching the executable. + + Parameters + ---------- + run_dir: :any:`pathlib.Path` + The working directory from where the executable will be launched. + job: Job + The parallel job setup. + + Returns + ------- + list[EnvHandler]: + List of EnvHandler objects. + """ + return NotImplemented + + @abstractmethod + def get_library_paths(self, run_dir, job): + """ + Return necessary library paths. + + Return all library paths that are required to launch this executable. + + Parameters + ---------- + run_dir: :any:`pathlib.Path` + The working directory from where the executable will be launched. + job: Job + The parallel job setup. + + Returns + ------- + list[pathlib.Path]: + List of the library paths. + """ + return NotImplemented + + @abstractmethod + def get_command(self, run_dir, job): + """ + Return the corresponding command. + + Parameters + ---------- + run_dir: :any:`pathlib.Path` + The working directory from where the executable will be launched. + job: Job + The parallel job setup. + + Returns + ------- + list[str]: + The command and (if necessary) corresponding flags. + """ + return NotImplemented + + +class DefaultApplication(Application): + """ + Default application implementation. + + Simple implementation of the application class where all returned data is + static and doesn't depend on the ``run_dir`` or ``job`` parameters. + + Parameters + ---------- + command: list[str] + The command that corresponds to this application. + data_handlers: list[DataHandler] + The DataHandler list that is returned by get_data_handlers. + env_handlers: list[EnvHandler] + The EnvHandler list that is returned by get_env_handlers. + library_paths: list[pathlib.Path] + The library path list that is returned by get_library_paths. + """ + + def __init__(self, command, data_handlers=None, env_handlers=None, library_paths=None): + self._command = list(command) + + if data_handlers: + self._data_handlers = list(data_handlers) + else: + self._data_handlers = [] + + if env_handlers: + self._env_handlers = list(env_handlers) + else: + self._env_handlers = [] + + if library_paths: + self._library_paths = list(library_paths) + else: + self._library_paths = [] + + def get_data_handlers(self, run_dir, job): + return list(self._data_handlers) + + def get_env_handlers(self, run_dir, job): + return list(self._env_handlers) + + def get_library_paths(self, run_dir, job): + return list(self._library_paths) + + def get_command(self, run_dir, job): + return list(self._command) + \ No newline at end of file diff --git a/ifsbench/arch.py b/ifsbench/arch.py index 1dc250b..577033a 100644 --- a/ifsbench/arch.py +++ b/ifsbench/arch.py @@ -9,467 +9,144 @@ Architecture specifications """ from abc import ABC, abstractmethod -import os +from dataclasses import dataclass, field -from .job import CpuConfiguration, CpuBinding, Job -from .launcher import Launcher, MpirunLauncher, SrunLauncher, AprunLauncher -from .util import as_tuple, execute +from ifsbench.env import EnvHandler +from ifsbench.launcher.launcher import Launcher +__all__ = ['Arch', 'DefaultArch'] -__all__ = ['Arch', 'Workstation', 'XC40Cray', 'XC40Intel', 'AtosAaIntel', - 'LumiC', 'LumiG', 'arch_registry'] - - -class Arch(ABC): +@dataclass +class ArchResult: """ - Machine and compiler architecture on which to run the IFS - - This provides compiler and environment assumptions for MPI-parallel runs. + Holds results of an :meth:`Arch.process` run. - An implementation of this class must be provided for each target system. - It must declare the CPU configuration in :attr:`cpu_config` and the - launcher in :attr:`launcher` that is used for MPI-parallel execution. - - For multiple toolchains/runtime environments on a single physical system - it can be beneficial to create an intermediate class describing common - properties (such as :attr:`cpu_config`) and derived classes to specify - the bespoke configuration. + Attributes: + job: Job + The updated job after the architecture processing. + env_handlers: list[EnvHandler] + Additional EnvHandler objects that set architecture-specific environment + flags. + default_launcher: Launcher + The default launcher that is used on this system. + default_launcher_flags : list[str] + Additional launcher flags that should be added to launcher invocations. """ + job = None + env_handlers : list[EnvHandler] = field(default_factory=list) + default_launcher : Launcher = None + default_launcher_flags : list[str] = field(default_factory=list) - cpu_config: CpuConfiguration - """The CPU configuration of the architecture - This describes the available compute capability on each compute node - from which the job resource requirements can be derived. +class Arch(ABC): """ + Architecture/system description. - launcher: Launcher - """The MPI launcher that is used on the architecture - - This is used to derive the relevant launch command to execute the application - using the resource requirements specified in a run's :any:`Job` description. + Additional machine/software environment information that is used to run jobs. + It provides information about the parallel setup (number of cores per node, + available GPUs, etc.) as well as system-specific environment variables. """ - @classmethod @abstractmethod - def run(cls, cmd, tasks, cpus_per_task, threads_per_core, launch_cmd=None, - launch_user_options=None, logfile=None, env=None, gpus_per_task=None, - **kwargs): + def get_default_launcher(self): """ - Arch-specific general purpose executable execution - - This method must be implemented by architecture-specific implementations, - either to launch :data:`cmd` directly or to build a job specification - :any:`Job` and launch this via :meth:`run_job`. + Return the launcher that is usually used on this system (e.g. + SLURM, PBS, MPI). - Parameters - ---------- - cmd : list or str - The command of the MPI executable (without the launch command) - tasks : int - The total number of MPI tasks to use - cpus_per_task : int - The number of threads to start for each MPI task - threads_per_core : int - The number of hyperthreads to use on each core - launch_cmd : list or str, optional - User-provided launch command that is used instead of generating - a launch command based on :data:`job` specifications - launch_user_options : list or str, optional - User-provided launch command options that are added to the - generated launch command. Ignored if :data:`launch_cmd` is given. - logfile : str or :any:`pathlib.Path`, optional - An optional logfile to store the output - env : dict, optional - Custom environment to use - gpus_per_task: int, optional - The number of GPUs that are used per MPI task - kwargs : - Other arguments that may be used in the architecture implementation - or may be passed on to :any:`execute` + Returns + ------- + Launcher + The default launcher used in this architecture. """ + return NotImplemented - @classmethod - def run_job(cls, cmd, job, launch_cmd=None, launch_user_options=None, - logfile=None, env=None, **kwargs): + @abstractmethod + def get_cpu_configuration(self): """ - Arch-specific general purpose executable execution for :any:`Job` specification + Return the hardware setup that is used. - This method can be used by the architecture-specific implementations. + Returns + ------- + CpuConfiguration + """ + return NotImplemented - It launches :data:`cmd` using the resource requirements specified in :data:`job`. - The architecture-specific :attr:`launcher` is used to generate the - required launch command for that. The user can override this behaviour - by providing a custom launch command in :data:`launch_cmd`. + @abstractmethod + def process_job(self, job, **kwargs): + """ + Process a given job and add architecture-dependent tweaks. This will + return a :class:`ArchResult` object that + * holds an updated Job object. + * may specify additional EnvHandler objects needed on this architecture. + * may specify additional flags that should be passed to the default + launcher. Parameters ---------- - cmd : list or str - The command of the MPI executable (without the launch command) - job : :any:`Job` - The resource requirements of the job to be run - launch_cmd : list or str, optional - User-provided launch command that is used instead of generating - a launch command based on :data:`job` specifications - launch_user_options : list or str, optional - User-provided launch command options that are added to the - generated launch command. Ignored if :data:`launch_cmd` is given. - logfile : str or :any:`pathlib.Path`, optional - An optional logfile to store the output - env : dict, optional - Custom environment to use - kwargs : - Other arguments that may be used in the architecture implementation - or may be passed on to :any:`execute` + job: Job + The initial job specification. This object is not updated. + + Returns + ------- + ArchResult """ - assert isinstance(job, Job) - assert job.cpu_config == cls.cpu_config - - # Just pop a few arguments from kwargs that are used by some - # architectures but not by all. If we don't remove them here, they may - # cause trouble inside the subsequent "execute" call. - kwargs.pop('mpi_gpu_aware', False) - - if env is None: - env = os.environ.copy() - - if launch_cmd is None: - launch_cmd = as_tuple(cls.launcher.get_launch_cmd(job, user_options=launch_user_options)) - else: - launch_cmd = as_tuple(launch_cmd) - - full_cmd = ' '.join(launch_cmd + as_tuple(cmd)) - execute(full_cmd, logfile=logfile, env=env, **kwargs) - - -class Workstation(Arch): - """ - Default setup for ECMWF workstations. - """ - - class WorkstationCpuConfig(CpuConfiguration): - """Most workstations have a quad-core CPU with hyperthreading""" - - sockets_per_node = 1 - cores_per_socket = 4 - threads_per_core = 2 - - cpu_config = WorkstationCpuConfig - - launcher = MpirunLauncher - - @classmethod - def run(cls, cmd, tasks, cpus_per_task, threads_per_core, launch_cmd=None, - launch_user_options=None, logfile=None, env=None, gpus_per_task=None, - **kwargs): - """Build job description using :attr:`cpu_config`""" - - # Setup environment - if env is None: - env = os.environ.copy() - env['OMP_NUM_THREADS'] = cpus_per_task - # TODO: Ensure proper pinning - - # Build job description - job = Job(cls.cpu_config, tasks=tasks, cpus_per_task=cpus_per_task, - threads_per_core=threads_per_core) - - # Launch via generic run - cls.run_job(cmd, job, launch_cmd=launch_cmd, launch_user_options=launch_user_options, - logfile=logfile, env=env, **kwargs) - - -class XC40(Arch): - """ - Hardware and launcher setup for ECMWF's Cray XC40 system - - Toolchain specific settings are provided in derived - classes :class:`XC40Cray` and :class:`XC40Intel` for Cray compiler and - Intel Compiler, respectively - """ - - class XC40CpuConfig(CpuConfiguration): - """Dual-socket Intel Xeon E5-2650 v4 (12 cores/24 threads, 2.2 GHz)""" - - sockets_per_node = 2 - cores_per_socket = 12 - threads_per_core = 2 - - cpu_config = XC40CpuConfig - - launcher = AprunLauncher - - -class XC40Cray(XC40): - """ - Cray compiler-toolchain setup for :any:`XC40` - """ - - @classmethod - def run(cls, cmd, tasks, cpus_per_task, threads_per_core, launch_cmd=None, - launch_user_options=None, logfile=None, env=None, gpus_per_task=None, - **kwargs): - """Build job description using :attr:`XC40.cpu_config`""" - - # Setup environment - if env is None: - env = os.environ.copy() - env['OMP_NUM_THREADS'] = cpus_per_task - - # Fill nodes without hyperthreading - tasks_per_node = kwargs.pop('tasks_per_node', min(tasks, cls.cpu_config.cores_per_node)) - tasks_per_socket = tasks_per_node // 2 - - # Binding-strategy -cc cpu - bind = CpuBinding.BIND_CORES - - # Build job description - job = Job(cls.cpu_config, tasks=tasks, tasks_per_node=tasks_per_node, - tasks_per_socket=tasks_per_socket, cpus_per_task=cpus_per_task, - threads_per_core=threads_per_core, bind=bind) - - # Strict memory containment - if launch_user_options is None: - launch_user_options = [] - launch_user_options += ['-ss'] - - # Launch via generic run - cls.run_job(cmd, job, launch_cmd=launch_cmd, launch_user_options=launch_user_options, - logfile=logfile, env=env, **kwargs) - - -class XC40Intel(XC40): - """ - Intel compiler-toolchain setup for :any:`XC40` - """ - - @classmethod - def run(cls, cmd, tasks, cpus_per_task, threads_per_core, launch_cmd=None, - launch_user_options=None, logfile=None, env=None, gpus_per_task=None, - **kwargs): - """Build job description using :attr:`XC40.cpu_config`""" - - # Setup environment - if env is None: - env = os.environ.copy() - env['OMP_NUM_THREADS'] = cpus_per_task - env['OMP_PLACES'] = 'threads' - env['OMP_PROC_BIND'] = 'close' - - # Fill nodes without hyperthreading - tasks_per_node = kwargs.pop('tasks_per_node', min(tasks, cls.cpu_config.cores_per_node)) - tasks_per_socket = tasks_per_node // 2 + return NotImplemented - # Binding-strategy -cc depth - bind = CpuBinding.BIND_THREADS +class DefaultArch(Arch): - # Build job description - job = Job(cls.cpu_config, tasks=tasks, tasks_per_node=tasks_per_node, - tasks_per_socket=tasks_per_socket, cpus_per_task=cpus_per_task, - threads_per_core=threads_per_core, bind=bind) - - # Launch via generic run - cls.run_job(cmd, job, launch_cmd=launch_cmd, launch_user_options=launch_user_options, - logfile=logfile, env=env, **kwargs) - - -class Atos(Arch): - """ - Hardware setup for ECMWF's aa system in Bologna - """ - - @classmethod - def run(cls, cmd, tasks, cpus_per_task, threads_per_core, launch_cmd=None, - launch_user_options=None, logfile=None, env=None, gpus_per_task=None, - **kwargs): - """Build job description using :attr:`cpu_config`""" - - # Setup environment - if env is None: - env = os.environ.copy() - env['OMP_NUM_THREADS'] = cpus_per_task - # TODO: Ensure proper pinning - - # Fill nodes as much as possible - max_tasks_per_node = cls.cpu_config.cores_per_node * threads_per_core // cpus_per_task - tasks_per_node = kwargs.pop('tasks_per_node', min(tasks, max_tasks_per_node)) - - launch_user_options = list(as_tuple(launch_user_options)) - - # If GPUs are used, request the GPU partition. - if gpus_per_task is not None and gpus_per_task > 0: - if cls.cpu_config.gpus_per_node // gpus_per_task <= 0: - raise ValueError(f"Not enough GPUs are available on the " - f"architecture {cls.__name__}!") - - launch_user_options.insert(0, '--qos=ng') - tasks_per_node = min( - tasks_per_node, - cls.cpu_config.gpus_per_node // gpus_per_task - ) - elif tasks * cpus_per_task > 32: - # By default, stuff on Atos runs on the GPIL nodes which allow only - # up to 32 cores. If more resources are needed, the compute - # partition should be requested. - launch_user_options.insert(0, '--qos=np') - - - # Bind to cores - bind = CpuBinding.BIND_CORES - - # Build job description - job = Job(cls.cpu_config, tasks=tasks, tasks_per_node=tasks_per_node, - cpus_per_task=cpus_per_task, threads_per_core=threads_per_core, - bind=bind, gpus_per_task=gpus_per_task) - - # Launch via generic run - cls.run_job(cmd, job, launch_cmd=launch_cmd, launch_user_options=launch_user_options, - logfile=logfile, env=env, **kwargs) - -class AtosAaIntel(Atos): - """ - Intel compiler-toolchain setup for :any:`AtosAa` - """ - - class AtosAaCpuConfig(CpuConfiguration): - """Dual-socket AMD EPYC 7H12 (64 core/128 thread, 2.6 GHz)""" - - sockets_per_node = 2 - cores_per_socket = 64 - threads_per_core = 2 - - cpu_config = AtosAaCpuConfig - - launcher = SrunLauncher - - -class AtosAc(Atos): - """ - Architecture for the Atos ac partition that also offers NVIDIA A100 GPUs. - """ - - class AtosAcCpuConfig(CpuConfiguration): - """Dual-socket AMD EPYC 7H12 (64 core/128 thread, 2.6 GHz)""" - - sockets_per_node = 2 - cores_per_socket = 64 - threads_per_core = 2 - gpus_per_node = 4 - - cpu_config = AtosAcCpuConfig - - launcher = SrunLauncher - -class Lumi(Arch): - # Define the default partition - partition : str - - @classmethod - def run(cls, cmd, tasks, cpus_per_task, threads_per_core, launch_cmd=None, - launch_user_options=None, logfile=None, env=None, gpus_per_task=None, - **kwargs): - """Build job description using :attr:`cpu_config`""" - - # Setup environment - if env is None: - env = os.environ.copy() - env['OMP_NUM_THREADS'] = cpus_per_task - # TODO: Ensure proper pinning - - # Fill nodes as much as possible - max_tasks_per_node = cls.cpu_config.cores_per_node * threads_per_core // cpus_per_task - tasks_per_node = kwargs.pop('tasks_per_node', min(tasks, max_tasks_per_node)) - - # Use the correct partition. - launch_user_options = list(as_tuple(launch_user_options)) - launch_user_options.insert(0, f"--partition={cls.partition}") - - - # If GPUs are used, limit the number of tasks per node. - if gpus_per_task is not None and gpus_per_task > 0: - if cls.cpu_config.gpus_per_node // gpus_per_task <= 0: - raise ValueError(f"Not enough GPUs are available on the " - f"architecture {cls.__name__}!") - - tasks_per_node = min( - tasks_per_node, - cls.cpu_config.gpus_per_node // gpus_per_task - ) - - use_gpu_mpi = kwargs.pop('mpi_gpu_aware', False) - - if use_gpu_mpi: - env['MPICH_GPU_SUPPORT_ENABLED'] = '1' - env['MPICH_SMP_SINGLE_COPY_MODE'] = 'NONE' - env['MPICH_GPU_IPC_ENABLED'] = '0' - - # Bind to cores - bind = CpuBinding.BIND_CORES - - # Build job description - job = Job(cls.cpu_config, tasks=tasks, tasks_per_node=tasks_per_node, - cpus_per_task=cpus_per_task, threads_per_core=threads_per_core, - bind=bind, gpus_per_task=gpus_per_task) - - # Launch via generic run - cls.run_job(cmd, job, launch_cmd=launch_cmd, launch_user_options=launch_user_options, - logfile=logfile, env=env, **kwargs) - - -class LumiC(Lumi): - """ - Architecture for the LUMI-C partition. - """ - - partition = "standard" - - class LumiCCpuConfig(CpuConfiguration): - sockets_per_node = 2 - cores_per_socket = 64 - threads_per_core = 2 + def __init__(self, launcher, cpu_config, set_explicit=False, + account=None, partition=None): + """ + Default architecture that can be used for various systems. - cpu_config = LumiCCpuConfig + Parameters + ---------- + launcher : Launcher + The default launcher that is used on this system. + cpu_config : CpuConfiguration + The hardware setup of the system. + set_explicit : bool + If set to True, the following attributes in result.job are + calculated and set explicitly, using ``cpu_config``: + * tasks + * nodes + * tasks_per_node + If not, these values will stay None, if not specified. + account : str + The account that is passed to the launcher. + partition : str + The partition that will be passed to the launcher. + """ + self._cpu_config = cpu_config + self._launcher = launcher + self._set_explicit = bool(set_explicit) + self._account = account + self._partition = partition - launcher = SrunLauncher + def get_default_launcher(self): + return self._launcher + def get_cpu_configuration(self): + return self._cpu_config -class LumiG(Lumi): - """ - Architecture for the LUMI-G partition. Only 56 cores per node are available - per node! - """ + def process_job(self, job, **kwargs): + result = ArchResult() - partition = "standard-g" + account = self._account + partition = self._partition - class LumiGCpuConfig(CpuConfiguration): - """ - Single 64 core AMD EPYC 7A53 "Trento" CPU. One core per L3 region is - deactivated, giving 56 usable cores in total. - The node is also equipped with four AMD MI250X GPUs which in turn - contain two GPU dies (the SLURM scheduler treats this as eight distinct - GPUs). - The CPU has 128GiB main memory and each MI250X has 2x64GB HBM memory. - """ - sockets_per_node = 1 - cores_per_socket = 56 - threads_per_core = 2 - gpus_per_node = 8 + result.job = job.copy() - cpu_config = LumiGCpuConfig + if partition: + result.job.set('partition', partition) + if account: + result.job.set('account', account) - launcher = SrunLauncher + if self._set_explicit: + result.job.calculate_missing(self._cpu_config) + result.default_launcher = self.get_default_launcher() -arch_registry = { - None: AtosAaIntel, - 'workstation': Workstation, - 'xc40': XC40Cray, - 'xc40cray': XC40Cray, - 'xc40intel': XC40Intel, - 'atos_aa': AtosAaIntel, - 'atos_ac': AtosAc, - 'lumi_c': LumiC, - 'lumi_g': LumiG -} -"""String-lookup of :any:`Arch` implementations""" + return result + \ No newline at end of file diff --git a/ifsbench/benchmark.py b/ifsbench/benchmark.py deleted file mode 100644 index c17431f..0000000 --- a/ifsbench/benchmark.py +++ /dev/null @@ -1,228 +0,0 @@ -# (C) Copyright 2020- ECMWF. -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. - -""" -Classes to set-up a benchmark -""" -from abc import ABC, abstractmethod -from pathlib import Path -from subprocess import CalledProcessError -from pprint import pformat -import sys - -from .drhook import DrHook -from .logging import warning, error, debug -from .util import copy_data, symlink_data, as_tuple, flatten -from .runrecord import RunRecord - - -__all__ = ['Benchmark', 'ExperimentFilesBenchmark'] - - -class Benchmark(ABC): - """ - Definition of a general benchmark setup - - Parameters - ---------- - expid : str - The experiment id corresponding to the input data set. - ifs : :any:`IFS` - The IFS configuration object. - rundir : str or :any:`pathlib.Path`, optional - The default working directory to be used for :meth:`run`. - """ - - def __init__(self, **kwargs): - self.expid = kwargs.get('expid') - self.rundir = kwargs.get('rundir', None) - - self.ifs = kwargs.get('ifs') - - @property - @classmethod - @abstractmethod - def input_files(cls): - """ - List of relative paths that define all necessary input data files to - run this benchmark - - Returns - ------- - list of str or :any:`pathlib.Path` - Relative paths for all input files required to run this benchmark. - The relative paths will be reproduced in :attr:`Benchmark.rundir`. - """ - - @classmethod - def from_files(cls, **kwargs): - """ - Create instance of :class:`Benchmark` by globbing a set of input paths - for the necessary input data and copying or linking it into rundir - - Parameters - ---------- - rundir : str or :any:`pathlib.Path` - Run directory to copy/symlink input data into - srcdir : (list of) str or :any:`pathlib.Path` - One or more source directories to search for input data - ifsdata : str or :any:`pathlib.Path`, optional - `ifsdata` directory to link as a whole - (default: :attr:`Benchmark.input_data`) - input_files : list of str, optional - Relative paths of necessary input files - copy : bool, optional - Copy files into :data:`rundir` instead of symlinking them (default: False) - force : bool, optional - Overwrite existing input files and re-link/copy (default: False) - """ - srcdir = as_tuple(kwargs.get('srcdir')) - rundir = Path(kwargs.get('rundir')) - copy = kwargs.pop('copy', False) - force = kwargs.pop('force', False) - ifsdata = kwargs.get('ifsdata', None) - input_files = kwargs.get('input_files', cls.input_files) - - if ifsdata is not None: - symlink_data(Path(ifsdata), rundir/'ifsdata', force=force) - - # Copy / symlink input files into rundir - for path in input_files: - path = Path(path) - dest = Path(rundir) / path - candidates = flatten([list(Path(s).glob(f'**/{path.name}')) for s in srcdir]) - if len(candidates) == 0: - warning(f'Input file {path.name} not found in {srcdir}') - continue - if len(candidates) == 1: - source = candidates[0] - else: - warning(f'More than one input file {path.name} found in {srcdir}') - source = candidates[0] - - if copy: - copy_data(source, dest, force=force) - else: - symlink_data(source, dest, force=force) - - return cls(**kwargs) - - @classmethod - def from_tarball(cls): - """ - Create instance of ``Benchmark`` object from given tarball - """ - raise NotImplementedError - - def to_tarball(self, filepath): - """ - Dump input files and configuration to a tarball for off-line - benchmarking. - """ - raise NotImplementedError - - def check_input(self): - """ - Check input file list matches benchmark configuration. - """ - for path in self.input_files: - filepath = self.rundir / path - if not filepath.exists(): - raise RuntimeError(f'Required input file "{filepath}" not found!') - - def run(self, **kwargs): - """ - Run the specified benchmark and validate against stored results. - """ - if 'rundir' in kwargs: - if kwargs['rundir'] != self.rundir: - error(f'Stored run directory: {self.rundir}') - error(f'Given run directory: {kwargs["rundir"]}') - raise RuntimeError('Conflicting run directories provided!') - else: - kwargs['rundir'] = self.rundir - - try: - self.ifs.run(**kwargs) - - except CalledProcessError: - error('Benchmark run failed!') - debug(f'Execution parameters:\n{pformat(kwargs, indent=2)}') - sys.exit(-1) - - # Provide DrHook output path only if DrHook is active - drhook = kwargs.get('drhook', DrHook.OFF) - drhook_path = None if drhook == DrHook.OFF else self.rundir/'drhook.*' - - dryrun = kwargs.get('dryrun', False) - if not dryrun: - return RunRecord.from_run(nodefile=self.rundir/'NODE.001_01', drhook=drhook_path) - return None - - -class ExperimentFilesBenchmark(Benchmark): - """ - General :class:`Benchmark` setup created from input file description - provided by :any:`ExperimentFiles` - - """ - - def __init__(self, **kwargs): - self._input_files = kwargs.pop('input_files') - super().__init__(**kwargs) - - @property - @classmethod - def special_paths(cls): - """ - List of :any:`SpecialRelativePath` patterns that define transformations - for converting a file path to a particular relative path object. - - Returns - ------- - list of :any:`SpecialRelativePath` - """ - - @property - def input_files(self): # pylint: disable=arguments-differ - return self._input_files - - @classmethod - def from_experiment_files(cls, **kwargs): - """ - Instantiate :class:`Benchmark` using input file lists captured in an - :any:`ExperimentFiles` object - """ - rundir = Path(kwargs.get('rundir')) - exp_files = kwargs.pop('exp_files') - copy = kwargs.pop('copy', False) - force = kwargs.pop('force', False) - ifsdata = kwargs.get('ifsdata', None) - - if ifsdata is not None: - symlink_data(Path(ifsdata), rundir/'ifsdata', force=force) - - special_paths = cls.special_paths if isinstance(cls.special_paths, (list, tuple)) else () - input_files = [] - for f in exp_files.files: - dest, source = str(f.fullpath), str(f.fullpath) - for pattern in special_paths: - dest = pattern(dest) - if dest != source: - break - else: - dest = str(Path(dest).name) - - input_files += [dest] - source, dest = Path(source), rundir/dest - if copy: - copy_data(source, dest, force=force) - else: - symlink_data(source, dest, force=force) - - obj = cls(input_files=input_files, **kwargs) - return obj diff --git a/ifsbench/env.py b/ifsbench/env.py new file mode 100644 index 0000000..fd0b789 --- /dev/null +++ b/ifsbench/env.py @@ -0,0 +1,197 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +from abc import ABC, abstractmethod +from collections import UserList +from enum import auto, Enum + +from ifsbench.logging import debug + +__all__ = ['EnvHandler', 'EnvOperation', 'EnvPipeline'] + + +class EnvOperation(Enum): + """ + Enum of environment operations. + + Specifies operations on environment variables. + + Attributes + ---------- + + SET + Set a given environment variable. + APPEND + Append to a given environment variable. + PREPEND + Prepend to a given environment variable. + DELETE + Delete/unset a given environment variable. + CLEAR + Clear the whole environment + """ + SET = auto() + APPEND = auto() + PREPEND = auto() + DELETE = auto() + CLEAR = auto() + +class EnvHandler: + """ + Specify changes to environment variables. + + Parameters + ---------- + mode: EnvOperation + The operation that will be performed. + key: str + The name of the environment variable that is updated. Must be specified + unless mode == CLEAR. + value: str + The value that is used for the operation. Must be specified unless + mode in (SET, DELETE, CLEAR). + + Raises + ------ + ValueError + If key or value is None but must be specifed. + """ + + def __init__(self, mode, key=None, value=None): + if key is not None: + self._key = str(key) + else: + self._key = None + + if value is not None: + self._value = str(value) + else: + self._value = None + + self._mode = mode + + if self._key is None and self._mode != EnvOperation.CLEAR: + raise ValueError(f"The key must be specified for operation {mode.name}!") + + if self._value is None: + if self._mode in (EnvOperation.APPEND, EnvOperation.PREPEND): + raise ValueError(f"The value must be specified for operation {mode.name}!") + + def execute(self, env): + """ + Apply the stored changes to a given environment. + + Parameters + ---------- + env: dict + An environment dictionary. Will be updated in place. + """ + + if self._mode == EnvOperation.SET: + debug(f"Set environment entry {self._key} = {self._value}.") + env[self._key] = self._value + elif self._mode == EnvOperation.APPEND: + if env.get(self._key, None) is not None: + env[self._key] += ":" + self._value + else: + env[self._key] = self._value + + debug(f"Append {self._value} to environment variable {self._key}.") + elif self._mode == EnvOperation.PREPEND: + if env.get(self._key, None) is not None: + env[self._key] = self._value + ':' + env[self._key] + else: + env[self._key] = self._value + + debug(f"Prepend {self._value} to environment variable {self._key}.") + + elif self._mode == EnvOperation.DELETE: + if self._key in env: + debug(f"Delete environment variable {str(self._key)}.") + del env[self._key] + + elif self._mode == EnvOperation.CLEAR: + debug('Clear whole environment.') + env.clear() + +class EnvPipeline(ABC, UserList): + """ + Abstract base class for environment update pipelines. + + Instances of this class can be used to update environment variables. + EnvPipeline supports all list operations to add/remove EnvHandler objects. + """ + + @abstractmethod + def execute(self): + """ + Create an environment using the pipeline. + + Returns + ------- + dict: + The created environment. + """ + return NotImplemented + + @abstractmethod + def add(self, handler): + """ + Add new EnvHandlers to the pipeline. + + Parameters + ---------- + handler: EnvHandler or list[EnvHandler] + Handler(s) that are added. + """ + +class DefaultEnvPipeline(EnvPipeline): + """ + Default environment pipeline. + + Parameters + ---------- + overrides: list[EnvHandler] + The environment operations that should be incorporated. + env_initial: dict or None + The initial environment. If None, an empty environment is used. + + Raises + ------ + ValueError: + If overrides contains entries that are not EnvOverride objects. + """ + + def __init__(self, handlers=None, env_initial=None): + if handlers: + self._handlers = list(handlers) + else: + self._handlers = [] + + for handler in self._handlers: + if not isinstance(handler, EnvHandler): + raise ValueError("Only EnvHandler objects are accepted!") + + if env_initial is not None: + self._env_initial = dict(env_initial) + else: + self._env_initial = {} + + def add(self, handler): + if isinstance(handler, EnvHandler): + self._handlers.append(handler) + else: + self._handlers += handler + + def execute(self): + env = dict(self._env_initial) + + for handler in self._handlers: + handler.execute(env) + + return env + \ No newline at end of file diff --git a/ifsbench/ifs.py b/ifsbench/ifs.py index 6a2d04b..7f8ffd2 100644 --- a/ifsbench/ifs.py +++ b/ifsbench/ifs.py @@ -9,7 +9,6 @@ from pathlib import Path from os import getenv -from .arch import arch_registry, Arch from .drhook import DrHook from .logging import warning from .namelist import IFSNamelist @@ -222,9 +221,6 @@ def run(self, namelist, rundir, nproc=1, nproc_io=0, nthread=1, hyperthread=1, a Further named parameters that will be passed to :meth:`IFS.setup_env`, :meth:`IFS.setup_nml` and :meth:`Arch.run` """ - # Select architecture preset from registry - if not isinstance(arch, Arch): - arch = arch_registry[arch] # Setup the run environment env, kwargs = self.setup_env(namelist=namelist, rundir=rundir, nproc=nproc, nproc_io=nproc_io, diff --git a/ifsbench/job.py b/ifsbench/job.py index c080515..c2d282a 100644 --- a/ifsbench/job.py +++ b/ifsbench/job.py @@ -6,23 +6,19 @@ # nor does it submit to any jurisdiction. """ -Hardware and job resource description classes +Hardware and job resource description classes. """ -from abc import ABC -from enum import Enum, auto -from .logging import error -from .util import classproperty +from dataclasses import dataclass, replace +from enum import Enum, auto -__all__ = ['CpuConfiguration', 'CpuBinding', 'CpuDistribution', 'Job'] +__all__ = ['CpuBinding', 'CpuDistribution', 'CpuConfiguration', 'Job'] -class CpuConfiguration(ABC): +@dataclass +class CpuConfiguration: """ - Abstract base class to describe the hardware configuration of compute nodes - - :any:`Arch` should provide an implementation of this class to describe the - CPU configuration of the available nodes. + This class describes the hardware configuration of compute nodes. Attributes ---------- @@ -35,39 +31,27 @@ class CpuConfiguration(ABC): The number of logical cores per physical core (i.e. the number of SMT threads each core can execute). Typically, this is 1 (no hyperthreading), 2 or 4. This must be specified in a derived class. - cores_per_node : int - The number of physical cores per node. This value is automatically derived - from the above properties. - threads_per_node : int - The number of logical cores per node (threads). This value is automatically derived - from the above properties. gpus_per_node : int The number of available GPUs per node. """ - sockets_per_node: int - - cores_per_socket: int + sockets_per_node : int = 1 + cores_per_socket : int = 1 + threads_per_core : int = 1 + gpus_per_node : int = 0 - threads_per_core: int - - gpus_per_node = 0 - - @classproperty def cores_per_node(self): """ The number of physical cores per node """ return self.sockets_per_node * self.cores_per_socket - @classproperty def threads_per_node(self): """ The number of logical cores (threads) per node """ return self.cores_per_node * self.threads_per_core - class CpuBinding(Enum): """ Description of CPU binding strategy to use, for which the launch @@ -106,206 +90,134 @@ class CpuDistribution(Enum): DISTRIBUTE_USER = auto() """Indicate that a different user-specified strategy should be used""" +@dataclass class Job: """ - Description of a parallel job's resource requirements - - Provided with a CPU configuration (:data:`cpu_config`) and at least one of - - * the total number of MPI tasks (:data:`tasks`) - * the number of nodes (:data:`nodes`) and the number of tasks per node - (:data:`tasks_per_node`) - * the number of nodes (:data:`nodes`) and the number of tasks per socket - (:data:`tasks_per_socket`) - - this class specifies the resource requirements for a job. - - The underlying idea is to specify as little as possible which is then passed on - to the relevant launch command but with the possibility to estimate those values - that can be derived unambigously from the specified values. - - The relevant attributes (with the same names as the parameters to the constructor) - are only defined when the corresponding value has been specified explicitly (with - the exception of those that have a default value), i.e., accessing undefined - attributes will raise a :any:`AttributeError`. The corresponding ``get_`` methods - allow to derive the relevant values when it is unambigously possible, or raise an - :any:`AttributeError` otherwise. - - Multi-threading can be specified by providing a value larger than 1 (the default) - for :data:`cpus_per_task`. Symmetric multi-threading (hyperthreading) can be - enabled with a value greater than 1 in :data:`threads_per_core`. - - The desired pinning strategy can be specified with :data:`bind`. + Description of a parallel job setup. Parameters ---------- - cpu_config : :any:`CpuConfiguration` - The description of the available CPUs in the target system. - tasks : int, optional - The total number of MPI tasks to be used. - nodes : int, optional - The total number of nodes to be used. - tasks_per_node : int, optional - Launch a specific number of tasks per node. Can be derived from :attr:`tasks_per_socket` - if that is specified - tasks_per_socket : int, optional - Launch a specific number of tasks per socket - cpus_per_task : int, optional - The number of computing elements (threads) available to each task for hybrid jobs. - threads_per_core : int, optional - Enable symmetric multi-threading (hyperthreading). + tasks : int + The number of tasks/processes. + nodes : int + The number of nodes. + tasks_per_node : int + The number of tasks per node. + tasks_per_socket : int + The number of tasks per socket. + cpus_per_task : int + The number of cpus assigned to each task. + threads_per_core : int + The number of threads that each CPU core should run. + gpus_per_task : int + The number of GPUs that are required by each task. + account : str, optional + The account that is passed to the scheduler. + partition: str, optional + The partition that is passed to the scheduler. bind : :any:`CpuBinding`, optional Specify the binding strategy to use for pinning. distribute_remote : :any:`CpuDistribution`, optional - Specify the distribution strategy to use for task distribution across nodes + Specify the distribution strategy to use for task distribution across nodes. distribute_local : :any:`CpuDistribution`, optional - Specify the distribution strategy to use for task distribution across sockets within a node - gpus_per_task : int, optional - The number of GPUs that are used per task. + Specify the distribution strategy to use for task distribution across + sockets within a node. """ - def __init__(self, cpu_config, tasks=None, nodes=None, tasks_per_node=None, - tasks_per_socket=None, cpus_per_task=None, threads_per_core=None, - bind=None, distribute_remote=None, distribute_local=None, - gpus_per_task=None): - - assert issubclass(cpu_config, CpuConfiguration) - self.cpu_config = cpu_config - if tasks is not None: - self.tasks = tasks - if nodes is not None: - self.nodes = nodes - if tasks_per_node is not None: - self.tasks_per_node = tasks_per_node - if tasks_per_socket is not None: - self.tasks_per_socket = tasks_per_socket - if cpus_per_task is not None: - self.cpus_per_task = cpus_per_task - if threads_per_core is not None: - self.threads_per_core = threads_per_core - if gpus_per_task is not None: - self.gpus_per_task = gpus_per_task - if bind is not None: - self.bind = bind - if distribute_remote is not None: - self.distribute_remote = distribute_remote - if distribute_local is not None: - self.distribute_local = distribute_local - - - gpus_per_node = self.get_gpus_per_task() * self.get_tasks_per_node() - if gpus_per_node > self.cpu_config.gpus_per_node: - raise ValueError("More GPUs have been requested than are available.") - - try: - tasks = self.get_tasks() - nodes = self.get_nodes() - threads = self.get_threads() - except AttributeError as excinfo: - error(('Need to specify at least one of:\n' - 'number of tasks or (tasks_per_node and nodes) or (tasks_per_socket and nodes)')) - raise excinfo - - if nodes < 1: - error(f'Invalid number of nodes: {nodes}') - raise ValueError - if tasks < 1: - error(f'Invalid number of tasks: {nodes}') - raise ValueError - if threads < tasks: - error(f'Invalid number of threads: {threads}') - raise ValueError - - def get_tasks(self): - """ - The total number of MPI tasks + tasks : int = None + nodes : int = None + tasks_per_node : int = None + tasks_per_socket : int = None + cpus_per_task : int = None + threads_per_core : int = None + gpus_per_task : int = None + account : str = None + partition : str = None + bind : CpuBinding = None + distribute_remote : CpuDistribution = None + distribute_local : CpuDistribution = None - If this has not been specified explicitly, it is estimated as - ``nodes * tasks_per_node``. + def copy(self): """ - tasks = getattr(self, 'tasks', None) - if tasks is None: - return self.get_nodes() * self.get_tasks_per_node() - return tasks - - def get_nodes(self): + Return a deep copy of this object. """ - The total number of nodes - If this has not been specified explicitly, it is estimated as - ``ceil(threads / available_threads_per_node)`` with the number of - available threads dependent on the use of SMT. + return replace(self) + + + + def calculate_missing(self, cpu_configuration): """ - nodes = getattr(self, 'nodes', None) - if nodes is None: - threads_per_node = self.get_tasks_per_node() * self.get_threads_per_core() * self.get_cpus_per_task() + If at least one of + * the total number of MPI tasks (:data:`tasks`) + * the number of nodes (:data:`nodes`) and the number of tasks per node + (:data:`tasks_per_node`) + * the number of nodes (:data:`nodes`) and the number of tasks per socket + (:data:`tasks_per_socket`) - return (self.get_threads() + threads_per_node - 1) // threads_per_node + is specified, this function calculates missing values for + * tasks + * nodes + * tasks_per_node + given hardware configuration. The resulting values are stored in this + object. - return nodes + Raises + ------ - def get_tasks_per_node(self): + ValueError + If not enough data is available to compute the missing values or if + the given values contradict each other. """ - The number of tasks on each node - If this has not been specified explicitly, it is estimated as - ``tasks_per_socket * sockets_per_node``. - """ + cpus_per_task = self.cpus_per_task + if not cpus_per_task: + cpus_per_task = 1 + + threads_per_core = self.threads_per_core + if not threads_per_core: + threads_per_core = 1 - if hasattr(self, 'tasks_per_node'): - # If tasks_per_node was specified just return it. - tasks_per_node = self.tasks_per_node - else: + gpus_per_task = self.gpus_per_task + if not gpus_per_task: + gpus_per_task = 0 + + if not self.tasks_per_node: # If tasks_per_node wasn't specified, calculate it from the other # values. - if hasattr(self, 'tasks_per_socket'): - tasks_per_node = self.tasks_per_socket * self.cpu_config.sockets_per_node - elif hasattr(self, 'tasks'): - tasks_per_node = self.cpu_config.cores_per_node // self.get_cpus_per_task() + if self.tasks_per_socket: + self.tasks_per_node = self.tasks_per_socket * cpu_configuration.sockets_per_node + elif self.tasks: + self.tasks_per_node = cpu_configuration.cores_per_node() // cpus_per_task else: raise ValueError('The number of tasks per node could not be determined!') # If GPUs are used, make sure that tasks_per_node is compatible with # the number of available GPUs. - if self.get_gpus_per_task() > 0: - tasks_per_node = min( - tasks_per_node, - self.cpu_config.gpus_per_node // self.get_gpus_per_task() - ) - - return tasks_per_node + if gpus_per_task > 0: + self.tasks_per_node = min( + self.tasks_per_node, + cpu_configuration.gpus_per_node // gpus_per_task + ) - def get_cpus_per_task(self): - """ - The number of CPUs assigned to each task + if self.tasks_per_node <= 0: + raise ValueError('Failed to determine the number of tasks per node!') - If this has not been specified explicitly, it defaults to 1 - """ - return getattr(self, 'cpus_per_task', 1) + elif gpus_per_task > 0: + if self.tasks_per_node * gpus_per_task > cpu_configuration.gpus_per_node: + raise ValueError('Not enough GPUs are available on a node.') - def get_gpus_per_task(self): - """ - The number of GPUs assigned to each task - If this has not been specified explicitly, it defaults to 0 - """ - return getattr(self, 'gpus_per_task', 0) + if self.nodes is None: + threads_per_node = self.tasks_per_node * threads_per_core * cpus_per_task - def get_threads_per_core(self): - """ - The number of threads assigned to each core (symmetric multi threading - or hyperthreading for values greater than 1) + if not self.tasks: + raise ValueError('The number of nodes could not be determined!') - If this has not been specified explicitly, it defaults to 1 - """ - return getattr(self, 'threads_per_core', 1) + self.nodes = (self.tasks * cpus_per_task + threads_per_node - 1) // threads_per_node - def get_threads(self): - """ - The total number of threads across all tasks - This is derived automatically as ``tasks * cpus_per_task`` - """ - return self.get_tasks() * self.get_cpus_per_task() + if self.tasks is None: + self.tasks = self.nodes * self.tasks_per_node diff --git a/ifsbench/launcher.py b/ifsbench/launcher.py deleted file mode 100644 index 3b1744a..0000000 --- a/ifsbench/launcher.py +++ /dev/null @@ -1,287 +0,0 @@ -# (C) Copyright 2020- ECMWF. -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. - -""" -Implementation of launch commands for various MPI launchers -""" -from abc import ABC, abstractmethod - -from .job import CpuBinding, CpuDistribution -from .logging import debug, warning -from .util import as_tuple - - -class Launcher(ABC): - """ - Base class for MPI launch command implementations - """ - - job_options_map: dict - """ - A mapping of :any:`Job` attributes to launch cmd options - - See :meth:`get_options_from_job` for how this is used to build - launch command options. - """ - - bind_options_map: dict - """ - A mapping of :any:`CpuBinding` values to launch cmd options - - See :meth:`get_options_from_binding` for how this is used to build - launch command options. - """ - - @classmethod - def get_options_from_job(cls, job): - """ - Build a list of launch command options from the provided :data:`job` specification - - This uses the :attr:`job_options_map` to compile all options according to what - is specified in :data:`job`. - The format of :attr:`job_options_map` should be a `dict` with the name of - :any:`Job` attributes as keys and launch command-specific format strings as - values, e.g., ``{'tasks': '--ntasks={}'}``. - - Only attributes from :attr:`job_options_map` that are defined (i.e. do not raise - :any:`AttributeError`) are added to the list of options, thus this provides a - direct mapping from the parameters provided to :any:`Job` to the launch command. - - Parameters - ---------- - job : :any:`Job` - The job description specifying required hardware resources - - Returns - ------- - list - A list of strings with the rendered job options - """ - options = [] - for attr, option in cls.job_options_map.items(): - try: - value = getattr(job, attr) - except AttributeError: - continue - options += [option.format(value)] - return options - - @classmethod - def get_options_from_binding(cls, bind): - """ - Build a list of launch command options from the provided :data:`bind` specification - - This uses the :attr:`bind_options_map` to map the specified binding strategy - to the relevant launch command option. The format of :attr:`bind_options_map` - should be a `dict` with the binding values declared in the enum :any:`CpuBinding` as - keys and (a list of) launch-command option strings as values, e.g., - ``{CpuBinding.BIND_CORES: '--cpu-bind=cores'}``. - - Parameters - ---------- - bind : :any:`CpuBinding` - - Returns - ------- - list - A list of strings with the rendered job options - """ - return list(as_tuple(cls.bind_options_map[bind])) - - @classmethod - @abstractmethod - def get_launch_cmd(cls, job, user_options=None): - """ - Return the launch command for a provided :data:`job` specification - - This must be implemented by derived classes. - - Parameters - ---------- - job : :any:`Job` - The specification of hardware resources to use - user_options : list - Any user-provided options that should be appended to the option - list of the launch command - """ - - -class SrunLauncher(Launcher): - """ - :any:`Launcher` implementation for Slurm's srun - """ - - job_options_map = { - 'nodes': '--nodes={}', - 'tasks': '--ntasks={}', - 'tasks_per_node': '--ntasks-per-node={}', - 'tasks_per_socket': '--ntasks-per-socket={}', - 'cpus_per_task': '--cpus-per-task={}', - 'threads_per_core': '--ntasks-per-core={}', - 'gpus_per_task': '--gpus-per-task={}' - } - - bind_options_map = { - CpuBinding.BIND_NONE: ['--cpu-bind=none'], - CpuBinding.BIND_SOCKETS: ['--cpu-bind=sockets'], - CpuBinding.BIND_CORES: ['--cpu-bind=cores'], - CpuBinding.BIND_THREADS: ['--cpu-bind=threads'], - CpuBinding.BIND_USER: [], - } - - distribution_options_map = { - CpuDistribution.DISTRIBUTE_DEFAULT: '*', - CpuDistribution.DISTRIBUTE_BLOCK: 'block', - CpuDistribution.DISTRIBUTE_CYCLIC: 'cyclic', - } - - @classmethod - def get_distribution_options(cls, job): - """Return options for task distribution""" - if not(hasattr(job, 'distribute_remote') or hasattr(job, 'distribute_local')): - return [] - - distribute_remote = getattr(job, 'distribute_remote', CpuDistribution.DISTRIBUTE_DEFAULT) - distribute_local = getattr(job, 'distribute_local', CpuDistribution.DISTRIBUTE_DEFAULT) - - if distribute_remote is CpuDistribution.DISTRIBUTE_USER: - debug(('Not applying task distribution options because remote distribution' - ' of tasks is set to use user-provided settings')) - return [] - if distribute_local is CpuDistribution.DISTRIBUTE_USER: - debug(('Not applying task distribution options because local distribution' - ' of tasks is set to use user-provided settings')) - return [] - - return [(f'--distribution={cls.distribution_options_map[distribute_remote]}' - f':{cls.distribution_options_map[distribute_local]}')] - - - @classmethod - def get_launch_cmd(cls, job, user_options=None): - """ - Return the srun command for the provided :data:`job` specification - """ - - cmd = ['srun'] + cls.get_options_from_job(job) - if hasattr(job, 'bind'): - cmd += cls.get_options_from_binding(job.bind) - cmd += cls.get_distribution_options(job) - if user_options is not None: - cmd += list(as_tuple(user_options)) - return cmd - - -class AprunLauncher(Launcher): - """ - :any:`Launcher` implementation for Cray's aprun - """ - - job_options_map = { - 'tasks': '-n {}', - 'tasks_per_node': '-N {}', - 'tasks_per_socket': '-S {}', - 'cpus_per_task': '-d {}', - 'threads_per_core': '-j {}', - } - - bind_options_map = { - CpuBinding.BIND_NONE: ['-cc none'], - CpuBinding.BIND_SOCKETS: ['-cc numa_node'], - CpuBinding.BIND_CORES: ['-cc cpu'], - CpuBinding.BIND_THREADS: ['-cc depth'], - CpuBinding.BIND_USER: [], - } - - @classmethod - def get_distribution_options(cls, job): - """Return options for task distribution""" - do_nothing = [CpuDistribution.DISTRIBUTE_DEFAULT, CpuDistribution.DISTRIBUTE_USER] - if hasattr(job, 'distribute_remote') and job.distribute_remote not in do_nothing: - warning('Specified remote distribution option ignored in AprunLauncher') - if hasattr(job, 'distribute_local') and job.distribute_local not in do_nothing: - warning('Specified local distribution option ignored in AprunLauncher') - - return [] - - @classmethod - def get_launch_cmd(cls, job, user_options=None): - """ - Return the aprun command for the provided :data:`job` specification - """ - - cmd = ['aprun'] - # Aprun has no option to specify node counts and tasks relative to - # nodes, thus we derive the number of total tasks if - # it has not been specified explicitly - if not hasattr(job, 'tasks'): - cmd += [f'-n {job.get_tasks()}'] - cmd += cls.get_options_from_job(job) - if hasattr(job, 'bind'): - cmd += cls.get_options_from_binding(job.bind) - cmd += cls.get_distribution_options(job) - if user_options is not None: - cmd += list(as_tuple(user_options)) - return cmd - - -class MpirunLauncher(Launcher): - """ - :any:`Launcher` implementation for a standard mpirun - """ - - job_options_map = { - 'tasks': '-np {}', - 'tasks_per_node': '-npernode {}', - 'tasks_per_socket': '-npersocket {}', - 'cpus_per_task': '-cpus-per-proc {}', - } - - bind_options_map = { - CpuBinding.BIND_NONE: ['--bind-to none'], - CpuBinding.BIND_SOCKETS: ['--bind-to socket'], - CpuBinding.BIND_CORES: ['--bind-to core'], - CpuBinding.BIND_THREADS: ['--bind-to hwthread'], - CpuBinding.BIND_USER: [], - } - - distribution_options_map = { - CpuDistribution.DISTRIBUTE_BLOCK: 'core', - CpuDistribution.DISTRIBUTE_CYCLIC: 'numa', - } - - @classmethod - def get_distribution_options(cls, job): - """Return options for task distribution""" - do_nothing = [CpuDistribution.DISTRIBUTE_DEFAULT, CpuDistribution.DISTRIBUTE_USER] - if hasattr(job, 'distribute_remote') and job.distribute_remote not in do_nothing: - warning('Specified remote distribution option ignored in MpirunLauncher') - - if not hasattr(job, 'distribute_local') or job.distribute_local in do_nothing: - return [] - - return [f'--map-by {cls.distribution_options_map[job.distribute_local]}'] - - @classmethod - def get_launch_cmd(cls, job, user_options=None): - """ - Return the mpirun command for the provided :data:`job` specification - """ - - cmd = ['mpirun'] - # Mpirun has no option to specify tasks relative to nodes without also - # modifying the mapping, thus we derive the number of total tasks if - # it has not been specified explicitly - if not hasattr(job, 'tasks'): - cmd += [f'-np {job.get_tasks()}'] - cmd += cls.get_options_from_job(job) - if hasattr(job, 'bind'): - cmd += cls.get_options_from_binding(job.bind) - cmd += cls.get_distribution_options(job) - if user_options is not None: - cmd += list(as_tuple(user_options)) - return cmd diff --git a/ifsbench/launcher/__init__.py b/ifsbench/launcher/__init__.py new file mode 100644 index 0000000..6f0dd63 --- /dev/null +++ b/ifsbench/launcher/__init__.py @@ -0,0 +1,11 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +from .launcher import * # noqa +from .bashlauncher import * # noqa +from .mpirunlauncher import * # noqa +from .srunlauncher import * # noqa diff --git a/ifsbench/launcher/bashlauncher.py b/ifsbench/launcher/bashlauncher.py new file mode 100644 index 0000000..cc34888 --- /dev/null +++ b/ifsbench/launcher/bashlauncher.py @@ -0,0 +1,75 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +import datetime +from pathlib import Path + +from ifsbench.launcher.launcher import Launcher, LaunchData + +class BashLauncher(Launcher): + """ + UNTESTED! + + This launcher encapsulates another launcher and creates a bash file + `run_dir//launch__.sh` which is then executed. + + Parameters + ---------- + script_dir: str or :any:`pathlib.Path` + The directory where the resulting bash script is placed. If a relative path + is given, this will be relative to the ``run_dir`` argument in + :meth:`prepare`. + base_launcher : Launcher + The underlying launcher that is used to generate the launch command. + base_launcher_flags: list[str] or None + Additional flags that are passed to the ``base_launcher``. + """ + + def __init__(self, script_dir, base_launcher, base_launcher_flags=None): + self._script_dir = Path(script_dir) + self._base_launcher = base_launcher + self._base_launcher_flags = base_launcher_flags + + def prepare(self, run_dir, job, cmd, library_paths=None, env_pipeline=None, custom_flags=None): + child_data = self._base_launcher.prepare(run_dir, job, cmd, library_paths, + env_pipeline, self._base_launcher_flags) + + if self._script_dir.is_absolute(): + script_dir = self._script_dir + else: + script_dir = run_dir/self._script_dir + + if script_dir.exists() and script_dir.is_file(): + script_dir.unlink() + + script_dir.mkdir(exist_ok=True, parents=True) + + current_datetime = datetime.datetime.now() + timestamp = current_datetime.strftime('%Y-%m-%d.%H:%M:%S.%f') + + # Guess the executable name from the command. + exec_name = cmd[0].split('/')[-1] + path = script_dir/f"run-{exec_name}-{timestamp}.sh" + + run_env = child_data.env + + with path.open('w') as f: + f.write('#! /bin/bash') + f.write("\n") + + for key, value in run_env.items(): + f.write(f"export {key}={value}\n") + + f.write("\n") + + f.write(' '.join(child_data.cmd)) + + return LaunchData( + cmd=['/bin/bash', str(path)], + run_dir=run_dir, + env={} + ) diff --git a/ifsbench/launcher/launcher.py b/ifsbench/launcher/launcher.py new file mode 100644 index 0000000..5c3c933 --- /dev/null +++ b/ifsbench/launcher/launcher.py @@ -0,0 +1,90 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +""" +Implementation of launch commands for various MPI launchers +""" +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path + +from ifsbench.logging import debug, info +from ifsbench.util import execute + +__all__ = ['LaunchData', 'Launcher'] + + +@dataclass +class LaunchData: + """ + Dataclass that contains all data necessary for launching a command. + + Class variables + --------------- + + run_dir: Path + The working directory for launching. + cmd: list[str] + The command that gets launched. + env: dict[str,str] + The environment variables that are used. + """ + run_dir : Path + cmd : list[str] + env: dict = field(default_factory=dict) + + def launch(self): + """ + Launch the actual executable. + """ + + info(f"Launch command {self.cmd} in {self.run_dir}.") + + debug("Environment variables:") + for key, value in self.env.items(): + debug(f"\t{key}={value}") + + + execute( + command = self.cmd, + cwd = self.run_dir, + env = self.env + ) + +class Launcher(ABC): + """ + Abstract base class for launching parallel jobs. + Subclasses must implement the prepare function. + """ + + @abstractmethod + def prepare(self, run_dir, job, cmd, library_paths=None, env_pipeline=None, custom_flags=None): + """ + Prepare a launch by building a LaunchData object (which in turn can + perform the actual launch). + + Parameters + ---------- + run_dir: Path + The working directory for launching. + job: Job + The job object that holds all necessary parallel data. + cmd: list[str] + The command that should be launched. + library_paths: list[Path] + Additional library paths that are needed for launching. + env_pipeline: EnvPipeline + Pipeline for modifying environment variables. + custom_flags: list[str] + Additional flags that are added to the launcher command. + + Returns + ------- + + LaunchData + """ + return NotImplemented diff --git a/ifsbench/launcher/mpirunlauncher.py b/ifsbench/launcher/mpirunlauncher.py new file mode 100644 index 0000000..ca206b1 --- /dev/null +++ b/ifsbench/launcher/mpirunlauncher.py @@ -0,0 +1,81 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +from ifsbench.env import DefaultEnvPipeline, EnvOperation, EnvHandler +from ifsbench.job import CpuBinding, CpuDistribution +from ifsbench.logging import warning +from ifsbench.launcher.launcher import Launcher, LaunchData + +class MpirunLauncher(Launcher): + """ + :any:`Launcher` implementation for a standard mpirun + """ + job_options_map = { + 'tasks': '--n={}', + 'tasks_per_node': '--npernode={}', + 'tasks_per_socket': '--npersocket={}', + 'cpus_per_task': '--cpus-per-proc={}', + } + + bind_options_map = { + CpuBinding.BIND_NONE: ['--bind-to', 'none'], + CpuBinding.BIND_SOCKETS: ['--bind-to', 'socket'], + CpuBinding.BIND_CORES: ['--bind-to', 'core'], + CpuBinding.BIND_THREADS: ['--bind-to', 'hwthread'], + CpuBinding.BIND_USER: [], + } + + distribution_options_map = { + CpuDistribution.DISTRIBUTE_BLOCK: 'core', + CpuDistribution.DISTRIBUTE_CYCLIC: 'numa', + } + + def _get_distribution_options(self, job): + """Return options for task distribution""" + do_nothing = [CpuDistribution.DISTRIBUTE_DEFAULT, CpuDistribution.DISTRIBUTE_USER] + if hasattr(job, 'distribute_remote') and job.distribute_remote not in do_nothing: + warning('Specified remote distribution option ignored in MpirunLauncher') + + if job.distribute_local is None or job.distribute_local in do_nothing: + return [] + + return ['--map-by', f'{self.distribution_options_map[job.distribute_local]}'] + + def prepare(self, run_dir, job, cmd, library_paths=None, env_pipeline=None, custom_flags=None): + executable = 'mpirun' + if env_pipeline is None: + env_pipeline = DefaultEnvPipeline() + + flags = [] + + for attr, option in self.job_options_map.items(): + value = getattr(job, attr, None) + + if value is not None: + flags += [option.format(value)] + + if job.bind: + flags += list(self.bind_options_map[job.bind]) + + flags += self._get_distribution_options(job) + + if custom_flags: + flags += custom_flags + + if library_paths: + for path in library_paths: + env_pipeline.add(EnvHandler(EnvOperation.APPEND, 'LD_LIBRARY_PATH', str(path))) + + flags += cmd + + env = env_pipeline.execute() + + return LaunchData( + run_dir=run_dir, + cmd=[executable] + flags, + env=env + ) diff --git a/ifsbench/launcher/srunlauncher.py b/ifsbench/launcher/srunlauncher.py new file mode 100644 index 0000000..5bb4fff --- /dev/null +++ b/ifsbench/launcher/srunlauncher.py @@ -0,0 +1,99 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +from ifsbench.env import DefaultEnvPipeline, EnvOperation, EnvHandler +from ifsbench.job import CpuBinding, CpuDistribution +from ifsbench.logging import debug +from ifsbench.launcher.launcher import Launcher, LaunchData + + +class SrunLauncher(Launcher): + """ + :any:`Launcher` implementation for Slurm's srun. + """ + + job_options_map = { + 'nodes': '--nodes={}', + 'tasks': '--ntasks={}', + 'tasks_per_node': '--ntasks-per-node={}', + 'tasks_per_socket': '--ntasks-per-socket={}', + 'cpus_per_task': '--cpus-per-task={}', + 'threads_per_core': '--ntasks-per-core={}', + 'gpus_per_task': '--gpus-per-task={}', + 'account': '--account={}', + 'partition': '--partition={}', + } + + bind_options_map = { + CpuBinding.BIND_NONE: ['--cpu-bind=none'], + CpuBinding.BIND_SOCKETS: ['--cpu-bind=sockets'], + CpuBinding.BIND_CORES: ['--cpu-bind=cores'], + CpuBinding.BIND_THREADS: ['--cpu-bind=threads'], + CpuBinding.BIND_USER: [], + } + + distribution_options_map = { + None: '*', + CpuDistribution.DISTRIBUTE_DEFAULT: '*', + CpuDistribution.DISTRIBUTE_BLOCK: 'block', + CpuDistribution.DISTRIBUTE_CYCLIC: 'cyclic', + } + + def _get_distribution_options(self, job): + """Return options for task distribution""" + if (job.distribute_remote is None) and (job.distribute_local is None): + return [] + + distribute_remote = job.distribute_remote + distribute_local = job.distribute_local + + if distribute_remote is CpuDistribution.DISTRIBUTE_USER: + debug(('Not applying task distribution options because remote distribution' + ' of tasks is set to use user-provided settings')) + return [] + if distribute_local is CpuDistribution.DISTRIBUTE_USER: + debug(('Not applying task distribution options because local distribution' + ' of tasks is set to use user-provided settings')) + return [] + + return [(f'--distribution={self.distribution_options_map[distribute_remote]}' + f':{self.distribution_options_map[distribute_local]}')] + + def prepare(self, run_dir, job, cmd, library_paths=None, env_pipeline=None, custom_flags=None): + executable = 'srun' + if env_pipeline is None: + env_pipeline = DefaultEnvPipeline() + + flags = [] + + for attr, option in self.job_options_map.items(): + value = getattr(job, attr, None) + + if value is not None: + flags += [option.format(value)] + + if job.bind: + flags += list(self.bind_options_map[job.bind]) + + flags += self._get_distribution_options(job) + + if custom_flags: + flags += custom_flags + + if library_paths: + for path in library_paths: + env_pipeline.add(EnvHandler(EnvOperation.APPEND, 'LD_LIBRARY_PATH', str(path))) + + flags += cmd + + env = env_pipeline.execute() + + return LaunchData( + run_dir=run_dir, + cmd=[executable] + flags, + env=env + ) diff --git a/tests/launcher/test_mpirunlauncher.py b/tests/launcher/test_mpirunlauncher.py new file mode 100644 index 0000000..940c12b --- /dev/null +++ b/tests/launcher/test_mpirunlauncher.py @@ -0,0 +1,78 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +""" +Some sanity tests for :any:`Arch` implementations +""" + +import pytest + +from ifsbench import (Job, MpirunLauncher, EnvHandler, EnvOperation, + DefaultEnvPipeline, CpuBinding, CpuDistribution) + + +_test_env = DefaultEnvPipeline([ + EnvHandler(EnvOperation.SET, 'SOME_VALUE', '5'), + EnvHandler(EnvOperation.SET, 'OTHER_VALUE', '6'), + EnvHandler(EnvOperation.DELETE, 'SOME_VALUE'), +]) + +@pytest.mark.parametrize('cmd,job_in,library_paths,env_pipeline,custom_flags,env_out', [ + (['ls', '-l'], {'tasks': 64, 'cpus_per_task': 4}, [], None, [], {}), + (['something'], {}, ['/library/path', '/more/paths'], None, [], {'LD_LIBRARY_PATH': '/library/path:/more/paths'}), + (['whatever'], {'nodes': 12}, ['/library/path'], _test_env, [], + {'LD_LIBRARY_PATH': '/library/path', 'OTHER_VALUE': '6'}), +]) +def test_mpirunLauncher_prepare_env(tmp_path, cmd, job_in, library_paths, env_pipeline, custom_flags, env_out): + """ + Test the env component of the LaunchData object that is returned by MpirunLauncher.prepare. + """ + launcher = MpirunLauncher() + job = Job(**job_in) + + result = launcher.prepare(tmp_path, job, cmd, library_paths, env_pipeline, custom_flags) + + assert result.env == {**env_out} + +@pytest.mark.parametrize('cmd,job_in,library_paths,env_pipeline,custom_flags', [ + (['ls', '-l'], {'tasks': 64, 'cpus_per_task': 4}, [], None, []), + (['something'], {}, ['/library/path', '/more/paths'], None, []), + (['whatever'], {'nodes': 12}, ['/library/path'], _test_env, []), +]) +def test_mpirunLauncher_prepare_run_dir(tmp_path, cmd, job_in, library_paths, env_pipeline, custom_flags): + """ + Test the run_dir component of the LaunchData object that is returned by MpirunLauncher.prepare. + """ + launcher = MpirunLauncher() + job = Job(**job_in) + + result = launcher.prepare(tmp_path, job, cmd, library_paths, env_pipeline, custom_flags) + + assert result.run_dir == tmp_path + +@pytest.mark.parametrize('cmd,job_in,library_paths,env_pipeline,custom_flags, cmd_out', [ + (['ls', '-l'], {'tasks': 64, 'cpus_per_task': 4}, [], None, [], + ['mpirun', '--n=64', '--cpus-per-proc=4', 'ls', '-l']), + (['something'], {}, ['/library/path', '/more/paths'], None, ['--some-more'], + ['mpirun', '--some-more', 'something']), + (['whatever'], {'nodes': 12, 'gpus_per_task': 2}, ['/library/path'], _test_env, [], + ['mpirun', 'whatever']), + (['bind_hell'], {'bind': CpuBinding.BIND_THREADS, 'distribute_local': CpuDistribution.DISTRIBUTE_CYCLIC}, + ['/library/path'], _test_env, [], + ['mpirun', '--bind-to', 'hwthread', '--map-by', 'numa', 'bind_hell']), +]) +def test_mpirunLauncher_prepare_cmd(tmp_path, cmd, job_in, library_paths, env_pipeline, custom_flags, cmd_out): + """ + Test the cmd component of the LaunchData object that is returned by MpirunLauncher.prepare. + """ + launcher = MpirunLauncher() + job = Job(**job_in) + + result = launcher.prepare(tmp_path, job, cmd, library_paths, env_pipeline, custom_flags) + + # There is no fixed order of the srun flags, so we test for the sorted command array. + assert sorted(cmd_out) == sorted(result.cmd) diff --git a/tests/launcher/test_srunlauncher.py b/tests/launcher/test_srunlauncher.py new file mode 100644 index 0000000..d29e906 --- /dev/null +++ b/tests/launcher/test_srunlauncher.py @@ -0,0 +1,77 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +""" +Some sanity tests for :any:`Arch` implementations +""" + +import pytest + +from ifsbench import (Job, SrunLauncher, EnvHandler, EnvOperation, + DefaultEnvPipeline, CpuBinding, CpuDistribution) + +_test_env = DefaultEnvPipeline([ + EnvHandler(EnvOperation.SET, 'SOME_VALUE', '5'), + EnvHandler(EnvOperation.SET, 'OTHER_VALUE', '6'), + EnvHandler(EnvOperation.DELETE, 'SOME_VALUE'), +]) + +@pytest.mark.parametrize('cmd,job_in,library_paths,env_pipeline,custom_flags,env_out', [ + (['ls', '-l'], {'tasks': 64, 'cpus_per_task': 4}, [], None, [], {}), + (['something'], {}, ['/library/path', '/more/paths'], None, [], {'LD_LIBRARY_PATH': '/library/path:/more/paths'}), + (['whatever'], {'nodes': 12}, ['/library/path'], _test_env, [], + {'LD_LIBRARY_PATH': '/library/path', 'OTHER_VALUE': '6'}), +]) +def test_srunlauncher_prepare_env(tmp_path, cmd, job_in, library_paths, env_pipeline, custom_flags, env_out): + """ + Test the env component of the LaunchData object that is returned by SrunLauncher.prepare. + """ + launcher = SrunLauncher() + job = Job(**job_in) + + result = launcher.prepare(tmp_path, job, cmd, library_paths, env_pipeline, custom_flags) + + assert result.env == {**env_out} + +@pytest.mark.parametrize('cmd,job_in,library_paths,env_pipeline,custom_flags', [ + (['ls', '-l'], {'tasks': 64, 'cpus_per_task': 4}, [], None, []), + (['something'], {}, ['/library/path', '/more/paths'], None, []), + (['whatever'], {'nodes': 12}, ['/library/path'], _test_env, []), +]) +def test_srunlauncher_prepare_run_dir(tmp_path, cmd, job_in, library_paths, env_pipeline, custom_flags): + """ + Test the run_dir component of the LaunchData object that is returned by SrunLauncher.prepare. + """ + launcher = SrunLauncher() + job = Job(**job_in) + + result = launcher.prepare(tmp_path, job, cmd, library_paths, env_pipeline, custom_flags) + + assert result.run_dir == tmp_path + +@pytest.mark.parametrize('cmd,job_in,library_paths,env_pipeline,custom_flags, cmd_out', [ + (['ls', '-l'], {'tasks': 64, 'cpus_per_task': 4}, [], None, [], + ['srun', '--ntasks=64', '--cpus-per-task=4', 'ls', '-l']), + (['something'], {}, ['/library/path', '/more/paths'], None, ['--some-more'], + ['srun', '--some-more', 'something']), + (['whatever'], {'nodes': 12, 'gpus_per_task': 2}, ['/library/path'], _test_env, [], + ['srun', '--nodes=12', '--gpus-per-task=2', 'whatever']), + (['bind_hell'], {'bind': CpuBinding.BIND_THREADS, 'distribute_local': CpuDistribution.DISTRIBUTE_CYCLIC}, + ['/library/path'], _test_env, [], + ['srun', '--cpu-bind=threads', '--distribution=*:cyclic', 'bind_hell']), +]) +def test_srunlauncher_prepare_cmd(tmp_path, cmd, job_in, library_paths, env_pipeline, custom_flags, cmd_out): + """ + Test the cmd component of the LaunchData object that is returned by SrunLauncher.prepare. + """ + launcher = SrunLauncher() + job = Job(**job_in) + + result = launcher.prepare(tmp_path, job, cmd, library_paths, env_pipeline, custom_flags) + + # There is no fixed order of the srun flags, so we test for the sorted command array. + assert sorted(cmd_out) == sorted(result.cmd) diff --git a/tests/test_application.py b/tests/test_application.py new file mode 100644 index 0000000..5e80def --- /dev/null +++ b/tests/test_application.py @@ -0,0 +1,47 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +""" +Some sanity tests for :any:`Application` implementations. +""" + +import pytest + +from ifsbench import DefaultApplication, Job, EnvHandler, EnvOperation +from ifsbench.data import ExtractHandler + +@pytest.mark.parametrize('job, command, data_handlers, env_handlers, library_paths', [ + (Job(tasks=5), ['ls', '-l'], None, None, None), + (Job(nodes=12), ['ls', '-l'], [], [], []), + (Job(nodes=12), ['ls', '-l'], [ExtractHandler('in', 'out')], [], ['/some/path']), + (Job(nodes=12), ['ls', '-l'], [], [EnvHandler(EnvOperation.CLEAR)], []), +]) +def test_default_application(tmp_path, job, command, data_handlers, env_handlers, library_paths): + application = DefaultApplication(command, data_handlers, env_handlers, library_paths) + + assert application.get_command(tmp_path, job) == command + + if library_paths: + assert application.get_library_paths(tmp_path, job) == library_paths + else: + assert application.get_library_paths(tmp_path, job) == [] + + + if env_handlers: + env_out = application.get_env_handlers(tmp_path, job) + assert len(env_out) == len(env_handlers) + assert [type(x) for x in env_out] == [type(x) for x in env_handlers] + else: + assert application.get_env_handlers(tmp_path, job) == [] + + + if data_handlers: + data_out = application.get_data_handlers(tmp_path, job) + assert len(data_out) == len(data_handlers) + assert [type(x) for x in data_out] == [type(x) for x in data_handlers] + else: + assert application.get_data_handlers(tmp_path, job) == [] diff --git a/tests/test_arch.py b/tests/test_arch.py index 6d5ea6a..ff633a2 100644 --- a/tests/test_arch.py +++ b/tests/test_arch.py @@ -6,162 +6,77 @@ # nor does it submit to any jurisdiction. """ -Some sanity tests for :any:`Arch` implementations +Some sanity tests for the :class:`DefaultArch` implementation. """ -import re - import pytest -from conftest import Watcher -from ifsbench import logger, arch_registry - - -@pytest.fixture(name='watcher') -def fixture_watcher(): - """Return a :any:`Watcher` to check test output""" - return Watcher(logger=logger, silent=True) - - -@pytest.mark.parametrize('arch,np,nt,hyperthread,expected', [ - ('atos_aa', 64, 4, 1, [ - 'srun', '--ntasks=64', '--ntasks-per-node=32', - '--cpus-per-task=4', '--ntasks-per-core=1' - ]), - ('atos_aa', 256, 4, 1, [ - 'srun', '--ntasks=256', '--ntasks-per-node=32', - '--cpus-per-task=4', '--ntasks-per-core=1' - ]), - ('atos_aa', 256, 16, 1, [ - 'srun', '--ntasks=256', '--ntasks-per-node=8', - '--cpus-per-task=16', '--ntasks-per-core=1' - ]), - ('atos_aa', 256, 16, 2, [ - 'srun', '--ntasks=256', '--ntasks-per-node=16', - '--cpus-per-task=16', '--ntasks-per-core=2' - ]), -]) -def test_arch_run(watcher, arch, np, nt, hyperthread, expected): - """ - Verify the launch command for certain architecture configurations - looks as expected - """ - obj = arch_registry[arch] - - with watcher: - obj.run('cmd', np, nt, hyperthread, dryrun=True) - - for string in expected: - assert string in watcher.output - -@pytest.mark.parametrize('arch,np,nt,hyperthread,gpus_per_task,expected', [ - ('atos_ac', 64, 4, 1, None, [ - 'srun', '--ntasks=64', '--ntasks-per-node=32', - '--cpus-per-task=4', '--ntasks-per-core=1', '--qos=np', - ]), - ('atos_ac', 64, 4, 1, 0, [ - 'srun', '--ntasks=64', '--ntasks-per-node=32', - '--cpus-per-task=4', '--ntasks-per-core=1', '--qos=np', - ]), - ('atos_ac', 64, 4, 1, 1, [ - 'srun', '--ntasks=64', '--ntasks-per-node=4', '--qos=ng', - '--cpus-per-task=4', '--ntasks-per-core=1', '--gpus-per-task=1' - ]), - ('atos_ac', 64, 4, 1, 4, [ - 'srun', '--ntasks=64', '--ntasks-per-node=1', '--qos=ng', - '--cpus-per-task=4', '--ntasks-per-core=1', '--gpus-per-task=4' - ]), - ('lumi_g', 256, 4, 1, None, [ - 'srun', '--ntasks=256', '--ntasks-per-node=14', '--partition=standard-g', - '--cpus-per-task=4', '--ntasks-per-core=1' - ]), - ('lumi_g', 256, 4, 1, 0, [ - 'srun', '--ntasks=256', '--ntasks-per-node=14', '--partition=standard-g', - '--cpus-per-task=4', '--ntasks-per-core=1' - ]), - ('lumi_g', 256, 4, 1, 1, [ - 'srun', '--ntasks=256', '--ntasks-per-node=8', '--partition=standard-g', - '--cpus-per-task=4', '--ntasks-per-core=1', '--gpus-per-task=1' - ]), - ('lumi_g', 256, 4, 1, 4, [ - 'srun', '--ntasks=256', '--ntasks-per-node=2', '--partition=standard-g', - '--cpus-per-task=4', '--ntasks-per-core=1', '--gpus-per-task=4' - ]), -]) -def test_arch_gpu_run(watcher, arch, np, nt, gpus_per_task, hyperthread, expected): +from ifsbench import logger, DefaultArch, CpuConfiguration, Job, MpirunLauncher, SrunLauncher + +_cpu_config_1 = CpuConfiguration( + sockets_per_node = 2, + cores_per_socket = 64, + threads_per_core = 2, + gpus_per_node = 0 +) + + +_cpu_config_2 = CpuConfiguration( + sockets_per_node = 2, + cores_per_socket = 56, + threads_per_core = 2, + gpus_per_node = 8 +) + + +@pytest.mark.parametrize('arch_in, job_in, job_out, launcher_out', [ + ( + {'launcher': SrunLauncher(), 'cpu_config': _cpu_config_1, 'set_explicit': False}, + {'tasks': 64, 'cpus_per_task': 4, 'threads_per_core': 1}, + {'tasks': 64, 'cpus_per_task': 4, 'threads_per_core': 1}, + SrunLauncher() + ), + ( + {'launcher': MpirunLauncher(), 'cpu_config': _cpu_config_1, 'set_explicit': True}, + {'tasks': 64, 'cpus_per_task': 4, 'threads_per_core': 1}, + {'tasks': 64, 'cpus_per_task': 4, 'threads_per_core': 1, 'nodes': 2, 'tasks_per_node': 32}, + MpirunLauncher() + ), + ( + {'launcher': SrunLauncher(), 'cpu_config': _cpu_config_2, 'set_explicit': False}, + {'tasks': 1}, + {'tasks': 1}, + SrunLauncher() + ), + ( + {'launcher': MpirunLauncher(), 'cpu_config': _cpu_config_2, 'set_explicit': True}, + {'tasks': 64, 'gpus_per_task': 32}, + None, + None + )]) +def test_defaultarch_process(arch_in, job_in, job_out, launcher_out): """ - Verify the launch command for certain architecture configurations - looks as expected + Test the :meth:`DefaultArch.process_job` implementation by checking that + the resulting Job and Launcher objects are correct. """ - obj = arch_registry[arch] + arch = DefaultArch(**arch_in) + job = Job(**job_in) - with watcher: - obj.run('cmd', np, nt, hyperthread, gpus_per_task=gpus_per_task, dryrun=True) + if job_out is None: + with pytest.raises(ValueError): + result = arch.process_job(job) + return - for string in expected: - assert string in watcher.output + result = arch.process_job(job) -@pytest.mark.parametrize('arch,np,nt,hyperthread,gpus_per_task,user_options,order', [ - ('atos_ac', 64, 4, 1, 0, ['--qos=some_partition'], [ - '--qos=np', '--qos=some_partition' - ]), - ('atos_ac', 64, 4, 1, 1, ['--qos=first_partition', '--qos=second_partition'], [ - '--qos=ng', '--qos=first_partition', '--qos=second_partition' - ]), - ('lumi_g', 256, 4, 1, 2, ['--partition=lumi-c', '--gpus-per-task=0'], [ - '--partition=standard-g', '--partition=lumi-c', '--gpus-per-task=0' - ]), -]) -def test_arch_user_override(watcher, arch, np, nt, gpus_per_task, hyperthread, - user_options, order): - """ - Verify that the launch user options are added to the end of the launcher - command (where they override previously set values). - """ - obj = arch_registry[arch] - - with watcher: - obj.run('cmd', np, nt, hyperthread, gpus_per_task=gpus_per_task, - launch_user_options=user_options, dryrun=True) - - # The order-list contains some launcher flags that must appear in this order - # in the launch command. Check that all the values exist and that they are - # in the given order. - positions = [] - for string in order: - assert string in watcher.output - positions.append(watcher.output.find(string)) - - assert positions == sorted(positions) - - -@pytest.mark.parametrize('arch,np,gpus_per_task,mpi_gpu_aware, env_expected', [ - ('atos_ac', 64, 0, True, {}), - ('atos_ac', 64, 4, True, {}), - ('atos_ac', 8, 0, False, {}), - ('lumi_g', 64, 8, True, { - 'MPICH_GPU_SUPPORT_ENABLED': '1', - 'MPICH_SMP_SINGLE_COPY_MODE': 'NONE', - 'MPICH_GPU_IPC_ENABLED': '0' - }), - ('lumi_g', 64, 8, True, {}), -]) -def test_arch_gpu_mpi_aware(watcher, arch, np, gpus_per_task, mpi_gpu_aware, - env_expected): - """ - Verify that setting mpi_gpu_aware sets the right environment flags (and - that it doesn't crash). - """ - obj = arch_registry[arch] + # DefaultArch shouldn't add any handlers or default flags. + assert result.env_handlers == [] + assert result.default_launcher_flags == [] - with watcher: - obj.run('cmd', np, 1, 1, gpus_per_task=gpus_per_task, - mpi_gpu_aware=mpi_gpu_aware, dryrun=True) + # Check that the right launcher is returned. Check only the type here, + # as the launchers dont implement __eq__ by default. + assert type(result.default_launcher) == type(launcher_out) - for key, value in env_expected.items(): - # Match anything of the form key: value in the watcher output. Keep in - # mind that key or value might be surrounded by whitespaces or ' or ". - regex = f"{key}['\" ]*:['\" ]*{value}" - print(regex) - print(watcher.output) - assert re.search(regex, watcher.output) is not None + # Check that the resulting job is what we expect. + for key, value in job_out.items(): + assert value == getattr(result.job, key) diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py deleted file mode 100644 index 8da23fb..0000000 --- a/tests/test_benchmark.py +++ /dev/null @@ -1,240 +0,0 @@ -# (C) Copyright 2020- ECMWF. -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. - -""" -Tests for all classes that define a benchmark and its input files -""" - -from contextlib import contextmanager -from pathlib import Path -import shutil - -import pytest -from conftest import Watcher -from ifsbench import ( - logger, Benchmark, IFS, - ExperimentFiles, ExperimentFilesBenchmark, SpecialRelativePath, -) - -@pytest.fixture(name='watcher') -def fixture_watcher(): - """Return a :any:`Watcher` to check test output""" - return Watcher(logger=logger, silent=True) - - -@pytest.fixture(name='here') -def fixture_here(): - """Return the full path of the test directory""" - return Path(__file__).parent.resolve() - - -@contextmanager -def temporary_tarballdir(basedir): - """ - Create a temporary tarball directory - """ - tarballdir = basedir/'tarballdir' - if tarballdir.exists(): - shutil.rmtree(tarballdir) - tarballdir.mkdir(parents=True, exist_ok=True) - yield tarballdir - - # Clean up after us - if tarballdir.exists(): - shutil.rmtree(tarballdir) - - -@contextmanager -def temporary_rundir(basedir): - """ - Create a temporary `rundir` and clean it up afterwards - """ - rundir = basedir/'rundir' - if rundir.exists(): - shutil.rmtree(rundir) - rundir.mkdir(parents=True, exist_ok=True) - yield rundir - - # Clean up after us - if rundir.exists(): - shutil.rmtree(rundir) - - -experiment_files_dict = { - 'my-exp-id': { - '/some/path/to/some/source/dir': { - 'sub/directory/inputA': { - 'fullpath': '/some/path/to/some/source/dir/sub/directory/inputA', - 'sha256sum': 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c' - }, - 'sub/directory/inputC': { - 'fullpath': '/some/path/to/some/source/dir/sub/directory/inputC', - 'sha256sum': 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' - }, - }, - '/some/other/path/to/some/input/dir': { - 'subsub/dir/inputB': { - 'fullpath': '/some/other/path/to/some/input/dir/subsub/dir/inputB', - 'sha256sum': '7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded97730' - }, - }, - '/the/path/to/ifsdata': { - 'some/inputD': { - 'fullpath': '/the/path/to/ifsdata/some/inputD', - 'sha256sum': 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' - }, - }, - }, -} - - -class SimpleBenchmark(Benchmark): - """ - Example configuration of simple :any:`Benchmark` - """ - - input_files = [ - 'inputA', - './someplace/inputB', - Path('./inputC'), - ] - - -class SimpleExperimentFilesBenchmark(ExperimentFilesBenchmark): - """ - Example configuration of simple :any:`ExperimentFilesBenchmark` - """ - - special_paths = [ - SpecialRelativePath.from_filename('inputB', './someplace/inputB'), - ] - - -@pytest.mark.parametrize('copy', [True, False]) -def test_benchmark_from_files(here, copy): - """ - Test input file verification for a simple benchmark setup. - """ - with temporary_rundir(here) as rundir: - benchmark = SimpleBenchmark.from_files(rundir=rundir, srcdir=here/'inidata', copy=copy) - - # Let benchmark test itself - benchmark.check_input() - - # And then we just make sure - assert (rundir/'inputA').exists() - assert (rundir/'someplace/inputB').exists() - assert (rundir/'inputC').exists() - - -def test_benchmark_execute(here, watcher): - """ - Test the basic benchmark execution mechanism. - """ - # Example of how to create and run one of the above... - ifs = IFS.create_cycle('default', builddir=here) - with temporary_rundir(here) as rundir: - benchmark = SimpleBenchmark.from_files(ifs=ifs, srcdir=here/'inidata', rundir=rundir) - - benchmark.check_input() - with watcher: - benchmark.run(dryrun=True, namelist=here/'t21_fc.nml') - - ifscmd = str(rundir.parent/'bin/ifsMASTER.DP') - assert ifscmd in watcher.output - - # Ensure fort.4 config file was generated - config = rundir/'fort.4' - assert config.exists() - - # Clean up config file - config.unlink() - - -@pytest.mark.parametrize('copy', [True, False]) -def test_benchmark_from_experiment_files(here, copy): - """ - Test input file verification for a simple benchmark setup - """ - exp_files = ExperimentFiles.from_dict(experiment_files_dict.copy(), verify_checksum=False) - exp_files.update_srcdir(here, with_ifsdata=True) - - with temporary_rundir(here) as rundir: - benchmark = SimpleExperimentFilesBenchmark.from_experiment_files( - rundir=rundir, exp_files=exp_files, copy=copy) - - # Let benchmark test itself - benchmark.check_input() - - # And then we just make sure - assert (rundir/'inputA').exists() - assert (rundir/'someplace/inputB').exists() - assert (rundir/'inputC').exists() - assert (rundir/'inputD').exists() - - -def test_benchmark_from_experiment_files_execute(here, watcher): - """ - Test the basic benchmark execution mechanism. - """ - exp_files = ExperimentFiles.from_dict(experiment_files_dict.copy(), verify_checksum=False) - exp_files.update_srcdir(here, with_ifsdata=True) - ifs = IFS.create_cycle('default', builddir=here) - - with temporary_rundir(here) as rundir: - benchmark = SimpleExperimentFilesBenchmark.from_experiment_files( - rundir=rundir, exp_files=exp_files, ifs=ifs) - - benchmark.check_input() - with watcher: - benchmark.run(dryrun=True, namelist=here/'t21_fc.nml') - - ifscmd = str(rundir.parent/'bin/ifsMASTER.DP') - assert ifscmd in watcher.output - - # Ensure fort.4 config file was generated - config = rundir/'fort.4' - assert config.exists() - - # Clean up config file - config.unlink() - - -def test_benchmark_from_tarball(here, watcher): - """ - Test running a benchmark from a tarball - """ - with temporary_tarballdir(here) as tarballdir: - # Pack experiment files to tarballs - exp_files = ExperimentFiles.from_dict(experiment_files_dict.copy(), verify_checksum=False) - exp_files.update_srcdir(here, with_ifsdata=True) - exp_files.to_tarball(tarballdir, with_ifsdata=True) - yaml_file = tarballdir/(exp_files.exp_id+'.yml') - exp_files.to_yaml(yaml_file) - - # Unpack experiments - reloaded_exp_files = ExperimentFiles.from_tarball( - yaml_file, input_dir=tarballdir, output_dir=tarballdir, with_ifsdata=True) - - with temporary_rundir(here) as rundir: - ifs = IFS.create_cycle('default', builddir=here) - benchmark = SimpleExperimentFilesBenchmark.from_experiment_files( - rundir=rundir, exp_files=reloaded_exp_files, ifs=ifs) - - benchmark.check_input() - with watcher: - benchmark.run(dryrun=True, namelist=here/'t21_fc.nml') - - ifscmd = str(rundir.parent/'bin/ifsMASTER.DP') - assert ifscmd in watcher.output - - # Ensure fort.4 config file was generated - config = rundir/'fort.4' - assert config.exists() - - # Clean up config file - config.unlink() diff --git a/tests/test_envhandler.py b/tests/test_envhandler.py new file mode 100644 index 0000000..4d8a0f7 --- /dev/null +++ b/tests/test_envhandler.py @@ -0,0 +1,121 @@ +# (C) Copyright 2020- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +""" +Tests for all classes that represent benchmark files +""" + +from contextlib import nullcontext +import pytest + +from ifsbench.env import ( + EnvHandler, EnvOperation, DefaultEnvPipeline +) + +@pytest.mark.parametrize('mode,key,value,success', [ + (EnvOperation.SET, 'some_key', 'some_value', True), + (EnvOperation.SET, None, 'some_value', False), + (EnvOperation.SET, 'some_key', None, True), + (EnvOperation.SET, None, None, False), + (EnvOperation.DELETE, 'some_key', 'some_value', True), + (EnvOperation.DELETE, None, 'some_value', False), + (EnvOperation.DELETE, 'some_key', None, True), + (EnvOperation.DELETE, None, None, False), + (EnvOperation.APPEND, 'some_key', 'some_value', True), + (EnvOperation.APPEND, None, 'some_value', False), + (EnvOperation.APPEND, 'some_key', None, False), + (EnvOperation.APPEND, None, None, False), + (EnvOperation.PREPEND, 'some_key', 'some_value', True), + (EnvOperation.PREPEND, None, 'some_value', False), + (EnvOperation.PREPEND, 'some_key', None, False), + (EnvOperation.PREPEND, None, None, False), + (EnvOperation.CLEAR, 'some_key', 'some_value', True), + (EnvOperation.CLEAR, None, 'some_value', True), + (EnvOperation.CLEAR, 'some_key', None, True), + (EnvOperation.CLEAR, None, None, True), +]) +def test_envhandler_init(mode, key, value, success): + """ + Initialise the EnvHandler and make sure that only correct values are + accepted. + """ + + if success: + context = nullcontext() + else: + context = pytest.raises(ValueError) + + with context: + EnvHandler(mode, key, value) + +@pytest.mark.parametrize('mode,key,value,env_in,env_out', [ + (EnvOperation.SET, 'some_key', 'some_value', {}, {'some_key': 'some_value'}), + (EnvOperation.SET, 'some_key', 'new_value', {'some_key': 'some_value'}, {'some_key': 'new_value'}), + (EnvOperation.SET, 'some_key', None, {'some_key': 'some_value'}, {'some_key': None}), + (EnvOperation.DELETE, 'some_key', None, {}, {}), + (EnvOperation.DELETE, 'some_key', 'new_value', {'some_key': 'some_value'}, {}), + (EnvOperation.CLEAR, 'some_key', None, {}, {}), + (EnvOperation.CLEAR, None, None, {'some_key': 'some_value', 'other_key': None}, {}), + (EnvOperation.APPEND, 'some_list', 'some_value', {}, {'some_list': 'some_value'}), + (EnvOperation.APPEND, 'some_list', 'new_value', {'some_list': 'some_value'}, {'some_list': 'some_value:new_value'}), + (EnvOperation.PREPEND, 'some_list', 'some_value', {}, {'some_list': 'some_value'}), + (EnvOperation.PREPEND, 'some_list', 'some_value', {'some_list': None}, {'some_list': 'some_value'}), + (EnvOperation.PREPEND, 'some_list', 'new_value', {'some_list': 'some_value'}, + {'some_list': 'new_value:some_value'}), +]) +def test_envhandler_execute(mode, key, value, env_in, env_out): + """ + Execute an EnvHandler and make sure that the output is correct. + """ + handler = EnvHandler(mode, key, value) + env = {**env_in} + + handler.execute(env) + + assert env == {**env_out} + + +@pytest.mark.parametrize('handler_data, env_in, env_out', [ + ((), {}, {}), + ((), {'some_value': None, 'other_value': '2'}, {'some_value': None, 'other_value': '2'}), + ( + ((EnvOperation.SET, 'some_value'), (EnvOperation.CLEAR,), (EnvOperation.SET, 'other_value', '3')), + {}, {'other_value': '3'} + ), + ( + ((EnvOperation.APPEND, 'some_list', 'end'), (EnvOperation.APPEND, 'some_list', 'endend'), + (EnvOperation.PREPEND, 'some_list', 'start')), + {'some_list': 'path'}, {'some_list': 'start:path:end:endend'} + ), + ( + ((EnvOperation.DELETE, 'some_list'), (EnvOperation.APPEND, 'some_list', 'endend'), + (EnvOperation.PREPEND, 'some_list', 'start')), + {'some_list': 'path'}, {'some_list': 'start:endend'} + ), + ( + ((EnvOperation.PREPEND, 'some_list', 'start'), (EnvOperation.SET, 'some_list', 'override'), + (EnvOperation.PREPEND, 'some_list', 'start')), + {'some_list': 'path'}, {'some_list': 'start:override'} + ), + ( + ((EnvOperation.DELETE, 'some_list'), (EnvOperation.SET, 'some_value', 'override'), + (EnvOperation.PREPEND, 'some_list', 'start')), + {'some_list': 'path', 'some_value': '3'}, {'some_list': 'start' , 'some_value': 'override'} + ), +]) +def test_defaultenvpipeline_execute(handler_data, env_in, env_out): + """ + Execute an DefaultEnvPipeline and check the resulting environment. + """ + handlers = [] + for data in handler_data: + handlers.append(EnvHandler(*data)) + + pipeline = DefaultEnvPipeline(handlers, env_in) + new_env = pipeline.execute() + + assert new_env == {**env_out} diff --git a/tests/test_job.py b/tests/test_job.py index bd710db..32dbdc7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -18,128 +18,93 @@ # Only specify number of tasks: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'tasks': 64}, - {'tasks': 64, 'get_tasks': 64, 'nodes': None, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 16, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 64, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 64, 'nodes': 4, 'tasks_per_node': 16, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Specify nodes and number of tasks per node: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'nodes': 4, 'tasks_per_node': 16}, - {'tasks': None, 'get_tasks': 64, 'nodes': 4, 'get_nodes': 4, - 'tasks_per_node': 16, 'get_tasks_per_node': 16, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 64, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 64, 'nodes': 4, 'tasks_per_node': 16, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Specify nodes and number of tasks per socket: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'nodes': 4, 'tasks_per_socket': 8}, - {'tasks': None, 'get_tasks': 64, 'nodes': 4, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 16, 'tasks_per_socket': 8, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 64, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 64, 'nodes': 4, 'tasks_per_node': 16, 'tasks_per_socket': 8, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Specify nodes and number of tasks per socket with hyperthreading: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'nodes': 4, 'tasks_per_socket': 16, 'threads_per_core': 2}, - {'tasks': None, 'get_tasks': 128, 'nodes': 4, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 32, 'tasks_per_socket': 16, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': 2, 'get_threads_per_core': 2, - 'get_threads': 128, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 128, 'nodes': 4, 'tasks_per_node': 32, 'tasks_per_socket': 16, + 'cpus_per_task': None, 'threads_per_core': 2, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Undersubscribe nodes: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'nodes': 4, 'tasks_per_socket': 2}, - {'tasks': None, 'get_tasks': 16, 'nodes': 4, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 4, 'tasks_per_socket': 2, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 16, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 16, 'nodes': 4, 'tasks_per_node': 4, 'tasks_per_socket': 2, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Less tasks than available: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'tasks': 60, 'nodes': 4}, - {'tasks': 60, 'get_tasks': 60, 'nodes': 4, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 16, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 60, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 60, 'nodes': 4, 'tasks_per_node': 16, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Specify number of tasks that is less than total available in required nodes ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'tasks': 60}, - {'tasks': 60, 'get_tasks': 60, 'nodes': None, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 16, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 60, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 60, 'nodes': 4, 'tasks_per_node': 16, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Hybrid MPI+OpenMP ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'tasks': 16, 'cpus_per_task': 4}, - {'tasks': 16, 'get_tasks': 16, 'nodes': None, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 4, 'tasks_per_socket': None, - 'cpus_per_task': 4, 'get_cpus_per_task': 4, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 64, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 16, 'nodes': 4, 'tasks_per_node': 4, 'tasks_per_socket': None, + 'cpus_per_task': 4, 'threads_per_core': None, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Hybrid MPI+OpenMP+SMT ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'tasks': 16, 'cpus_per_task': 8, 'threads_per_core': 2}, - {'tasks': 16, 'get_tasks': 16, 'nodes': None, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 2, 'tasks_per_socket': None, - 'cpus_per_task': 8, 'get_cpus_per_task': 8, - 'threads_per_core': 2, 'get_threads_per_core': 2, - 'get_threads': 128, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 16, 'nodes': 4, 'tasks_per_node': 2, 'tasks_per_socket': None, + 'cpus_per_task': 8, 'threads_per_core': 2, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Hybrid MPI+OpenMP+SMT undersubscribed ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'tasks': 14, 'cpus_per_task': 8, 'threads_per_core': 2}, - {'tasks': 14, 'get_tasks': 14, 'nodes': None, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 2, 'tasks_per_socket': None, - 'cpus_per_task': 8, 'get_cpus_per_task': 8, - 'threads_per_core': 2, 'get_threads_per_core': 2, - 'get_threads': 112, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 14, 'nodes': 4, 'tasks_per_node': 2, 'tasks_per_socket': None, + 'cpus_per_task': 8, 'threads_per_core': 2, 'bind': None, + 'distribute_remote': None, 'distribute_local': None}), # Hybrid MPI+OpenMP+SMT with binding ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'tasks': 16, 'cpus_per_task': 8, 'threads_per_core': 2, 'bind': CpuBinding.BIND_CORES}, - {'tasks': 16, 'get_tasks': 16, 'nodes': None, 'get_nodes': 4, - 'tasks_per_node': None, 'get_tasks_per_node': 2, 'tasks_per_socket': None, - 'cpus_per_task': 8, 'get_cpus_per_task': 8, - 'threads_per_core': 2, 'get_threads_per_core': 2, - 'get_threads': 128, 'bind': CpuBinding.BIND_CORES, + {'tasks': 16, 'nodes': 4, 'tasks_per_node': 2, 'tasks_per_socket': None, + 'cpus_per_task': 8, 'threads_per_core': 2, 'bind': CpuBinding.BIND_CORES, 'distribute_remote': None, 'distribute_local': None}), # Specify nodes and number of tasks per node with remote distribution strategy: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'nodes': 4, 'tasks_per_node': 16, 'distribute_remote': CpuDistribution.DISTRIBUTE_CYCLIC}, - {'tasks': None, 'get_tasks': 64, 'nodes': 4, 'get_nodes': 4, - 'tasks_per_node': 16, 'get_tasks_per_node': 16, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 64, 'bind': None, + {'tasks': 64, 'nodes': 4, 'tasks_per_node': 16, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, 'distribute_remote': CpuDistribution.DISTRIBUTE_CYCLIC, 'distribute_local': None}), # Specify nodes and number of tasks per node with local distribution strategy: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2}, {'nodes': 4, 'tasks_per_node': 16, 'distribute_local': CpuDistribution.DISTRIBUTE_BLOCK}, - {'tasks': None, 'get_tasks': 64, 'nodes': 4, 'get_nodes': 4, - 'tasks_per_node': 16, 'get_tasks_per_node': 16, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 64, 'bind': None, + {'tasks': 64, 'nodes': 4, 'tasks_per_node': 16, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'bind': None, 'distribute_remote': None, 'distribute_local': CpuDistribution.DISTRIBUTE_BLOCK}), # Only specify number of tasks and use GPUs: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2, 'gpus_per_node': 4}, {'tasks': 64, 'gpus_per_task': 1}, - {'tasks': 64, 'get_tasks': 64, 'nodes': None, 'get_nodes': 16, - 'tasks_per_node': None, 'get_tasks_per_node': 4, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'gpus_per_task': 1, 'get_gpus_per_task': 1, - 'get_threads': 64, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 64, 'nodes': 16, 'tasks_per_node': 4, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'gpus_per_task': 1, + 'bind': None, 'distribute_remote': None, 'distribute_local': None}), # Only specify number of tasks and tasks_per_node and use GPUs: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2, 'gpus_per_node': 4}, {'tasks': 64, 'tasks_per_node': 4, 'gpus_per_task': 1}, - {'tasks': 64, 'get_tasks': 64, 'nodes': None, 'get_nodes': 16, - 'tasks_per_node': 4, 'get_tasks_per_node': 4, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'gpus_per_task': 1, 'get_gpus_per_task': 1, - 'get_threads': 64, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 64, 'nodes': 16, 'tasks_per_node': 4, 'tasks_per_socket': None, + 'cpus_per_task': None, 'threads_per_core': None, 'gpus_per_task': 1, + 'bind': None, 'distribute_remote': None, 'distribute_local': None}), # Only specify number of tasks and tasks_per_node and an incomatible # number of GPUs: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2, 'gpus_per_node': 4}, @@ -148,41 +113,35 @@ # Only specify number of tasks and tasks_per_node and use GPUs: ({'sockets_per_node': 2, 'cores_per_socket': 8, 'threads_per_core': 2, 'gpus_per_node': 4}, {'tasks': 64, 'tasks_per_node': 2, 'gpus_per_task': 1}, - {'tasks': 64, 'get_tasks': 64, 'nodes': None, 'get_nodes': 32, - 'tasks_per_node': 2, 'get_tasks_per_node': 2, 'tasks_per_socket': None, - 'cpus_per_task': None, 'get_cpus_per_task': 1, - 'gpus_per_task': 1, 'get_gpus_per_task': 1, - 'threads_per_core': None, 'get_threads_per_core': 1, - 'get_threads': 64, 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + {'tasks': 64, 'nodes': 32, 'tasks_per_node': 2, 'tasks_per_socket': None, + 'cpus_per_task': None, 'gpus_per_task': 1, 'threads_per_core': None, + 'bind': None, 'distribute_remote': None, 'distribute_local': None}), + # Mismatch between gpus_per_node and gpus_per_task. + ({'sockets_per_node': 2, 'cores_per_socket': 56, 'threads_per_core': 2, 'gpus_per_node': 8}, + {'tasks': 64, 'gpus_per_task': 32}, + None), ]) -def test_job(cpu_config, jobargs, jobattrs): +def test_job_calculate_missing(cpu_config, jobargs, jobattrs): """ Test various configuration specifications and fill-in for derived values. If jobattrs is empty, creating the job is assumed to fail with a ValueError. """ - class MyCpuConfig(CpuConfiguration): - """Dummy cpu configuration""" - sockets_per_node = cpu_config['sockets_per_node'] - cores_per_socket = cpu_config['cores_per_socket'] - threads_per_core = cpu_config['threads_per_core'] + cpu_config = CpuConfiguration( + sockets_per_node = cpu_config['sockets_per_node'], + cores_per_socket = cpu_config['cores_per_socket'], + threads_per_core = cpu_config['threads_per_core'], gpus_per_node = cpu_config.pop('gpus_per_node', 0) + ) + + job = Job(**jobargs) if jobattrs: - job = Job(MyCpuConfig, **jobargs) + job.calculate_missing(cpu_config) else: with pytest.raises(ValueError): - job = Job(MyCpuConfig, **jobargs) + job.calculate_missing(cpu_config) + return for attr, value in jobattrs.items(): - if value is None: - with pytest.raises(AttributeError): - if attr.startswith('get_'): - _ = getattr(job, attr)() - else: - _ = getattr(job, attr) - else: - if attr.startswith('get_'): - assert getattr(job, attr)() == value - else: - assert getattr(job, attr) == value + assert getattr(job, attr) == value diff --git a/tests/test_launcher.py b/tests/test_launcher.py deleted file mode 100644 index fa2a0e9..0000000 --- a/tests/test_launcher.py +++ /dev/null @@ -1,170 +0,0 @@ -# (C) Copyright 2020- ECMWF. -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. - -""" -Test various implementations of :any:`Launcher` -""" - -import pytest - -from ifsbench import ( - CpuConfiguration, CpuBinding, CpuDistribution, Job, - SrunLauncher, AprunLauncher, MpirunLauncher -) - - -@pytest.fixture(scope='module', name='cpu_config') -def fixture_cpu_config(): - """ - A typical :any:`CpuConfiguration` - """ - class MyCpuConfig(CpuConfiguration): - """Intentionally awkward values""" - sockets_per_node = 3 - cores_per_socket = 37 - threads_per_core = 3 - - return MyCpuConfig - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--cpu-bind=none'}), - (AprunLauncher, {'aprun', '-n 185', '-cc none'}), - (MpirunLauncher, {'mpirun', '-np 185', '--bind-to none'}) -]) -def test_mpi_job(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification""" - job = Job(cpu_config, tasks=185, bind=CpuBinding.BIND_NONE) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--cpus-per-task=4', '--cpu-bind=none'}), - (AprunLauncher, {'aprun', '-n 185', '-d 4', '-cc none'}), - (MpirunLauncher, {'mpirun', '-np 185', '-cpus-per-proc 4', '--bind-to none'}) -]) -def test_hybrid_mpi_job(cpu_config, launcher, cmd): - """A hybrid MPI + OpenMP :any:`Job` specification""" - job = Job(cpu_config, tasks=185, cpus_per_task=4, bind=CpuBinding.BIND_NONE) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--ntasks-per-core=2', '--cpu-bind=threads'}), - (AprunLauncher, {'aprun', '-n 185', '-j 2', '-cc depth'}), - (MpirunLauncher, {'mpirun', '-np 185', '--bind-to hwthread'}) -]) -def test_mpi_smt_job(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with hyperthreading""" - job = Job(cpu_config, tasks=185, threads_per_core=2, bind=CpuBinding.BIND_THREADS) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--nodes=7', '--ntasks-per-node=3', '--cpu-bind=sockets'}), - (AprunLauncher, {'aprun', '-n 21', '-N 3', '-cc numa_node'}), - (MpirunLauncher, {'mpirun', '-np 21', '-npernode 3', '--bind-to socket'}) -]) -def test_mpi_per_node_job(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with resources specified per node""" - job = Job(cpu_config, nodes=7, tasks_per_node=3, bind=CpuBinding.BIND_SOCKETS) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--nodes=7', '--ntasks-per-socket=30', '--cpu-bind=cores'}), - (AprunLauncher, {'aprun', '-n 630', '-S 30', '-cc cpu'}), - (MpirunLauncher, {'mpirun', '-np 630', '-npersocket 30', '--bind-to core'}) -]) -def test_mpi_per_socket_job(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with resources specified per socket""" - job = Job(cpu_config, nodes=7, tasks_per_socket=30, bind=CpuBinding.BIND_CORES) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--nodes=2', '--ntasks-per-socket=10', '--cpus-per-task=3'}), - (AprunLauncher, {'aprun', '-n 60', '-S 10', '-d 3'}), - (MpirunLauncher, {'mpirun', '-np 60', '-npersocket 10', '-cpus-per-proc 3'}) -]) -def test_hybrid_per_socket_job(cpu_config, launcher, cmd): - """A hybrid :any:`Job` specification with resources specified per socket""" - job = Job(cpu_config, nodes=2, tasks_per_socket=10, cpus_per_task=3) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--distribution=block:*'}), - (AprunLauncher, {'aprun', '-n 185'}), - (MpirunLauncher, {'mpirun', '-np 185'}) -]) -def test_mpi_distribute_remote_block_job(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with the distribution of ranks - across nodes prescribed - """ - job = Job(cpu_config, tasks=185, distribute_remote=CpuDistribution.DISTRIBUTE_BLOCK) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--distribution=*:cyclic'}), - (AprunLauncher, {'aprun', '-n 185'}), - (MpirunLauncher, {'mpirun', '-np 185', '--map-by numa'}) -]) -def test_mpi_distribute_local_cyclic_job(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with the distribution of ranks - across sockets prescribed - """ - job = Job(cpu_config, tasks=185, distribute_local=CpuDistribution.DISTRIBUTE_CYCLIC) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--distribution=block:block'}), - (AprunLauncher, {'aprun', '-n 185'}), - (MpirunLauncher, {'mpirun', '-np 185', '--map-by core'}) -]) -def test_mpi_distribute_block_job(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with the distribution of ranks - fully prescribed - """ - job = Job(cpu_config, tasks=185, distribute_remote=CpuDistribution.DISTRIBUTE_BLOCK, - distribute_local=CpuDistribution.DISTRIBUTE_BLOCK) - assert set(launcher.get_launch_cmd(job)) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--my-option'}), - (AprunLauncher, {'aprun', '-n 185', '--my-option'}), - (MpirunLauncher, {'mpirun', '-np 185', '--my-option'}) -]) -def test_mpi_custom_option(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with custom option handed through""" - job = Job(cpu_config, tasks=185) - assert set(launcher.get_launch_cmd(job, user_options='--my-option')) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=185', '--my-option', '--full-custom'}), - (AprunLauncher, {'aprun', '-n 185', '--my-option', '--full-custom'}), - (MpirunLauncher, {'mpirun', '-np 185', '--my-option', '--full-custom'}) -]) -def test_mpi_custom_options(cpu_config, launcher, cmd): - """An MPI-only :any:`Job` specification with custom options handed through""" - job = Job(cpu_config, tasks=185) - assert set(launcher.get_launch_cmd(job, user_options=['--my-option', '--full-custom'])) == cmd - - -@pytest.mark.parametrize('launcher,cmd', [ - (SrunLauncher, {'srun', '--ntasks=1'}), - (AprunLauncher, {'aprun', '-n 1'}), - (MpirunLauncher, {'mpirun', '-np 1'}) -]) -def test_mpi_non_parallel(cpu_config, launcher, cmd): - """A non-parallel :any:`Job` specification""" - job = Job(cpu_config, tasks=1) - assert set(launcher.get_launch_cmd(job)) == cmd