From 8b8b2c489d6ee44d567652dbb0220c2d4a831e42 Mon Sep 17 00:00:00 2001 From: svdenhau Date: Fri, 8 Dec 2023 19:17:57 +0100 Subject: [PATCH] re-add WQ support --- configs/htex.yaml | 2 ++ configs/workqueue.yaml | 15 ++++++++ psiflow/execution.py | 75 ++++++++++++++++++++++++++++++++++----- psiflow/models/_mace.py | 6 ++-- psiflow/models/_nequip.py | 6 ++-- 5 files changed, 89 insertions(+), 15 deletions(-) create mode 100644 configs/workqueue.yaml diff --git a/configs/htex.yaml b/configs/htex.yaml index 514623d..87d49af 100644 --- a/configs/htex.yaml +++ b/configs/htex.yaml @@ -8,5 +8,7 @@ ModelTraining: max_walltime: 1 gpu: true ReferenceEvaluation: + mpi_command: 'mpirun -np {}' # cp2k on conda-forge comes with OpenMPI (not MPICH as in container) cores_per_worker: 1 +mode: 'htex' ... diff --git a/configs/workqueue.yaml b/configs/workqueue.yaml new file mode 100644 index 0000000..b4f0ad4 --- /dev/null +++ b/configs/workqueue.yaml @@ -0,0 +1,15 @@ +--- +ModelEvaluation: + cores_per_worker: 1 + simulation_engine: 'openmm' + gpu: false +ModelTraining: + cores_per_worker: 4 + max_walltime: 1 + gpu: true +ReferenceEvaluation: + max_walltime: 0.3 + cores_per_worker: 1 + mpi_command: 'mpirun -np {}' # cp2k on conda-forge comes with OpenMPI (not MPICH as in container) +mode: 'workqueue' +... diff --git a/psiflow/execution.py b/psiflow/execution.py index 7690100..a797c51 100644 --- a/psiflow/execution.py +++ b/psiflow/execution.py @@ -11,7 +11,6 @@ # see https://stackoverflow.com/questions/59904631/python-class-constants-in-dataclasses from typing import Any, Callable, Optional, Type, Union -from warnings import warn import parsl import psutil @@ -25,7 +24,7 @@ from parsl.providers.base import ExecutionProvider from psiflow.models import BaseModel -from psiflow.parsl_utils import ContainerizedLauncher +from psiflow.parsl_utils import ContainerizedLauncher, MyWorkQueueExecutor from psiflow.reference import BaseReference from psiflow.utils import resolve_and_check, set_logger @@ -63,9 +62,9 @@ def __post_init__(self): self.max_walltime = 1e9 def generate_parsl_resource_specification(self): - warn("use of the WQ executor is deprecated!") resource_specification = {} resource_specification["cores"] = self.cores_per_worker + # add random disk and mem usage because this is somehow required resource_specification["disk"] = 1000 memory = 2000 * self.cores_per_worker resource_specification["memory"] = int(memory) @@ -257,8 +256,10 @@ def parse_config(yaml_dict: dict): "retries": 1, "strategy": "simple", "max_idletime": 20, - "htex_address": None, "default_threads": 1, + "htex_address": None, + # "use_workqueue": False, + "mode": "taskvine", } forced = { "initialize_logging": False, # manual; to move parsl.log one level up @@ -328,8 +329,15 @@ def load( # create main parsl executors executors = [] + mode = psiflow_config.pop("mode") for definition in definitions: - if not definition.use_threadpool: + if definition.use_threadpool: + executor = ThreadPoolExecutor( + max_threads=definition.cores_per_worker, + working_dir=str(path), + label=definition.name(), + ) + elif mode == "htex": if type(definition.parsl_provider) is LocalProvider: # noqa: F405 cores_available = psutil.cpu_count(logical=False) max_workers = max( @@ -352,12 +360,61 @@ def load( provider=definition.parsl_provider, cpu_affinity=definition.cpu_affinity, ) - else: - executor = ThreadPoolExecutor( - max_threads=definition.cores_per_worker, - working_dir=str(path), + elif mode == "workqueue": + worker_options = [] + if hasattr(definition.parsl_provider, "cores_per_node"): + worker_options.append( + "--cores={}".format(definition.parsl_provider.cores_per_node), + ) + else: + worker_options.append( + "--cores={}".format(psutil.cpu_count(logical=False)), + ) + if hasattr(definition.parsl_provider, "walltime"): + walltime_hhmmss = definition.parsl_provider.walltime.split(":") + assert len(walltime_hhmmss) == 3 + walltime = 0 + walltime += 60 * float(walltime_hhmmss[0]) + walltime += float(walltime_hhmmss[1]) + walltime += 1 # whatever seconds are present + walltime -= ( + 5 # add 5 minutes of slack, e.g. for container downloading + ) + worker_options.append("--wall-time={}".format(walltime * 60)) + worker_options.append("--parent-death") + worker_options.append( + "--timeout={}".format(psiflow_config["max_idletime"]) + ) + # manager_config = TaskVineManagerConfig( + # shared_fs=True, + # max_retries=1, + # autocategory=False, + # enable_peer_transfers=False, + # port=0, + # ) + # factory_config = TaskVineFactoryConfig( + # factory_timeout=20, + # worker_options=' '.join(worker_options), + # ) + executor = MyWorkQueueExecutor( label=definition.name(), + working_dir=str(path / definition.name()), + provider=definition.parsl_provider, + shared_fs=True, + autocategory=False, + port=0, + max_retries=0, + coprocess=False, + worker_options=" ".join(worker_options), ) + else: + raise ValueError("Unknown mode {}".format(mode)) + # executor = TaskVineExecutor( + # label=definition.name(), + # provider=definition.parsl_provider, + # manager_config=manager_config, + # factory_config=factory_config, + # ) executors.append(executor) # create default executors diff --git a/psiflow/models/_mace.py b/psiflow/models/_mace.py index f66baf5..00ae7f1 100644 --- a/psiflow/models/_mace.py +++ b/psiflow/models/_mace.py @@ -120,7 +120,7 @@ class MACEConfig: ) -@typeguard.typechecked +# @typeguard.typechecked def initialize( mace_config: dict, stdout: str = "", @@ -174,14 +174,14 @@ def train( command_write, "timeout -s 15 {}s psiflow-train-mace".format(max(walltime - 15, 0)), "--config config.yaml", - "--model {};".format(inputs[0].filepath), + "--model {} || true;".format(inputs[0].filepath), "ls *;", "cp model/mace.model {};".format(outputs[0].filepath), # no swa ] return " ".join(command_list) -@typeguard.typechecked +# @typeguard.typechecked def deploy( device: str, inputs: List[File] = [], diff --git a/psiflow/models/_nequip.py b/psiflow/models/_nequip.py index 59eb12d..f067435 100644 --- a/psiflow/models/_nequip.py +++ b/psiflow/models/_nequip.py @@ -184,7 +184,7 @@ class AllegroConfig(NequIPConfig): r_max: float = 5.0 -@typeguard.typechecked +# @typeguard.typechecked def initialize( nequip_config: dict, stdout: str = "", @@ -214,7 +214,7 @@ def initialize( return " ".join(command_list) -@typeguard.typechecked +# @typeguard.typechecked def deploy( nequip_config: dict, inputs: List[File] = [], @@ -265,7 +265,7 @@ def train( "timeout -s 15 {}s".format(max(walltime - 15, 0)), # 15 s slack "psiflow-train-nequip", "--config config.yaml", - "--model {};".format(inputs[0].filepath), + "--model {} || true;".format(inputs[0].filepath), "ls;", "cp {}/best_model.pth {}".format( nequip_config["run_name"], outputs[0].filepath