Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2408,15 +2408,6 @@ def run_experiment(expid: str, start_time: Optional[str] = None, start_after: Op
job_list.save()
as_conf.save()

# Submit jobs that are prepared to hold (if remote dependencies parameter are enabled)
# This currently is not used as SLURM no longer allows to jobs to acquire priority while in hold state.
# This only works for SLURM. ( Prepare status can not be achieved in other platforms )
if as_conf.get_remote_dependencies() == "true" and len(job_list.get_prepared()) > 0:
Autosubmit.submit_ready_jobs(
as_conf, job_list, platforms_to_test, packages_persistence, hold=True)
job_list.update_list(as_conf, submitter=submitter)
job_list.save()
as_conf.save()
# Safe spot to store changes
try:
exp_history = Autosubmit.process_historical_data_iteration(job_list, job_changes_tracker,
Expand Down
11 changes: 0 additions & 11 deletions autosubmit/config/configcommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2554,17 +2554,6 @@ def update_dict(original_dict: dict, updated_dict: collections.abc.Mapping) -> d
yaml.dump(final_dict, yaml_file)
ini_file.rename(Path(root_dir, ini_file.stem + ".yml"))

# noinspection PyMethodMayBeStatic
def get_remote_dependencies(self) -> str:
"""Returns if the user has enabled the PRESUBMISSION configuration parameter from autosubmit's config file.

:return: if remote dependencies
:rtype: string
"""
# Disabled, forced to "false" not working anymore in newer slurm versions.
# return str(self.get_section(['CONFIG', 'PRESUBMISSION'], "false")).lower()
return "false"

def get_wrapper_type(self, wrapper=None) -> Optional[str]:
"""Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, MULTI NONE) the user
has configured in the autosubmit's config.
Expand Down
9 changes: 0 additions & 9 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2331,15 +2331,6 @@ def update_content(self, as_conf: AutosubmitConfig, parameters: dict) -> tuple[s
if as_conf.get_project_type().lower() != "none" and len(as_conf.get_project_type()) > 0:
template_file = open(os.path.join(as_conf.get_project_dir(), self.file), 'r')
template = ''
if as_conf.get_remote_dependencies() == "true":
if self.type == Language.BASH:
template = 'sleep 5' + "\n"
elif self.type == Language.PYTHON2:
template = 'time.sleep(5)' + "\n"
elif self.type == Language.PYTHON3 or self.type == Language.PYTHON:
template = 'time.sleep(5)' + "\n"
elif self.type == Language.R:
template = 'Sys.sleep(5)' + "\n"
template += template_file.read()
template_file.close()
else:
Expand Down
68 changes: 0 additions & 68 deletions autosubmit/job/job_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2743,7 +2743,6 @@ def update_list(self, as_conf: AutosubmitConfig, store_change: bool = True,
"for parents completion...")
Log.debug('Updating WAITING jobs')
if not fromSetStatus:
all_parents_completed = []
for job in self.get_delayed():
if datetime.datetime.now() >= job.delay_end:
job.status = Status.READY
Expand All @@ -2760,8 +2759,6 @@ def update_list(self, as_conf: AutosubmitConfig, store_change: bool = True,
job.status = Status.READY
job.hold = False
Log.debug(f"Setting job: {job.name} status to: READY (all parents completed)...")
if as_conf.get_remote_dependencies() == "true":
all_parents_completed.append(job.name)
if job.status != Status.READY:
if len(tmp3) != len(job.parents):
if len(tmp2) == len(job.parents):
Expand All @@ -2781,8 +2778,6 @@ def update_list(self, as_conf: AutosubmitConfig, store_change: bool = True,
Log.debug(f"Setting job: {job.name} status to: READY "
"(conditional jobs are completed/failed)...")
break
if as_conf.get_remote_dependencies() == "true":
all_parents_completed.append(job.name)
else:
if len(tmp3) == 1 and len(job.parents) == 1:
for parent in job.parents:
Expand All @@ -2793,69 +2788,6 @@ def update_list(self, as_conf: AutosubmitConfig, store_change: bool = True,
Log.debug(f"Setting job: {job.name} status to: READY"
" (conditional jobs are completed/failed)...")
break
if as_conf.get_remote_dependencies() == "true":
for job in self.get_prepared():
tmp2 = [parent for parent in job.parents if
parent.status == Status.COMPLETED or parent.status == Status.SKIPPED
or parent.status == Status.FAILED]
tmp3 = [parent for parent in job.parents if
parent.status == Status.SKIPPED or parent.status == Status.FAILED]
if len(tmp2) == len(job.parents) and len(tmp3) != len(job.parents):
job.status = Status.READY
# Run start time in format (YYYYMMDDHH:MM:SS) from current time
job.hold = False
save = True
Log.debug("A job in prepared status has all parent completed, job:"
f"{job.name} status set to: READY ...")
Log.debug('Updating WAITING jobs eligible for be prepared')
# Setup job name should be a variable
for job in self.get_waiting_remote_dependencies('slurm'):
if job.name not in all_parents_completed:
tmp = [parent for parent in job.parents if (
(parent.status == Status.SKIPPED or parent.status == Status.COMPLETED
or parent.status == Status.QUEUING or parent.status == Status.RUNNING)
and "setup" not in parent.name.lower())]
if len(tmp) == len(job.parents):
job.status = Status.PREPARED
job.hold = True
Log.debug(
f"Setting job: {job.name} status to: Prepared for be held ("
"all parents queuing, running or completed)...")

Log.debug('Updating Held jobs')
if self.job_package_map:
held_jobs = [job for job in self.get_held_jobs() if (
job.id not in list(self.job_package_map.keys()))]
held_jobs += [wrapper_job for wrapper_job in list(self.job_package_map.values())
if wrapper_job.status == Status.HELD]
else:
held_jobs = self.get_held_jobs()

for job in held_jobs:
# Wrappers and inner jobs
if self.job_package_map and job.id in list(self.job_package_map.keys()):
hold_wrapper = False
for inner_job in job.job_list:
valid_parents = [parent
for parent in inner_job.parents if parent not in job.job_list]
tmp = [parent
for parent in valid_parents if parent.status == Status.COMPLETED]
if len(tmp) < len(valid_parents):
hold_wrapper = True
job.hold = hold_wrapper
if not job.hold:
for inner_job in job.job_list:
inner_job.hold = False
Log.debug(
f"Setting job: {job.name} status to: Queuing (all parents completed)...")
else: # Non-wrapped jobs
tmp = [
parent for parent in job.parents if parent.status == Status.COMPLETED]
if len(tmp) == len(job.parents):
job.hold = False
Log.debug(f"Setting job: {job.name} status to: Queuing (all parents completed)...")
else:
job.hold = True
jobs_to_skip = self.get_skippable_jobs(
as_conf.get_wrapper_jobs()) # Get A Dict with all jobs that are listed as skippable

Expand Down
17 changes: 10 additions & 7 deletions autosubmit/platforms/paramiko_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(self, expid: str, name: str, config: dict, auth_password: Optional[
self._host_config_id = None
self.submit_cmd = ""
self._ftpChannel: Optional[paramiko.SFTPClient] = None
self.transport = None
self.transport: Optional[paramiko.Transport] = None
self.channels = {}
if sys.platform != "linux":
self.poller = select.kqueue()
Expand Down Expand Up @@ -347,11 +347,15 @@ def connect(
self._init_local_x11_display()
self._ssh = _create_ssh_client()
self._ssh_config = paramiko.SSHConfig()
if as_conf:
self.map_user_config_file(as_conf)
user_ssh_config = Path("~/.ssh/config").expanduser()
if user_ssh_config.is_file():
if as_conf:
self.map_user_config_file(as_conf)
else:
with open(os.path.expanduser("~/.ssh/config"), "r") as fd:
self._ssh_config.parse(fd)
else:
with open(os.path.expanduser("~/.ssh/config"), "r") as fd:
self._ssh_config.parse(fd)
Log.debug("No user SSH config file found")

self._host_config = self._ssh_config.lookup(self.host)
if "," in self._host_config['hostname']:
Expand Down Expand Up @@ -1159,8 +1163,7 @@ def exec_command(
self.restore_connection(None)
timeout = timeout + 60
retries = retries - 1
if retries <= 0:
return False, False, False
return False, False, False

def send_command_non_blocking(self, command, ignore_log):
thread = threading.Thread(target=self.send_command, args=(command, ignore_log))
Expand Down
2 changes: 1 addition & 1 deletion autosubmit/platforms/paramiko_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ParamikoSubmitter:

def __init__(self, as_conf: 'AutosubmitConfig', auth_password: Optional[str] = None,
local_auth_password=None):
self.platforms = None
self.platforms: Optional[dict[str, 'ParamikoPlatform']] = None
self.load_platforms(as_conf=as_conf, auth_password=auth_password, local_auth_password=local_auth_password)

def load_local_platform(self, as_conf: 'AutosubmitConfig', experiment_data: Optional[dict] = None,
Expand Down
15 changes: 14 additions & 1 deletion test/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from re import sub
from subprocess import check_output
from tempfile import TemporaryDirectory
from textwrap import dedent
from time import time_ns
from typing import cast, Any, ContextManager, Generator, Iterator, Optional, Protocol, Union, TYPE_CHECKING

Expand Down Expand Up @@ -334,6 +335,8 @@ def _ssh_connect(*args, **kwargs):
kwargs['port'] = ssh_port
if 'password' not in kwargs:
kwargs['password'] = password
kwargs['look_for_keys'] = False
kwargs['allow_agent'] = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is harmless, and Erick's environment has something different in his ssh-agent and/or ssh config that prevents him from running the tests. It shouldn't cause any issues to others here as we don't rely on ssh-agent or config, and instead connect using the password directly for the Docker container.

if len(args) > 1:
# tuple to list, and then replace the port...
args = [x for x in args]
Expand Down Expand Up @@ -457,11 +460,21 @@ def ssh_server(mocker, tmp_path, request) -> Generator[DockerContainer, None, No
ssh_client = _make_ssh_client(ssh_port, _SSH_DOCKER_PASSWORD, None, mfa)
mocker.patch('autosubmit.platforms.paramiko_platform._create_ssh_client', return_value=ssh_client)

ssh_config = Path(tmp_path, '.ssh/ssh_config')
ssh_config.parent.mkdir(exist_ok=True, parents=True)
ssh_config.touch()
ssh_config.write_text(dedent(f"""\
Host localhost
Hostname localhost
User {user}
ForwardX11 yes
Port 22
"""))
if mfa:
# It uses a Transport and not an SSH client directly. Ideally, we would be able
# to use just one way
original_paramiko_config = paramiko.SSHConfig()
with open(Path('~/.ssh/config').expanduser()) as f:
with open(ssh_config) as f:
original_paramiko_config.parse(f)
modified_config = original_paramiko_config.lookup('localhost')
modified_config['port'] = f'{ssh_port}'
Expand Down
35 changes: 25 additions & 10 deletions test/integration/test_paramiko_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import cast, Generator, Optional, Protocol, Union, TYPE_CHECKING

import pytest
from paramiko import ChannelFile # type: ignore[import]

from autosubmit.job.job import Job
from autosubmit.job.job_common import Status
Expand Down Expand Up @@ -107,7 +108,8 @@ def exp_platform_server(autosubmit_exp, ssh_server, request) -> ExperimentPlatfo
# to maintain and test).
submitter = ParamikoSubmitter(as_conf=exp.as_conf)

ps_platform: 'PsPlatform' = submitter.platforms[_PLATFORM_NAME]
assert submitter.platforms
ps_platform: 'PsPlatform' = cast('PsPlatform', submitter.platforms[_PLATFORM_NAME])

return ExperimentPlatformServer(exp, ps_platform, ssh_server)

Expand All @@ -123,14 +125,15 @@ class CreateJobParametersPlatformFixture(Protocol):

def __call__(
self,
experiment_data: Optional[dict] = None
experiment_data: Optional[dict] = None,
/
) -> JobParametersPlatform:
...


@pytest.fixture
def create_job_parameters_platform(autosubmit_exp) -> CreateJobParametersPlatformFixture:
def job_parameters_platform(experiment_data: dict) -> JobParametersPlatform:
def job_parameters_platform(experiment_data: Optional[dict] = None) -> JobParametersPlatform:
exp = autosubmit_exp(_EXPID, experiment_data=experiment_data)
slurm_platform: 'SlurmPlatform' = cast('SlurmPlatform', exp.platform)

Expand Down Expand Up @@ -237,8 +240,8 @@ def test_send_file_errors(exp_platform_server: ExperimentPlatformServer):
]
)
@pytest.mark.docker
def test_send_command(cmd: str, error: Optional, x11_enabled: bool, mfa_enabled: bool, request: pytest.FixtureRequest,
mocker):
def test_send_command(cmd: str, error: Optional[Exception], x11_enabled: bool, mfa_enabled: bool,
request: pytest.FixtureRequest, mocker):
"""This test opens an SSH connection (via sftp) and sends a command."""
if x11_enabled:
request.applymarker('x11')
Expand All @@ -258,7 +261,7 @@ def test_send_command(cmd: str, error: Optional, x11_enabled: bool, mfa_enabled:

if error:
assert exp_platform_server.platform.get_ssh_output_err() == ''
with pytest.raises(error):
with pytest.raises(error): # type: ignore
exp_platform_server.platform.send_command(cmd, ignore_log=False, x11=x11_enabled)

stderr = exp_platform_server.platform.get_ssh_output_err()
Expand All @@ -282,6 +285,7 @@ def test_exec_command(exp_platform_server: 'ExperimentPlatformServer'):
assert stdin is not False
assert stderr is not False
# The stdout contents should be [b"user_name\n"]; thus the ugly list comprehension + extra code.
assert isinstance(stdout, ChannelFile)
assert user == str(''.join([x.decode('UTF-8').strip() for x in stdout.readlines()]))


Expand Down Expand Up @@ -310,6 +314,7 @@ def test_exec_command_invalid_command(command: str, expected: str, x11: bool, re
exp_platform_server.platform.connect(None, reconnect=False, log_recovery_process=False)

stdin, stdout, stderr = exp_platform_server.platform.exec_command(command, x11=x11)
assert isinstance(stdout, ChannelFile)
assert stdin is not False
assert stderr is not False
# The stdout contents should be [b"user_name\n"]; thus the ugly list comprehension + extra code.
Expand All @@ -327,6 +332,7 @@ def test_exec_command_after_a_reset(exp_platform_server: 'ExperimentPlatformServ
exp_platform_server.platform.connect(None, reconnect=False, log_recovery_process=False)

stdin, stdout, stderr = exp_platform_server.platform.exec_command('whoami')
assert isinstance(stdout, ChannelFile)
assert stdin is not False
assert stderr is not False
# The stdout contents should be [b"user_name\n"]; thus the ugly list comprehension + extra code.
Expand Down Expand Up @@ -358,6 +364,7 @@ def test_exec_command_ssh_session_not_active(x11: bool, retries: int, command: s
# But while that's OK, we can also avoid mocking by simply
# closing the connection.

assert exp_platform_server.platform.transport
exp_platform_server.platform.transport.close()

stdin, stdout, stderr = exp_platform_server.platform.exec_command(
Expand All @@ -367,6 +374,7 @@ def test_exec_command_ssh_session_not_active(x11: bool, retries: int, command: s
)

# This will be true iff the ``ps_platform.restore_connection(None)`` ran without errors.
assert isinstance(stdout, ChannelFile)
assert stdin is not False
assert stderr is not False
# The stdout contents should be [b"user_name\n"]; thus the ugly list comprehension + extra code.
Expand Down Expand Up @@ -406,16 +414,19 @@ def test_fs_operations(exp_platform_server: 'ExperimentPlatformServer', request)

exp_platform_server.platform.connect(None, reconnect=False, log_recovery_process=False)

file_not_found = Path('/app', 'this-file-does-not-exist')
file_not_found = Path('/app', test_name, 'this-file-does-not-exist')

assert exp_platform_server.platform.send_file(local_file.name)

contents = exp_platform_server.platform.read_file(str(remote_file))
assert contents
assert contents.decode('UTF-8').strip() == text
assert None is exp_platform_server.platform.read_file(str(file_not_found))
assert exp_platform_server.platform.read_file(str(file_not_found)) is None

assert exp_platform_server.platform.get_file_size(str(remote_file)) > 0
assert None is exp_platform_server.platform.get_file_size(str(file_not_found))
file_size: Optional[int] = exp_platform_server.platform.get_file_size(str(remote_file))
assert file_size
assert file_size > 0
assert exp_platform_server.platform.get_file_size(str(file_not_found)) is None

assert exp_platform_server.platform.check_absolute_file_exists(str(remote_file))
assert not exp_platform_server.platform.check_absolute_file_exists(str(file_not_found))
Expand Down Expand Up @@ -463,6 +474,7 @@ def test_exec_command_with_x11(x11_enabled: bool, user_or_false: Union[str, bool
if type(user_or_false) is bool:
assert user_or_false == stdout
else:
assert isinstance(stdout, ChannelFile)
assert user_or_false == stdout.readline().decode('UTF-8').strip()


Expand All @@ -483,6 +495,9 @@ def test_xclock(exp_platform_server: ExperimentPlatformServer):

_, stdout, stderr = ps_platform.exec_command('timeout 1 xclock', x11=True)

assert isinstance(stdout, ChannelFile)
assert isinstance(stderr, ChannelFile)

assert ''.join(stdout.readlines()) == ''
assert ''.join(stderr.readlines()) == ''

Expand Down
Loading