Skip to content

Commit 3fe790b

Browse files
committed
creating tests for the PBS platform and adjusting file position in the folder in order for them to make structural sense
1 parent 6e44e65 commit 3fe790b

File tree

12 files changed

+294
-114
lines changed

12 files changed

+294
-114
lines changed

autosubmit/autosubmit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2530,7 +2530,7 @@ def run_experiment(expid: str, start_time: Optional[str] = None, start_after: Op
25302530
Autosubmit.restore_platforms(platforms_to_test, as_conf=as_conf, expid=expid)
25312531
Log.info("Waiting for all logs to be updated")
25322532
for p in platforms_to_test:
2533-
if p.log_recovery_process:
2533+
if p.log_recovery_process and p.log_recovery_process.is_alive():
25342534
p.cleanup_event.set() # Send cleanup event
25352535
p.log_recovery_process.join()
25362536
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)

autosubmit/platforms/headers/pbs_header.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,6 @@ def get_custom_directives(self, job, parameters, het=-1):
5353
return '\n'.join(str(s) for s in parameters['CUSTOM_DIRECTIVES'])
5454
return ""
5555

56-
@staticmethod
57-
def get_partition_directive(job, parameters, het=-1):
58-
"""Returns partition directive for the specified job
59-
60-
:param parameters:
61-
:param job: job to create partition directive for
62-
:type job: Job
63-
:param het:
64-
:return: partition directive
65-
:rtype: str
66-
"""
67-
if job.partition != '':
68-
return f"PBS -q={parameters['CURRENT_QUEUE']}"
69-
return ""
70-
7156
# noinspection PyMethodMayBeStatic,PyUnusedLocal
7257
def get_account_directive(self, job, parameters, het=-1):
7358
"""Returns account directive for the specified job

autosubmit/platforms/pbsplatform.py

Lines changed: 51 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from contextlib import suppress
2323
from pathlib import Path
2424
from time import sleep
25-
from typing import Union, TYPE_CHECKING
25+
from typing import Union, TYPE_CHECKING, Optional
2626

2727
from autosubmit.config.configcommon import AutosubmitConfig
2828
from autosubmit.job.job_common import Status
@@ -93,57 +93,27 @@ def get_header(self, job: 'Job', parameters: dict) -> str:
9393
:param parameters: Parameters dictionary.
9494
:return: Job header.
9595
"""
96-
if not job.packed or str(job.wrapper_type).lower() != "vertical":
97-
out_filename = f"{job.name}.cmd.out.{job.fail_count}"
98-
err_filename = f"{job.name}.cmd.err.{job.fail_count}"
99-
else:
100-
out_filename = f"{job.name}.cmd.out"
101-
err_filename = f"{job.name}.cmd.err"
10296

10397
header = self.header.SERIAL
98+
header = header.replace('%OUT_LOG_DIRECTIVE%', f"{job.name}.cmd.out")
99+
header = header.replace('%ERR_LOG_DIRECTIVE%', f"{job.name}.cmd.err")
104100

105-
header = header.replace('%OUT_LOG_DIRECTIVE%', out_filename)
106-
header = header.replace('%ERR_LOG_DIRECTIVE%', err_filename)
107101
if job.het.get("HETSIZE", 0) <= 1:
108102
if hasattr(self.header, 'get_queue_directive'):
109103
header = header.replace(
110104
'%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job, parameters))
111-
if hasattr(self.header, 'get_proccesors_directive'):
112-
header = header.replace(
113-
'%NUMPROC_DIRECTIVE%', self.header.get_proccesors_directive(job, parameters))
114-
if hasattr(self.header, 'get_partition_directive'):
115-
header = header.replace(
116-
'%PARTITION_DIRECTIVE%', self.header.get_partition_directive(job, parameters))
117105
if hasattr(self.header, 'get_tasks_per_node'):
118106
header = header.replace(
119107
'%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job, parameters))
120108
if hasattr(self.header, 'get_threads_per_task'):
121109
header = header.replace(
122110
'%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job, parameters))
123-
if job.x11:
124-
header = header.replace(
125-
'%X11%', "SBATCH --x11=batch")
126-
else:
127-
header = header.replace(
128-
'%X11%', "")
129-
if hasattr(self.header, 'get_scratch_free_space'):
130-
header = header.replace(
131-
'%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job, parameters))
132111
if hasattr(self.header, 'get_custom_directives'):
133112
header = header.replace(
134113
'%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job, parameters))
135-
if hasattr(self.header, 'get_exclusive_directive'):
136-
header = header.replace(
137-
'%EXCLUSIVE_DIRECTIVE%', self.header.get_exclusive_directive(job, parameters))
138-
if hasattr(self.header, 'get_select_directive'):
139-
header = header.replace(
140-
'%SELECT_DIRECTIVE%', self.header.get_select_directive(job, parameters))
141114
if hasattr(self.header, 'get_account_directive'):
142115
header = header.replace(
143116
'%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job, parameters))
144-
if hasattr(self.header, 'get_shape_directive'):
145-
header = header.replace(
146-
'%SHAPE_DIRECTIVE%', self.header.get_shape_directive(job, parameters))
147117
if hasattr(self.header, 'get_nodes_directive'):
148118
header = header.replace(
149119
'%NODES_DIRECTIVE%', self.header.get_nodes_directive(job, parameters))
@@ -156,27 +126,8 @@ def get_header(self, job: 'Job', parameters: dict) -> str:
156126
if hasattr(self.header, 'get_memory_per_task_directive'):
157127
header = header.replace(
158128
'%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job, parameters))
159-
if hasattr(self.header, 'get_hyperthreading_directive'):
160-
header = header.replace(
161-
'%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job, parameters))
162129
return header
163130

164-
def get_submit_cmd_x11(self, args: str, script_name: str) -> str:
165-
"""
166-
Returns the submit command for the platform.
167-
168-
:param args: Arguments to be used in the construction of the submit command.
169-
:type args: str
170-
:param script_name: Name of the file to be referenced.
171-
:type script_name: str
172-
173-
:return: Command PBS to allocate jobs
174-
:rtype: str
175-
"""
176-
cmd = f'qsub {args} {self._submit_cmd_x11}/{script_name}'
177-
Log.debug(f"qsub command: {cmd}")
178-
return cmd
179-
180131
def generate_new_name_submit_script_file(self) -> None:
181132
"""
182133
Delete the current file and generates a new one with a new name.
@@ -209,7 +160,7 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
209160
duplicated_jobs_already_checked = False
210161
platform = valid_packages_to_submit[0].jobs[0].platform
211162
try:
212-
jobs_id = self.submit_script(hold=hold)
163+
jobs_id = self.submit_script()
213164
except AutosubmitError as e:
214165
job_names = []
215166
duplicated_jobs_already_checked = True
@@ -221,9 +172,9 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
221172
job_names.append(package_.jobs[0].name) # job_name
222173
Log.error(f'TRACE:{e.trace}\n{e.message} JOBS:{job_names}')
223174
for job_name in job_names:
224-
jobid = self.get_jobid_by_jobname(job_name)
225-
# cancel bad submitted job if jobid is encountered
226-
for id_ in jobid:
175+
jobs_id = self.get_jobs_id_by_job_name(job_name)
176+
# cancel bad submitted job if jobs_id is encountered
177+
for id_ in jobs_id:
227178
self.send_command(self.cancel_job(id_))
228179
jobs_id = None
229180
self.connected = False
@@ -271,9 +222,9 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
271222
6015, f"Jobs_id {jobs_id}")
272223
if hold:
273224
sleep(10)
274-
jobid_index = 0
225+
jobs_id_index = 0
275226
for package in valid_packages_to_submit:
276-
current_package_id = str(jobs_id[jobid_index])
227+
current_package_id = str(jobs_id[jobs_id_index])
277228
if hold:
278229
retries = 5
279230
package.jobs[0].id = current_package_id
@@ -285,7 +236,7 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
285236
retries = retries - 1
286237
package.jobs[0].platform.send_command(
287238
package.jobs[0].platform.cancel_cmd + f" {current_package_id}")
288-
jobid_index += 1
239+
jobs_id_index += 1
289240
continue
290241
except Exception:
291242
failed_packages.append(current_package_id)
@@ -294,18 +245,18 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
294245
# Check if there are duplicated job_name
295246
if not duplicated_jobs_already_checked:
296247
job_name = package.name if hasattr(package, "name") else package.jobs[0].name
297-
jobid = self.get_jobid_by_jobname(job_name)
298-
if len(jobid) > 1: # Cancel each job that is not the associated
248+
jobs_id = self.get_jobs_id_by_job_name(job_name)
249+
if len(jobs_id) > 1: # Cancel each job that is not the associated
299250
ids_to_check = [package.jobs[0].id]
300251
if package.jobs[0].het:
301252
for i in range(1, package.jobs[0].het.get("HETSIZE", 1)): # noqa
302253
ids_to_check.append(str(int(ids_to_check[0]) + i))
303254
# TODO to optimize cancel all jobs at once
304-
for id_ in [jobid for jobid in jobid if jobid not in ids_to_check]:
255+
for id_ in [jobs_id for jobs_id in jobs_id if jobs_id not in ids_to_check]:
305256
self.send_command(self.cancel_job(id_))
306257
Log.debug(f'Job {id_} with the assigned name: {job_name} has been cancelled')
307258
Log.debug(f'Job {package.jobs[0].id} with the assigned name: {job_name} has been submitted')
308-
jobid_index += 1
259+
jobs_id_index += 1
309260
if len(failed_packages) > 0:
310261
for job_id in failed_packages:
311262
platform.send_command(platform.cancel_cmd + f" {job_id}")
@@ -341,7 +292,7 @@ def get_submit_script(self) -> str:
341292
self._submit_script_path.chmod(mode=0o750)
342293
return str(self._submit_script_path)
343294

344-
def submit_job(self, job: 'Job', script_name: str, hold: bool = False, export: str = "none") -> None:
295+
def submit_job(self, job: 'Job', script_name: str, hold: bool = False, export: str = "none") -> Optional[int]:
345296
"""
346297
Submit a job from a given job object.
347298
@@ -357,23 +308,36 @@ def submit_job(self, job: 'Job', script_name: str, hold: bool = False, export: s
357308
:return: job id for the submitted job.
358309
:rtype: int
359310
"""
360-
self.get_submit_cmd(script_name, job, hold=hold, export=export)
311+
if job is None or not job:
312+
x11 = False
313+
else:
314+
x11 = job.x11
315+
if not x11:
316+
self.get_submit_cmd(script_name, job, hold=hold, export=export)
317+
return None
318+
cmd = self.get_submit_cmd(script_name, job, hold=hold, export=export)
319+
if cmd is None:
320+
return None
321+
if self.send_command(cmd, x11=x11):
322+
job_id = self.get_submitted_job_id(self.get_ssh_output(), x11=x11)
323+
if job:
324+
Log.result(f"Job: {job.name} submitted with job_id: {str(job_id).strip()} and workflow commit: "
325+
f"{job.workflow_commit}")
326+
return int(job_id)
361327
return None
362328

363-
def submit_script(self, hold: bool = False) -> Union[list[int], int]:
329+
def submit_script(self) -> Union[list[int], int]:
364330
"""
365331
Sends a Submit file Script with sbatch instructions, execute it in the platform and
366332
retrieves the Jobs_ID of all jobs at once.
367333
368-
:param hold: Submit a job in held status. Held jobs will only earn priority status if the
369-
remote machine allows it.
370-
:type hold: bool
371334
:return: job id for submitted jobs.
372335
:rtype: Union[List[int], int]
373336
"""
374337
try:
375338
self.send_file(self.get_submit_script(), False)
376-
cmd = os.path.join(self.get_files_path(), self._submit_script_path.name)
339+
cmd = os.path.join(self.get_files_path(),
340+
os.path.basename(self._submit_script_path))
377341
# remove file after submission
378342
cmd = f"{cmd} ; rm {cmd}"
379343
try:
@@ -526,25 +490,18 @@ def get_submit_cmd(self, job_script: str, job, hold: bool = False, export: str =
526490
export = ""
527491
else:
528492
export += " ; "
529-
if job is None or not job:
530-
x11 = False
531-
else:
532-
x11 = job.x11
533493

534-
if not x11:
535-
with suppress(Exception):
536-
lang = locale.getlocale()[1]
494+
with suppress(Exception):
495+
lang = locale.getlocale()[1]
496+
if lang is None:
497+
lang = locale.getdefaultlocale()[1]
537498
if lang is None:
538-
lang = locale.getdefaultlocale()[1]
539-
if lang is None:
540-
lang = 'UTF-8'
541-
with open(self._submit_script_path, "ab") as submit_script_file:
542-
if not hold:
543-
submit_script_file.write((export + self._submit_cmd + job_script + "\n").encode(lang))
544-
else:
545-
submit_script_file.write((export + self._submit_hold_cmd + job_script + "\n").encode(lang))
546-
else:
547-
return export + self.get_submit_cmd_x11(job.x11_options.strip(""), job_script.strip(""))
499+
lang = 'UTF-8'
500+
with open(self._submit_script_path, "ab") as submit_script_file:
501+
if not hold:
502+
submit_script_file.write((export + self._submit_cmd + job_script + "\n").encode(lang))
503+
else:
504+
submit_script_file.write((export + self._submit_hold_cmd + job_script + "\n").encode(lang))
548505

549506
def get_check_job_cmd(self, job_id: str) -> list[str]: # noqa
550507
"""
@@ -572,7 +529,7 @@ def get_check_all_jobs_cmd(self, jobs_id: str) -> str: # noqa
572529
jobs_id = jobs_id.replace('{', '').replace('}', '').replace(',', ' ')
573530
return f"qstat {jobs_id} | awk" + " '{print $1, $3}' &&" + f"qstat -H {jobs_id} | awk" + " '{print $1, $3}'"
574531

575-
def get_jobid_by_jobname(self, job_name, retries=2):
532+
def get_jobs_id_by_job_name(self, job_name, retries=2):
576533
"""
577534
Get job id by job name
578535
@@ -583,7 +540,7 @@ def get_jobid_by_jobname(self, job_name, retries=2):
583540
"""
584541
# sleep(5)
585542
job_ids = ""
586-
cmd = self.get_jobid_by_jobname_cmd(job_name)
543+
cmd = self.get_jobs_id_by_job_name_cmd(job_name)
587544
self.send_command(cmd)
588545
job_id_name = self.get_ssh_output()
589546
while len(job_id_name) <= 0 < retries:
@@ -624,7 +581,7 @@ def get_queue_status_cmd(self, job_id: str) -> str:
624581
job_id = job_id.replace('{', '').replace('}', '').replace(',', ' ')
625582
return f"qstat {job_id} && echo \"BREAK\" && " + f"qstat -H {job_id}"
626583

627-
def get_jobid_by_jobname_cmd(self, job_name: str) -> str: # noqa
584+
def get_jobs_id_by_job_name_cmd(self, job_name: str) -> str: # noqa
628585
"""
629586
Looks for a job based on its name.
630587
@@ -667,22 +624,22 @@ def parse_queue_reason(self, output: str, job_id: str) -> str:
667624
return ''.join(reason)
668625
return reason # noqa F501
669626

670-
def get_queue_status(self, in_queue_jobs: list['Job'], list_queue_jobid: str, as_conf: AutosubmitConfig) -> None:
627+
def get_queue_status(self, in_queue_jobs: list['Job'], list_queue_jobs_id: str, as_conf: AutosubmitConfig) -> None:
671628
"""
672629
get_queue_status
673630
674631
:param in_queue_jobs: List of Job.
675632
:type in_queue_jobs: list[Job]
676-
:param list_queue_jobid: List of Job IDs concatenated.
677-
:type list_queue_jobid: str
633+
:param list_queue_jobs_id: List of Job IDs concatenated.
634+
:type list_queue_jobs_id: str
678635
:param as_conf: experiment configuration.
679636
:type as_conf: autosubmit.config.AutosubmitConfig
680637
681638
:rtype:None
682639
"""
683640
if not in_queue_jobs:
684641
return
685-
cmd = self.get_queue_status_cmd(list_queue_jobid)
642+
cmd = self.get_queue_status_cmd(list_queue_jobs_id)
686643
self.send_command(cmd)
687644
queue_status = self._ssh_output
688645
for job in in_queue_jobs:

autosubmit/platforms/platform.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -858,9 +858,9 @@ def clean_log_recovery_process(self) -> None:
858858
This method sets the cleanup event to signal the log recovery process to finish,
859859
waits for the process to join with a timeout, and then resets all related variables.
860860
"""
861-
if self.cleanup_event is not None:
861+
if self.cleanup_event:
862862
self.cleanup_event.set() # Indicates to old child ( if reachable ) to finish.
863-
if self.log_recovery_process is not None:
863+
if self.log_recovery_process and self.log_recovery_process.is_alive():
864864
# Waits for old child ( if reachable ) to finish. Timeout in case of it being blocked.
865865
self.log_recovery_process.join(timeout=60)
866866
# Resets everything related to the log recovery process.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)