Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class Job(object):
'_platform', '_queue', '_partition', 'retry_delay', '_section',
'_wallclock', 'wchunkinc', '_tasks', '_nodes',
'_threads', '_processors', '_memory', '_memory_per_task', '_chunk',
'_chunk_length', '_chunk_unit',
'_member', 'date', 'date_split', '_splits', '_split', '_delay',
'_frequency', '_synchronize', 'skippable', 'repacked', '_long_name',
'date_format', 'type', '_name',
Expand Down Expand Up @@ -217,6 +218,8 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
self._memory = None
self._memory_per_task = None
self._chunk = None
self._chunk_length = None
self._chunk_unit = None
self._member = None
self.date = None
self.date_split = None
Expand Down Expand Up @@ -508,6 +511,26 @@ def chunk(self):
def chunk(self, value):
self._chunk = value

@property
@autosubmit_parameter(name='chunk_length')
def chunk_length(self):
"""Length of the chunk."""
return self._chunk_length

@chunk_length.setter
def chunk_length(self, value):
self._chunk_length = value

@property
@autosubmit_parameter(name='chunk_unit')
def chunk_unit(self):
"""Unit of the chunk."""
return self._chunk_unit

@chunk_unit.setter
def chunk_unit(self, value):
self._chunk_unit = value

@property
@autosubmit_parameter(name='split')
def split(self):
Expand Down Expand Up @@ -2113,14 +2136,14 @@ def calendar_chunk(self, parameters):

parameters['CHUNK'] = chunk
total_chunk = int(parameters.get('EXPERIMENT.NUMCHUNKS', 1))
chunk_length = int(parameters.get('EXPERIMENT.CHUNKSIZE', 1))
chunk_unit = str(parameters.get('EXPERIMENT.CHUNKSIZEUNIT', "day")).lower()
self.chunk_length = int(parameters.get('EXPERIMENT.CHUNKSIZE', 1))
self.chunk_unit = str(parameters.get('EXPERIMENT.CHUNKSIZEUNIT', "day")).lower()
cal = str(parameters.get('EXPERIMENT.CALENDAR', "")).lower()
chunk_start = chunk_start_date(
self.date, chunk, chunk_length, chunk_unit, cal)
self.date, chunk, self.chunk_length, self.chunk_unit, cal)
chunk_end = chunk_end_date(
chunk_start, chunk_length, chunk_unit, cal)
if chunk_unit == 'hour':
chunk_start, self.chunk_length, self.chunk_unit, cal)
if self.chunk_unit == 'hour':
chunk_end_1 = chunk_end
else:
chunk_end_1 = previous_day(chunk_end, cal)
Expand Down Expand Up @@ -2771,7 +2794,6 @@ def recover_last_ready_date(self) -> None:
self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S')
Log.debug(f"Failed to recover ready date for the job {self.name}")


class WrapperJob(Job):
"""
Defines a wrapper from a package.
Expand Down
17 changes: 17 additions & 0 deletions autosubmit/notifications/mail_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,23 @@ def notify_status_change(

self._send_message(mail_to, self.config.MAIL_FROM, message)

def notify_custom_alert(self, subject: str, mail_to: list[str], message: str) -> None:
"""
Send a custom alert email.
Args:
subject (str): The subject of the email.
message (str): The body of the email.
"""
_check_mail_address(mail_to)

message = MIMEText(message)
message['From'] = email.utils.formataddr(
('Autosubmit', self.config.MAIL_FROM))
message['Subject'] = subject
message['Date'] = email.utils.formatdate(localtime=True)

self._send_message(mail_to, self.config.MAIL_FROM, message)

def _send_message(self, mail_to: list[str], mail_from: str, message) -> None:
formatted_addresses = [
email.utils.formataddr(
Expand Down
Empty file.
116 changes: 116 additions & 0 deletions autosubmit/performance/base_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2015-2025 Earth Sciences Department, BSC-CNS
#
# This file is part of Autosubmit.
#
# Autosubmit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Autosubmit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Autosubmit. If not, see <http://www.gnu.org/licenses/>.

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional, NamedTuple
from autosubmit.notifications.mail_notifier import MailNotifier

if TYPE_CHECKING:
from autosubmit.job.job import Job


class PerformanceMetricInfo(NamedTuple):
"""
Class to hold information about a performance metric.

Attributes:
metric (str): The name of the metric.
under_threshold (bool): Whether the metric is under the threshold.
value (float): The current value of the metric.
threshold (float): The threshold value for the metric.
under_performance (Optional[float]): The percentage of underperformance, if applicable.
"""

metric: str
under_threshold: bool
value: float
threshold: float
under_performance: Optional[float] = None


class BasePerformance(ABC):
"""Base class for performance metrics calculation"""


def __init__(self, performance_config: dict[str, any], mail_notifier: MailNotifier):
"""
Initialize the BasePerformance class.

:param performance_config: Performance configuration containing performance settings.
:type performance_config: dict[str, any]
"""
self._performance_config = performance_config
self._mail_notifier = mail_notifier

@abstractmethod
def compute_and_check_performance_metrics(self, job: "Job") -> list[PerformanceMetricInfo]:
"""
Compute performance metrics for a job.

:param job: Job instance containing the necessary attributes.
:type job: Job

:return: A list of PerformanceMetricInfo instances containing metric details.
:rtype: list[PerformanceMetricInfo]
"""
pass

# Build mail message for the metrics

@staticmethod
def _template_metric_message(metric_info: PerformanceMetricInfo, job: "Job") -> str:
"""
Generate a message template for the performance metric.

:param metric_info: PerformanceMetricInfo instance containing metric details.
:type metric_info: PerformanceMetricInfo

:param job: Job instance containing the necessary attributes.
:type job: Job

:return: A formatted message string.
:rtype: str
"""
return f"""
🧪 Experiment ID: {job.expid}
🎯 Job Name: {job.name}

Metric: {metric_info.metric}
📉 Current Value: {metric_info.value:.4f}
🎚️ Expected Threshold: {metric_info.threshold}

🔍 Performance is {metric_info.under_performance:.1f}% below expected threshold.
"""

# Get Autosubmit configuration

def _get_mail_recipients(self) -> list[str]:
"""
Get the email recipients for performance notifications from the Autosubmit configuration.

:return: A list of email addresses to notify.
:rtype: list[str]
"""

notify_to = self._performance_config.get("NOTIFY_TO", [])

if not notify_to:
raise ValueError(
"No email recipients configured for performance notifications."
)

return notify_to
106 changes: 106 additions & 0 deletions autosubmit/performance/factory_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2015-2025 Earth Sciences Department, BSC-CNS
#
# This file is part of Autosubmit.
#
# Autosubmit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Autosubmit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Autosubmit. If not, see <http://www.gnu.org/licenses/>.

from typing import TYPE_CHECKING
from autosubmit.job.job_common import Status
from autosubmit.performance.base_performance import BasePerformance
from autosubmit.performance.utils import UtilsPerformance

if TYPE_CHECKING:
from autosubmit.job.job import Job


class PerformanceFactory:

def create_performance(self, job: "Job", performance_config: dict[str, any]) -> BasePerformance:
"""
Factory method to create a performance metric calculator for a job.

:param job: Job instance containing the necessary attributes.
:type job: Job

:param performance_config: Autosubmit configuration containing performance settings.
:type performance_config: dict[str, any]

:return: An instance of a class derived from BasePerformance.
:rtype: BasePerformance
"""

if not performance_config:
raise ValueError(
"Performance configuration is not set."
)

project = performance_config.get("PROJECT", {})

if not project:
raise ValueError("Project configuration is not set in the performance configuration.")

sections = performance_config.get("SECTION", [])

if not sections:
raise ValueError("Sections configuration is not set in the performance configuration.")

notify_on = performance_config.get("NOTIFY_ON", [])

if not notify_on:
raise ValueError("Notify on configuration is not set in the performance configuration.")

job_in_sections = job.section in sections if sections else False

job_status_string = Status.VALUE_TO_KEY.get(job.status, 'UNKNOWN')
job_in_status = job_status_string in notify_on if notify_on else False

if not job_in_sections:
return None

if not job_in_status:
return None

return self._create_performance_by_type(f"{job.section}_{project}", performance_config)

@staticmethod
def _create_performance_by_type(job_type: str, performance_config: dict[str, any]) -> BasePerformance:
"""
Create a performance calculator based on the job type.

:param job_type: Type of the job, e.g., "SIM_DESTINE" or "SIM_DEFAULT".
:type job_type: str

:param performance_config: Autosubmit configuration containing performance settings.
:type performance_config: dict[str, any]

:return: An instance of a class derived from BasePerformance specific to the job type.
:rtype: BasePerformance
"""

if job_type == "SIM_DESTINE":
from autosubmit.performance.type_job.SIM.project.SIM_DestinE import (
SIMDestinEPerformance,
)

return SIMDestinEPerformance(performance_config, UtilsPerformance.get_mail_notifier())
elif job_type == "SIM_DEFAULT":
from autosubmit.performance.type_job.SIM.SIM_performance import (
SIMPerformance,
)

return SIMPerformance(performance_config, UtilsPerformance.get_mail_notifier())

raise ValueError(
f"Unsupported job type: {job_type}. Cannot create performance calculator."
)
Loading
Loading