Skip to content

Commit 0a2b9e8

Browse files
committed
Fixing performance issues
1 parent 906bb4c commit 0a2b9e8

File tree

1 file changed

+89
-44
lines changed

1 file changed

+89
-44
lines changed

autosubmit/autosubmit.py

Lines changed: 89 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5258,6 +5258,83 @@ def _apply_ftc(job_list: JobList, filter_type_chunk_split: str) -> list[Job]:
52585258
final_list.append(job)
52595259
return final_list
52605260

5261+
@staticmethod
5262+
def select_jobs_by_chunks(job_list: "JobList",
5263+
filter_chunks: str) -> list[Job]:
5264+
"""Select jobs from *job_list* according to *filter_chunks* specification.
5265+
5266+
Format is:
5267+
- "ANY" to select all jobs with chunks in all sections
5268+
- "ANY,ANY" to select all jobs with chunks in all sections
5269+
- "ANY",SECTION1,SECTION2" to select all jobs with chunks in SECTION1 and SECTION2
5270+
- "DATE1,[MEMBER1|MEMBER2|ANY],[CHUNK1|CHUNK2|...],DATE2,[MEMBER1|MEMBER2|ANY],[CHUNK1|CHUNK2|...],...,SECTION1,SECTION2"
5271+
5272+
:param job_list: JobList object
5273+
:param filter_chunks: filter chunks
5274+
"""
5275+
def _prune_jobs(matching_jobs: list[Job], members: list[str], dates: list[str], chunks: int) -> list[Job]:
5276+
"""Return jobs from *matching_jobs* that match the given members, dates and chunk limits.
5277+
5278+
- If \"ANY\" is present in *members*, member and chunk checks are skipped.
5279+
- If \"ANY\" is present in *dates*, date checks are skipped.
5280+
"""
5281+
any_member = "ANY" in members
5282+
any_date = "ANY" in dates
5283+
5284+
def matches(job: Job) -> bool:
5285+
"""Return True if *job* passes member, chunk and date checks."""
5286+
if not any_member:
5287+
if not (getattr(job, "member", None) and job.member.upper() in members):
5288+
return False
5289+
if not (getattr(job, "chunk", None) and job.chunk <= chunks):
5290+
return False
5291+
if not any_date:
5292+
if not (getattr(job, "date", None) and date2str(job.date, "D") in dates):
5293+
return False
5294+
return True
5295+
5296+
return [job for job in matching_jobs if matches(job)]
5297+
5298+
final_list = []
5299+
filter_chunks = filter_chunks.upper()
5300+
sections = filter_chunks.split(",")[1:]
5301+
sections = [sect.strip(" ,") for sect in sections]
5302+
if "ANY" in sections:
5303+
matching_jobs = job_list.get_job_list()
5304+
else:
5305+
matching_jobs = [job for job in job_list.get_job_list() if job.section in sections]
5306+
5307+
fc = filter_chunks
5308+
# Any located in chunks part
5309+
if str(fc).upper() != "ANY":
5310+
data = json.loads(Autosubmit._create_json(fc))
5311+
5312+
# Prune jobs by selected dates, members, chunks
5313+
dates = []
5314+
members = []
5315+
chunks = 0
5316+
for date_json in data['sds']:
5317+
dates.append(date_json['sd'])
5318+
for member_json in date_json['ms']:
5319+
members.append(member_json['m'])
5320+
if str(member_json['cs'][0]).upper() == "ANY":
5321+
chunks = len(job_list._chunk_list)
5322+
else:
5323+
chunks = max(len(member_json['cs']), chunks)
5324+
chunks = min(chunks, len(job_list._chunk_list))
5325+
matching_jobs = _prune_jobs(matching_jobs, members, dates, chunks)
5326+
5327+
for date_json in data['sds']:
5328+
date = date_json['sd']
5329+
jobs_of_this_date = [j for j in matching_jobs if date2str(j.date, "D") == date]
5330+
for member_json in date_json['ms']:
5331+
member = member_json['m']
5332+
jobs_of_this_member = [j for j in jobs_of_this_date if j.member.upper() == member]
5333+
chunks_of_this_member = len(member_json['cs']) if "ANY" != str(member_json['cs'][0]).upper() else len(job_list._chunk_list)
5334+
final_list.extend([job for job in jobs_of_this_member if job.chunk <= chunks_of_this_member or job.synchronize])
5335+
5336+
return final_list
5337+
52615338
@staticmethod
52625339
def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: str, filter_chunks: str,
52635340
filter_status: str, filter_section: str, filter_type_chunk: str, filter_type_chunk_split: str,
@@ -5343,6 +5420,7 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
53435420
Autosubmit._validate_set_status_filters(as_conf, job_list, filter_list, filter_chunks, filter_status,
53445421
filter_section, filter_type_chunk, filter_type_chunk_split)
53455422
#### Starts the filtering process ####
5423+
Log.info(f"Filtering jobs...")
53465424
final_list = []
53475425
jobs_filtered = []
53485426
final_status = Autosubmit._get_status(final)
@@ -5359,49 +5437,13 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
53595437
if job.section == section:
53605438
final_list.append(job)
53615439
if filter_chunks:
5362-
ft = filter_chunks.split(",")[1:]
5363-
# Any located in section part
5364-
if str(ft).upper() == "ANY":
5365-
for job in job_list.get_job_list():
5366-
final_list.append(job)
5367-
for job in job_list.get_job_list():
5368-
if job.section == section:
5369-
if filter_chunks:
5370-
jobs_filtered.append(job)
5371-
if len(jobs_filtered) == 0:
5372-
jobs_filtered = job_list.get_job_list()
5373-
fc = filter_chunks
5374-
# Any located in chunks part
5375-
if str(fc).upper() == "ANY":
5376-
for job in jobs_filtered:
5377-
final_list.append(job)
5378-
else:
5379-
data = json.loads(Autosubmit._create_json(fc))
5380-
for date_json in data['sds']:
5381-
date = date_json['sd']
5382-
if len(str(date)) < 9:
5383-
format_ = "D"
5384-
elif len(str(date)) < 11:
5385-
format_ = "H"
5386-
elif len(str(date)) < 13:
5387-
format_ = "M"
5388-
elif len(str(date)) < 15:
5389-
format_ = "S"
5390-
else:
5391-
format_ = "D"
5392-
jobs_date = [j for j in jobs_filtered if date2str(
5393-
j.date, format_) == date]
5394-
5395-
for member_json in date_json['ms']:
5396-
member = member_json['m']
5397-
jobs_member = [j for j in jobs_date if j.member == member]
5440+
start = time.time()
5441+
Log.debug(f"Filtering jobs with chunks {filter_chunks}")
5442+
# The extend is because the code was thought to have multiple filters at the same time
5443+
final_list.extend(Autosubmit.select_jobs_by_chunks(job_list, filter_chunks))
5444+
final_list = list(set(final_list))
5445+
Log.info(f"Chunk filtering took {time.time() - start:.2f} seconds.")
53985446

5399-
for chunk_json in member_json['cs']:
5400-
chunk = int(chunk_json)
5401-
for job in [j for j in jobs_date if j.chunk == chunk and j.synchronize is not None]:
5402-
final_list.append(job)
5403-
for job in [j for j in jobs_member if j.chunk == chunk]:
5404-
final_list.append(job)
54055447
if filter_status:
54065448
status_list = filter_status.split()
54075449
Log.debug(f"Filtering jobs with status {filter_status}")
@@ -5439,6 +5481,7 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
54395481
# Time to change status
54405482
final_list = list(set(final_list))
54415483
performed_changes = {}
5484+
Log.info(f"The selected number of jobs to change is: {len(final_list)}")
54425485
for job in final_list:
54435486
if final_status in [Status.WAITING, Status.PREPARED, Status.DELAYED, Status.READY]:
54445487
job.fail_count = 0
@@ -5466,15 +5509,17 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
54665509
status_change=performed_changes))
54675510
else:
54685511
Log.warning("No changes were performed.")
5469-
5512+
Log.info(f"Updating JobList for experiment {expid}...")
54705513
job_list.update_list(as_conf, False, True)
5471-
5514+
start = time.time()
54725515
if save and wrongExpid == 0:
54735516
for job in final_list:
54745517
job.update_parameters(as_conf, set_attributes=True, reset_logs=True)
54755518

54765519
job_list.recover_last_data()
54775520
job_list.save()
5521+
end = time.time()
5522+
Log.info(f"JobList saved in {end - start:.2f} seconds.")
54785523
exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR,
54795524
historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
54805525
exp_history.initialize_database()

0 commit comments

Comments
 (0)