diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e35516e0..19f817eb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,8 @@ platform does not support wrappers anymore (it was used for testing). - Fixes an issue with multi-day applications dependencies bug #2631 - Fixes an issue with all-filter #2565 - Fixes an issue when setting a dependency to a different date or member # 2466 ( #2518 partially) - +- Fixes an issue with some placeholders not being replaced in templates #2426 +- **Enhancements:** - autosubmit/autosubmit container now includes the `$USER` environment variable diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d0250d971..dbcc9de0b 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -273,10 +273,10 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da self.updated_log = False self._log_recovered = False self.log_recovered = False - self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time + self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time self.start_time_timestamp = None - self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial - self._script = None # Inline code to be executed + self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial + self._script = None # Inline code to be executed self._log_recovery_retries = None self.ready_date = None self.wrapper_name = None @@ -401,6 +401,7 @@ def x11(self, value): def x11_options(self): """Allows to set salloc parameters for x11""" return self._x11_options + @x11_options.setter def x11_options(self, value): self._x11_options = value @@ -669,11 +670,13 @@ def custom_directives(self): @custom_directives.setter def custom_directives(self, value): self._custom_directives = value + @property # type: ignore @autosubmit_parameter(name='splits') def splits(self): """Max number of splits.""" return self._splits + @splits.setter def splits(self, value): self._splits = value @@ -745,7 +748,7 @@ def read_header_tailer_script(self, script_path: str, as_conf: AutosubmitConfig, # TODO: Check why the fstring is not working for the text below raise AutosubmitCritical( "Extended {1} script: couldn't figure out script {0} type\n".format(script_name, - error_message_type), 7011) + error_message_type), 7011) if not found_hashbang: # TODO: Check why the fstring is not working for the text below @@ -1282,7 +1285,6 @@ def _sync_retrieve_logfiles(self): # Update local logs self.local_logs = remote_logs - def retrieve_external_retrials_logfiles(self): log_recovered = False self.remote_logs = self.get_new_remotelog_name() @@ -1644,7 +1646,6 @@ def update_current_parameters(self, as_conf: AutosubmitConfig, parameters: dict) return parameters - def process_scheduler_parameters(self, job_platform, chunk): """ Parsers yaml data stored in the dictionary and calculates the components of the heterogeneous job if any @@ -1656,7 +1657,7 @@ def process_scheduler_parameters(self, job_platform, chunk): else: hetsize = 1 if type(self.nodes) is list: - hetsize = max(hetsize,len(self.nodes)) + hetsize = max(hetsize, len(self.nodes)) self.het['HETSIZE'] = hetsize self.het['PROCESSORS'] = list() self.het['NODES'] = list() @@ -1957,7 +1958,7 @@ def update_platform_associated_parameters(self, as_conf: AutosubmitConfig, param return parameters - def update_wrapper_parameters(self,as_conf: AutosubmitConfig, parameters: dict) -> dict: + def update_wrapper_parameters(self, as_conf: AutosubmitConfig, parameters: dict) -> dict: wrappers = as_conf.experiment_data.get("WRAPPERS", {}) if len(wrappers) > 0: parameters['WRAPPER'] = as_conf.get_wrapper_type() @@ -2244,8 +2245,7 @@ def reset_logs(self, as_conf: AutosubmitConfig) -> None: self.workflow_commit = as_conf.experiment_data.get("AUTOSUBMIT", {}).get("WORKFLOW_COMMIT", "") def update_placeholders(self, as_conf: AutosubmitConfig, parameters: dict, replace_by_empty=False) -> dict: - """ - Find and substitute dynamic placeholders in `parameters` using the provided + """Find and substitute dynamic placeholders in `parameters` using the provided Autosubmit configuration helpers. :param as_conf: Autosubmit configuration object. @@ -2257,12 +2257,25 @@ def update_placeholders(self, as_conf: AutosubmitConfig, parameters: dict, repla :return: Parameters with placeholders substituted. :rtype: dict """ + as_conf.deep_read_loops(parameters) - as_conf.substitute_dynamic_variables(parameters) - if replace_by_empty and as_conf.dynamic_variables: - for var in as_conf.dynamic_variables.keys(): - parameters[var] = "" - as_conf.dynamic_variables = {} + # At this point, the ^ and not ^ is the same + for key, value in as_conf.special_dynamic_variables.items(): + if isinstance(value, str): + as_conf.dynamic_variables[key] = value.replace('^', '') + parameters[key] = as_conf.dynamic_variables[key] + elif isinstance(value, list): + as_conf.dynamic_variables[key] = [v.replace('^', '') if isinstance(v, str) else v for v in value] + parameters[key] = as_conf.dynamic_variables[key] + as_conf.special_dynamic_variables = dict() + + as_conf.substitute_dynamic_variables(parameters, in_the_end=False) + + # Only replace CURRENT_ placeholders when requested and dynamic_variables exists. + if replace_by_empty: + for key in as_conf.dynamic_variables.keys(): + parameters[key] = "" + as_conf.dynamic_variables = dict() return parameters @@ -2397,7 +2410,7 @@ def queuing_reason_cancel(self, reason): 'QOSMaxMemoryPerNode', 'QOSMaxMemoryMinutesPerJob', 'QOSMaxNodeMinutesPerJob', 'InactiveLimit', 'JobLaunchFailure', 'NonZeroExitCode', 'PartitionNodeLimit', 'PartitionTimeLimit', 'SystemFailure', 'TimeLimit', 'QOSUsageThreshold', - 'QOSTimeLimit','QOSResourceLimit','QOSJobLimit','InvalidQOS','InvalidAccount']: + 'QOSTimeLimit', 'QOSResourceLimit', 'QOSJobLimit', 'InvalidQOS', 'InvalidAccount']: return True return False except Exception: @@ -2510,16 +2523,15 @@ def construct_real_additional_file_name(self, file_name: str) -> str: real_name = real_name.replace(f"{self.expid}_", "") return real_name - def create_wrapped_script(self, as_conf: AutosubmitConfig, wrapper_tag='wrapped') -> str: parameters = self.update_parameters(as_conf, set_attributes=False) template_content = self.get_wrapped_content(as_conf, parameters) for key, value in parameters.items(): template_content = re.sub( - '%(? bool: """ parameters = self.update_parameters(as_conf, set_attributes=False) template_content, additional_templates = self.update_content(as_conf, parameters) - variables = re.findall('%(? bool: if str(show_logs).lower() != "false": Log.printlog("The following set of variables to be substituted in template script is not part " "of parameters set, and will be replaced by a blank value: {0}".format( - self.undefined_variables), 5013) + self.undefined_variables), 5013) if not set(variables).issuperset(set(parameters)): Log.printlog(f"The following set of variables are not being used in the templates: {str(set(parameters) - set(variables))}", 5013) @@ -2567,7 +2579,7 @@ def update_local_logs(self, count: int = -1, update_submit_time: bool = True) -> else: self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", f"{self.name}.{self.submit_time_timestamp}.err") - + def check_compressed_local_logs(self) -> None: """ Checks if the current local log files are compressed versions (.gz or .xz) @@ -2611,14 +2623,14 @@ def write_submit_time(self) -> None: children=self.children_names_str, workflow_commit=self.workflow_commit) def update_start_time(self, count=-1): - start_time_ = self.check_start_time(count) # last known start time from the .cmd file + start_time_ = self.check_start_time(count) # last known start time from the .cmd file if start_time_: self.start_time_timestamp = start_time_ else: Log.warning(f"Start time for job {self.name} not found in the .cmd file, using last known time.") self.start_time_timestamp = self.start_time_timestamp if self.start_time_timestamp else time.time() if count > 0 or self.wrapper_name in self.platform.processed_wrapper_logs: - self.submit_time_timestamp = date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp),'S') + self.submit_time_timestamp = date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S') def fix_local_logs_timestamps(self, current_timestamp: str, new_timestamp: str) -> None: """ @@ -2660,12 +2672,13 @@ def write_start_time(self, count=-1, vertical_wrapper=False): f.write(date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S')) # Writing database exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), qos=self.queue, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), - children=self.children_names_str) + exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), qos=self.queue, job_id=self.id, + wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + children=self.children_names_str) return True def write_vertical_time( - self, count: int = -1, first_submit_timestamp: str = '' + self, count: int = -1, first_submit_timestamp: str = '' ) -> None: self.update_start_time(count=count) self.update_local_logs(update_submit_time=False, count=count) @@ -2707,7 +2720,9 @@ def write_end_time(self, completed, count=-1): # Launch second as threaded function only for slurm if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm": - thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform)) + thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, + historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, + args=(job_data_dc, self.platform)) thread_write_finish.name = "JOB_data_{}".format(self.name) thread_write_finish.start() @@ -2761,7 +2776,7 @@ def is_ancestor(self, job): return False def synchronize_logs( - self, platform: "Platform", remote_logs, local_logs, last=True + self, platform: "Platform", remote_logs, local_logs, last=True ): platform.move_file(remote_logs[0], local_logs[0], True) # .out platform.move_file(remote_logs[1], local_logs[1], True) # .err @@ -2831,16 +2846,16 @@ class WrapperJob(Job): """ def __init__( - self, - name: str, - job_id: int, - status: str, - priority: int, - job_list: List[Job], - total_wallclock: str, - platform: 'ParamikoPlatform', - as_config: AutosubmitConfig, - hold: bool, + self, + name: str, + job_id: int, + status: str, + priority: int, + job_list: List[Job], + total_wallclock: str, + platform: 'ParamikoPlatform', + as_config: AutosubmitConfig, + hold: bool, ): super(WrapperJob, self).__init__(name, job_id, status, priority) self.failed = False @@ -2871,7 +2886,7 @@ def _queuing_reason_cancel(self, reason: str) -> bool: 'QOSMaxMemoryPerNode', 'QOSMaxMemoryMinutesPerJob', 'QOSMaxNodeMinutesPerJob', 'InactiveLimit', 'JobLaunchFailure', 'NonZeroExitCode', 'PartitionNodeLimit', 'PartitionTimeLimit', 'SystemFailure', 'TimeLimit', 'QOSUsageThreshold', - 'QOSTimeLimit','QOSResourceLimit','QOSJobLimit','InvalidQOS','InvalidAccount']: + 'QOSTimeLimit', 'QOSResourceLimit', 'QOSJobLimit', 'InvalidQOS', 'InvalidAccount']: return True return False except Exception: @@ -2908,8 +2923,8 @@ def check_status(self, status: str) -> None: # Fail can come from check function or running/completed checkers. if self.status in [Status.FAILED, Status.UNKNOWN]: self.status = Status.FAILED - if self.prev_status in [Status.SUBMITTED,Status.QUEUING]: - self.update_failed_jobs(True) # check false ready jobs + if self.prev_status in [Status.SUBMITTED, Status.QUEUING]: + self.update_failed_jobs(True) # check false ready jobs elif self.prev_status in [Status.FAILED, Status.UNKNOWN]: self.failed = True self._check_running_jobs() @@ -3100,7 +3115,7 @@ def _check_running_jobs(self) -> None: job_name, str(parse_date(start_time)))) self.running_jobs_start[job] = start_time job.new_status = Status.RUNNING - #job.status = Status.RUNNING + # job.status = Status.RUNNING job.update_status(self.as_config) if len(out) == 2: Log.info("Job {0} is RUNNING".format(job_name)) @@ -3139,7 +3154,7 @@ def _check_finished_job(self, job: Job, failed_file: bool = False) -> None: if output is None or len(output) == 0: sleep(wait) retries = retries - 1 - if (output is not None and len(str(output)) > 0 ) or 'COMPLETED' in output: + if (output is not None and len(str(output)) > 0) or 'COMPLETED' in output: job.new_status = Status.COMPLETED else: failed_file = True diff --git a/test/unit/test_job.py b/test/unit/test_job.py index eb64ff392..7b9c199c1 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -1915,6 +1915,10 @@ def test_job_parameters_resolves_all_placeholders(autosubmit_config, monkeypatch "WRAPPER_HAS_PRIO": "%CURRENT_NOT_EXISTENT_PLACEHOLDER%", } }, + "TESTDATES": { + "START_DATE": "%CHUNK_START_DATE%", + "START_DATE_WITH_SPECIAL": "%^CHUNK_START_DATE%", + }, "PLATFORMS": { "TEST_SLURM": { "ADD_PROJECT_TO_HOST": False, @@ -1994,6 +1998,10 @@ def test_job_parameters_resolves_all_placeholders(autosubmit_config, monkeypatch assert parameters["CURRENT_JOB_HAS_PRIO"] == "whatever" assert parameters["CURRENT_WRAPPER_HAS_PRIO"] == "whatever_from_wrapper" assert parameters["CURRENT_PLATFORM_HAS_PRIO"] == "whatever_from_platform" + assert parameters["SDATE"] == "20200101" + assert parameters["TESTDATES.START_DATE"] == "20200101" + assert parameters["TESTDATES.START_DATE_WITH_SPECIAL"] == "20200101" + assert parameters["EXPERIMENT.DATELIST"] == 20200101 def test_process_scheduler_parameters(local):