Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 107 additions & 47 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3231,8 +3231,8 @@ def recovery(
expid, job_list.get_job_list(), os.path.join(exp_path, "/tmp/LOG_", expid),
output_format=output_type, packages=packages, show=not hide, groups=groups_dict,
job_list_object=job_list)
except Exception as e:
Log.warning(f"An error has occurred while plotting the jobs list after recovery. "
except Exception:
Log.warning("An error has occurred while plotting the jobs list after recovery. "
"Check if you have X11 redirection and an img viewer correctly set. Trace: {str(e)}")
try:
if detail:
Expand Down Expand Up @@ -5258,6 +5258,99 @@ def _apply_ftc(job_list: JobList, filter_type_chunk_split: str) -> list[Job]:
final_list.append(job)
return final_list

@staticmethod
def select_jobs_by_chunks(job_list: "JobList",
filter_chunks: str) -> list[Job]:
"""Select jobs from *job_list* according to *filter_chunks* specification.

Format is:
- "ANY" to select all jobs with chunks in all sections
- "ANY,ANY" to select all jobs with chunks in all sections
- "ANY",SECTION1,SECTION2" to select all jobs with chunks in SECTION1 and SECTION2
- "DATE1,[MEMBER1|MEMBER2|ANY],[CHUNK1|CHUNK2|...],DATE2,[MEMBER1|MEMBER2|ANY],[CHUNK1|CHUNK2|...],...,SECTION1,SECTION2"

:param job_list: JobList object
:param filter_chunks: filter chunks
:return: list of jobs matching the filter
"""

def _setup_prune(json_filter: dict[str, Any]) -> tuple[list[str], list[str], int]:
"""Setup dates, members and chunks from json_filter.

:param json_filter: json filter with dates, members and chunks
:return: tuple of (dates, members, chunks)
"""
dates = []
members = []
chunks = 0
for d_json in json_filter['sds']:
if "ANY" == str(d_json['sd']).upper():
return [], [], len(job_list._chunk_list)
dates.append(d_json['sd'])
for m_json in d_json['ms']:
members.append(m_json['m'])
if str(m_json['cs'][0]).upper() == "ANY":
chunks = len(job_list._chunk_list)
else:
chunks = max(len(m_json['cs']), chunks)
chunks = min(chunks, len(job_list._chunk_list))
return dates, members, chunks

def _prune_jobs(jobs: list[Job], dates: list[str], members: list[str], chunks: int) -> list[Job]:
"""Return jobs from *jobs* that match the given members, dates and chunk limits.

:param jobs: list of jobs to prune
:param dates: list of dates to match
:param members: list of members to match
:param chunks: maximum chunk number to match
:return: pruned list of jobs
"""
if "ANY" in dates:
return jobs
else:
return [j for j in jobs if
(
(j.date and date2str(j.date, "D") in dates)
and (j.member and (("ANY" in members) or (j.member.upper() in members)))
and (not j.synchronize or (j.chunk and j.chunk <= chunks))
)
]

if not filter_chunks or not isinstance(filter_chunks, str):
return []

final_list = []
filter_chunks = filter_chunks.upper()
matching_jobs = job_list.get_job_list()
if "," in filter_chunks:
splitted_filters = filter_chunks.split(",")
fc = splitted_filters[0]
sections = splitted_filters[1:]
sections = [sect.strip(" ,") for sect in sections]
if "ANY" not in sections:
matching_jobs = [job for job in matching_jobs if job.section in sections]
else:
fc = filter_chunks

if str(fc).upper() != "ANY":
data = json.loads(Autosubmit._create_json(fc))

# Prune jobs by selected dates, members, chunks
filtered_dates, filtered_members, filtered_chunks = _setup_prune(data)
matching_jobs = _prune_jobs(matching_jobs, filtered_dates, filtered_members, filtered_chunks)

# Now, build final list according to the structure in data
for date_json in data['sds']:
date = date_json['sd']
jobs_of_this_date = [j for j in matching_jobs if date2str(j.date, "D") == date]
for member_json in date_json['ms']:
member = member_json['m']
jobs_of_this_member = [j for j in jobs_of_this_date if j.member.upper() == member]
chunks_of_this_member = len(member_json['cs']) if "ANY" != str(member_json['cs'][0]).upper() else len(job_list._chunk_list)
final_list.extend([job for job in jobs_of_this_member if job.chunk <= chunks_of_this_member or job.synchronize])

return final_list

@staticmethod
def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: str, filter_chunks: str,
filter_status: str, filter_section: str, filter_type_chunk: str, filter_type_chunk_split: str,
Expand Down Expand Up @@ -5343,8 +5436,8 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
Autosubmit._validate_set_status_filters(as_conf, job_list, filter_list, filter_chunks, filter_status,
filter_section, filter_type_chunk, filter_type_chunk_split)
#### Starts the filtering process ####
Log.info("Filtering jobs...")
final_list = []
jobs_filtered = []
final_status = Autosubmit._get_status(final)
# I have the impression that whoever did this function thought about the possibility of having multiple filters at the same time
# But, as it was, it is not possible to have multiple filters at the same time due to the way the code is written
Expand All @@ -5359,49 +5452,13 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
if job.section == section:
final_list.append(job)
if filter_chunks:
ft = filter_chunks.split(",")[1:]
# Any located in section part
if str(ft).upper() == "ANY":
for job in job_list.get_job_list():
final_list.append(job)
for job in job_list.get_job_list():
if job.section == section:
if filter_chunks:
jobs_filtered.append(job)
if len(jobs_filtered) == 0:
jobs_filtered = job_list.get_job_list()
fc = filter_chunks
# Any located in chunks part
if str(fc).upper() == "ANY":
for job in jobs_filtered:
final_list.append(job)
else:
data = json.loads(Autosubmit._create_json(fc))
for date_json in data['sds']:
date = date_json['sd']
if len(str(date)) < 9:
format_ = "D"
elif len(str(date)) < 11:
format_ = "H"
elif len(str(date)) < 13:
format_ = "M"
elif len(str(date)) < 15:
format_ = "S"
else:
format_ = "D"
jobs_date = [j for j in jobs_filtered if date2str(
j.date, format_) == date]

for member_json in date_json['ms']:
member = member_json['m']
jobs_member = [j for j in jobs_date if j.member == member]
start = time.time()
Log.debug(f"Filtering jobs with chunks {filter_chunks}")
# The extend is because the code was thought to have multiple filters at the same time
final_list.extend(Autosubmit.select_jobs_by_chunks(job_list, filter_chunks))
final_list = list(set(final_list))
Log.info(f"Chunk filtering took {time.time() - start:.2f} seconds.")

for chunk_json in member_json['cs']:
chunk = int(chunk_json)
for job in [j for j in jobs_date if j.chunk == chunk and j.synchronize is not None]:
final_list.append(job)
for job in [j for j in jobs_member if j.chunk == chunk]:
final_list.append(job)
if filter_status:
status_list = filter_status.split()
Log.debug(f"Filtering jobs with status {filter_status}")
Expand Down Expand Up @@ -5439,6 +5496,7 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
# Time to change status
final_list = list(set(final_list))
performed_changes = {}
Log.info(f"The selected number of jobs to change is: {len(final_list)}")
for job in final_list:
if final_status in [Status.WAITING, Status.PREPARED, Status.DELAYED, Status.READY]:
job.fail_count = 0
Expand Down Expand Up @@ -5466,15 +5524,17 @@ def set_status(expid: str, noplot: bool, save: bool, final: str, filter_list: st
status_change=performed_changes))
else:
Log.warning("No changes were performed.")

Log.info(f"Updating JobList for experiment {expid}...")
job_list.update_list(as_conf, False, True)

start = time.time()
if save and wrongExpid == 0:
for job in final_list:
job.update_parameters(as_conf, set_attributes=True, reset_logs=True)

job_list.recover_last_data()
job_list.save()
end = time.time()
Log.info(f"JobList saved in {end - start:.2f} seconds.")
exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR,
historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
exp_history.initialize_database()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from autosubmit.history.database_managers.database_manager import (
DatabaseManager,
)
from autosubmit.log.log import AutosubmitCritical

CURRENT_DB_VERSION = 19 # Update this if you change the database schema
DB_EXPERIMENT_HEADER_SCHEMA_CHANGES = 14
Expand Down
Loading