From 10c0afa874f0273a49c268bd31afaa484415d806 Mon Sep 17 00:00:00 2001 From: Diego Marvid Date: Thu, 4 Apr 2024 14:56:02 -0300 Subject: [PATCH] change saving method to save fitted encoders --- pipeline_lib/core/data_container.py | 27 ++++++++++++- pipeline_lib/core/pipeline.py | 40 +++++++++---------- pipeline_lib/core/steps/encode.py | 31 +++++++++++--- .../core/steps/explainer_dashboard.py | 2 +- pipeline_lib/core/steps/predict.py | 16 +------- 5 files changed, 74 insertions(+), 42 deletions(-) diff --git a/pipeline_lib/core/data_container.py b/pipeline_lib/core/data_container.py index 3b45ca5..c66accf 100644 --- a/pipeline_lib/core/data_container.py +++ b/pipeline_lib/core/data_container.py @@ -10,6 +10,7 @@ import pandas as pd import yaml +from sklearn.compose import ColumnTransformer from pipeline_lib.core.model import Model @@ -166,7 +167,7 @@ def save(self, file_path: str, keys: Optional[Union[str, list[str]]] = None): if isinstance(keys, str): keys = [keys] - data_to_save = {k: self.data[k] for k in keys} if keys else self.data + data_to_save = {k: self.data.get(k) for k in keys} if keys else self.data serialized_data = pickle.dumps(data_to_save) data_size_bytes = sys.getsizeof(serialized_data) @@ -619,6 +620,30 @@ def is_train(self, value: bool): """ self["is_train"] = value + @property + def _encoder(self) -> ColumnTransformer: + """ + Get the encoder from the DataContainer. + + Returns + ------- + ColumnTransformer + The encoder stored in the DataContainer. + """ + return self["encoder"] + + @_encoder.setter + def _encoder(self, value: ColumnTransformer): + """ + Set the encoder in the DataContainer. + + Parameters + ---------- + value + The encoder to be stored in the DataContainer. + """ + self["encoder"] = value + def __eq__(self, other) -> bool: """ Compare this DataContainer with another for equality. diff --git a/pipeline_lib/core/pipeline.py b/pipeline_lib/core/pipeline.py index 23809f4..5158e54 100644 --- a/pipeline_lib/core/pipeline.py +++ b/pipeline_lib/core/pipeline.py @@ -20,13 +20,13 @@ class Pipeline: step_registry = StepRegistry() model_registry = ModelRegistry() + KEYS_TO_SAVE = ["model", "encoder", "_drop_columns", "target"] + def __init__(self, initial_data: Optional[DataContainer] = None): self.steps = [] self.initial_data = initial_data - self.save_path = None - self.load_path = None - self.model_path = None self.config = None + self.save_data_path = None def add_steps(self, steps: list[PipelineStep]): """Add steps to the pipeline.""" @@ -35,22 +35,32 @@ def add_steps(self, steps: list[PipelineStep]): def run(self, is_train: bool, save: bool = True) -> DataContainer: """Run the pipeline on the given data.""" - data = DataContainer.from_pickle(self.load_path) if self.load_path else DataContainer() - data.is_train = is_train + if not self.save_data_path: + raise ValueError( + "A path for saving the data must be provided. Use the `save_data_path` attribute." + ) + + data = DataContainer() if is_train: steps_to_run = [step for step in self.steps if step.used_for_training] self.logger.info("Training the pipeline") else: + data = DataContainer.from_pickle(self.save_data_path) steps_to_run = [step for step in self.steps if step.used_for_prediction] self.logger.info("Predicting with the pipeline") + data.is_train = is_train + for i, step in enumerate(steps_to_run): Pipeline.logger.info( f"Running {step.__class__.__name__} - {i + 1} / {len(steps_to_run)}" ) data = step.execute(data) + if is_train: + data.save(self.save_data_path, keys=self.KEYS_TO_SAVE) + if save: self.save_run(data) @@ -78,17 +88,16 @@ def from_json(cls, path: str) -> Pipeline: if custom_steps_path: cls.step_registry.load_and_register_custom_steps(custom_steps_path) + save_data_path = config["pipeline"].get("save_data_path") + pipeline = Pipeline() - pipeline.load_path = config.get("load_path") - pipeline.save_path = config.get("save_path") pipeline.config = config + pipeline.save_data_path = save_data_path - steps = [] + print(f"Saved data path: {save_data_path}") - model_path = None - drop_columns = None - target = None + steps = [] for step_config in config["pipeline"]["steps"]: step_type = step_config["step_type"] @@ -103,15 +112,6 @@ def from_json(cls, path: str) -> Pipeline: model_class_name = parameters.pop("model_class") model_class = cls.model_registry.get_model_class(model_class_name) parameters["model_class"] = model_class - model_path = parameters.get("save_path") - drop_columns = parameters.get("drop_columns") - target = parameters.get("target") - - # if step type is prediction, add model path - if step_type == "PredictStep": - parameters["load_path"] = model_path - parameters["drop_columns"] = drop_columns - parameters["target"] = target step_class = cls.step_registry.get_step_class(step_type) step = step_class(**parameters) diff --git a/pipeline_lib/core/steps/encode.py b/pipeline_lib/core/steps/encode.py index f602c84..99055a7 100644 --- a/pipeline_lib/core/steps/encode.py +++ b/pipeline_lib/core/steps/encode.py @@ -36,6 +36,10 @@ def execute(self, data: DataContainer) -> DataContainer: """Execute the encoding step.""" self.logger.info("Encoding data") df = data.flow + + if not data.target and not self.target: + raise ValueError("Target column not found in any parameter before encoding.") + target_column_name = self.target or data.target target_original_dtype = None @@ -49,11 +53,21 @@ def execute(self, data: DataContainer) -> DataContainer: if pd.api.types.is_numeric_dtype(df[target_column_name]): target_original_dtype = df[target_column_name].dtype - column_transformer = self._create_column_transformer( - high_cardinality_features, low_cardinality_features - ) + if data.is_train: + column_transformer = self._create_column_transformer( + high_cardinality_features, low_cardinality_features + ) + # Save the encoder for prediction + data._encoder = column_transformer + else: + column_transformer = data._encoder - encoded_data = self._transform_data(df, target_column_name, column_transformer) + encoded_data = self._transform_data( + df, + target_column_name, + column_transformer, + data.is_train, + ) encoded_data = self._restore_column_order(df, encoded_data) encoded_data = self._convert_ordinal_encoded_columns_to_int(encoded_data) encoded_data = self._restore_numeric_dtypes(encoded_data, original_numeric_dtypes) @@ -157,10 +171,15 @@ def _create_column_transformer( ) def _transform_data( - self, df: pd.DataFrame, target_column_name: str, column_transformer: ColumnTransformer + self, + df: pd.DataFrame, + target_column_name: str, + column_transformer: ColumnTransformer, + is_train: bool, ) -> pd.DataFrame: """Transform the data using the ColumnTransformer.""" - column_transformer.fit(df, df[target_column_name]) + if is_train: + column_transformer.fit(df, df[target_column_name]) transformed_data = column_transformer.transform(df) self.logger.debug(f"Transformed data shape: {transformed_data.shape}") return pd.DataFrame(transformed_data, columns=column_transformer.get_feature_names_out()) diff --git a/pipeline_lib/core/steps/explainer_dashboard.py b/pipeline_lib/core/steps/explainer_dashboard.py index 042c104..8dd3b92 100644 --- a/pipeline_lib/core/steps/explainer_dashboard.py +++ b/pipeline_lib/core/steps/explainer_dashboard.py @@ -39,7 +39,7 @@ def execute(self, data: DataContainer) -> DataContainer: self.logger.info(f"Sampling {self.max_samples} data points from the dataset.") df = df.sample(n=self.max_samples, random_state=42) - drop_columns = data._drop_columns + drop_columns = data._drop_columns + ["predictions"] if drop_columns: df = df.drop(columns=drop_columns) diff --git a/pipeline_lib/core/steps/predict.py b/pipeline_lib/core/steps/predict.py index 98d1ed3..a2f19ef 100644 --- a/pipeline_lib/core/steps/predict.py +++ b/pipeline_lib/core/steps/predict.py @@ -1,7 +1,4 @@ -from typing import List, Optional - from pipeline_lib.core import DataContainer -from pipeline_lib.core.model import Model from pipeline_lib.core.steps.base import PipelineStep @@ -13,23 +10,16 @@ class PredictStep(PipelineStep): def __init__( self, - load_path: str, - target: str, - drop_columns: Optional[List[str]] = None, ) -> None: """Initialize Predict Step.""" super().__init__() self.init_logger() - self.load_path = load_path - self.model = Model.from_file(load_path) - self.target = target - self.drop_columns = drop_columns or [] def execute(self, data: DataContainer) -> DataContainer: """Execute the step.""" self.logger.info("Obtaining predictions") - drop_columns = self.drop_columns + [self.target] + drop_columns = data._drop_columns + [data.target] missing_columns = [col for col in drop_columns if col not in data.flow.columns] if missing_columns: @@ -39,10 +29,8 @@ def execute(self, data: DataContainer) -> DataContainer: self.logger.warning(error_message) raise KeyError(error_message) - data.predictions = self.model.predict(data.flow.drop(columns=drop_columns)) + data.predictions = data.model.predict(data.flow.drop(columns=drop_columns)) data.flow["predictions"] = data.predictions - data.target = self.target - return data