Skip to content

Commit

Permalink
add pipeline from config
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 4, 2024
1 parent 48ee9e4 commit c80bf68
Show file tree
Hide file tree
Showing 16 changed files with 143 additions and 39 deletions.
4 changes: 4 additions & 0 deletions pipeline_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .core.pipeline import Pipeline

Pipeline.auto_register_steps_from_package("pipeline_lib.core.steps")
Pipeline.auto_register_steps_from_package("pipeline_lib.implementation.tabular.xgboost")
96 changes: 78 additions & 18 deletions pipeline_lib/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,105 @@
from __future__ import annotations

import importlib
import json
import logging
from abc import ABC, abstractmethod
import os
import pkgutil
from typing import Optional

from pipeline_lib.core.data_container import DataContainer
from pipeline_lib.core.steps import PipelineStep


class Pipeline(ABC):
class Pipeline:
"""Base class for pipelines."""

_step_registry = {}

def __init__(self, initial_data: Optional[DataContainer] = None):
self.steps = self.define_steps()
self.steps = []
if not all(isinstance(step, PipelineStep) for step in self.steps):
raise TypeError("All steps must be instances of PipelineStep")
self.initial_data = initial_data
self.init_logger()

def init_logger(self) -> None:
"""Initialize the logger."""
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug(f"{self.__class__.__name__} initialized")

@classmethod
def register_step(cls, step_class):
"""Register a step class using its class name."""
step_name = step_class.__name__
if not issubclass(step_class, PipelineStep):
raise ValueError(f"{step_class} must be a subclass of PipelineStep")
cls._step_registry[step_name] = step_class

@classmethod
def get_step_class(cls, step_name):
"""Retrieve a step class by name."""
if step_name in cls._step_registry:
return cls._step_registry[step_name]
else:
raise ValueError(f"Step class '{step_name}' not found in registry.")

def add_steps(self, steps: list[PipelineStep]):
"""Add steps to the pipeline."""
self.steps.extend(steps)

@classmethod
def auto_register_steps_from_package(cls, package_name):
"""
Automatically registers all step classes found within a specified package.
"""
package = importlib.import_module(package_name)
prefix = package.__name__ + "."
for importer, modname, ispkg in pkgutil.walk_packages(package.__path__, prefix):
module = importlib.import_module(modname)
for name in dir(module):
attribute = getattr(module, name)
if (
isinstance(attribute, type)
and issubclass(attribute, PipelineStep)
and attribute is not PipelineStep
):
cls.register_step(attribute)

def run(self, data: Optional[DataContainer] = None) -> DataContainer:
def run(self) -> DataContainer:
"""Run the pipeline on the given data."""
if data is None:
if self.initial_data is None:
raise ValueError("No data given and no initial data set")
self.logger.debug("No data given, using initial data")
data = self.initial_data

data = DataContainer()

for i, step in enumerate(self.steps):
self.logger.info(f"Running {step.__class__.__name__} - {i + 1} / {len(self.steps)}")
data = step.execute(data)
return data

@abstractmethod
def define_steps(self) -> list[PipelineStep]:
"""
Subclasses should implement this method to define their specific steps.
"""
@classmethod
def from_json(cls, path: str) -> Pipeline:
"""Load a pipeline from a JSON file."""
# check file is a json file
if not path.endswith(".json"):
raise ValueError(f"File {path} is not a JSON file")

def init_logger(self) -> None:
"""Initialize the logger."""
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug(f"{self.__class__.__name__} initialized")
with open(path, "r") as config_file:
config = json.load(config_file)

pipeline = Pipeline() # Assuming you have a default or base Pipeline class
steps = []

for step_config in config["pipeline"]["steps"]:
step_type = step_config["step_type"]
parameters = step_config.get("parameters", {})

step_class = Pipeline.get_step_class(step_type)
print(f"Got step class: {step_class}")
step = step_class(config=parameters)
steps.append(step)

pipeline.add_steps(steps)
return pipeline

def __str__(self) -> str:
step_names = [f"{i + 1}. {step.__class__.__name__}" for i, step in enumerate(self.steps)]
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/augment.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class AugmentStep(PipelineStep):
"""Augment the data."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize AugmentStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
6 changes: 5 additions & 1 deletion pipeline_lib/core/steps/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import logging
from abc import ABC, abstractmethod
from typing import Optional

from pipeline_lib.core.data_container import DataContainer
from pipeline_lib.core import DataContainer


class PipelineStep(ABC):
"""Base class for pipeline steps."""

def __init__(self, config: Optional[dict] = None) -> None:
self.config = config

@abstractmethod
def execute(self, data: DataContainer) -> DataContainer:
"""Abstract method for executing the step."""
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/calculate_features.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class CalculateFeaturesStep(PipelineStep):
"""Calculate features."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize CalculateFeaturesStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/calculate_metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error

Expand All @@ -9,8 +11,9 @@
class CalculateMetricsStep(PipelineStep):
"""Calculate metrics."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize CalculateMetricsStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/calculate_reports.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class CalculateReportsStep(PipelineStep):
"""Calculate reports."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize CalculateReportsStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/encode.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class EncodeStep(PipelineStep):
"""Encode the data."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize EncodeStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/explainer_dashboard.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class ExplainerDashboardStep(PipelineStep):
"""Explainer Dashboard."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize ExplainerDashboardStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/fit_encoders.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class FitEncodersStep(PipelineStep):
"""Fit encoders."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize FitEncodersStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/fit_model.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class FitModelStep(PipelineStep):
"""Fit the model."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize FitModelStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/input_scaling.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class InputScalingStep(PipelineStep):
"""Scale the input."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize InputScalingStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
5 changes: 4 additions & 1 deletion pipeline_lib/core/steps/predict.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,8 +8,9 @@
class PredictStep(PipelineStep):
"""Obtain the predictions."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize Predict Step."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
7 changes: 4 additions & 3 deletions pipeline_lib/core/steps/tabular_split.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple
from typing import Optional, Tuple

import pandas as pd
from sklearn.model_selection import train_test_split
Expand All @@ -11,8 +11,9 @@
class TabularSplitStep(PipelineStep):
"""Split the data."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize SplitStep."""
super().__init__(config=config)
self.init_logger()

def _id_based_split(
Expand Down Expand Up @@ -75,7 +76,7 @@ def execute(self, data: DataContainer) -> DataContainer:
"""Execute the split based on IDs."""
self.logger.info("Splitting tabular data...")

split_configs = data[DataContainer.SPLIT_CONFIGS]
split_configs = self.config

if split_configs is None:
self.logger.info("No split_configs found. No splitting will be performed.")
Expand Down
10 changes: 6 additions & 4 deletions pipeline_lib/core/steps/target_scaling.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pipeline_lib.core import DataContainer

from .base import PipelineStep
Expand All @@ -6,14 +8,14 @@
class TargetScalingStep(PipelineStep):
"""Scale the target."""

def __init__(self) -> None:
def __init__(self, config: Optional[dict] = None) -> None:
"""Initialize TargetScalingStep."""
super().__init__(config=config)
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
target_scaling_configs = data.get(DataContainer.TARGET_SCALING_CONFIGS)

if target_scaling_configs is None:
"""Execute the step."""
if not self.config:
self.logger.info("No target scaling configs found. Skipping target scaling.")
return data

Expand Down
9 changes: 6 additions & 3 deletions pipeline_lib/implementation/tabular/xgboost/fit_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ def execute(self, data: DataContainer) -> DataContainer:

start_time = time.time()

model_configs = data[DataContainer.MODEL_CONFIGS]
model_configs = self.config

if model_configs is None:
raise ValueError("No model configs found")

target = model_configs.get("target")

if target is None:
raise ValueError("Target column not found in model_configs.")

drop_columns: list[str] = model_configs.get("drop_columns")

df_train = data[DataContainer.TRAIN]
df_valid = data[DataContainer.VALIDATION]

drop_columns = model_configs.get("drop_columns")

if drop_columns:
df_train = df_train.drop(columns=drop_columns)
df_valid = df_valid.drop(columns=drop_columns)
Expand Down

0 comments on commit c80bf68

Please sign in to comment.