Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ platform does not support wrappers anymore (it was used for testing).
- Fixes an issue with multi-day applications dependencies bug #2631
- Fixes an issue with all-filter #2565
- Fixes an issue when setting a dependency to a different date or member # 2466 ( #2518 partially)

- Fixes an issue with some placeholders not being replaced in templates #2426
-
**Enhancements:**

- autosubmit/autosubmit container now includes the `$USER` environment variable
Expand Down
105 changes: 60 additions & 45 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
self.updated_log = False
self._log_recovered = False
self.log_recovered = False
self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time
self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time
self.start_time_timestamp = None
self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial
self._script = None # Inline code to be executed
self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial
self._script = None # Inline code to be executed
self._log_recovery_retries = None
self.ready_date = None
self.wrapper_name = None
Expand Down Expand Up @@ -401,6 +401,7 @@ def x11(self, value):
def x11_options(self):
"""Allows to set salloc parameters for x11"""
return self._x11_options

@x11_options.setter
def x11_options(self, value):
self._x11_options = value
Expand Down Expand Up @@ -669,11 +670,13 @@ def custom_directives(self):
@custom_directives.setter
def custom_directives(self, value):
self._custom_directives = value

@property # type: ignore
@autosubmit_parameter(name='splits')
def splits(self):
"""Max number of splits."""
return self._splits

@splits.setter
def splits(self, value):
self._splits = value
Expand Down Expand Up @@ -745,7 +748,7 @@ def read_header_tailer_script(self, script_path: str, as_conf: AutosubmitConfig,
# TODO: Check why the fstring is not working for the text below
raise AutosubmitCritical(
"Extended {1} script: couldn't figure out script {0} type\n".format(script_name,
error_message_type), 7011)
error_message_type), 7011)

if not found_hashbang:
# TODO: Check why the fstring is not working for the text below
Expand Down Expand Up @@ -1282,7 +1285,6 @@ def _sync_retrieve_logfiles(self):
# Update local logs
self.local_logs = remote_logs


def retrieve_external_retrials_logfiles(self):
log_recovered = False
self.remote_logs = self.get_new_remotelog_name()
Expand Down Expand Up @@ -1644,7 +1646,6 @@ def update_current_parameters(self, as_conf: AutosubmitConfig, parameters: dict)

return parameters


def process_scheduler_parameters(self, job_platform, chunk):
"""
Parsers yaml data stored in the dictionary and calculates the components of the heterogeneous job if any
Expand All @@ -1656,7 +1657,7 @@ def process_scheduler_parameters(self, job_platform, chunk):
else:
hetsize = 1
if type(self.nodes) is list:
hetsize = max(hetsize,len(self.nodes))
hetsize = max(hetsize, len(self.nodes))
self.het['HETSIZE'] = hetsize
self.het['PROCESSORS'] = list()
self.het['NODES'] = list()
Expand Down Expand Up @@ -1957,7 +1958,7 @@ def update_platform_associated_parameters(self, as_conf: AutosubmitConfig, param

return parameters

def update_wrapper_parameters(self,as_conf: AutosubmitConfig, parameters: dict) -> dict:
def update_wrapper_parameters(self, as_conf: AutosubmitConfig, parameters: dict) -> dict:
wrappers = as_conf.experiment_data.get("WRAPPERS", {})
if len(wrappers) > 0:
parameters['WRAPPER'] = as_conf.get_wrapper_type()
Expand Down Expand Up @@ -2244,8 +2245,7 @@ def reset_logs(self, as_conf: AutosubmitConfig) -> None:
self.workflow_commit = as_conf.experiment_data.get("AUTOSUBMIT", {}).get("WORKFLOW_COMMIT", "")

def update_placeholders(self, as_conf: AutosubmitConfig, parameters: dict, replace_by_empty=False) -> dict:
"""
Find and substitute dynamic placeholders in `parameters` using the provided
"""Find and substitute dynamic placeholders in `parameters` using the provided
Autosubmit configuration helpers.

:param as_conf: Autosubmit configuration object.
Expand All @@ -2257,12 +2257,25 @@ def update_placeholders(self, as_conf: AutosubmitConfig, parameters: dict, repla
:return: Parameters with placeholders substituted.
:rtype: dict
"""

as_conf.deep_read_loops(parameters)
as_conf.substitute_dynamic_variables(parameters)
if replace_by_empty and as_conf.dynamic_variables:
for var in as_conf.dynamic_variables.keys():
parameters[var] = ""
as_conf.dynamic_variables = {}
# At this point, the ^ and not ^ is the same
for key, value in as_conf.special_dynamic_variables.items():
if isinstance(value, str):
as_conf.dynamic_variables[key] = value.replace('^', '')
parameters[key] = as_conf.dynamic_variables[key]
elif isinstance(value, list):
as_conf.dynamic_variables[key] = [v.replace('^', '') if isinstance(v, str) else v for v in value]
parameters[key] = as_conf.dynamic_variables[key]
as_conf.special_dynamic_variables = dict()

as_conf.substitute_dynamic_variables(parameters, in_the_end=False)

# Only replace CURRENT_ placeholders when requested and dynamic_variables exists.
if replace_by_empty:
for key in as_conf.dynamic_variables.keys():
parameters[key] = ""
as_conf.dynamic_variables = dict()

return parameters

Expand Down Expand Up @@ -2397,7 +2410,7 @@ def queuing_reason_cancel(self, reason):
'QOSMaxMemoryPerNode', 'QOSMaxMemoryMinutesPerJob', 'QOSMaxNodeMinutesPerJob',
'InactiveLimit', 'JobLaunchFailure', 'NonZeroExitCode', 'PartitionNodeLimit',
'PartitionTimeLimit', 'SystemFailure', 'TimeLimit', 'QOSUsageThreshold',
'QOSTimeLimit','QOSResourceLimit','QOSJobLimit','InvalidQOS','InvalidAccount']:
'QOSTimeLimit', 'QOSResourceLimit', 'QOSJobLimit', 'InvalidQOS', 'InvalidAccount']:
return True
return False
except Exception:
Expand Down Expand Up @@ -2510,16 +2523,15 @@ def construct_real_additional_file_name(self, file_name: str) -> str:
real_name = real_name.replace(f"{self.expid}_", "")
return real_name


def create_wrapped_script(self, as_conf: AutosubmitConfig, wrapper_tag='wrapped') -> str:
parameters = self.update_parameters(as_conf, set_attributes=False)
template_content = self.get_wrapped_content(as_conf, parameters)
for key, value in parameters.items():
template_content = re.sub(
'%(?<!%%)' + key + '%(?!%%)', str(parameters[key]), template_content,flags=re.I)
'%(?<!%%)' + key + '%(?!%%)', str(parameters[key]), template_content, flags=re.I)
for variable in self.undefined_variables:
template_content = re.sub(
'%(?<!%%)' + variable + '%(?!%%)', '', template_content,flags=re.I)
'%(?<!%%)' + variable + '%(?!%%)', '', template_content, flags=re.I)
template_content = template_content.replace("%%", "%")
script_name = '{0}.{1}.cmd'.format(self.name, wrapper_tag)
open(os.path.join(self._tmp_path, script_name),
Expand All @@ -2536,11 +2548,11 @@ def check_script(self, as_conf: AutosubmitConfig, show_logs="false") -> bool:
"""
parameters = self.update_parameters(as_conf, set_attributes=False)
template_content, additional_templates = self.update_content(as_conf, parameters)
variables = re.findall('%(?<!%%)[a-zA-Z0-9_.-]+%(?!%%)', template_content,flags=re.IGNORECASE)
variables = re.findall('%(?<!%%)[a-zA-Z0-9_.-]+%(?!%%)', template_content, flags=re.IGNORECASE)
variables = [variable[1:-1] for variable in variables]
variables = [variable for variable in variables if variable not in as_conf.default_parameters]
for template in additional_templates:
variables_tmp = re.findall('%(?<!%%)[a-zA-Z0-9_.-]+%(?!%%)', template,flags=re.IGNORECASE)
variables_tmp = re.findall('%(?<!%%)[a-zA-Z0-9_.-]+%(?!%%)', template, flags=re.IGNORECASE)
variables_tmp = [variable[1:-1] for variable in variables_tmp]
variables_tmp = [variable for variable in variables_tmp if variable not in as_conf.default_parameters]
variables.extend(variables_tmp)
Expand All @@ -2552,7 +2564,7 @@ def check_script(self, as_conf: AutosubmitConfig, show_logs="false") -> bool:
if str(show_logs).lower() != "false":
Log.printlog("The following set of variables to be substituted in template script is not part "
"of parameters set, and will be replaced by a blank value: {0}".format(
self.undefined_variables), 5013)
self.undefined_variables), 5013)
if not set(variables).issuperset(set(parameters)):
Log.printlog(f"The following set of variables are not being used in the templates: {str(set(parameters) - set(variables))}", 5013)

Expand All @@ -2567,7 +2579,7 @@ def update_local_logs(self, count: int = -1, update_submit_time: bool = True) ->
else:
self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out",
f"{self.name}.{self.submit_time_timestamp}.err")

def check_compressed_local_logs(self) -> None:
"""
Checks if the current local log files are compressed versions (.gz or .xz)
Expand Down Expand Up @@ -2611,14 +2623,14 @@ def write_submit_time(self) -> None:
children=self.children_names_str, workflow_commit=self.workflow_commit)

def update_start_time(self, count=-1):
start_time_ = self.check_start_time(count) # last known start time from the .cmd file
start_time_ = self.check_start_time(count) # last known start time from the .cmd file
if start_time_:
self.start_time_timestamp = start_time_
else:
Log.warning(f"Start time for job {self.name} not found in the .cmd file, using last known time.")
self.start_time_timestamp = self.start_time_timestamp if self.start_time_timestamp else time.time()
if count > 0 or self.wrapper_name in self.platform.processed_wrapper_logs:
self.submit_time_timestamp = date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp),'S')
self.submit_time_timestamp = date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S')

def fix_local_logs_timestamps(self, current_timestamp: str, new_timestamp: str) -> None:
"""
Expand Down Expand Up @@ -2660,12 +2672,13 @@ def write_start_time(self, count=-1, vertical_wrapper=False):
f.write(date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S'))
# Writing database
exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), qos=self.queue, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name),
children=self.children_names_str)
exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), qos=self.queue, job_id=self.id,
wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name),
children=self.children_names_str)
return True

def write_vertical_time(
self, count: int = -1, first_submit_timestamp: str = ''
self, count: int = -1, first_submit_timestamp: str = ''
) -> None:
self.update_start_time(count=count)
self.update_local_logs(update_submit_time=False, count=count)
Expand Down Expand Up @@ -2707,7 +2720,9 @@ def write_end_time(self, completed, count=-1):

# Launch second as threaded function only for slurm
if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm":
thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform))
thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR,
historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish,
args=(job_data_dc, self.platform))
thread_write_finish.name = "JOB_data_{}".format(self.name)
thread_write_finish.start()

Expand Down Expand Up @@ -2761,7 +2776,7 @@ def is_ancestor(self, job):
return False

def synchronize_logs(
self, platform: "Platform", remote_logs, local_logs, last=True
self, platform: "Platform", remote_logs, local_logs, last=True
):
platform.move_file(remote_logs[0], local_logs[0], True) # .out
platform.move_file(remote_logs[1], local_logs[1], True) # .err
Expand Down Expand Up @@ -2831,16 +2846,16 @@ class WrapperJob(Job):
"""

def __init__(
self,
name: str,
job_id: int,
status: str,
priority: int,
job_list: List[Job],
total_wallclock: str,
platform: 'ParamikoPlatform',
as_config: AutosubmitConfig,
hold: bool,
self,
name: str,
job_id: int,
status: str,
priority: int,
job_list: List[Job],
total_wallclock: str,
platform: 'ParamikoPlatform',
as_config: AutosubmitConfig,
hold: bool,
):
super(WrapperJob, self).__init__(name, job_id, status, priority)
self.failed = False
Expand Down Expand Up @@ -2871,7 +2886,7 @@ def _queuing_reason_cancel(self, reason: str) -> bool:
'QOSMaxMemoryPerNode', 'QOSMaxMemoryMinutesPerJob', 'QOSMaxNodeMinutesPerJob',
'InactiveLimit', 'JobLaunchFailure', 'NonZeroExitCode', 'PartitionNodeLimit',
'PartitionTimeLimit', 'SystemFailure', 'TimeLimit', 'QOSUsageThreshold',
'QOSTimeLimit','QOSResourceLimit','QOSJobLimit','InvalidQOS','InvalidAccount']:
'QOSTimeLimit', 'QOSResourceLimit', 'QOSJobLimit', 'InvalidQOS', 'InvalidAccount']:
return True
return False
except Exception:
Expand Down Expand Up @@ -2908,8 +2923,8 @@ def check_status(self, status: str) -> None:
# Fail can come from check function or running/completed checkers.
if self.status in [Status.FAILED, Status.UNKNOWN]:
self.status = Status.FAILED
if self.prev_status in [Status.SUBMITTED,Status.QUEUING]:
self.update_failed_jobs(True) # check false ready jobs
if self.prev_status in [Status.SUBMITTED, Status.QUEUING]:
self.update_failed_jobs(True) # check false ready jobs
elif self.prev_status in [Status.FAILED, Status.UNKNOWN]:
self.failed = True
self._check_running_jobs()
Expand Down Expand Up @@ -3100,7 +3115,7 @@ def _check_running_jobs(self) -> None:
job_name, str(parse_date(start_time))))
self.running_jobs_start[job] = start_time
job.new_status = Status.RUNNING
#job.status = Status.RUNNING
# job.status = Status.RUNNING
job.update_status(self.as_config)
if len(out) == 2:
Log.info("Job {0} is RUNNING".format(job_name))
Expand Down Expand Up @@ -3139,7 +3154,7 @@ def _check_finished_job(self, job: Job, failed_file: bool = False) -> None:
if output is None or len(output) == 0:
sleep(wait)
retries = retries - 1
if (output is not None and len(str(output)) > 0 ) or 'COMPLETED' in output:
if (output is not None and len(str(output)) > 0) or 'COMPLETED' in output:
job.new_status = Status.COMPLETED
else:
failed_file = True
Expand Down
8 changes: 8 additions & 0 deletions test/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1915,6 +1915,10 @@ def test_job_parameters_resolves_all_placeholders(autosubmit_config, monkeypatch
"WRAPPER_HAS_PRIO": "%CURRENT_NOT_EXISTENT_PLACEHOLDER%",
}
},
"TESTDATES": {
"START_DATE": "%CHUNK_START_DATE%",
"START_DATE_WITH_SPECIAL": "%^CHUNK_START_DATE%",
},
"PLATFORMS": {
"TEST_SLURM": {
"ADD_PROJECT_TO_HOST": False,
Expand Down Expand Up @@ -1994,6 +1998,10 @@ def test_job_parameters_resolves_all_placeholders(autosubmit_config, monkeypatch
assert parameters["CURRENT_JOB_HAS_PRIO"] == "whatever"
assert parameters["CURRENT_WRAPPER_HAS_PRIO"] == "whatever_from_wrapper"
assert parameters["CURRENT_PLATFORM_HAS_PRIO"] == "whatever_from_platform"
assert parameters["SDATE"] == "20200101"
assert parameters["TESTDATES.START_DATE"] == "20200101"
assert parameters["TESTDATES.START_DATE_WITH_SPECIAL"] == "20200101"
assert parameters["EXPERIMENT.DATELIST"] == 20200101


def test_process_scheduler_parameters(local):
Expand Down
Loading