Skip to content

Commit 1ccf89c

Browse files
committed
first commit documenting the wrappers (#2310)
* first commit documenting the wrappers * small adjustments * small documentation fix
1 parent d6cc5b7 commit 1ccf89c

File tree

2 files changed

+127
-37
lines changed

2 files changed

+127
-37
lines changed

autosubmit/job/job.py

Lines changed: 112 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,8 +1539,8 @@ def check_completion(self, default_status=Status.FAILED, over_wallclock=False):
15391539
:param default_status: status to set if job is not completed. By default, it is FAILED
15401540
:type default_status: Status
15411541
"""
1542-
completed_file = os.path.join(self._tmp_path, self.name + '_COMPLETED')
1543-
completed_file_location = os.path.join(self._tmp_path, f"LOG_{self.expid}", self.name + '_COMPLETED')
1542+
completed_file = os.path.join(str(self._tmp_path), self.name + '_COMPLETED')
1543+
completed_file_location = os.path.join(str(self._tmp_path), f"LOG_{self.expid}", self.name + '_COMPLETED')
15441544
# I'm not fan of this but, it is the only way of doing it without a rework.
15451545
if os.path.exists(completed_file) or os.path.exists(completed_file_location):
15461546
if not over_wallclock:
@@ -2722,7 +2722,14 @@ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, nu
27222722
self.is_wrapper = True
27232723

27242724

2725-
def _queuing_reason_cancel(self, reason):
2725+
def _queuing_reason_cancel(self, reason: str) -> bool:
2726+
"""
2727+
Function return True if a job was cancelled for a listed reason.
2728+
:param reason: Reason of a job to be cancelled
2729+
:type reason: str
2730+
:return: True if a job was cancelled for a known reason, False otherwise
2731+
:rtype: bool
2732+
"""
27262733
try:
27272734
if len(reason.split('(', 1)) > 1:
27282735
reason = reason.split('(', 1)[1].split(')')[0]
@@ -2738,7 +2745,13 @@ def _queuing_reason_cancel(self, reason):
27382745
except Exception as e:
27392746
return False
27402747

2741-
def check_status(self, status):
2748+
def check_status(self, status: str) -> None:
2749+
"""
2750+
Update the status of a job, saving its previous status and update the current one, in case of failure
2751+
it'll log all the files that were correctly created.
2752+
:param status: Reason of a job to be cancelled
2753+
:type status: str
2754+
"""
27422755
prev_status = self.status
27432756
self.prev_status = prev_status
27442757
self.status = status
@@ -2787,7 +2800,12 @@ def check_status(self, status):
27872800
if not still_running:
27882801
self.cancel_failed_wrapper_job()
27892802

2790-
def check_inner_jobs_completed(self, jobs):
2803+
def check_inner_jobs_completed(self, jobs: [Job]) -> None:
2804+
"""
2805+
Will get all the jobs that the status are not completed and check if it was completed or not
2806+
:param jobs: Jobs inside the wrapper
2807+
:type jobs: [Job]
2808+
"""
27912809
not_completed_jobs = [
27922810
job for job in jobs if job.status != Status.COMPLETED]
27932811
not_completed_job_names = [job.name for job in not_completed_jobs]
@@ -2812,7 +2830,15 @@ def check_inner_jobs_completed(self, jobs):
28122830
for job in not_completed_jobs:
28132831
self._check_finished_job(job)
28142832

2815-
def _check_inner_jobs_queue(self, prev_status):
2833+
def _check_inner_jobs_queue(self, prev_status :str) -> None:
2834+
"""
2835+
Update previous status of a job and updating the job to a new status.
2836+
If the platform being used is slurm the function will get the status of all the jobs,
2837+
get the parsed queue reason and cancel and fail jobs that has a known reason.
2838+
If job is held by admin or user the job will be held to be executed later.
2839+
:param prev_status: previous status of a job
2840+
:type prev_status: str
2841+
"""
28162842
reason = str()
28172843
if self._platform.type == 'slurm':
28182844
self._platform.send_command(
@@ -2854,7 +2880,14 @@ def _check_inner_jobs_queue(self, prev_status):
28542880
job.hold = self.hold
28552881
job.status = self.status
28562882

2857-
def _check_inner_job_wallclock(self, job):
2883+
def _check_inner_job_wallclock(self, job: Job) -> bool:
2884+
"""
2885+
This will check if the job is running longer than the wallclock was set to be run.
2886+
:param job: The inner job of a job.
2887+
:type job: Job
2888+
:return: True if the job is running longer then wallcloclk, otherwise False.
2889+
:rtype: bool
2890+
"""
28582891
start_time = self.running_jobs_start[job]
28592892
if self._is_over_wallclock(start_time, job.wallclock):
28602893
if job.wrapper_type != "vertical":
@@ -2863,7 +2896,16 @@ def _check_inner_job_wallclock(self, job):
28632896
return True
28642897
return False
28652898

2866-
def _check_running_jobs(self):
2899+
def _check_running_jobs(self) -> None:
2900+
"""
2901+
Get all jobs that are not "COMPLETED" or "FAILED", for each of the jobs still not completed that are still
2902+
running a command will be created and executed to either read the first few lines of the _STAT file created or
2903+
just print the JOB's name if the file don't exist.
2904+
Depending on the output of the file the status of a job will be set to
2905+
RUNNING if not over wallclock
2906+
FAILED if over wallclock and not vertical wrapper
2907+
If after 5 retries no file is created the status of the job is set to FAIL
2908+
"""
28672909
not_finished_jobs_dict = OrderedDict()
28682910
self.inner_jobs_running = list()
28692911
not_finished_jobs = [job for job in self.job_list if job.status not in [
@@ -2878,7 +2920,7 @@ def _check_running_jobs(self):
28782920
not_finished_jobs_names = ' '.join(list(not_finished_jobs_dict.keys()))
28792921
remote_log_dir = self._platform.get_remote_log_dir()
28802922
# PREPARE SCRIPT TO SEND
2881-
# When a inner_job is running? When the job has an _STAT file
2923+
# When an inner_job is running? When the job has an _STAT file
28822924
command = textwrap.dedent("""
28832925
cd {1}
28842926
for job in {0}
@@ -2892,18 +2934,17 @@ def _check_running_jobs(self):
28922934
done
28932935
""").format(str(not_finished_jobs_names), str(remote_log_dir), '\n'.ljust(13))
28942936

2895-
log_dir = os.path.join(
2896-
self._tmp_path, 'LOG_{0}'.format(self.expid))
2897-
multiple_checker_inner_jobs = os.path.join(
2898-
log_dir, "inner_jobs_checker.sh")
2937+
log_dir = Path(str(self._tmp_path) + f'/LOG_{self.expid}')
2938+
multiple_checker_inner_jobs = Path(log_dir / "inner_jobs_checker.sh")
28992939
if not os.stat(log_dir):
29002940
os.mkdir(log_dir)
29012941
os.chmod(log_dir, 0o770)
29022942
open(multiple_checker_inner_jobs, 'w+').write(command)
29032943
os.chmod(multiple_checker_inner_jobs, 0o770)
29042944
if self.platform.name != "local": # already "sent"...
29052945
self._platform.send_file(multiple_checker_inner_jobs, False)
2906-
command = f"cd {self._platform.get_files_path()}; {os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}"
2946+
command = (f"cd {self._platform.get_files_path()}; "
2947+
f"{os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}")
29072948
else:
29082949
command = f"cd {self._platform.get_files_path()}; ./inner_jobs_checker.sh; cd {os.getcwd()}"
29092950
#
@@ -2918,38 +2959,46 @@ def _check_running_jobs(self):
29182959
for line in content[:-1]:
29192960
out = line.split()
29202961
if out:
2921-
jobname = out[0]
2922-
job = not_finished_jobs_dict[jobname]
2962+
job_name = out[0]
2963+
job = not_finished_jobs_dict[job_name]
29232964
if len(out) > 1:
29242965
if job not in self.running_jobs_start:
29252966
start_time = self._check_time(out, 1)
29262967
Log.info("Job {0} started at {1}".format(
2927-
jobname, str(parse_date(start_time))))
2968+
job_name, str(parse_date(start_time))))
29282969
self.running_jobs_start[job] = start_time
29292970
job.new_status = Status.RUNNING
29302971
#job.status = Status.RUNNING
29312972
job.update_status(self.as_config)
29322973
if len(out) == 2:
2933-
Log.info("Job {0} is RUNNING".format(jobname))
2974+
Log.info("Job {0} is RUNNING".format(job_name))
29342975
over_wallclock = self._check_inner_job_wallclock(
29352976
job) # messaged included
29362977
if over_wallclock:
29372978
if job.wrapper_type != "vertical":
29382979
job.status = Status.FAILED
29392980
Log.printlog(
2940-
"Job {0} is FAILED".format(jobname), 6009)
2981+
"Job {0} is FAILED".format(job_name), 6009)
29412982
elif len(out) == 3:
29422983
end_time = self._check_time(out, 2)
29432984
self._check_finished_job(job)
29442985
Log.info("Job {0} finished at {1}".format(
2945-
jobname, str(parse_date(end_time))))
2986+
job_name, str(parse_date(end_time))))
29462987
if content == '':
29472988
sleep(wait)
29482989
retries = retries - 1
29492990
if retries == 0 or over_wallclock:
29502991
self.status = Status.FAILED
29512992

2952-
def _check_finished_job(self, job, failed_file=False):
2993+
def _check_finished_job(self, job :Job, failed_file :bool=False) -> None:
2994+
"""
2995+
Will set the jobs status to failed, unless they're completed, in which,
2996+
the function will change it to complete.
2997+
:param job: The job to have its status updated.
2998+
:type job: Job
2999+
:param failed_file: True if system has created a file for a failed execution
3000+
:type failed_file: bool
3001+
"""
29533002
job.new_status = Status.FAILED
29543003
if not failed_file:
29553004
wait = 2
@@ -2967,7 +3016,13 @@ def _check_finished_job(self, job, failed_file=False):
29673016
job.update_status(self.as_config, failed_file)
29683017
self.running_jobs_start.pop(job, None)
29693018

2970-
def update_failed_jobs(self, check_ready_jobs=False):
3019+
def update_failed_jobs(self, check_ready_jobs :bool=False) -> None:
3020+
"""
3021+
Check all jobs associated, and update their status either to complete or to Failed,
3022+
and if job is still running appends it to they inner jobs of the wrapper.
3023+
:param check_ready_jobs: if true check for running jobs with status "READY", "SUBMITTED", "QUEUING"
3024+
:type check_ready_jobs: bool
3025+
"""
29713026
running_jobs = self.inner_jobs_running
29723027
real_running = copy.deepcopy(self.inner_jobs_running)
29733028
if check_ready_jobs:
@@ -2981,7 +3036,12 @@ def update_failed_jobs(self, check_ready_jobs=False):
29813036
if job in real_running:
29823037
self.inner_jobs_running.append(job)
29833038

2984-
def cancel_failed_wrapper_job(self):
3039+
def cancel_failed_wrapper_job(self) -> None:
3040+
"""
3041+
When a wrapper is cancelled or run into some problem all its jobs are cancelled,
3042+
if there are jobs on the list that are not Running, and is not Completed, or Failed set it as WAITING,
3043+
if not on these status and it is a vertical wrapper it will set the fail_count to the number of retrials.
3044+
"""
29853045
try:
29863046
if self.platform_name == "local":
29873047
# Check if the job is still running to avoid a misleading message in the logs
@@ -3005,7 +3065,17 @@ def cancel_failed_wrapper_job(self):
30053065
job.fail_count = job.retrials
30063066

30073067

3008-
def _is_over_wallclock(self, start_time, wallclock):
3068+
def _is_over_wallclock(self, start_time: str, wallclock: str) -> bool:
3069+
"""
3070+
This calculates if the job is over its wallclock time,
3071+
which indicates that a jobs is running for too long
3072+
:param start_time: When a job started to execute
3073+
:type start_time: str
3074+
:param wallclock: Time limit a job should run
3075+
:type wallclock: str
3076+
:return: If start_time is bigger than wallclock return True, otherwise False
3077+
:rtype: bool
3078+
"""
30093079
elapsed = datetime.datetime.now() - parse_date(start_time)
30103080
wallclock = datetime.datetime.strptime(wallclock, '%H:%M')
30113081
total = 0.0
@@ -3026,12 +3096,28 @@ def _is_over_wallclock(self, start_time, wallclock):
30263096
return True
30273097
return False
30283098

3029-
def _parse_timestamp(self, timestamp):
3099+
def _parse_timestamp(self, timestamp: int) -> datetime:
3100+
"""
3101+
Parse a date from int to datetime.
3102+
:param timestamp: time to be converted
3103+
:type timestamp: int
3104+
:return: return time converted
3105+
:rtype: datetime
3106+
"""
30303107
value = datetime.datetime.fromtimestamp(timestamp)
30313108
time = value.strftime('%Y-%m-%d %H:%M:%S')
30323109
return time
30333110

3034-
def _check_time(self, output, index):
3111+
def _check_time(self, output: [str], index: int) -> datetime:
3112+
"""
3113+
Generate the starting time of a job found by a generated command
3114+
:param output: The output of a CMD command executed
3115+
:type output: [str]
3116+
:param index: line in which the "output" should be pointed at to get the time
3117+
:type index: int
3118+
:return: Time in which a job started
3119+
:rtype: datetime
3120+
"""
30353121
time = int(output[index])
30363122
time = self._parse_timestamp(time)
30373123
return time

autosubmit/platforms/paramiko_platform.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ def submit_job(self, job, script_name, hold=False, export="none"):
555555
:param job: job object
556556
:type job: autosubmit.job.job.Job
557557
:param script_name: job script's name
558-
:rtype scriptname: str
558+
:rtype script_name: str
559559
:param hold: send job hold
560560
:type hold: boolean
561561
:return: job id for the submitted job
@@ -597,7 +597,7 @@ def check_job_energy(self, job_id):
597597

598598
def submit_Script(self, hold=False):
599599
"""
600-
Sends a Submitfile Script, exec in platform and retrieve the Jobs_ID.
600+
Sends a Submit file Script, exec in platform and retrieve the Jobs_ID.
601601
602602
:param hold: send job hold
603603
:type hold: boolean
@@ -615,6 +615,7 @@ def get_estimated_queue_time_cmd(self, job_id):
615615
:return: command to get estimated queue time
616616
"""
617617
raise NotImplementedError
618+
618619
def parse_estimated_time(self, output):
619620
"""
620621
Parses estimated queue time from output of get_estimated_queue_time_cmd
@@ -675,8 +676,7 @@ def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold
675676
self.send_command(self.get_checkjob_cmd(job_id))
676677
while self.get_ssh_output().strip(" ") == "" and retries > 0:
677678
retries = retries - 1
678-
Log.debug(
679-
'Retrying check job command: {0}', self.get_checkjob_cmd(job_id))
679+
Log.debug('Retrying check job command: {0}', self.get_checkjob_cmd(job_id))
680680
Log.debug('retries left {0}', retries)
681681
Log.debug('Will be retrying in {0} seconds', sleep_time)
682682
sleep(sleep_time)
@@ -742,6 +742,11 @@ def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold
742742
job.new_status = job_status
743743

744744
def _check_jobid_in_queue(self, ssh_output, job_list_cmd):
745+
"""
746+
747+
:param ssh_output: ssh output
748+
:type ssh_output: str
749+
"""
745750
for job in job_list_cmd[:-1].split(','):
746751
if job not in ssh_output:
747752
return False
@@ -752,8 +757,6 @@ def parse_joblist(self, job_list):
752757
753758
:param job_list: list of jobs
754759
:type job_list: list
755-
:param ssh_output: ssh output
756-
:type ssh_output: str
757760
:return: job status
758761
:rtype: str
759762
"""
@@ -768,18 +771,17 @@ def parse_joblist(self, job_list):
768771
job_list_cmd=job_list_cmd[:-1]
769772

770773
return job_list_cmd
774+
771775
def check_Alljobs(self, job_list, as_conf, retries=5):
772776
"""
773777
Checks jobs running status
774778
775779
:param job_list: list of jobs
776780
:type job_list: list
777-
:param job_list_cmd: list of jobs in the queue system
778-
:type job_list_cmd: str
779-
:param remote_logs: remote logs
780-
:type remote_logs: str
781+
:param as_conf: config
782+
:type as_conf: as_conf
781783
:param retries: retries
782-
:type default_status: bool
784+
:type retries: int
783785
:return: current job status
784786
:rtype: autosubmit.job.job_common.Status
785787
"""
@@ -873,6 +875,7 @@ def check_Alljobs(self, job_list, as_conf, retries=5):
873875
# job.new_status=job_status
874876
if slurm_error:
875877
raise AutosubmitError(e_msg,6000)
878+
876879
def get_jobid_by_jobname(self,job_name,retries=2):
877880
"""
878881
Get job id by job name
@@ -1479,6 +1482,7 @@ def check_absolute_file_exists(self, src):
14791482
return False
14801483
except:
14811484
return False
1485+
14821486
class ParamikoPlatformException(Exception):
14831487
"""
14841488
Exception raised from HPC queues

0 commit comments

Comments
 (0)