Skip to content

Commit

Permalink
add generate and clean steps to core
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 18, 2024
1 parent 67978af commit 86edc65
Show file tree
Hide file tree
Showing 21 changed files with 1,572 additions and 1,549 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ __pycache__/
# C extensions
*.so

*.joblib
*.bin
*.json

# ignore examples folder
examples/

Expand Down
1 change: 0 additions & 1 deletion pipeline_lib/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from .data_container import DataContainer # noqa: F401
from .pipeline import Pipeline # noqa: F401
from .steps import PipelineStep # noqa: F401
2 changes: 1 addition & 1 deletion pipeline_lib/core/step_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import pkgutil

from pipeline_lib.core.steps import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class StepClassNotFoundError(Exception):
Expand Down
2 changes: 2 additions & 0 deletions pipeline_lib/core/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from .calculate_features import CalculateFeaturesStep # noqa: F401
from .calculate_metrics import CalculateMetricsStep # noqa: F401
from .calculate_reports import CalculateReportsStep # noqa: F401
from .clean import CleanStep # noqa: F401
from .encode import EncodeStep # noqa: F401
from .explainer_dashboard import ExplainerDashboardStep # noqa: F401
from .fit_encoders import FitEncodersStep # noqa: F401
from .fit_model import FitModelStep # noqa: F401
from .generate import GenerateStep # noqa: F401
from .input_scaling import InputScalingStep # noqa: F401
from .predict import PredictStep # noqa: F401
from .tabular_split import TabularSplitStep # noqa: F401
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/augment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class AugmentStep(PipelineStep):
Expand Down
2 changes: 1 addition & 1 deletion pipeline_lib/core/steps/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
from typing import Optional

from pipeline_lib.core import DataContainer
from pipeline_lib.core.data_container import DataContainer


class PipelineStep(ABC):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/calculate_features.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class CalculateFeaturesStep(PipelineStep):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/calculate_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from sklearn.metrics import mean_absolute_error, mean_squared_error

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class CalculateMetricsStep(PipelineStep):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/calculate_reports.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class CalculateReportsStep(PipelineStep):
Expand Down
69 changes: 69 additions & 0 deletions pipeline_lib/core/steps/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Optional

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps.base import PipelineStep


class CleanStep(PipelineStep):
def __init__(
self,
fill_missing: Optional[dict] = None,
remove_outliers: Optional[dict] = None,
convert_dtypes: Optional[dict] = None,
):
self.init_logger()
self.fill_missing = fill_missing
self.remove_outliers = remove_outliers
self.convert_dtypes = convert_dtypes

def execute(self, data: DataContainer) -> DataContainer:
self.logger.info("Cleaning tabular data...")

df = data[DataContainer.RAW]

if self.fill_missing:
for column, fill_value in self.fill_missing.items():
if column in df.columns:
df[column].fillna(fill_value, inplace=True)
self.logger.info(
f"Filled missing values in column '{column}' with {fill_value}"
)
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")

if self.remove_outliers:
for column, method in self.remove_outliers.items():
if column in df.columns:
if method == "clip":
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - (1.5 * iqr)
upper_bound = q3 + (1.5 * iqr)
df[column] = df[column].clip(lower=lower_bound, upper=upper_bound)
self.logger.info(f"Clipped outliers in column '{column}'")
elif method == "drop":
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - (1.5 * iqr)
upper_bound = q3 + (1.5 * iqr)
outliers = (df[column] < lower_bound) | (df[column] > upper_bound)
df = df[~outliers]
self.logger.info(f"Dropped outliers in column '{column}'")
else:
self.logger.warning(f"Unsupported outlier removal method '{method}'")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")

if self.convert_dtypes:
for column, dtype in self.convert_dtypes.items():
if column in df.columns:
df[column] = df[column].astype(dtype)
self.logger.info(f"Converted column '{column}' to {dtype}")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")

data[DataContainer.CLEAN] = df

return data
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/encode.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class EncodeStep(PipelineStep):
Expand Down
2 changes: 1 addition & 1 deletion pipeline_lib/core/steps/explainer_dashboard.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from explainerdashboard import RegressionExplainer

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class ExplainerDashboardStep(PipelineStep):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/fit_encoders.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class FitEncodersStep(PipelineStep):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/fit_model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class FitModelStep(PipelineStep):
Expand Down
65 changes: 65 additions & 0 deletions pipeline_lib/core/steps/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os
from enum import Enum

import pandas as pd

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps.base import PipelineStep


class FileType(Enum):
CSV = ".csv"
PARQUET = ".parquet"


class GenerateStep(PipelineStep):
def __init__(self, path: str, **kwargs):
self.init_logger()
self.file_path = path
self.kwargs = kwargs

def execute(self, data: DataContainer) -> DataContainer:
self.logger.info(f"Generating data from file: {self.file_path}")

if not os.path.exists(self.file_path):
raise FileNotFoundError(f"File not found: {self.file_path}")

file_type = self._infer_file_type()

if file_type == FileType.CSV:
df = self._read_csv()
elif file_type == FileType.PARQUET:
df = self._read_parquet()
else:
raise ValueError(f"Unsupported file type: {file_type}")

data[DataContainer.RAW] = df

self.logger.info(f"Generated DataFrame with shape: {df.shape}")

return data

def _infer_file_type(self) -> FileType:
_, file_extension = os.path.splitext(self.file_path)
file_extension = file_extension.lower()

try:
return FileType(file_extension)
except ValueError:
raise ValueError(f"Unsupported file extension: {file_extension}")

def _read_csv(self) -> pd.DataFrame:
kwargs = self.kwargs.copy()
index_col = kwargs.pop("index", None)
df = pd.read_csv(self.file_path, **kwargs)
if index_col is not None:
df.set_index(index_col, inplace=True)
return df

def _read_parquet(self) -> pd.DataFrame:
kwargs = self.kwargs.copy()
index_col = kwargs.pop("index", None)
df = pd.read_parquet(self.file_path, **kwargs)
if index_col is not None:
df.set_index(index_col, inplace=True)
return df
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/input_scaling.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class InputScalingStep(PipelineStep):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/predict.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class PredictStep(PipelineStep):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/tabular_split.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from sklearn.model_selection import train_test_split

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class TabularSplitStep(PipelineStep):
Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/core/steps/target_scaling.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
from pipeline_lib.core.steps.base import PipelineStep


class TargetScalingStep(PipelineStep):
Expand Down
17 changes: 8 additions & 9 deletions pipeline_lib/implementation/tabular/xgboost/fit_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def __init__(

self.xgb_params = xgb_params
self.optuna_params = optuna_params

if save_path:
if not save_path.endswith(".joblib"):
raise ValueError("Only joblib format is supported for saving the model.")

self.save_path = save_path

def execute(self, data: DataContainer) -> DataContainer:
Expand Down Expand Up @@ -90,15 +95,9 @@ def execute(self, data: DataContainer) -> DataContainer:
# Save the model to the data container
data[DataContainer.MODEL] = model

# save model to disk
save_path = self.save_path

if save_path:
if not save_path.endswith(".joblib"):
raise ValueError("Only joblib format is supported for saving the model.")
self.logger.info(f"Saving the model to {save_path}")
dump(model, save_path)

if self.save_path:
self.logger.info(f"Saving the model to {self.save_path}")
dump(model, self.save_path)
return data

def optimize_with_optuna(self, X_train, y_train, X_valid, y_valid, optuna_params):
Expand Down
Loading

0 comments on commit 86edc65

Please sign in to comment.