Skip to content

Commit d6340c3

Browse files
committed
creating tests for the PBS platform and adjusting file position in the folder in order for them to make structural sense
1 parent 6e44e65 commit d6340c3

File tree

11 files changed

+225
-26
lines changed

11 files changed

+225
-26
lines changed

autosubmit/autosubmit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2530,7 +2530,7 @@ def run_experiment(expid: str, start_time: Optional[str] = None, start_after: Op
25302530
Autosubmit.restore_platforms(platforms_to_test, as_conf=as_conf, expid=expid)
25312531
Log.info("Waiting for all logs to be updated")
25322532
for p in platforms_to_test:
2533-
if p.log_recovery_process:
2533+
if p.log_recovery_process and p.log_recovery_process.is_alive():
25342534
p.cleanup_event.set() # Send cleanup event
25352535
p.log_recovery_process.join()
25362536
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)

autosubmit/platforms/pbsplatform.py

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from contextlib import suppress
2323
from pathlib import Path
2424
from time import sleep
25-
from typing import Union, TYPE_CHECKING
25+
from typing import Union, TYPE_CHECKING, Optional
2626

2727
from autosubmit.config.configcommon import AutosubmitConfig
2828
from autosubmit.job.job_common import Status
@@ -221,9 +221,9 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
221221
job_names.append(package_.jobs[0].name) # job_name
222222
Log.error(f'TRACE:{e.trace}\n{e.message} JOBS:{job_names}')
223223
for job_name in job_names:
224-
jobid = self.get_jobid_by_jobname(job_name)
225-
# cancel bad submitted job if jobid is encountered
226-
for id_ in jobid:
224+
jobs_id = self.get_jobs_id_by_job_name(job_name)
225+
# cancel bad submitted job if jobs_id is encountered
226+
for id_ in jobs_id:
227227
self.send_command(self.cancel_job(id_))
228228
jobs_id = None
229229
self.connected = False
@@ -271,9 +271,9 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
271271
6015, f"Jobs_id {jobs_id}")
272272
if hold:
273273
sleep(10)
274-
jobid_index = 0
274+
jobs_id_index = 0
275275
for package in valid_packages_to_submit:
276-
current_package_id = str(jobs_id[jobid_index])
276+
current_package_id = str(jobs_id[jobs_id_index])
277277
if hold:
278278
retries = 5
279279
package.jobs[0].id = current_package_id
@@ -285,7 +285,7 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
285285
retries = retries - 1
286286
package.jobs[0].platform.send_command(
287287
package.jobs[0].platform.cancel_cmd + f" {current_package_id}")
288-
jobid_index += 1
288+
jobs_id_index += 1
289289
continue
290290
except Exception:
291291
failed_packages.append(current_package_id)
@@ -294,18 +294,18 @@ def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages: li
294294
# Check if there are duplicated job_name
295295
if not duplicated_jobs_already_checked:
296296
job_name = package.name if hasattr(package, "name") else package.jobs[0].name
297-
jobid = self.get_jobid_by_jobname(job_name)
298-
if len(jobid) > 1: # Cancel each job that is not the associated
297+
jobs_id = self.get_jobs_id_by_job_name(job_name)
298+
if len(jobs_id) > 1: # Cancel each job that is not the associated
299299
ids_to_check = [package.jobs[0].id]
300300
if package.jobs[0].het:
301301
for i in range(1, package.jobs[0].het.get("HETSIZE", 1)): # noqa
302302
ids_to_check.append(str(int(ids_to_check[0]) + i))
303303
# TODO to optimize cancel all jobs at once
304-
for id_ in [jobid for jobid in jobid if jobid not in ids_to_check]:
304+
for id_ in [jobs_id for jobs_id in jobs_id if jobs_id not in ids_to_check]:
305305
self.send_command(self.cancel_job(id_))
306306
Log.debug(f'Job {id_} with the assigned name: {job_name} has been cancelled')
307307
Log.debug(f'Job {package.jobs[0].id} with the assigned name: {job_name} has been submitted')
308-
jobid_index += 1
308+
jobs_id_index += 1
309309
if len(failed_packages) > 0:
310310
for job_id in failed_packages:
311311
platform.send_command(platform.cancel_cmd + f" {job_id}")
@@ -341,7 +341,7 @@ def get_submit_script(self) -> str:
341341
self._submit_script_path.chmod(mode=0o750)
342342
return str(self._submit_script_path)
343343

344-
def submit_job(self, job: 'Job', script_name: str, hold: bool = False, export: str = "none") -> None:
344+
def submit_job(self, job: 'Job', script_name: str, hold: bool = False, export: str = "none") -> Optional[int]:
345345
"""
346346
Submit a job from a given job object.
347347
@@ -357,7 +357,22 @@ def submit_job(self, job: 'Job', script_name: str, hold: bool = False, export: s
357357
:return: job id for the submitted job.
358358
:rtype: int
359359
"""
360-
self.get_submit_cmd(script_name, job, hold=hold, export=export)
360+
if job is None or not job:
361+
x11 = False
362+
else:
363+
x11 = job.x11
364+
if not x11:
365+
self.get_submit_cmd(script_name, job, hold=hold, export=export)
366+
return None
367+
cmd = self.get_submit_cmd(script_name, job, hold=hold, export=export)
368+
if cmd is None:
369+
return None
370+
if self.send_command(cmd, x11=x11):
371+
job_id = self.get_submitted_job_id(self.get_ssh_output(), x11=x11)
372+
if job:
373+
Log.result(f"Job: {job.name} submitted with job_id: {str(job_id).strip()} and workflow commit: "
374+
f"{job.workflow_commit}")
375+
return int(job_id)
361376
return None
362377

363378
def submit_script(self, hold: bool = False) -> Union[list[int], int]:
@@ -373,7 +388,8 @@ def submit_script(self, hold: bool = False) -> Union[list[int], int]:
373388
"""
374389
try:
375390
self.send_file(self.get_submit_script(), False)
376-
cmd = os.path.join(self.get_files_path(), self._submit_script_path.name)
391+
cmd = os.path.join(self.get_files_path(),
392+
os.path.basename(self._submit_script_path))
377393
# remove file after submission
378394
cmd = f"{cmd} ; rm {cmd}"
379395
try:
@@ -572,7 +588,7 @@ def get_check_all_jobs_cmd(self, jobs_id: str) -> str: # noqa
572588
jobs_id = jobs_id.replace('{', '').replace('}', '').replace(',', ' ')
573589
return f"qstat {jobs_id} | awk" + " '{print $1, $3}' &&" + f"qstat -H {jobs_id} | awk" + " '{print $1, $3}'"
574590

575-
def get_jobid_by_jobname(self, job_name, retries=2):
591+
def get_jobs_id_by_job_name(self, job_name, retries=2):
576592
"""
577593
Get job id by job name
578594
@@ -583,7 +599,7 @@ def get_jobid_by_jobname(self, job_name, retries=2):
583599
"""
584600
# sleep(5)
585601
job_ids = ""
586-
cmd = self.get_jobid_by_jobname_cmd(job_name)
602+
cmd = self.get_jobs_id_by_job_name_cmd(job_name)
587603
self.send_command(cmd)
588604
job_id_name = self.get_ssh_output()
589605
while len(job_id_name) <= 0 < retries:
@@ -624,7 +640,7 @@ def get_queue_status_cmd(self, job_id: str) -> str:
624640
job_id = job_id.replace('{', '').replace('}', '').replace(',', ' ')
625641
return f"qstat {job_id} && echo \"BREAK\" && " + f"qstat -H {job_id}"
626642

627-
def get_jobid_by_jobname_cmd(self, job_name: str) -> str: # noqa
643+
def get_jobs_id_by_job_name_cmd(self, job_name: str) -> str: # noqa
628644
"""
629645
Looks for a job based on its name.
630646
@@ -667,22 +683,22 @@ def parse_queue_reason(self, output: str, job_id: str) -> str:
667683
return ''.join(reason)
668684
return reason # noqa F501
669685

670-
def get_queue_status(self, in_queue_jobs: list['Job'], list_queue_jobid: str, as_conf: AutosubmitConfig) -> None:
686+
def get_queue_status(self, in_queue_jobs: list['Job'], list_queue_jobs_id: str, as_conf: AutosubmitConfig) -> None:
671687
"""
672688
get_queue_status
673689
674690
:param in_queue_jobs: List of Job.
675691
:type in_queue_jobs: list[Job]
676-
:param list_queue_jobid: List of Job IDs concatenated.
677-
:type list_queue_jobid: str
692+
:param list_queue_jobs_id: List of Job IDs concatenated.
693+
:type list_queue_jobs_id: str
678694
:param as_conf: experiment configuration.
679695
:type as_conf: autosubmit.config.AutosubmitConfig
680696
681697
:rtype:None
682698
"""
683699
if not in_queue_jobs:
684700
return
685-
cmd = self.get_queue_status_cmd(list_queue_jobid)
701+
cmd = self.get_queue_status_cmd(list_queue_jobs_id)
686702
self.send_command(cmd)
687703
queue_status = self._ssh_output
688704
for job in in_queue_jobs:

autosubmit/platforms/platform.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -858,9 +858,9 @@ def clean_log_recovery_process(self) -> None:
858858
This method sets the cleanup event to signal the log recovery process to finish,
859859
waits for the process to join with a timeout, and then resets all related variables.
860860
"""
861-
if self.cleanup_event is not None:
861+
if self.cleanup_event:
862862
self.cleanup_event.set() # Indicates to old child ( if reachable ) to finish.
863-
if self.log_recovery_process is not None:
863+
if self.log_recovery_process and self.log_recovery_process.is_alive():
864864
# Waits for old child ( if reachable ) to finish. Timeout in case of it being blocked.
865865
self.log_recovery_process.join(timeout=60)
866866
# Resets everything related to the log recovery process.
File renamed without changes.
File renamed without changes.
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
# Copyright 2015-2025 Earth Sciences Department, BSC-CNS
2+
#
3+
# This file is part of Autosubmit.
4+
#
5+
# Autosubmit is free software: you can redistribute it and/or modify
6+
# it under the terms of the GNU General Public License as published by
7+
# the Free Software Foundation, either version 3 of the License, or
8+
# (at your option) any later version.
9+
#
10+
# Autosubmit is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
# GNU General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU General Public License
16+
# along with Autosubmit. If not, see <http://www.gnu.org/licenses/>.
17+
18+
19+
from pathlib import Path
20+
21+
import pytest
22+
23+
from autosubmit.job.job import Job
24+
from autosubmit.job.job_common import Status
25+
from autosubmit.job.job_packages import JobPackageSimple
26+
from autosubmit.log.log import AutosubmitCritical, AutosubmitError
27+
from autosubmit.platforms.pbsplatform import PBSPlatform
28+
29+
"""Unit tests for the PBS platform."""
30+
31+
32+
@pytest.fixture
33+
def platform(autosubmit_config):
34+
expid = 'a000'
35+
as_conf = autosubmit_config(expid, experiment_data={})
36+
exp_path = Path(as_conf.basic_config.LOCAL_ROOT_DIR, expid)
37+
aslogs_dir = exp_path / as_conf.basic_config.LOCAL_TMP_DIR / as_conf.basic_config.LOCAL_ASLOG_DIR
38+
submit_platform_script = aslogs_dir / 'submit_local.sh'
39+
Path(submit_platform_script).touch()
40+
return PBSPlatform(expid='a000', name='local', config=as_conf.experiment_data)
41+
42+
43+
def test_properties(platform):
44+
props = {
45+
'name': 'foo',
46+
'host': 'localhost1',
47+
'user': 'sam',
48+
'project': 'proj1',
49+
'budget': 100,
50+
'reservation': 1,
51+
'exclusivity': True,
52+
'hyperthreading': True,
53+
'type': 'SuperPBS',
54+
'scratch': '/scratch/1',
55+
'project_dir': '/proj1',
56+
'root_dir': '/root_1',
57+
'partition': 'inter',
58+
'queue': 'prio1'
59+
}
60+
for prop, value in props.items():
61+
setattr(platform, prop, value)
62+
for prop, value in props.items():
63+
assert value == getattr(platform, prop)
64+
65+
66+
def test_pbs_platform_submit_script_raises_autosubmit_critical_with_trace(mocker, platform):
67+
package = mocker.MagicMock()
68+
package.jobs.return_value = []
69+
valid_packages_to_submit = [
70+
package
71+
]
72+
73+
ae = AutosubmitError(message='violates resource limits', code=123, trace='ERR!')
74+
platform.submit_script = mocker.MagicMock(side_effect=ae)
75+
76+
# AS will handle the AutosubmitError above, but then raise an AutosubmitCritical.
77+
# This new error won't contain all the info from the upstream error.
78+
with pytest.raises(AutosubmitCritical) as cm:
79+
platform.process_batch_ready_jobs(
80+
valid_packages_to_submit=valid_packages_to_submit,
81+
failed_packages=[]
82+
)
83+
84+
# AS will handle the error and then later will raise another error message.
85+
# But the AutosubmitError object we created will have been correctly used
86+
# without raising any exceptions (such as AttributeError).
87+
assert cm.value.message != ae.message
88+
89+
90+
@pytest.fixture
91+
def as_conf(autosubmit_config, tmpdir):
92+
exp_data = {
93+
"PLATFORMS": {
94+
"pytest-pbs": {
95+
"type": "pbs",
96+
"host": "localhost",
97+
"user": "user",
98+
"project": "project",
99+
"scratch_dir": "/scratch",
100+
"QUEUE": "queue",
101+
"ADD_PROJECT_TO_HOST": False,
102+
"MAX_WALLCLOCK": "00:01",
103+
"TEMP_DIR": "",
104+
"MAX_PROCESSORS": 99999,
105+
},
106+
},
107+
"LOCAL_ROOT_DIR": str(tmpdir),
108+
"LOCAL_TMP_DIR": str(tmpdir),
109+
"LOCAL_PROJ_DIR": str(tmpdir),
110+
"LOCAL_ASLOG_DIR": str(tmpdir),
111+
}
112+
as_conf = autosubmit_config("dummy-expid", exp_data)
113+
return as_conf
114+
115+
116+
@pytest.fixture
117+
def pbs_platform(as_conf):
118+
platform = PBSPlatform(expid="dummy-expid", name='pytest-pbs', config=as_conf.experiment_data)
119+
return platform
120+
121+
122+
@pytest.fixture
123+
def create_packages(as_conf, pbs_platform):
124+
simple_jobs_1 = [Job("dummy-1", 1, Status.SUBMITTED, 0)]
125+
simple_jobs_2 = [Job("dummy-1", 1, Status.SUBMITTED, 0),
126+
Job("dummy-2", 2, Status.SUBMITTED, 0),
127+
Job("dummy-3", 3, Status.SUBMITTED, 0)]
128+
simple_jobs_3 = [Job("dummy-1", 1, Status.SUBMITTED, 0),
129+
Job("dummy-2", 2, Status.SUBMITTED, 0),
130+
Job("dummy-3", 3, Status.SUBMITTED, 0)]
131+
for job in simple_jobs_1 + simple_jobs_2 + simple_jobs_3:
132+
job._platform = pbs_platform
133+
job._platform.name = pbs_platform.name
134+
job.platform_name = pbs_platform.name
135+
job.processors = 2
136+
job.section = "dummysection"
137+
job._init_runtime_parameters()
138+
job.wallclock = "00:01"
139+
packages = [
140+
JobPackageSimple(simple_jobs_1),
141+
JobPackageSimple(simple_jobs_2),
142+
JobPackageSimple(simple_jobs_3),
143+
]
144+
return packages
145+
146+
147+
def test_process_batch_ready_jobs_valid_packages_to_submit(mocker, pbs_platform, as_conf, create_packages):
148+
valid_packages_to_submit = create_packages
149+
failed_packages = []
150+
pbs_platform.get_jobs_id_by_job_name = mocker.MagicMock()
151+
pbs_platform.send_command = mocker.MagicMock()
152+
pbs_platform.submit_script = mocker.MagicMock()
153+
jobs_id = [1, [1, 2, 3], [1, 2, 3]]
154+
pbs_platform.get_jobs_id_by_job_name.return_value = jobs_id
155+
pbs_platform.submit_script.return_value = jobs_id
156+
pbs_platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages)
157+
for i, package in enumerate(valid_packages_to_submit):
158+
for job in package.jobs:
159+
assert job.hold is False
160+
assert job.id == str(jobs_id[i])
161+
assert job.status == Status.SUBMITTED
162+
assert job.wrapper_name is None
163+
assert failed_packages == []
164+
165+
166+
def test_submit_job(mocker, pbs_platform):
167+
pbs_platform.get_submit_cmd = mocker.MagicMock(returns="dummy")
168+
pbs_platform.send_command = mocker.MagicMock(returns="dummy")
169+
pbs_platform._ssh_output = "10000"
170+
job = Job("dummy", 10000, Status.SUBMITTED, 0)
171+
job._platform = pbs_platform
172+
job.platform_name = pbs_platform.name
173+
jobs_id = pbs_platform.submit_job(job, "dummy")
174+
assert not jobs_id
175+
job.x11 = True
176+
jobs_id = pbs_platform.submit_job(job, "dummy")
177+
assert jobs_id == 10000
178+
job.workflow_commit = "dummy"
179+
jobs_id = pbs_platform.submit_job(job, "dummy")
180+
assert jobs_id == 10000
181+
pbs_platform._ssh_output = "10000\n"
182+
jobs_id = pbs_platform.submit_job(job, "dummy")
183+
assert jobs_id == 10000

test/unit/test_pjm.py renamed to test/unit/platforms/test_pjm.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,12 @@ def remote_platform(autosubmit_config, autosubmit):
116116
}
117117
})
118118

119-
yml_file = Path(__file__).resolve().parents[1] / "files/fake-jobs.yml"
119+
yml_file = Path(__file__).resolve().parents[2] / "files/fake-jobs.yml"
120120
factory = YAMLParserFactory()
121121
parser = factory.create_parser()
122122
parser.data = parser.load(yml_file)
123123
as_conf.experiment_data.update(parser.data)
124-
yml_file = Path(__file__).resolve().parents[1] / "files/fake-platforms.yml"
124+
yml_file = Path(__file__).resolve().parents[2] / "files/fake-platforms.yml"
125125
factory = YAMLParserFactory()
126126
parser = factory.create_parser()
127127
parser.data = parser.load(yml_file)
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)