Skip to content

Commit

Permalink
change saving method to save fitted encoders
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Apr 4, 2024
1 parent e284327 commit 10c0afa
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 42 deletions.
27 changes: 26 additions & 1 deletion pipeline_lib/core/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import pandas as pd
import yaml
from sklearn.compose import ColumnTransformer

from pipeline_lib.core.model import Model

Expand Down Expand Up @@ -166,7 +167,7 @@ def save(self, file_path: str, keys: Optional[Union[str, list[str]]] = None):
if isinstance(keys, str):
keys = [keys]

data_to_save = {k: self.data[k] for k in keys} if keys else self.data
data_to_save = {k: self.data.get(k) for k in keys} if keys else self.data

serialized_data = pickle.dumps(data_to_save)
data_size_bytes = sys.getsizeof(serialized_data)
Expand Down Expand Up @@ -619,6 +620,30 @@ def is_train(self, value: bool):
"""
self["is_train"] = value

@property
def _encoder(self) -> ColumnTransformer:
"""
Get the encoder from the DataContainer.
Returns
-------
ColumnTransformer
The encoder stored in the DataContainer.
"""
return self["encoder"]

@_encoder.setter
def _encoder(self, value: ColumnTransformer):
"""
Set the encoder in the DataContainer.
Parameters
----------
value
The encoder to be stored in the DataContainer.
"""
self["encoder"] = value

def __eq__(self, other) -> bool:
"""
Compare this DataContainer with another for equality.
Expand Down
40 changes: 20 additions & 20 deletions pipeline_lib/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ class Pipeline:
step_registry = StepRegistry()
model_registry = ModelRegistry()

KEYS_TO_SAVE = ["model", "encoder", "_drop_columns", "target"]

def __init__(self, initial_data: Optional[DataContainer] = None):
self.steps = []
self.initial_data = initial_data
self.save_path = None
self.load_path = None
self.model_path = None
self.config = None
self.save_data_path = None

def add_steps(self, steps: list[PipelineStep]):
"""Add steps to the pipeline."""
Expand All @@ -35,22 +35,32 @@ def add_steps(self, steps: list[PipelineStep]):
def run(self, is_train: bool, save: bool = True) -> DataContainer:
"""Run the pipeline on the given data."""

data = DataContainer.from_pickle(self.load_path) if self.load_path else DataContainer()
data.is_train = is_train
if not self.save_data_path:
raise ValueError(
"A path for saving the data must be provided. Use the `save_data_path` attribute."
)

data = DataContainer()

if is_train:
steps_to_run = [step for step in self.steps if step.used_for_training]
self.logger.info("Training the pipeline")
else:
data = DataContainer.from_pickle(self.save_data_path)
steps_to_run = [step for step in self.steps if step.used_for_prediction]
self.logger.info("Predicting with the pipeline")

data.is_train = is_train

for i, step in enumerate(steps_to_run):
Pipeline.logger.info(
f"Running {step.__class__.__name__} - {i + 1} / {len(steps_to_run)}"
)
data = step.execute(data)

if is_train:
data.save(self.save_data_path, keys=self.KEYS_TO_SAVE)

if save:
self.save_run(data)

Expand Down Expand Up @@ -78,17 +88,16 @@ def from_json(cls, path: str) -> Pipeline:
if custom_steps_path:
cls.step_registry.load_and_register_custom_steps(custom_steps_path)

save_data_path = config["pipeline"].get("save_data_path")

pipeline = Pipeline()

pipeline.load_path = config.get("load_path")
pipeline.save_path = config.get("save_path")
pipeline.config = config
pipeline.save_data_path = save_data_path

steps = []
print(f"Saved data path: {save_data_path}")

model_path = None
drop_columns = None
target = None
steps = []

for step_config in config["pipeline"]["steps"]:
step_type = step_config["step_type"]
Expand All @@ -103,15 +112,6 @@ def from_json(cls, path: str) -> Pipeline:
model_class_name = parameters.pop("model_class")
model_class = cls.model_registry.get_model_class(model_class_name)
parameters["model_class"] = model_class
model_path = parameters.get("save_path")
drop_columns = parameters.get("drop_columns")
target = parameters.get("target")

# if step type is prediction, add model path
if step_type == "PredictStep":
parameters["load_path"] = model_path
parameters["drop_columns"] = drop_columns
parameters["target"] = target

step_class = cls.step_registry.get_step_class(step_type)
step = step_class(**parameters)
Expand Down
31 changes: 25 additions & 6 deletions pipeline_lib/core/steps/encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def execute(self, data: DataContainer) -> DataContainer:
"""Execute the encoding step."""
self.logger.info("Encoding data")
df = data.flow

if not data.target and not self.target:
raise ValueError("Target column not found in any parameter before encoding.")

target_column_name = self.target or data.target
target_original_dtype = None

Expand All @@ -49,11 +53,21 @@ def execute(self, data: DataContainer) -> DataContainer:
if pd.api.types.is_numeric_dtype(df[target_column_name]):
target_original_dtype = df[target_column_name].dtype

column_transformer = self._create_column_transformer(
high_cardinality_features, low_cardinality_features
)
if data.is_train:
column_transformer = self._create_column_transformer(
high_cardinality_features, low_cardinality_features
)
# Save the encoder for prediction
data._encoder = column_transformer
else:
column_transformer = data._encoder

encoded_data = self._transform_data(df, target_column_name, column_transformer)
encoded_data = self._transform_data(
df,
target_column_name,
column_transformer,
data.is_train,
)
encoded_data = self._restore_column_order(df, encoded_data)
encoded_data = self._convert_ordinal_encoded_columns_to_int(encoded_data)
encoded_data = self._restore_numeric_dtypes(encoded_data, original_numeric_dtypes)
Expand Down Expand Up @@ -157,10 +171,15 @@ def _create_column_transformer(
)

def _transform_data(
self, df: pd.DataFrame, target_column_name: str, column_transformer: ColumnTransformer
self,
df: pd.DataFrame,
target_column_name: str,
column_transformer: ColumnTransformer,
is_train: bool,
) -> pd.DataFrame:
"""Transform the data using the ColumnTransformer."""
column_transformer.fit(df, df[target_column_name])
if is_train:
column_transformer.fit(df, df[target_column_name])
transformed_data = column_transformer.transform(df)
self.logger.debug(f"Transformed data shape: {transformed_data.shape}")
return pd.DataFrame(transformed_data, columns=column_transformer.get_feature_names_out())
Expand Down
2 changes: 1 addition & 1 deletion pipeline_lib/core/steps/explainer_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def execute(self, data: DataContainer) -> DataContainer:
self.logger.info(f"Sampling {self.max_samples} data points from the dataset.")
df = df.sample(n=self.max_samples, random_state=42)

drop_columns = data._drop_columns
drop_columns = data._drop_columns + ["predictions"]
if drop_columns:
df = df.drop(columns=drop_columns)

Expand Down
16 changes: 2 additions & 14 deletions pipeline_lib/core/steps/predict.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from typing import List, Optional

from pipeline_lib.core import DataContainer
from pipeline_lib.core.model import Model
from pipeline_lib.core.steps.base import PipelineStep


Expand All @@ -13,23 +10,16 @@ class PredictStep(PipelineStep):

def __init__(
self,
load_path: str,
target: str,
drop_columns: Optional[List[str]] = None,
) -> None:
"""Initialize Predict Step."""
super().__init__()
self.init_logger()
self.load_path = load_path
self.model = Model.from_file(load_path)
self.target = target
self.drop_columns = drop_columns or []

def execute(self, data: DataContainer) -> DataContainer:
"""Execute the step."""
self.logger.info("Obtaining predictions")

drop_columns = self.drop_columns + [self.target]
drop_columns = data._drop_columns + [data.target]

missing_columns = [col for col in drop_columns if col not in data.flow.columns]
if missing_columns:
Expand All @@ -39,10 +29,8 @@ def execute(self, data: DataContainer) -> DataContainer:
self.logger.warning(error_message)
raise KeyError(error_message)

data.predictions = self.model.predict(data.flow.drop(columns=drop_columns))
data.predictions = data.model.predict(data.flow.drop(columns=drop_columns))

data.flow["predictions"] = data.predictions

data.target = self.target

return data

0 comments on commit 10c0afa

Please sign in to comment.