From bc9a4dfdc34922aefa823016074ca715d0fff703 Mon Sep 17 00:00:00 2001 From: Diego Marvid Date: Mon, 25 Mar 2024 15:11:50 -0300 Subject: [PATCH] improve tabular split and add calc train metrics --- pipeline_lib/core/data_container.py | 84 ++++--------------- pipeline_lib/core/model.py | 19 +++++ pipeline_lib/core/pipeline.py | 10 +-- .../core/steps/calculate_train_metrics.py | 83 ++++++++++++++++++ pipeline_lib/core/steps/fit_model.py | 5 +- pipeline_lib/core/steps/predict.py | 5 +- pipeline_lib/core/steps/tabular_split.py | 64 ++++++++++++-- 7 files changed, 185 insertions(+), 85 deletions(-) create mode 100644 pipeline_lib/core/steps/calculate_train_metrics.py diff --git a/pipeline_lib/core/data_container.py b/pipeline_lib/core/data_container.py index f0fb134..92ab07a 100644 --- a/pipeline_lib/core/data_container.py +++ b/pipeline_lib/core/data_container.py @@ -377,76 +377,52 @@ def validation(self, value: Any): self["validation"] = value @property - def model(self) -> Any: - """ - Get the model from the DataContainer. - - Returns - ------- - Any - The model stored in the DataContainer. - """ - return self["model"] - - @model.setter - def model(self, value: Any): - """ - Set the model in the DataContainer. - - Parameters - ---------- - value - The model to be stored in the DataContainer. - """ - self["model"] = value - - @property - def model_input(self) -> Any: + def test(self) -> Any: """ - Get the model input from the DataContainer. + Get the test data from the DataContainer. Returns ------- Any - The model input stored in the DataContainer. + The test data stored in the DataContainer. """ - return self["model_input"] + return self["test"] - @model_input.setter - def model_input(self, value: Any): + @test.setter + def test(self, value: Any): """ - Set the model input in the DataContainer. + Set the test data in the DataContainer. Parameters ---------- value - The model input to be stored in the DataContainer. + The test data to be stored in the DataContainer. """ - self["model_input"] = value + self["test"] = value @property - def model_output(self) -> Any: + def model(self) -> Any: """ - Get the model output from the DataContainer. + Get the model from the DataContainer. Returns ------- Any - The model output stored in the DataContainer. + The model stored in the DataContainer. """ - return self["model_output"] + return self["model"] - @model_output.setter - def model_output(self, value: Any): + @model.setter + def model(self, value: Any): """ - Set the model output in the DataContainer. + Set the model in the DataContainer. Parameters ---------- value - The model output to be stored in the DataContainer. + The model to be stored in the DataContainer. """ - self["model_output"] = value + self["model"] = value @property def metrics(self) -> Any: @@ -568,30 +544,6 @@ def target(self, value: Any): """ self["target"] = value - @property - def features(self) -> Any: - """ - Get the features from the DataContainer. - - Returns - ------- - Any - The features stored in the DataContainer. - """ - return self["features"] - - @features.setter - def features(self, value: Any): - """ - Set the features in the DataContainer. - - Parameters - ---------- - value - The features to be stored in the DataContainer. - """ - self["features"] = value - @property def flow(self) -> Any: """ diff --git a/pipeline_lib/core/model.py b/pipeline_lib/core/model.py index 36b50cd..1655395 100644 --- a/pipeline_lib/core/model.py +++ b/pipeline_lib/core/model.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod +from pathlib import Path from typing import List, Optional, Tuple +import joblib import pandas as pd @@ -20,3 +22,20 @@ def fit( @abstractmethod def predict(self, X: pd.DataFrame) -> pd.Series: """Abstract method for making predictions.""" + + def save(self, path: str) -> None: + """Save the model.""" + if not path.endswith(".joblib"): + raise ValueError("The path must end with .joblib") + joblib.dump(self, path) + + @classmethod + def from_file(cls, path: str) -> "Model": + """Load the model from a .joblib file.""" + if not Path(path).exists(): + raise FileNotFoundError(f"File not found: {path}") + + if not path.endswith(".joblib"): + raise ValueError("The path must end with .joblib") + + return joblib.load(path) diff --git a/pipeline_lib/core/pipeline.py b/pipeline_lib/core/pipeline.py index 112293d..2d6bcf8 100644 --- a/pipeline_lib/core/pipeline.py +++ b/pipeline_lib/core/pipeline.py @@ -37,8 +37,10 @@ def run(self, is_train: bool) -> DataContainer: if is_train: steps_to_run = [step for step in self.steps if step.used_for_training] + self.logger.info("Training the pipeline") else: steps_to_run = [step for step in self.steps if step.used_for_prediction] + self.logger.info("Predicting with the pipeline") for i, step in enumerate(steps_to_run): Pipeline.logger.info( @@ -53,17 +55,11 @@ def run(self, is_train: bool) -> DataContainer: def train(self) -> DataContainer: """Run the pipeline on the given data.""" - self.logger.info("Training the pipeline") return self.run(is_train=True) def predict(self) -> DataContainer: """Run the pipeline on the given data.""" - self.logger.info("Predicting with the pipeline") - data = self.run(is_train=False) - data.predictions = data.model.predict(data.flow) - self.logger.info("Predictions:") - self.logger.info(data.predictions) - return data + return self.run(is_train=False) @classmethod def from_json(cls, path: str) -> Pipeline: diff --git a/pipeline_lib/core/steps/calculate_train_metrics.py b/pipeline_lib/core/steps/calculate_train_metrics.py new file mode 100644 index 0000000..7329830 --- /dev/null +++ b/pipeline_lib/core/steps/calculate_train_metrics.py @@ -0,0 +1,83 @@ +import json +import time +from typing import List, Optional + +import numpy as np +import pandas as pd +from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score + +from pipeline_lib.core import DataContainer +from pipeline_lib.core.model import Model +from pipeline_lib.core.steps.base import PipelineStep + + +class CalculateTrainMetricsStep(PipelineStep): + """Calculate metrics.""" + + used_for_prediction = False + used_for_training = True + + def __init__(self) -> None: + """Initialize CalculateMetricsStep.""" + super().__init__() + self.init_logger() + + def _calculate_metrics(self, true_values: pd.Series, predictions: pd.Series) -> dict: + return { + "MAE": str(mean_absolute_error(true_values, predictions)), + "RMSE": str(np.sqrt(mean_squared_error(true_values, predictions))), + "R^2": str(r2_score(true_values, predictions)), + "Mean Error": str(np.mean(true_values - predictions)), + "Max Error": str(np.max(np.abs(true_values - predictions))), + "Median Absolute Error": str(np.median(np.abs(true_values - predictions))), + } + + def _get_predictions( + self, model: Model, df: pd.DataFrame, target: str, drop_columns: Optional[List[str]] = None + ) -> pd.Series: + drop_columns = (drop_columns or []) + [target] + return model.predict(df.drop(columns=drop_columns)) + + def _log_metrics(self, dataset_name: str, metrics: dict) -> None: + self.logger.info(f"Metrics for {dataset_name} dataset:") + for metric, value in metrics.items(): + self.logger.info(f"{metric}: {value}") + + def execute(self, data: DataContainer) -> DataContainer: + self.logger.debug("Starting metric calculation") + + target_column_name = data.target + if target_column_name is None: + raise ValueError("Target column not found on any configuration.") + + metrics = {} + + for dataset_name in ["train", "validation", "test"]: + start_time = time.time() + dataset = getattr(data, dataset_name, None) + + if dataset is None: + self.logger.warning( + f"Dataset '{dataset_name}' not found. Skipping metric calculation." + ) + continue + + predictions = self._get_predictions( + model=data.model, + df=dataset, + target=target_column_name, + drop_columns=data._drop_columns, + ) + metrics[dataset_name] = self._calculate_metrics( + true_values=dataset[target_column_name], + predictions=predictions, + ) + elapsed_time = time.time() - start_time + self.logger.info(f"Elapsed time for {dataset_name} dataset: {elapsed_time:.2f} seconds") + + # pretty print metrics + self.logger.info(f"Metrics: {json.dumps(metrics, indent=4)}") + + data.metrics = metrics + + return data diff --git a/pipeline_lib/core/steps/fit_model.py b/pipeline_lib/core/steps/fit_model.py index 9f2e7a6..2fcde63 100644 --- a/pipeline_lib/core/steps/fit_model.py +++ b/pipeline_lib/core/steps/fit_model.py @@ -1,7 +1,6 @@ from typing import Optional, Type import optuna -from joblib import dump from sklearn.metrics import mean_absolute_error from pipeline_lib.core import DataContainer @@ -70,11 +69,11 @@ def execute(self, data: DataContainer) -> DataContainer: data.model = self.model data.target = self.target - data.model_path = self.save_path + data._drop_columns = self.drop_columns if self.save_path: self.logger.info(f"Saving the model to {self.save_path}") - dump(self.model, self.save_path) + self.model.save(self.save_path) return data diff --git a/pipeline_lib/core/steps/predict.py b/pipeline_lib/core/steps/predict.py index 4025649..98d1ed3 100644 --- a/pipeline_lib/core/steps/predict.py +++ b/pipeline_lib/core/steps/predict.py @@ -1,8 +1,7 @@ from typing import List, Optional -from joblib import load - from pipeline_lib.core import DataContainer +from pipeline_lib.core.model import Model from pipeline_lib.core.steps.base import PipelineStep @@ -22,7 +21,7 @@ def __init__( super().__init__() self.init_logger() self.load_path = load_path - self.model = load(self.load_path) + self.model = Model.from_file(load_path) self.target = target self.drop_columns = drop_columns or [] diff --git a/pipeline_lib/core/steps/tabular_split.py b/pipeline_lib/core/steps/tabular_split.py index f17b811..fc22ef4 100644 --- a/pipeline_lib/core/steps/tabular_split.py +++ b/pipeline_lib/core/steps/tabular_split.py @@ -1,3 +1,5 @@ +from typing import Optional + from sklearn.model_selection import train_test_split from pipeline_lib.core import DataContainer @@ -10,27 +12,71 @@ class TabularSplitStep(PipelineStep): used_for_prediction = False used_for_training = True - def __init__(self, train_percentage: float) -> None: + def __init__( + self, + train_percentage: float, + validation_percentage: Optional[float] = None, + test_percentage: Optional[float] = None, + ) -> None: """Initialize SplitStep.""" self.init_logger() self.train_percentage = train_percentage + self.validation_percentage = validation_percentage + self.test_percentage = test_percentage if self.train_percentage <= 0 or self.train_percentage >= 1: raise ValueError("train_percentage must be between 0 and 1.") + if self.validation_percentage is not None: + if self.validation_percentage <= 0 or self.validation_percentage >= 1: + raise ValueError("validation_percentage must be between 0 and 1.") + if self.test_percentage is None: + if self.train_percentage + self.validation_percentage != 1: + raise ValueError( + "The sum of train_percentage and validation_percentage must equal 1 when" + " test_percentage is not specified." + ) + else: + if self.train_percentage + self.validation_percentage + self.test_percentage != 1: + raise ValueError( + "The sum of train_percentage, validation_percentage, and test_percentage" + " must equal 1." + ) + + if self.test_percentage is not None: + if self.test_percentage <= 0 or self.test_percentage >= 1: + raise ValueError("test_percentage must be between 0 and 1.") + if self.validation_percentage is None: + raise ValueError( + "validation_percentage must be provided when test_percentage is specified." + ) + def execute(self, data: DataContainer) -> DataContainer: - """Execute the random train-validation split.""" + """Execute the random train-validation-test split.""" self.logger.info("Splitting tabular data...") df = data.flow - train_df, validation_df = train_test_split( - df, train_size=self.train_percentage, random_state=42 - ) + if self.test_percentage is not None: + train_val_df, test_df = train_test_split( + df, test_size=self.test_percentage, random_state=42 + ) + train_df, validation_df = train_test_split( + train_val_df, + train_size=self.train_percentage + / (self.train_percentage + self.validation_percentage), + random_state=42, + ) + else: + train_df, validation_df = train_test_split( + df, train_size=self.train_percentage, random_state=42 + ) + test_df = None train_rows = len(train_df) validation_rows = len(validation_df) - total_rows = train_rows + validation_rows + test_rows = len(test_df) if test_df is not None else 0 + total_rows = train_rows + validation_rows + test_rows self.logger.info( f"Number of rows in training set: {train_rows} | {train_rows/total_rows:.2%}" @@ -39,8 +85,14 @@ def execute(self, data: DataContainer) -> DataContainer: f"Number of rows in validation set: {validation_rows} |" f" {validation_rows/total_rows:.2%}" ) + if test_df is not None: + self.logger.info( + f"Number of rows in test set: {test_rows} | {test_rows/total_rows:.2%}" + ) data.train = train_df data.validation = validation_df + if test_df is not None: + data.test = test_df return data