diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index ecae34d40..5df8cd95e 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -81,6 +81,7 @@ from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from autosubmit.platforms.platform import Platform from autosubmit.platforms.submitter import Submitter +from autosubmit.generators import Engine, get_engine_generator from log.log import Log, AutosubmitError, AutosubmitCritical dialog = None @@ -698,6 +699,16 @@ def parse_args() -> Tuple[int, Optional[argparse.Namespace]]: help='Select the status (one or more) to filter the list of jobs.') subparser.add_argument('-t', '--target', type=str, default="FAILED", metavar='STATUS', help='Final status of killed jobs. Default is FAILED.') + subparser = subparsers.add_parser( + 'generate', description='Generate a workflow definition for a different workflow engine', + argument_default=argparse.SUPPRESS) + subparser.add_argument('expid', help='experiment identifier') + subsubparser = subparser.add_subparsers(title="engines", dest='engine', required=True, description='Workflow engine identifier') + for engine in Engine: + generator_class = get_engine_generator(engine) + parser_engine = subsubparser.add_parser(engine.value, help=f"{generator_class.get_engine_name()}") + generator_class.add_parse_args(parser_engine) + args, unknown = parser.parse_known_args() if args.version: Log.info(Autosubmit.autosubmit_version) @@ -807,6 +818,8 @@ def run_command(args): return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) elif args.command == 'stop': return Autosubmit.stop(args.expid, args.force, args.all, args.force_all, args.cancel, args.filter_status, args.target) + elif args.command == 'generate': + return Autosubmit.generate_workflow(args.expid, Engine[args.engine], args) @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): @@ -5912,3 +5925,46 @@ def stop(expids: str, force=False, all_expids=False, force_all=False, cancel=Fal if cancel: job_list, _, _, _, _, _, _, _ = Autosubmit.prepare_run(expid, check_scripts=False) cancel_jobs(job_list, active_jobs_filter=current_status, target_status=status) + + @staticmethod + def generate_workflow(expid: str, engine: Engine, args: argparse.Namespace) -> None: + """Generate the workflow configuration for a different backend engine.""" + Log.info(f'Generate workflow configuration for {engine}') + + try: + Log.info("Getting job list...") + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(False) + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if len(submitter.platforms) == 0: + raise ValueError('Missing platform!') + + packages_persistence = JobPackagePersistence(expid) + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=False, monitor=False) + + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + + hpc_architecture = as_conf.get_platform() + for job in job_list.get_job_list(): + if job.platform_name is None or job.platform_name == '': + job.platform_name = hpc_architecture + job.platform = submitter.platforms[job.platform_name] + job.update_parameters(as_conf, job_list.parameters) + + job_list.check_scripts(as_conf) + except AutosubmitError as e: + raise AutosubmitCritical(e.message, e.code, e.trace) + except AutosubmitCritical as e: + raise + except BaseException as e: + raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040, + str(e)) + + generator_class = get_engine_generator(engine) + parser = argparse.ArgumentParser() + generator_class.add_parse_args(parser) + generator_input_keys = vars(parser.parse_args('')).keys() + generator_kwargs = {key: args.__getattribute__(key) for key in generator_input_keys} + generator_class.generate(job_list, as_conf, **generator_kwargs) diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py new file mode 100644 index 000000000..942249108 --- /dev/null +++ b/autosubmit/generators/__init__.py @@ -0,0 +1,45 @@ +from enum import Enum +from importlib import import_module +from typing import AbstractSet, Callable, cast +from abc import ABC, abstractmethod + + +"""This module provides generators to produce workflow configurations for different backend engines.""" + +class Engine(Enum): + """Workflow Manager engine flavors.""" + aiida = 'aiida' + + def __str__(self): + return self.value + + +class AbstractGenerator(ABC): + """Generator of workflow for an engine.""" + + @staticmethod + @abstractmethod + def get_engine_name() -> str: + """The engine name used for the help text.""" + raise NotImplementedError + + @staticmethod + @abstractmethod + def add_parse_args(parser) -> None: + """Adds arguments to the parser that are needed for a specific engine implementation.""" + raise NotImplementedError + + @classmethod + @abstractmethod + def generate(cls, job_list, as_conf, **arg_options) -> None: + """Generates the workflow from the created autosubmit workflow.""" + raise NotImplementedError + + +def get_engine_generator(engine: Engine) -> AbstractGenerator: + return import_module(f'autosubmit.generators.{engine.value}').Generator + +__all__ = [ + 'Engine', + 'get_engine_generator' +] diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py new file mode 100644 index 000000000..aa5cd45f6 --- /dev/null +++ b/autosubmit/generators/aiida.py @@ -0,0 +1,332 @@ +import os +from pathlib import Path +from functools import cached_property +import warnings +import re +import yaml + +from autosubmitconfigparser.config.configcommon import AutosubmitConfig + +from autosubmit.job.job_list import JobList +from autosubmit.generators import AbstractGenerator +from autosubmit.platforms.platform import Platform +from autosubmit.platforms.paramiko_platform import ParamikoPlatform + +"""The AiiDA generator for Autosubmit.""" + +# Autosubmit Task name separator (not to be confused with task and chunk name separator). +DEFAULT_SEPARATOR = '_' + + +class Generator(AbstractGenerator): + """Generates an aiida workflow script that initializes all required AiiDA resources. + + The generated file is structures as the following: + * header: Information about the generation of the file + * imports: All required imports + * init: Initialization of all python resources that need to be instantiated once for the whole script + * create_orm_nodes: Creation of all AiiDA's object-relational mapping (ORM) nodes covering the creation of computer and codes + * workgraph_tasks: Creation of the AiiDA-WorkGraph tasks + * workgraph_deps: Linking of the dependencies between the AiiDA-WorkGraph tasks + * workgraph_submission: Submission of the AiiDA-WorkGraph + """ + # AiiDA Slurm options + # --cpus-per-task -> num_cores_per_mpiproc + # --ntasks-per-node -> num_mpiprocs_per_machine + # --nodes -> num_machines + # --mem -> max_memory_kb (with some translation) + # --qos -> qos + # --partition -> queue_name + # + # Autosubmit Slurm options + # --cpus-per-task -> NUMTHREADS (job) + # --ntasks-per-node -> TASKS (job) + # --nodes -> NODES (job) + # --mem -> MEMORY (job) + # --qos -> CURRENT_QUEUE (job) + # --partition -> PARTITION (job) + # + SUPPORTED_JOB_KEYWORDS = ["NUMTHREADS", "TASKS", "NODES", "WALLCLOCK", "MEMORY", "PLATFORM", # these we need to transfer to aiida + "DEPENDENCIES", "FILE", "RUNNING"] # these are resolved by autosubmit internally + + SUPPORTED_PLATFORM_KEYWORDS = ["TYPE", "HOST", "USER", "QUEUE", "SCRATCH_DIR", "MAX_WALLCLOCK", # these we need to transfer to aiida + "PROJECT"] # these are resolved by autosubmit internally + + def __init__(self, job_list: JobList, as_conf: AutosubmitConfig, output_dir: str): + if not (output_path := Path(output_dir)).exists(): + raise ValueError(f"Given `output_dir` {output_path} does not exist.") + self._output_path = (output_path / job_list.expid).absolute() + self._output_path.mkdir(exist_ok=True) + self._job_list = job_list + self._as_conf = as_conf + + @classmethod + def generate(cls, job_list: JobList, as_conf: AutosubmitConfig, output_dir: str) -> None: + self = cls(job_list, as_conf, output_dir) + self._validate() + workflow_script = self._generate_workflow_script() + (self._output_path / "submit_aiida_workflow.py").write_text(workflow_script) + (self._output_path / "README.md").write_text(self._generate_readme()) + + @staticmethod + def get_engine_name() -> str: + return "AiiDA" + + @staticmethod + def add_parse_args(parser) -> None: + parser.add_argument('-o', '--output_dir', dest="output_dir", default=".", help='Output directory') + + def _validate(self) -> None: + """Validate jobs""" + for job_name, job_conf in self._as_conf.jobs_data.items(): + for key in job_conf.keys(): + if key not in Generator.SUPPORTED_JOB_KEYWORDS: + msg = f"Found in job {job_name} configuration file key {key} that is not officially supported for AiiDA generator. It might result in an error." + warnings.warn(msg) + ## validate platforms + for platform_name, platform_conf in self._as_conf.platforms_data.items(): + # only validate platforms that are used in jobs + if platform_name in self._platforms_used_in_job.keys(): + for key, value in platform_conf.items(): + if key not in Generator.SUPPORTED_PLATFORM_KEYWORDS and value != '': + msg = f"Found in platform {platform_name} configuration file key {key} that is not supported for AiiDA generator." + warnings.warn(msg) + + + @cached_property + def _platforms_used_in_job(self) -> dict[str, Platform]: + platforms_used_in_jobs = {} + for job in self._job_list.get_all(): + platforms_used_in_jobs[job.platform.name] = job.platform + return platforms_used_in_jobs + + def _generate_workflow_script(self) -> str: + """Generates a PyFlow workflow using Autosubmit database. + + The ``autosubmit create`` command must have been already executed prior + to calling this function. This is so that the jobs are correctly loaded + to produce the PyFlow workflow. + + :param job_list: ``JobList`` Autosubmit object, that contains the parameters, jobs, and graph + :param as_conf: Autosubmit configuration + :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse + """ + header = self._generate_header_section() + imports = self._generate_imports_section() + init = self._generate_init_section() + create_orm_nodes = self._generate_create_orm_nodes_section() + workgraph_tasks = self._generate_workgraph_tasks_section() + workgraph_deps = self._generate_workgraph_deps_section() + workgraph_submission = self._generate_workgraph_submission_section() + return header + imports + init + create_orm_nodes + workgraph_tasks + workgraph_deps + workgraph_submission + + + def _generate_header_section(self) -> str: + return f"""# HEADER +# This is in autogenerated file from {self._job_list.expid} +# The computer and codes are defined for the following platforms: +# {list(self._platforms_used_in_job.keys())} + +""" + + def _generate_imports_section(self) -> str: + return """# IMPORTS +import aiida +from aiida import orm +from aiida_workgraph import WorkGraph +from aiida.orm.utils.builders.computer import ComputerBuilder +from aiida.common.exceptions import NotExistent +from pathlib import Path +import yaml + +""" + + def _generate_init_section(self) -> str: + return """# INIT +aiida.load_profile() +wg = WorkGraph() +tasks = {} + +""" + + def _generate_create_orm_nodes_section(self) -> str: + # aiida computer + + code_section = "# CREATE_ORM_NODES" + for platform in self._platforms_used_in_job.values(): + + from autosubmit.platforms.locplatform import LocalPlatform + from autosubmit.platforms.slurmplatform import SlurmPlatform + if isinstance(platform, LocalPlatform): + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + "description": "", + "transport": "core.local", + "scheduler": "core.direct", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": 1, + "default_memory_per_machine": None, + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + elif isinstance(platform, SlurmPlatform): + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + #username": f"{platform.user}", does not work + "description": "", + "transport": "core.ssh", + "scheduler": "core.slurm", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": platform.processors_per_node, + "default_memory_per_machine": None, # This is specified in the task option + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + else: + raise ValueError(f"Platform type {platform} not supported for engine aiida.") + + computer_setup_path = Path(self._output_path / f"{platform.name}/{platform.name}-setup.yml") + computer_setup_path.parent.mkdir(exist_ok=True) + computer_setup_path.write_text(yaml.dump(computer_setup)) + create_computer = f""" +try: + computer = orm.load_computer("{platform.name}") + print(f"Loaded computer {{computer.label!r}}") +except NotExistent: + setup_path = Path("{computer_setup_path}") + config_kwargs = yaml.safe_load(setup_path.read_text()) + computer = ComputerBuilder(**config_kwargs).new().store() + computer.configure(safe_interval=5.0) + computer.set_minimum_job_poll_interval(5.0) + + from aiida.transports.plugins.ssh import SshTransport + from aiida.transports import cli as transport_cli + default_kwargs = {{name: transport_cli.transport_option_default(name, computer) for name in SshTransport.get_valid_auth_params()}} + default_kwargs["port"] = int(default_kwargs["port"]) + default_kwargs["timeout"] = float(default_kwargs["timeout"]) + default_kwargs["username"] = "{platform.user}" + default_kwargs['key_filename'] = "{os.environ["HOME"]}/.ssh/id_rsa" + computer.configure(user=orm.User.collection.get_default(), **default_kwargs) + print(f"Created and stored computer {{computer.label}}")""" + + # aiida bash code to run script + code_setup = { + "computer": f"{platform.name}", + "filepath_executable": "/bin/bash", + "label": "bash", + "description": '', + "default_calc_job_plugin": 'core.shell', + "prepend_text": '', + "append_text": '', + "use_double_quotes": False, + "with_mpi": False + } + + code_setup_path = Path(self._output_path / f"{platform.name}/bash@{platform.name}-setup.yml") + code_setup_path.parent.mkdir(exist_ok=True) + code_setup_path.write_text(yaml.dump(code_setup)) + create_code = f""" +try: + bash_code = orm.load_code("bash@{platform.name}") +except NotExistent: + setup_path = Path("{code_setup_path}") + setup_kwargs = yaml.safe_load(setup_path.read_text()) + setup_kwargs["computer"] = orm.load_computer(setup_kwargs["computer"]) + bash_code = orm.InstalledCode(**setup_kwargs).store() + print(f"Created and stored bash@{{computer.label}}")""" + code_section += create_computer + create_code + code_section += "\n\n" + return code_section + + def _generate_workgraph_tasks_section(self): + code_section = "# WORKGRAPH_TASKS" + + for job in self._job_list.get_all(): + script_name = job.create_script(self._as_conf) + script_path = Path(job._tmp_path, script_name) + script_text = open(script_path).read() + # Let's drop the Autosubmit header and tailed. + trimmed_script_text = re.findall( + r'# Autosubmit job(.*)# Autosubmit tailer', + script_text, + flags=re.DOTALL | re.MULTILINE)[0][1:-1] + trimmed_script_path = self._output_path / script_name + trimmed_script_path.write_text(trimmed_script_text) + create_task = f""" +tasks["{job.name}"] = wg.add_task( + "workgraph.shelljob", + name = "{job.name}", + command = orm.load_code("bash@{job.platform.name}"), + arguments = ["{{script}}"], + nodes = {{"script": orm.SinglefileData("{trimmed_script_path}")}} +)""" + if job.memory != "": + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.memory}}})""" + if job.platform.max_wallclock is None: + if job.parameters["WALLCLOCK"] != "": + wallclock_seconds = int(ParamikoPlatform.parse_time(job.wallclock).total_seconds()) + else: + wallclock_seconds = int(ParamikoPlatform.parse_time(job.platform.wallclock).total_seconds()) + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.max_wallclock_seconds": {wallclock_seconds}}})""" + if job.partition != "": + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.queue_name": "{job.partition}"}})""" + + + code_section += create_task + code_section += "\n\n" + return code_section + + def _generate_workgraph_deps_section(self) -> str: + code_section = "# WORKGRAPH_DEPS" + for edge in self._job_list.graph.edges: + code_section += f""" +tasks["{edge[1]}"].waiting_on.add(tasks["{edge[0]}"])""" + code_section += "\n\n" + return code_section + + + def _generate_workgraph_submission_section(self) -> str: + return """# WORKGRAPH_SUBMISSION +wg.run()""" + + def _generate_readme(self) -> str: + return f"""### Meta-information +This file has been auto generated from expid {self._job_list.expid}. + +### Instructions +To run the workflow please ensure that you have AiiDA installed and configured. +For that please refer to the +[installation section of AiiDA](https://aiida.readthedocs.io/projects/aiida-core/en/stable/installation/index.html) +Then you can run the generated workflow with: +```bash +python submit_aiida_workflow.py +``` +The workflow is compatible with aiida-core~=2.6.0 and aiida-workgraph==0.5.2. + +### Node Creation + +The workflow creates the computer and code nodes as required from the +autosubmit config. If the computer and code nodes have been already created for +example through a previous run, they are loaded. If you have changed the +computer parameters in your autosubmit config, then you need to delete the +corresponding compute and code nodes. Note that this has the disadvantage thit +it will delete all calculations that are associated with these nodes. +It is therefore recommended to use the JOB paramaters in autosubmit to override +certain computer configs. +""" + + +__all__ = [ + 'Generator' +] diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index e41d0a519..83efec2da 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1446,6 +1446,20 @@ def get_header(self, job, parameters): '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job, parameters)) return header + @staticmethod + def parse_time(wallclock): + # noinspection Annotator + regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') + parts = regex.match(wallclock) + if not parts: + return + parts = parts.groupdict() + time_params = {} + for name, param in parts.items(): + if param: + time_params[name] = int(param) + return timedelta(**time_params) + def closeConnection(self): # Ensure to delete all references to the ssh connection, so that it frees all the file descriptors with suppress(Exception): diff --git a/autosubmit/scripts/autosubmit.py b/autosubmit/scripts/autosubmit.py index 12968672b..6afff7f5c 100755 --- a/autosubmit/scripts/autosubmit.py +++ b/autosubmit/scripts/autosubmit.py @@ -105,6 +105,7 @@ def main(): return_value = Autosubmit.run_command(args) delete_lock_file() except BaseException as e: + raise delete_lock_file() command = "" expid = "" diff --git a/test/unit/test_generator.py b/test/unit/test_generator.py new file mode 100644 index 000000000..d9dcc0f85 --- /dev/null +++ b/test/unit/test_generator.py @@ -0,0 +1,219 @@ +from autosubmit.generators.aiida import Generator as AiidaGenerator +import pytest +import os + + +@pytest.fixture +def as_conf(autosubmit_config): + expid = "dummy-id" + experiment_data = { + "CONFIG": { + "AUTOSUBMIT_VERSION": "4.1.9", + "MAXWAITINGJOBS": 20, + "TOTALJOBS": 20, + "SAFETYSLEEPTIME": 10, + "RETRIALS": 0, + "RELOAD_WHILE_RUNNING": False, + }, + "MAIL": {"NOTIFICATIONS": False, "TO": None}, + "STORAGE": {"TYPE": "pkl", "COPY_REMOTE_LOGS": True}, + "DEFAULT": {"EXPID": f"{expid}", "HPCARCH": "LOCAL"}, + "EXPERIMENT": { + "DATELIST": "20000101", + "MEMBERS": "fc0", + "CHUNKSIZEUNIT": "month", + "CHUNKSIZE": 4, + "NUMCHUNKS": 2, + "CHUNKINI": "", + "CALENDAR": "standard", + }, + "PROJECT": {"PROJECT_TYPE": "none", "PROJECT_DESTINATION": ""}, + "GIT": { + "PROJECT_ORIGIN": "", + "PROJECT_BRANCH": "", + "PROJECT_COMMIT": "", + "PROJECT_SUBMODULES": "", + "FETCH_SINGLE_BRANCH": True, + }, + "SVN": {"PROJECT_URL": "", "PROJECT_REVISION": ""}, + "LOCAL": {"PROJECT_PATH": ""}, + "PROJECT_FILES": { + "FILE_PROJECT_CONF": "", + "FILE_JOBS_CONF": "", + "JOB_SCRIPTS_TYPE": "", + }, + "RERUN": {"RERUN": False, "RERUN_JOBLIST": ""}, + "JOBS": { + "LOCAL_SETUP": { + "FILE": "LOCAL_SETUP.sh", + "PLATFORM": "LOCAL", + "RUNNING": "once", + "DEPENDENCIES": {}, + "ADDITIONAL_FILES": [], + }, + "REMOTE_SETUP": { + "FILE": "REMOTE_SETUP.sh", + "PLATFORM": "LOCAL", + "DEPENDENCIES": {"LOCAL_SETUP": {}}, + "WALLCLOCK": "00:05", + "RUNNING": "once", + "ADDITIONAL_FILES": [], + }, + "INI": { + "FILE": "INI.sh", + "PLATFORM": "LOCAL", + "DEPENDENCIES": {"REMOTE_SETUP": {}}, + "RUNNING": "member", + "WALLCLOCK": "00:05", + "ADDITIONAL_FILES": [], + }, + "SIM": { + "FILE": "SIM.sh", + "PLATFORM": "LOCAL", + "DEPENDENCIES": {"INI": {}, "SIM-1": {}}, + "RUNNING": "chunk", + "WALLCLOCK": "00:05", + "ADDITIONAL_FILES": [], + }, + "POST": { + "FILE": "POST.sh", + "PLATFORM": "LOCAL", + "DEPENDENCIES": {"SIM": {}}, + "RUNNING": "once", + "WALLCLOCK": "00:05", + "ADDITIONAL_FILES": [], + }, + "CLEAN": { + "FILE": "CLEAN.sh", + "PLATFORM": "LOCAL", + "DEPENDENCIES": {"POST": {}}, + "RUNNING": "once", + "WALLCLOCK": "00:05", + "ADDITIONAL_FILES": [], + }, + "TRANSFER": { + "FILE": "TRANSFER.sh", + "PLATFORM": "LOCAL", + "DEPENDENCIES": {"CLEAN": {}}, + "RUNNING": "member", + "ADDITIONAL_FILES": [], + }, + }, + "PLATFORMS": { + "MARENOSTRUM4": { + "TYPE": "slurm", + "HOST": "mn1.bsc.es", + "PROJECT": "bsc32", + "USER": None, + "QUEUE": "debug", + "SCRATCH_DIR": "/gpfs/scratch", + "ADD_PROJECT_TO_HOST": False, + "MAX_WALLCLOCK": "48:00", + "TEMP_DIR": "", + }, + "MARENOSTRUM_ARCHIVE": { + "TYPE": "ps", + "HOST": "dt02.bsc.es", + "PROJECT": "bsc32", + "USER": None, + "SCRATCH_DIR": "/gpfs/scratch", + "ADD_PROJECT_TO_HOST": False, + "TEST_SUITE": False, + }, + "TRANSFER_NODE": { + "TYPE": "ps", + "HOST": "dt01.bsc.es", + "PROJECT": "bsc32", + "USER": None, + "ADD_PROJECT_TO_HOST": False, + "SCRATCH_DIR": "/gpfs/scratch", + }, + "TRANSFER_NODE_BSCEARTH000": { + "TYPE": "ps", + "HOST": "bscearth000", + "USER": None, + "PROJECT": "Earth", + "ADD_PROJECT_TO_HOST": False, + "QUEUE": "serial", + "SCRATCH_DIR": "/esarchive/scratch", + }, + "BSCEARTH000": { + "TYPE": "ps", + "HOST": "bscearth000", + "USER": None, + "PROJECT": "Earth", + "ADD_PROJECT_TO_HOST": False, + "QUEUE": "serial", + "SCRATCH_DIR": "/esarchive/scratch", + }, + "NORD3": { + "TYPE": "SLURM", + "HOST": "nord1.bsc.es", + "PROJECT": "bsc32", + "USER": None, + "QUEUE": "debug", + "SCRATCH_DIR": "/gpfs/scratch", + "MAX_WALLCLOCK": "48:00", + }, + "ECMWF-XC40": { + "TYPE": "ecaccess", + "VERSION": "pbs", + "HOST": "cca", + "USER": None, + "PROJECT": "spesiccf", + "ADD_PROJECT_TO_HOST": False, + "SCRATCH_DIR": "/scratch/ms", + "QUEUE": "np", + "SERIAL_QUEUE": "ns", + "MAX_WALLCLOCK": "48:00", + }, + }, + "LOCAL_TMP_DIR": "/dummy/local/temp/dir" + } + + as_conf = autosubmit_config(expid, experiment_data=experiment_data) + return as_conf + +@pytest.fixture +def job_list(as_conf, monkeypatch): + from autosubmit.job.job_list import JobList + orig_generate = JobList.generate + def mockgenerate(self, *args, **kwargs): + kwargs['create'] = True + orig_generate(self, *args, **kwargs) + + monkeypatch.setattr(JobList, 'generate', mockgenerate) + from autosubmit.autosubmit import Autosubmit + + job_list = Autosubmit.load_job_list( + as_conf.expid, as_conf, notransitive=False, monitor=False + ) + jobs = job_list.get_all() + for job in jobs: + job._platform = jobs[0].platform # for jobs[0] the local platform is correctly set + job.het = {} + return job_list + +def test_aiida_generator(as_conf, job_list, tmp_path): + AiidaGenerator.generate(job_list, as_conf, str(tmp_path)) + + generated_paths_in_experiment_folder = set([ + f'{as_conf.expid}_LOCAL_SETUP.cmd', + f'{as_conf.expid}_REMOTE_SETUP.cmd', # date from experiment data + f'{as_conf.expid}_20000101_fc0_INI.cmd', + f'{as_conf.expid}_20000101_fc0_TRANSFER.cmd', # date from experiment data + f'{as_conf.expid}_20000101_fc0_1_SIM.cmd', # date from experiment data + f'{as_conf.expid}_20000101_fc0_2_SIM.cmd', # date from experiment data + f'{as_conf.expid}_POST.cmd', + f'{as_conf.expid}_CLEAN.cmd', + 'local', + "README.md", + "submit_aiida_workflow.py", + ]) + assert set(os.listdir(tmp_path / as_conf.expid)) == generated_paths_in_experiment_folder + + generated_paths_in_local_folder = set([ + "bash@local-setup.yml", + "local-setup.yml" + ]) + assert set(os.listdir(tmp_path / as_conf.expid / "local")) == generated_paths_in_local_folder