Skip to content

Commit 8712452

Browse files
committed
WIP: this platforms.wrapper code is confusing...
1 parent f9c8100 commit 8712452

File tree

11 files changed

+654
-128
lines changed

11 files changed

+654
-128
lines changed

.github/workflows/ci.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ jobs:
9292
cache: 'pip' # caching pip dependencies
9393

9494
- name: Install system dependencies
95-
run: sudo apt-get install -y curl git graphviz rsync
95+
run: sudo apt-get install -y curl git graphviz rsync xvfb
9696

9797
- name: Install dependencies
9898
run: |
@@ -112,6 +112,9 @@ jobs:
112112
# CONTRIBUTING.md file for details how to set up your environment to run these.
113113
- name: Integration tests
114114
run: |
115+
Xvfb :99 -screen 0 1024x768x24 -ac &
116+
export DISPLAY=:99
117+
115118
pytest \
116119
--cov=autosubmit --cov-config=.coveragerc \
117120
--cov-report=xml:test/coverage.xml --cov-append \

.github/workflows/lint.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ jobs:
3131

3232
mypy-changes-in-file:
3333
runs-on: ubuntu-latest
34+
continue-on-error: true
3435
name: mypy run
3536
steps:
3637
- uses: actions/checkout@v5

autosubmit/platforms/locplatform.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from autosubmit.log.log import Log, AutosubmitError
2727
from autosubmit.platforms.headers.local_header import LocalHeader
2828
from autosubmit.platforms.paramiko_platform import ParamikoPlatform
29-
from autosubmit.platforms.wrappers.wrapper_factory import LocalWrapperFactory
3029

3130
if TYPE_CHECKING:
3231
from autosubmit.config.configcommon import AutosubmitConfig
@@ -67,8 +66,7 @@ def __init__(self, expid: str, name: str, config: dict, auth_password: Optional[
6766
self.job_status['RUNNING'] = ['0']
6867
self.job_status['QUEUING'] = []
6968
self.job_status['FAILED'] = []
70-
self._allow_wrappers = True
71-
self._wrapper = LocalWrapperFactory(self)
69+
self._allow_wrappers = False
7270

7371
self.update_cmds()
7472

autosubmit/platforms/paramiko_platform.py

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from pathlib import Path
3131
from threading import Thread
3232
from time import sleep
33-
from typing import Optional, Union, TYPE_CHECKING
33+
from typing import Any, Optional, Union, TYPE_CHECKING
3434

3535
import Xlib.support.connect as xlib_connect
3636
import paramiko
@@ -143,9 +143,7 @@ def reset(self):
143143
self.poller = select.kqueue()
144144
else:
145145
self.poller = select.poll()
146-
display = os.getenv('DISPLAY')
147-
if display is None:
148-
display = "localhost:0"
146+
display = os.getenv('DISPLAY', "localhost:0")
149147
try:
150148
self.local_x11_display = xlib_connect.get_display(display)
151149
except Exception as e:
@@ -244,6 +242,7 @@ def agent_auth(self, port: int) -> bool:
244242
return False
245243
return True
246244

245+
# noinspection PyUnusedLocal
247246
def interactive_auth_handler(self, title, instructions, prompt_list):
248247
answers = []
249248
# Walk the list of prompts that the server sent that we need to answer
@@ -254,7 +253,7 @@ def interactive_auth_handler(self, title, instructions, prompt_list):
254253
# strip() used to get rid of any padding spaces sent by the server
255254
if "password" in prompt:
256255
answers.append(self.pw)
257-
elif "token" in prompt or "2fa" in prompt or "otp" in prompt:
256+
elif "token" in prompt or "2fa" in prompt or "otp" in prompt or "code":
258257
if self.two_factor_method == "push":
259258
answers.append("")
260259
elif self.two_factor_method == "token":
@@ -506,7 +505,7 @@ def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False
506505
Log.printlog(f"Log file couldn't be retrieved: {filename}", 5000)
507506
return False
508507

509-
def delete_file(self, filename: str) -> None:
508+
def delete_file(self, filename: str) -> bool:
510509
"""
511510
Deletes a file from this platform
512511
@@ -515,7 +514,6 @@ def delete_file(self, filename: str) -> None:
515514
:return: True if successful or file does not exist
516515
:rtype: bool
517516
"""
518-
# TODO: pytests when the slurm container is avaliable
519517
remote_file = Path(self.get_files_path()) / filename
520518
try:
521519
self._ftpChannel.remove(str(remote_file))
@@ -524,12 +522,15 @@ def delete_file(self, filename: str) -> None:
524522
return False
525523
except Exception as e:
526524
# Change to Path
527-
Log.error(f'Could not remove file {str(remote_file)}, something went wrong with the platform', 6004, str(e))
525+
Log.error(f'Could not remove file {str(remote_file)}, something went wrong with the platform',
526+
6004, str(e))
528527

529528
if str(e).lower().find("garbage") != -1:
530529
raise AutosubmitCritical(
531-
"Wrong User or invalid .ssh/config. Or invalid user in the definition of PLATFORMS in YAML or public key not set ",
530+
"Wrong User or invalid .ssh/config. Or invalid user in the definition of PLATFORMS in "
531+
"YAML or public key not set ",
532532
7051, str(e))
533+
return False
533534

534535
def move_file(self, src, dest, must_exist=False):
535536
"""
@@ -776,26 +777,25 @@ def _check_jobid_in_queue(self, ssh_output, job_list_cmd):
776777
return False
777778
return True
778779

779-
def parse_joblist(self, job_list: list[list['Job']]) -> str:
780-
"""
781-
Convert a list of job_list to job_list_cmd
780+
def parse_joblist(self, job_list: list[list['Job', Any]]) -> str:
781+
"""Return a string containing a comma-separated list of job IDs.
782782
783-
:param job_list: list of jobs
784-
:type job_list: list
785-
:return: job status
786-
:rtype: str
783+
If a job in the provided list is missing its ID, this function will initialize
784+
it to a string containing the digit zero,``"0"``.
785+
786+
:param job_list: A list of jobs.
787+
:return: A comma-separated string containing the job IDs.
787788
"""
788-
job_list_cmd = ""
789-
for job, job_prev_status in job_list:
789+
job_list_cmd: list[str] = []
790+
# TODO: second item in tuple, _, is a ``job_prev_status``? What for?
791+
for job, _ in job_list:
790792
if job.id is None:
791793
job_str = "0"
792794
else:
793795
job_str = str(job.id)
794-
job_list_cmd += job_str + ","
795-
if job_list_cmd[-1] == ",":
796-
job_list_cmd = job_list_cmd[:-1]
796+
job_list_cmd.append(job_str)
797797

798-
return job_list_cmd
798+
return ','.join(job_list_cmd)
799799

800800
def check_Alljobs(self, job_list: list[list['Job']], as_conf, retries=5):
801801
"""
@@ -979,12 +979,15 @@ def get_queue_status_cmd(self, job_name):
979979

980980
def x11_handler(self, channel, xxx_todo_changeme):
981981
"""Handler for incoming x11 connections.
982+
982983
For each x11 incoming connection:
983984
984985
- get a connection to the local display
985986
- maintain bidirectional map of remote x11 channel to local x11 channel
986987
- add the descriptors to the poller
987988
- queue the channel (use transport.accept())
989+
990+
Incoming connections come from the server when we open an actual GUI application.
988991
"""
989992
(src_addr, src_port) = xxx_todo_changeme
990993
x11_chanfd = channel.fileno()
@@ -1038,7 +1041,7 @@ def exec_command(
10381041
) -> Union[tuple[paramiko.Channel, paramiko.Channel, paramiko.Channel], tuple[bool, bool, bool]]:
10391042
"""
10401043
Execute a command on the SSH server. A new `.Channel` is opened and
1041-
the requested command is execed. The command's input and output
1044+
the requested command is executed. The command's input and output
10421045
streams are returned as Python ``file``-like objects representing
10431046
stdin, stdout, and stderr.
10441047
@@ -1052,7 +1055,6 @@ def exec_command(
10521055
:param timeout: set command's channel timeout. See `Channel.settimeout`.settimeout.
10531056
:type timeout: int
10541057
:return: the stdin, stdout, and stderr of the executing command
1055-
10561058
:raises SSHException: if the server fails to execute the command
10571059
"""
10581060
while retries > 0:
@@ -1061,7 +1063,8 @@ def exec_command(
10611063
self._init_local_x11_display()
10621064
chan = self.transport.open_session()
10631065
if not chan.request_x11(single_connection=False, handler=self.x11_handler):
1064-
# FIXME: test this!
1066+
# FIXME: Apparently, the SSH session closes before reaching here?
1067+
# If so, we can just remove this check?
10651068
raise AutosubmitCritical("Remote platform does not support X11!")
10661069
else:
10671070
chan = self.transport.open_session()
@@ -1309,7 +1312,7 @@ def get_ssh_output(self):
13091312
def get_ssh_output_err(self):
13101313
return self._ssh_output_err
13111314

1312-
def get_call(self, job_script: str, job: 'Job', export="none", timeout=-1) -> str:
1315+
def get_call(self, job_script: str, job: Optional['Job'], export="none", timeout=-1) -> str:
13131316
"""Gets execution command for given job.
13141317
13151318
:param job_script: script to run
@@ -1318,7 +1321,7 @@ def get_call(self, job_script: str, job: 'Job', export="none", timeout=-1) -> st
13181321
:param timeout:
13191322
:return: command to execute script
13201323
"""
1321-
# If job is None, it is a wrapper. ( 0 clarity there, to be improved in a rework TODO )
1324+
# If job is None, it is a wrapper. (TODO: 0 clarity there, to be improved in a rework)
13221325
if job:
13231326
if job.executable != '':
13241327
executable = '' # Alternative: use job.executable with substituted placeholders
@@ -1512,10 +1515,10 @@ def check_remote_log_dir(self):
15121515
except BaseException as e:
15131516
raise AutosubmitError(f"Couldn't send the file {self.remote_log_dir} to HPC {self.host}", 6004, str(e))
15141517

1515-
def check_absolute_file_exists(self, src):
1518+
def check_absolute_file_exists(self, src) -> bool:
15161519
try:
1517-
if self._ftpChannel.stat(src):
1518-
return True
1520+
self._ftpChannel.stat(src)
1521+
return True
15191522
except Exception as e:
15201523
Log.debug(f'Failed to check absolute file {src} exists: {str(e)}')
15211524
return False

autosubmit/platforms/wrappers/wrapper_builder.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
#!/usr/bin/env python3
2-
3-
# Copyright 2015-2020 Earth Sciences Department, BSC-CNS
4-
1+
# Copyright 2015-2025 Earth Sciences Department, BSC-CNS
2+
#
53
# This file is part of Autosubmit.
6-
4+
#
75
# Autosubmit is free software: you can redistribute it and/or modify
86
# it under the terms of the GNU General Public License as published by
97
# the Free Software Foundation, either version 3 of the License, or
108
# (at your option) any later version.
11-
9+
#
1210
# Autosubmit is distributed in the hope that it will be useful,
1311
# but WITHOUT ANY WARRANTY; without even the implied warranty of
1412
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1513
# GNU General Public License for more details.
16-
14+
#
1715
# You should have received a copy of the GNU General Public License
1816
# along with Autosubmit. If not, see <http://www.gnu.org/licenses/>.
1917

@@ -30,22 +28,26 @@ class WrapperDirector:
3028
"""
3129
def __init__(self):
3230
self._builder = None
31+
3332
def construct(self, builder):
3433
self._builder = builder
3534

3635
header = self._builder.build_header()
3736
job_thread = self._builder.build_job_thread()
38-
#if "bash" not in header[0:15]:
37+
# if "bash" not in header[0:15]:
3938

4039
main = self._builder.build_main()
41-
#else:
40+
# else:
4241
# nodes,main = self._builder.build_main() #What to do with nodes?
4342
# change to WrapperScript object
4443
wrapper_script = header + job_thread + main
4544
wrapper_script = wrapper_script.replace("_NEWLINE_", '\\n')
4645

4746
return wrapper_script
47+
48+
4849
class WrapperBuilder(object):
50+
4951
def __init__(self, **kwargs):
5052
if "retrials" in list(kwargs.keys()):
5153
self.retrials = kwargs['retrials']
@@ -66,7 +68,7 @@ def __init__(self, **kwargs):
6668
def build_header(self):
6769
return textwrap.dedent(self.header_directive) + self.build_imports()
6870

69-
def build_imports(self):
71+
def build_imports(self) -> str:
7072
pass # pragma: no cover
7173

7274
def build_job_thread(self):

0 commit comments

Comments
 (0)