Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
483f9a3
Remove duplicated platform attribute
pablogoitia Nov 13, 2025
e73e1f2
Temporaly downgraded the yaml version required to solve a runtime dep…
pablogoitia Nov 13, 2025
46add63
Temporaly added an entry point for debugging in VSCode
pablogoitia Nov 13, 2025
f197f9c
Integrated the Flux builder to the factories
pablogoitia Nov 13, 2025
84f79ce
Created a wrapper builder for Flux (as wrapper engine)
pablogoitia Nov 13, 2025
fe3298c
Modified the wrapper header to assign "bash" as the language for the …
pablogoitia Nov 13, 2025
223e00a
Just added some clarifying comments for myself (marked as TODOs)
pablogoitia Nov 13, 2025
5ec796e
Now the wrapping method is passed as a parameter to the vertical pack…
pablogoitia Nov 13, 2025
7d23980
Basic horizontal wrapper handling with Flux
pablogoitia Nov 13, 2025
0993b9c
Added some error handling to the builder (not the wrapper script)
pablogoitia Nov 13, 2025
14e8e1b
Minor changes in comments
pablogoitia Nov 14, 2025
1c8e0ab
Fixed exception message and horizontal wrapper generation for Flux
pablogoitia Nov 14, 2025
c62199f
Temporaly associate jobs in wrapper to the FluxOverSlurm fictitious p…
pablogoitia Nov 14, 2025
9911398
Create a fictitious platform for Flux running over Slurm
pablogoitia Nov 14, 2025
03c1cb0
Now jobs inside a wrapper contains more wrapper information
pablogoitia Nov 14, 2025
b4f8595
Header format improvements
pablogoitia Nov 14, 2025
146026d
Forced nslots in the flux batch command
pablogoitia Nov 14, 2025
3ccc0b5
Cleaned Flux script header
pablogoitia Nov 17, 2025
80e9324
Finished the vertical and horizontal wrappers. Improvements in code.
pablogoitia Nov 17, 2025
3d77764
Comment fix
pablogoitia Nov 18, 2025
8469766
Solved "Remove extraneous `f` prefix"
pablogoitia Nov 18, 2025
a9cbd2c
Now processors numbers are get per section to support V-H and H-V
pablogoitia Nov 18, 2025
b77c887
Fix environment loader
pablogoitia Nov 18, 2025
246b403
Support for vertical-horizontal wrappers
pablogoitia Nov 18, 2025
787ab6f
Implemented H-V wrappers and fixed a bug in V-H
pablogoitia Nov 19, 2025
650eb6d
Fixes in Flux headers
pablogoitia Nov 19, 2025
ddf2de0
Added some TODOs
pablogoitia Nov 19, 2025
c337c00
Now jobs are required to be waitable through their headers
pablogoitia Nov 21, 2025
65d50d5
Remove the non-existent tasks-per-node directive
pablogoitia Nov 21, 2025
ee6240a
Refactor the FluxWrapperBuilder to remove unused processor and waitab…
pablogoitia Nov 21, 2025
5f8a9db
Reorganize TODOs and remove unused commands
pablogoitia Nov 21, 2025
5379ba2
TODO removal or update
pablogoitia Nov 21, 2025
84082ac
Remove log path flags because they are overwritten in 'flux batch'
pablogoitia Nov 21, 2025
0823ab3
Environment setup is now available as a config parameter
pablogoitia Nov 21, 2025
b85501e
Now Autosubmit exits in case of receiving hetjobs, which are not impl…
pablogoitia Nov 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,8 @@ def submit_ready_jobs(as_conf: AutosubmitConfig, job_list: JobList, platforms_to
for platform in platforms_to_test:
packager = JobPackager(as_conf, platform, job_list, hold=hold)
packages_to_submit = packager.build_packages()

#### TODO: [ENGINES] Here is where the scripts are created and submitted
save_1, failed_packages, error_message, valid_packages_to_submit, any_job_submitted = platform.submit_ready_jobs(as_conf,
job_list,
packages_persistence,
Expand All @@ -2664,6 +2666,7 @@ def submit_ready_jobs(as_conf: AutosubmitConfig, job_list: JobList, platforms_to
save_2 = False
if platform.type.lower() in ["slurm", "pjm"] and not inspect and not only_wrappers:
# Process the script generated in submit_ready_jobs
# TODO: [ENGINES] The script is submitted to Slurm here
save_2, valid_packages_to_submit = platform.process_batch_ready_jobs(valid_packages_to_submit,
failed_packages,
error_message="", hold=hold)
Expand Down
10 changes: 10 additions & 0 deletions autosubmit/config/configcommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,16 @@ def get_wrapper_policy(self, wrapper=None):
wrapper = {}
return wrapper.get('POLICY', self.experiment_data.get("WRAPPERS", {}).get("POLICY", 'flexible'))

def get_custom_env_setup(self, wrapper=None):
"""Returns custom environment setup commands for a wrapper that runs with a wrapper engine.

:return: wrapper type (or none)
:rtype: string
"""
if wrapper is None:
wrapper = {}
return wrapper.get('CUSTOM_ENV_SETUP', self.experiment_data.get("WRAPPERS", {}).get("CUSTOM_ENV_SETUP", ''))

def get_wrappers(self):
"""Returns the jobs that should be wrapped, configured in the autosubmit's config.

Expand Down
14 changes: 10 additions & 4 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from autosubmit.job.metrics_processor import UserMetricProcessor
from autosubmit.job.template import get_template_snippet, Language
from autosubmit.log.log import Log, AutosubmitCritical
from autosubmit.platforms.fluxoverslurm import FluxOverSlurmPlatform
from autosubmit.platforms.paramiko_platform import ParamikoPlatform
from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter

Expand Down Expand Up @@ -130,7 +131,7 @@ class Job(object):
"""

__slots__ = (
'rerun_only', 'delay_end', 'wrapper_type', '_wrapper_queue',
'rerun_only', 'delay_end', 'wrapper_type', 'wrapper_method', '_wrapper_queue',
'_platform', '_queue', '_partition', 'retry_delay', '_section',
'_wallclock', 'wchunkinc', '_tasks', '_nodes',
'_threads', '_processors', '_memory', '_memory_per_task', '_chunk',
Expand All @@ -141,7 +142,7 @@ class Job(object):
'file', 'additional_files', 'executable', '_local_logs',
'_remote_logs', 'script_name', 'stat_file', '_status', 'prev_status',
'new_status', 'priority', '_parents', '_children', '_fail_count', 'expid',
'parameters', '_tmp_path', '_log_path', '_platform', 'check',
'parameters', '_tmp_path', '_log_path', 'check',
'check_warnings', '_packed', 'hold', 'distance_weight', 'level', '_export',
'_dependencies', 'running', 'start_time', 'ext_header_path', 'ext_tailer_path',
'edge_info', 'total_jobs', 'max_waiting_jobs', 'exclusive', '_retrials',
Expand Down Expand Up @@ -192,6 +193,7 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
self.rerun_only = False
self.delay_end = None
self.wrapper_type = None
self.wrapper_method = None
self._wrapper_queue = None
self._platform = None
self._queue = None
Expand Down Expand Up @@ -245,7 +247,6 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
self._tmp_path = os.path.join(
BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR)
self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}")
self._platform = None
self.check = 'true'
self.check_warnings = False
self.packed = False
Expand Down Expand Up @@ -310,6 +311,7 @@ def clean_attributes(self):
self.rerun_only = False
self.delay_end = None
self.wrapper_type = None
self.wrapper_method = None
self._wrapper_queue = None
self._queue = None
self._partition = None
Expand Down Expand Up @@ -1635,6 +1637,7 @@ def update_current_parameters(self, as_conf: AutosubmitConfig, parameters: dict)
"min_wrapped_h",
"min_wrapped_v",
"policy"
"custom_env_setup"
]:
parameters[f"CURRENT_{key.upper()}"] = value

Expand Down Expand Up @@ -2380,7 +2383,10 @@ def get_wrapped_content(self, as_conf: AutosubmitConfig, parameters: dict):
return self._get_paramiko_template(snippet, template, parameters)

def _get_paramiko_template(self, snippet: 'TemplateSnippet', template, parameters) -> str:
current_platform = self._platform
if self.wrapper_method == 'flux':
current_platform = FluxOverSlurmPlatform()
else:
current_platform = self._platform
return ''.join([
snippet.as_header(current_platform.get_header(self, parameters), self.executable),
snippet.as_body(template),
Expand Down
14 changes: 9 additions & 5 deletions autosubmit/job/job_packager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self, as_config: 'AutosubmitConfig', platform: 'ParamikoPlatform',
self.jobs_in_wrapper = dict()
self.extensible_wallclock = dict()
self.wrapper_info = list()
self.custom_env_setup = dict()
self.calculate_job_limits(platform)
self.special_variables = dict()
self.wrappers_with_error = {}
Expand All @@ -81,7 +82,8 @@ def __init__(self, as_config: 'AutosubmitConfig', platform: 'ParamikoPlatform',
self.wrapper_method[wrapper_section] = self._as_config.get_wrapper_method(wrapper_data).lower()
self.jobs_in_wrapper[wrapper_section] = self._as_config.get_wrapper_jobs(wrapper_data)
self.extensible_wallclock[wrapper_section] = self._as_config.get_extensible_wallclock(wrapper_data)
self.wrapper_info = [self.wrapper_type,self.wrapper_policy,self.wrapper_method,self.jobs_in_wrapper,self.extensible_wallclock] # to pass to job_packages
self.custom_env_setup[wrapper_section] = self._as_config.get_custom_env_setup(wrapper_data)
self.wrapper_info = [self.wrapper_type,self.wrapper_policy,self.wrapper_method,self.jobs_in_wrapper,self.extensible_wallclock,self.custom_env_setup] # to pass to job_packages
Log.debug("Number of jobs available: {0}", self._max_wait_jobs_to_submit)
if self.hold:
Log.debug("Number of jobs prepared: {0}", len(jobs_list.get_prepared(platform)))
Expand Down Expand Up @@ -311,6 +313,8 @@ def submit_remaining_jobs(self, p: JobPackageBase, packages_to_submit: list, max
Log.warning("There are no more jobs of this section to form a wrapper, submitting the remaining jobs")
if len(p.jobs) == 1:
p.jobs[0].wrapper_type = "Simple"
# TODO: Here I could insert the code to reverse the wrapper method to None
# p.jobs[0].wrapper_method = self.wrapper_method
packages_to_submit.append(JobPackageSimple([p.jobs[0]]))
else:
packages_to_submit.append(p)
Expand Down Expand Up @@ -707,7 +711,7 @@ def _build_horizontal_packages(self, section_list, wrapper_limits, section, wrap
jobs_resources = horizontal_packager.components_dict
jobs_resources['MACHINEFILES'] = machinefile_function
current_package = JobPackageHorizontal(
package_jobs, jobs_resources=jobs_resources, method=self.wrapper_method[self.current_wrapper_section], configuration=self._as_config, wrapper_section=self.current_wrapper_section)
package_jobs, jobs_resources=jobs_resources, method=self.wrapper_method[self.current_wrapper_section], configuration=self._as_config, wrapper_section=self.current_wrapper_section, wrapper_info=wrapper_info)
packages.append(current_package)

return packages
Expand All @@ -732,7 +736,7 @@ def _build_vertical_packages(self, section_list, wrapper_limits,wrapper_info={})
dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section)
job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, wrapper_limits["max"], wrapper_limits, self._platform.max_wallclock,wrapper_info=wrapper_info)
jobs_list = job_vertical_packager.build_vertical_package(job, wrapper_info)
packages.append(JobPackageVertical(jobs_list, configuration=self._as_config,wrapper_section=self.current_wrapper_section,wrapper_info=wrapper_info))
packages.append(JobPackageVertical(jobs_list, configuration=self._as_config,wrapper_section=self.current_wrapper_section,wrapper_info=wrapper_info,method=self.wrapper_method[self.current_wrapper_section]))
else:
break
return packages
Expand Down Expand Up @@ -779,7 +783,7 @@ def _build_horizontal_vertical_package(self, horizontal_packager, section, jobs_
for job in current_package[level]:
job.level = level
return JobPackageHorizontalVertical(current_package, max_procs, total_wallclock,
jobs_resources=jobs_resources, configuration=self._as_config, wrapper_section=self.current_wrapper_section)
jobs_resources=jobs_resources, configuration=self._as_config, wrapper_section=self.current_wrapper_section, wrapper_info=wrapper_info, method=self.wrapper_method[self.current_wrapper_section])

def _build_vertical_horizontal_package(self, horizontal_packager, jobs_resources, wrapper_info):
total_wallclock = '00:00'
Expand Down Expand Up @@ -807,7 +811,7 @@ def _build_vertical_horizontal_package(self, horizontal_packager, jobs_resources
for job in current_package[level]:
job.level = level
return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock,
jobs_resources=jobs_resources, method=self.wrapper_method[self.current_wrapper_section], configuration=self._as_config, wrapper_section=self.current_wrapper_section )
jobs_resources=jobs_resources, method=self.wrapper_method[self.current_wrapper_section], configuration=self._as_config, wrapper_section=self.current_wrapper_section, wrapper_info=wrapper_info)


# TODO: Rename and unite JobPackerVerticalMixed to JobPackerVertical since
Expand Down
53 changes: 37 additions & 16 deletions autosubmit/job/job_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from contextlib import suppress
from datetime import timedelta
from threading import Thread
from typing import Optional, TYPE_CHECKING
from typing import List, Optional, TYPE_CHECKING

from bscearth.utils.date import sum_str_hours

Expand Down Expand Up @@ -157,7 +157,7 @@ def submit_unthreaded(self, configuration: 'AutosubmitConfig', only_generate: bo
Log.result("Script {0} OK", job.name)
# looking for directives on jobs
self._custom_directives = self._custom_directives | set(job.custom_directives)
self._create_scripts(configuration)
# self._create_scripts(configuration) # TODO: [ENGINES] Duplicated. Solved by Dani in PR #2693, pending review

def submit(self, configuration: 'AutosubmitConfig', parameters: Optional[dict] = None, only_generate: bool = False, hold: bool = False):
"""
Expand Down Expand Up @@ -185,8 +185,11 @@ def submit(self, configuration: 'AutosubmitConfig', parameters: Optional[dict] =
if len(self.jobs) < thread_number or str(
configuration.experiment_data.get("CONFIG", {}).get("ENABLE_WRAPPER_THREADS",
"False")).lower() == "false":
# TODO: [ENGINES] This function (should) check the scripts validity
self.submit_unthreaded(configuration, only_generate)
Log.debug("Creating Scripts")
# TODO: [ENGINES] Here is where the scripts are created. Both the common script and
# the individual job ones.
self._create_scripts(configuration)
else:
lhandle = list()
Expand All @@ -209,6 +212,9 @@ def submit(self, configuration: 'AutosubmitConfig', parameters: Optional[dict] =
try:
if not only_generate:
Log.debug("Sending Files")
# TODO: [ENGINES] Here is where the files are sent to the platform as a compressed tar
# "WRAPPER_SCRIPTS.tar", together with the COMMON_SCRIPT, which is the wrapper script
# (AS_THREAD_*).
self._send_files()
Log.debug("Submitting")
self._do_submission(hold=hold)
Expand All @@ -226,6 +232,16 @@ def _send_files(self):
def _do_submission(self, job_scripts=None, hold: bool = False):
""" Submit package to the platform. """
pass # pragma: no cover

def _assign_wrapper_method_to_jobs(self, jobs: List[Job], wrapper_method: str) -> None:
"""
Assign the wrapper method to each job in the list.

:param jobs: List of jobs.
:param wrapper_method: Wrapper method to be assigned.
"""
for job in jobs:
job.wrapper_method = wrapper_method

def process_jobs_to_submit(self, job_id: str, hold: bool = False) -> None:
for i, job in enumerate(self.jobs):
Expand All @@ -252,6 +268,9 @@ def __init__(self, jobs: list[Job]):
# Add a property that defines what is a package with wrappers
# self.name = "simple_package"

# TODO: [ENGINES] Simple jobs may have been in a wrapper previously, so we reset the wrapper method.
self._assign_wrapper_method_to_jobs(jobs, None)

def _create_scripts(self, configuration: 'AutosubmitConfig'):
for job in self.jobs:
self._job_scripts[job.name] = job.create_script(configuration)
Expand Down Expand Up @@ -321,7 +340,7 @@ def _do_submission(self, job_scripts=None, hold=False):
super(JobPackageSimpleWrapped, self)._do_submission(job_scripts, hold=hold)


class JobPackageArray(JobPackageBase):
class JobPackageArray(JobPackageBase): # TODO: [ENGINES] Never used?
"""
Class to manage an array-based package of jobs to be submitted by autosubmit
"""
Expand Down Expand Up @@ -436,6 +455,7 @@ def __init__(self, jobs: list[Job], dependency=None, jobs_resources: Optional[di
self.wrapper_method = wrapper_info[2]
self.jobs_in_wrapper = wrapper_info[3]
self.extensible_wallclock = wrapper_info[4]
self.custom_env_setup = wrapper_info[5]
else:
self.wrapper_type = None
self.wrapper_policy = None
Expand All @@ -448,8 +468,7 @@ def __init__(self, jobs: list[Job], dependency=None, jobs_resources: Optional[di
self._common_script = None
self.executable = None

self._wallclock = '00:00'
# depends on the type of wrapper
self._wallclock = '00:00' # depends on the type of wrapper

self._jobs_resources = jobs_resources
self._wrapper_factory = self.platform.wrapper
Expand Down Expand Up @@ -549,6 +568,9 @@ def __init__(self, jobs: list[Job], dependency=None, jobs_resources: Optional[di
self.parameters["EXECUTABLE"] = self.executable # have to look
self.method = method

# Assign wrapper method to jobs so that they can be assigned their platform or the wrapper engine later
self._assign_wrapper_method_to_jobs(jobs, method)

@property
def name(self):
# FIXME: Calling this raises an exception at runtime?
Expand Down Expand Up @@ -761,13 +783,12 @@ class JobPackageVertical(JobPackageThread):
:type jobs:
:param: dependency:
"""

def __init__(self, jobs: list[Job], dependency=None, configuration: Optional['AutosubmitConfig'] = None,
wrapper_section: str = "WRAPPERS", wrapper_info: Optional[list] = None):
wrapper_section: str = "WRAPPERS", wrapper_info: Optional[list] = None, method: str = 'ASThread'):
if wrapper_info is None:
wrapper_info = []
super(JobPackageVertical, self).__init__(jobs, dependency, configuration=configuration,
wrapper_section=wrapper_section, wrapper_info=wrapper_info)
wrapper_section=wrapper_section, wrapper_info=wrapper_info, method=method)
for job in jobs:
if int(job.processors) >= int(self._num_processors):
self._num_processors = job.processors
Expand Down Expand Up @@ -854,9 +875,9 @@ class JobPackageHorizontal(JobPackageThread):
Class to manage a horizontal thread-based package of jobs to be submitted by autosubmit
"""
def __init__(self, jobs: list[Job], dependency=None, jobs_resources: dict = None, method: str = 'ASThread',
configuration: Optional['AutosubmitConfig'] = None, wrapper_section="WRAPPERS"):
configuration: Optional['AutosubmitConfig'] = None, wrapper_section="WRAPPERS", wrapper_info: Optional[list] = None):
super(JobPackageHorizontal, self).__init__(jobs, dependency, jobs_resources, configuration=configuration,
wrapper_section=wrapper_section)
wrapper_section=wrapper_section, wrapper_info=wrapper_info, method=method)
self.method = method
self._queue = self.queue
for job in jobs:
Expand All @@ -883,17 +904,17 @@ def _common_script_content(self) -> str:

class JobPackageHybrid(JobPackageThread):
"""
Class to manage a hybrid (horizontal and vertical) thread-based package of jobs to be submitted by autosubmit
"""
Class to manage a hybrid (horizontal and vertical) thread-based package of jobs to be submitted by autosubmit
"""

def __init__(self, jobs: list[list[Job]], num_processors: str, total_wallclock, dependency=None,
jobs_resources: Optional[dict] = None, method: str = "ASThread",
configuration: Optional['AutosubmitConfig'] = None, wrapper_section="WRAPPERS"):
jobs_resources: Optional[dict] = None, method: str = "ASThread", configuration: Optional['AutosubmitConfig'] = None,
wrapper_section="WRAPPERS", wrapper_info: Optional[list] = None):
all_jobs = [item for sublist in jobs for item in sublist] # flatten list
if jobs_resources is None:
jobs_resources = {}
super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources, method,
configuration=configuration, wrapper_section=wrapper_section)
super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources, method, configuration=configuration,
wrapper_section=wrapper_section, wrapper_info=wrapper_info)
self.jobs_lists = jobs
self.method = method
self._num_processors = int(num_processors)
Expand Down
Loading