Skip to content

Commit e02c85b

Browse files
committed
first commit documenting the wrappers
1 parent 16b8edd commit e02c85b

File tree

1 file changed

+111
-24
lines changed

1 file changed

+111
-24
lines changed

autosubmit/job/job.py

Lines changed: 111 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from collections import OrderedDict
2525
from pathlib import Path
26+
from xmlrpc.client import DateTime
2627

2728
from autosubmit.job import job_utils
2829
import copy
@@ -2721,7 +2722,14 @@ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, nu
27212722
self.is_wrapper = True
27222723

27232724

2724-
def _queuing_reason_cancel(self, reason):
2725+
def _queuing_reason_cancel(self, reason: str) -> bool:
2726+
"""
2727+
Function return True if a job was cancelled for a listed reason.
2728+
:param reason: Reason of a job to be cancelled
2729+
:type reason: str
2730+
:return: True if a job was cancelled for a known reason, False otherwise
2731+
:rtype: bool
2732+
"""
27252733
try:
27262734
if len(reason.split('(', 1)) > 1:
27272735
reason = reason.split('(', 1)[1].split(')')[0]
@@ -2737,7 +2745,13 @@ def _queuing_reason_cancel(self, reason):
27372745
except Exception as e:
27382746
return False
27392747

2740-
def check_status(self, status):
2748+
def check_status(self, status: str) -> None:
2749+
"""
2750+
Update the status of a job, saving its previous status and update the current one, in case of failure
2751+
it'll log all the files that were correctly created.
2752+
:param status: Reason of a job to be cancelled
2753+
:type status: str
2754+
"""
27412755
prev_status = self.status
27422756
self.prev_status = prev_status
27432757
self.status = status
@@ -2786,7 +2800,12 @@ def check_status(self, status):
27862800
if not still_running:
27872801
self.cancel_failed_wrapper_job()
27882802

2789-
def check_inner_jobs_completed(self, jobs):
2803+
def check_inner_jobs_completed(self, jobs: [Job]) -> None:
2804+
"""
2805+
Will get all the jobs that the status are not completed and check if it was completed or not
2806+
:param jobs: Jobs inside the wrapper
2807+
:type jobs: [Job]
2808+
"""
27902809
not_completed_jobs = [
27912810
job for job in jobs if job.status != Status.COMPLETED]
27922811
not_completed_job_names = [job.name for job in not_completed_jobs]
@@ -2811,7 +2830,15 @@ def check_inner_jobs_completed(self, jobs):
28112830
for job in not_completed_jobs:
28122831
self._check_finished_job(job)
28132832

2814-
def _check_inner_jobs_queue(self, prev_status):
2833+
def _check_inner_jobs_queue(self, prev_status :str) -> None:
2834+
"""
2835+
Update previous status of a job and updating the job to a new status.
2836+
If the platform being used is slurm the function will get the status of all the jobs,
2837+
get the parsed queue reason and cancel and fail jobs that has a known reason.
2838+
If job is held by admin or user the job will be held to be executed later.
2839+
:param prev_status: previous status of a job
2840+
:type prev_status: str
2841+
"""
28152842
reason = str()
28162843
if self._platform.type == 'slurm':
28172844
self._platform.send_command(
@@ -2853,7 +2880,14 @@ def _check_inner_jobs_queue(self, prev_status):
28532880
job.hold = self.hold
28542881
job.status = self.status
28552882

2856-
def _check_inner_job_wallclock(self, job):
2883+
def _check_inner_job_wallclock(self, job: Job) -> bool:
2884+
"""
2885+
This will check if the job is running longer than the wallclock was set to be run.
2886+
:param job: The inner job of a job.
2887+
:type job: Job
2888+
:return: True if the job is running longer then wallcloclk, otherwise False.
2889+
:rtype: bool
2890+
"""
28572891
start_time = self.running_jobs_start[job]
28582892
if self._is_over_wallclock(start_time, job.wallclock):
28592893
if job.wrapper_type != "vertical":
@@ -2862,7 +2896,16 @@ def _check_inner_job_wallclock(self, job):
28622896
return True
28632897
return False
28642898

2865-
def _check_running_jobs(self):
2899+
def _check_running_jobs(self) -> None:
2900+
"""
2901+
Get all jobs that are not "COMPLETED" or "FAILED", for each of the jobs still not completed that are still
2902+
running a command will be created and executed to either read the first few lines of the _STAT file created or
2903+
just print the JOB's name if the file don't exist.
2904+
Depending on the output of the file the status of a job will be set to
2905+
RUNNING if not over wallclock
2906+
FAILED if over wallclock and not vertical wrapper
2907+
If after 5 retries no file is created the status of the job is set to FAIL
2908+
"""
28662909
not_finished_jobs_dict = OrderedDict()
28672910
self.inner_jobs_running = list()
28682911
not_finished_jobs = [job for job in self.job_list if job.status not in [
@@ -2877,7 +2920,7 @@ def _check_running_jobs(self):
28772920
not_finished_jobs_names = ' '.join(list(not_finished_jobs_dict.keys()))
28782921
remote_log_dir = self._platform.get_remote_log_dir()
28792922
# PREPARE SCRIPT TO SEND
2880-
# When a inner_job is running? When the job has an _STAT file
2923+
# When an inner_job is running? When the job has an _STAT file
28812924
command = textwrap.dedent("""
28822925
cd {1}
28832926
for job in {0}
@@ -2891,18 +2934,17 @@ def _check_running_jobs(self):
28912934
done
28922935
""").format(str(not_finished_jobs_names), str(remote_log_dir), '\n'.ljust(13))
28932936

2894-
log_dir = os.path.join(
2895-
self._tmp_path, 'LOG_{0}'.format(self.expid))
2896-
multiple_checker_inner_jobs = os.path.join(
2897-
log_dir, "inner_jobs_checker.sh")
2937+
log_dir = Path(str(self._tmp_path) + f'/LOG_{self.expid}')
2938+
multiple_checker_inner_jobs = Path(log_dir / "inner_jobs_checker.sh")
28982939
if not os.stat(log_dir):
28992940
os.mkdir(log_dir)
29002941
os.chmod(log_dir, 0o770)
29012942
open(multiple_checker_inner_jobs, 'w+').write(command)
29022943
os.chmod(multiple_checker_inner_jobs, 0o770)
29032944
if self.platform.name != "local": # already "sent"...
29042945
self._platform.send_file(multiple_checker_inner_jobs, False)
2905-
command = f"cd {self._platform.get_files_path()}; {os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}"
2946+
command = (f"cd {self._platform.get_files_path()}; "
2947+
f"{os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}")
29062948
else:
29072949
command = f"cd {self._platform.get_files_path()}; ./inner_jobs_checker.sh; cd {os.getcwd()}"
29082950
#
@@ -2917,38 +2959,46 @@ def _check_running_jobs(self):
29172959
for line in content[:-1]:
29182960
out = line.split()
29192961
if out:
2920-
jobname = out[0]
2921-
job = not_finished_jobs_dict[jobname]
2962+
job_name = out[0]
2963+
job = not_finished_jobs_dict[job_name]
29222964
if len(out) > 1:
29232965
if job not in self.running_jobs_start:
29242966
start_time = self._check_time(out, 1)
29252967
Log.info("Job {0} started at {1}".format(
2926-
jobname, str(parse_date(start_time))))
2968+
job_name, str(parse_date(start_time))))
29272969
self.running_jobs_start[job] = start_time
29282970
job.new_status = Status.RUNNING
29292971
#job.status = Status.RUNNING
29302972
job.update_status(self.as_config)
29312973
if len(out) == 2:
2932-
Log.info("Job {0} is RUNNING".format(jobname))
2974+
Log.info("Job {0} is RUNNING".format(job_name))
29332975
over_wallclock = self._check_inner_job_wallclock(
29342976
job) # messaged included
29352977
if over_wallclock:
29362978
if job.wrapper_type != "vertical":
29372979
job.status = Status.FAILED
29382980
Log.printlog(
2939-
"Job {0} is FAILED".format(jobname), 6009)
2981+
"Job {0} is FAILED".format(job_name), 6009)
29402982
elif len(out) == 3:
29412983
end_time = self._check_time(out, 2)
29422984
self._check_finished_job(job)
29432985
Log.info("Job {0} finished at {1}".format(
2944-
jobname, str(parse_date(end_time))))
2986+
job_name, str(parse_date(end_time))))
29452987
if content == '':
29462988
sleep(wait)
29472989
retries = retries - 1
29482990
if retries == 0 or over_wallclock:
29492991
self.status = Status.FAILED
29502992

2951-
def _check_finished_job(self, job, failed_file=False):
2993+
def _check_finished_job(self, job :Job, failed_file :bool=False) -> None:
2994+
"""
2995+
Will set the jobs status to failed, unless they're completed, in which,
2996+
the function will change it to complete.
2997+
:param job: The job to have its status updated.
2998+
:type job: Job
2999+
:param failed_file: True if system has created a file for a failed execution
3000+
:type failed_file: bool
3001+
"""
29523002
job.new_status = Status.FAILED
29533003
if not failed_file:
29543004
wait = 2
@@ -2966,7 +3016,13 @@ def _check_finished_job(self, job, failed_file=False):
29663016
job.update_status(self.as_config, failed_file)
29673017
self.running_jobs_start.pop(job, None)
29683018

2969-
def update_failed_jobs(self, check_ready_jobs=False):
3019+
def update_failed_jobs(self, check_ready_jobs :bool=False) -> None:
3020+
"""
3021+
Check all jobs associated, and update their status either to complete or to Failed,
3022+
and if job is still running appends it to they inner jobs of the wrapper.
3023+
:param check_ready_jobs: if true check for running jobs with status "READY", "SUBMITTED", "QUEUING"
3024+
:type check_ready_jobs: bool
3025+
"""
29703026
running_jobs = self.inner_jobs_running
29713027
real_running = copy.deepcopy(self.inner_jobs_running)
29723028
if check_ready_jobs:
@@ -2980,7 +3036,12 @@ def update_failed_jobs(self, check_ready_jobs=False):
29803036
if job in real_running:
29813037
self.inner_jobs_running.append(job)
29823038

2983-
def cancel_failed_wrapper_job(self):
3039+
def cancel_failed_wrapper_job(self) -> None:
3040+
"""
3041+
When a wrapper is cancelled or run into some problem all its jobs are cancelled,
3042+
if there are jobs on the list that are not Running, and is not Completed, or Failed set it as WAITING,
3043+
if not on these status and it is a vertical wrapper it will set the fail_count to the number of retrials.
3044+
"""
29843045
try:
29853046
if self.platform_name == "local":
29863047
# Check if the job is still running to avoid a misleading message in the logs
@@ -3004,7 +3065,17 @@ def cancel_failed_wrapper_job(self):
30043065
job.fail_count = job.retrials
30053066

30063067

3007-
def _is_over_wallclock(self, start_time, wallclock):
3068+
def _is_over_wallclock(self, start_time: str, wallclock: str) -> bool:
3069+
"""
3070+
This calculates if the job is over its wallclock time,
3071+
which indicates that a jobs is running for too long
3072+
:param start_time: When a job started to execute
3073+
:type start_time: str
3074+
:param wallclock: Time limit a job should run
3075+
:type wallclock: str
3076+
:return: If start_time is bigger than wallclock return True, otherwise False
3077+
:rtype: bool
3078+
"""
30083079
elapsed = datetime.datetime.now() - parse_date(start_time)
30093080
wallclock = datetime.datetime.strptime(wallclock, '%H:%M')
30103081
total = 0.0
@@ -3025,12 +3096,28 @@ def _is_over_wallclock(self, start_time, wallclock):
30253096
return True
30263097
return False
30273098

3028-
def _parse_timestamp(self, timestamp):
3099+
def _parse_timestamp(self, timestamp: int) -> datetime:
3100+
"""
3101+
Parse a date from int to datetime.
3102+
:param timestamp: time to be converted
3103+
:type timestamp: int
3104+
:return: return time converted from int to datetime
3105+
:rtype: datetime
3106+
"""
30293107
value = datetime.datetime.fromtimestamp(timestamp)
30303108
time = value.strftime('%Y-%m-%d %H:%M:%S')
30313109
return time
30323110

3033-
def _check_time(self, output, index):
3111+
def _check_time(self, output: [str], index: int) -> datetime:
3112+
"""
3113+
Generate the starting time of a job found by a generated command
3114+
:param output: The output of a CMD command executed
3115+
:type output: [str]
3116+
:param index: line in which the "output" should be pointed at to get the time
3117+
:type index: int
3118+
:return: Time in which a job started
3119+
:rtype: datetime
3120+
"""
30343121
time = int(output[index])
30353122
time = self._parse_timestamp(time)
30363123
return time

0 commit comments

Comments
 (0)