Skip to content

Commit

Permalink
add model interface
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 25, 2024
1 parent 4808ab6 commit 2c96b4b
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pipeline_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .core.pipeline import Pipeline

Pipeline.step_registry.auto_register_steps_from_package("pipeline_lib.core.steps")
Pipeline.step_registry.auto_register_steps_from_package(
Pipeline.model_registry.auto_register_models_from_package(
"pipeline_lib.implementation.tabular.xgboost"
)
48 changes: 48 additions & 0 deletions pipeline_lib/core/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,54 @@ def flow(self, value: Any):
"""
self["flow"] = value

@property
def _drop_columns(self) -> Any:
"""
Get the drop columns from the DataContainer.
Returns
-------
Any
The drop columns stored in the DataContainer.
"""
return self["_drop_columns"]

@_drop_columns.setter
def _drop_columns(self, value: Any):
"""
Set the drop columns in the DataContainer.
Parameters
----------
value
The drop columns to be stored in the DataContainer.
"""
self["_drop_columns"] = value

@property
def is_train(self) -> bool:
"""
Check if the DataContainer is made for training.
Returns
-------
bool
True if the DataContainer contains training data, False otherwise.
"""
return self["is_train"]

@is_train.setter
def is_train(self, value: bool):
"""
Set the is_train flag in the DataContainer.
Parameters
----------
value
The is_train flag to be stored in the DataContainer.
"""
self["is_train"] = value

def __eq__(self, other) -> bool:
"""
Compare this DataContainer with another for equality.
Expand Down
22 changes: 22 additions & 0 deletions pipeline_lib/core/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple

import pandas as pd


class Model(ABC):
"""Base class for models."""

@abstractmethod
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
eval_set: Optional[List[Tuple[pd.DataFrame, pd.Series]]] = None,
verbose: Optional[bool] = True,
):
"""Abstract method for fitting the model."""

@abstractmethod
def predict(self, X: pd.DataFrame) -> pd.Series:
"""Abstract method for making predictions."""
47 changes: 47 additions & 0 deletions pipeline_lib/core/model_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import importlib
import logging
import pkgutil

from pipeline_lib.core.model import Model


class ModelClassNotFoundError(Exception):
pass


class ModelRegistry:
def __init__(self):
self._model_registry = {}
self.logger = logging.getLogger(__name__)

def register_model(self, model_class: type):
model_name = model_class.__name__
if not issubclass(model_class, Model):
raise ValueError(f"{model_class} must be a subclass of Model")
self._model_registry[model_name] = model_class

def get_model_class(self, model_name: str) -> type:
if model_name in self._model_registry:
return self._model_registry[model_name]
else:
raise ModelClassNotFoundError(f"Model class '{model_name}' not found in registry.")

def get_all_model_classes(self) -> dict:
return self._model_registry

def auto_register_models_from_package(self, package_name: str):
try:
package = importlib.import_module(package_name)
prefix = package.__name__ + "."
for importer, modname, ispkg in pkgutil.walk_packages(package.__path__, prefix):
module = importlib.import_module(modname)
for name in dir(module):
attribute = getattr(module, name)
if (
isinstance(attribute, type)
and issubclass(attribute, Model)
and attribute is not Model
):
self.register_model(attribute)
except ImportError as e:
self.logger.error(f"Failed to import package: {package_name}. Error: {e}")
52 changes: 49 additions & 3 deletions pipeline_lib/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import logging
from typing import Optional

from joblib import load

from pipeline_lib.core.data_container import DataContainer
from pipeline_lib.core.model_registry import ModelRegistry
from pipeline_lib.core.step_registry import StepRegistry
from pipeline_lib.core.steps import PipelineStep

Expand All @@ -15,31 +18,55 @@ class Pipeline:
_step_registry = {}
logger = logging.getLogger("Pipeline")
step_registry = StepRegistry()
model_registry = ModelRegistry()

def __init__(self, initial_data: Optional[DataContainer] = None):
self.steps = []
self.initial_data = initial_data
self.save_path = None
self.load_path = None
self.model_path = None

def add_steps(self, steps: list[PipelineStep]):
"""Add steps to the pipeline."""
self.steps.extend(steps)

def run(self) -> DataContainer:
def run(self, is_train: bool) -> DataContainer:
"""Run the pipeline on the given data."""

data = DataContainer.from_pickle(self.load_path) if self.load_path else DataContainer()
data.is_train = is_train

if is_train:
steps_to_run = [step for step in self.steps if step.used_for_training]
else:
steps_to_run = [step for step in self.steps if step.used_for_prediction]

for i, step in enumerate(self.steps):
Pipeline.logger.info(f"Running {step.__class__.__name__} - {i + 1} / {len(self.steps)}")
for i, step in enumerate(steps_to_run):
Pipeline.logger.info(
f"Running {step.__class__.__name__} - {i + 1} / {len(steps_to_run)}"
)
data = step.execute(data)

if self.save_path:
data.save(self.save_path)

return data

def train(self) -> DataContainer:
"""Run the pipeline on the given data."""
self.logger.info("Training the pipeline")
return self.run(is_train=True)

def predict(self) -> DataContainer:
"""Run the pipeline on the given data."""
self.logger.info("Predicting with the pipeline")
data = self.run(is_train=False)
data.predictions = data.model.predict(data.flow)
self.logger.info("Predictions:")
self.logger.info(data.predictions)
return data

@classmethod
def from_json(cls, path: str) -> Pipeline:
"""Load a pipeline from a JSON file."""
Expand All @@ -61,6 +88,10 @@ def from_json(cls, path: str) -> Pipeline:

steps = []

model_path = None
drop_columns = None
target = None

for step_config in config["pipeline"]["steps"]:
step_type = step_config["step_type"]
parameters = step_config.get("parameters", {})
Expand All @@ -69,6 +100,21 @@ def from_json(cls, path: str) -> Pipeline:
f"Creating step {step_type} with parameters: \n {json.dumps(parameters, indent=4)}"
)

# change model from string to class
if step_type == "FitModelStep":
model_class_name = parameters.pop("model_class")
model_class = cls.model_registry.get_model_class(model_class_name)
parameters["model_class"] = model_class
model_path = parameters.get("save_path")
drop_columns = parameters.get("drop_columns")
target = parameters.get("target")

# if step type is prediction, add model path
if step_type == "PredictStep":
parameters["load_path"] = model_path
parameters["drop_columns"] = drop_columns
parameters["target"] = target

step_class = cls.step_registry.get_step_class(step_type)
step = step_class(**parameters)
steps.append(step)
Expand Down
18 changes: 18 additions & 0 deletions pipeline_lib/implementation/tabular/xgboost/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any

import pandas as pd
import xgboost as xgb

from pipeline_lib.core.model import Model


class XGBoostModel(Model):
def __init__(self, **params):
self.model = xgb.XGBRegressor(**params)

def fit(self, X: pd.DataFrame, y: pd.Series, eval_set=None, verbose=True) -> Any:
self.model.fit(X, y, eval_set=eval_set, verbose=verbose)
return self

def predict(self, X: pd.DataFrame) -> pd.Series:
return self.model.predict(X)

0 comments on commit 2c96b4b

Please sign in to comment.