Skip to content

Commit

Permalink
split step_registry to new class
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 13, 2024
1 parent b447fc9 commit fef94d9
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 177 deletions.
6 changes: 4 additions & 2 deletions pipeline_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .core.pipeline import Pipeline

Pipeline.auto_register_steps_from_package("pipeline_lib.core.steps")
Pipeline.auto_register_steps_from_package("pipeline_lib.implementation.tabular.xgboost")
Pipeline.step_registry.auto_register_steps_from_package("pipeline_lib.core.steps")
Pipeline.step_registry.auto_register_steps_from_package(
"pipeline_lib.implementation.tabular.xgboost"
)
81 changes: 4 additions & 77 deletions pipeline_lib/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from __future__ import annotations

import importlib
import json
import logging
import os
import pkgutil
from typing import Optional

from pipeline_lib.core.data_container import DataContainer
from pipeline_lib.core.step_registry import StepRegistry
from pipeline_lib.core.steps import PipelineStep


Expand All @@ -16,89 +14,18 @@ class Pipeline:

_step_registry = {}
logger = logging.getLogger("Pipeline")
step_registry = StepRegistry()

def __init__(self, initial_data: Optional[DataContainer] = None):
self.steps = []
self.initial_data = initial_data
self.save_path = None
self.load_path = None

@classmethod
def register_step(cls, step_class):
"""Register a step class using its class name."""
step_name = step_class.__name__
if not issubclass(step_class, PipelineStep):
raise ValueError(f"{step_class} must be a subclass of PipelineStep")
cls._step_registry[step_name] = step_class

@classmethod
def get_step_class(cls, step_name):
"""Retrieve a step class by name."""
if step_name in cls._step_registry:
return cls._step_registry[step_name]
else:
raise ValueError(f"Step class '{step_name}' not found in registry.")

def add_steps(self, steps: list[PipelineStep]):
"""Add steps to the pipeline."""
self.steps.extend(steps)

@classmethod
def auto_register_steps_from_package(cls, package_name):
"""
Automatically registers all step classes found within a specified package.
"""
package = importlib.import_module(package_name)
prefix = package.__name__ + "."
for importer, modname, ispkg in pkgutil.walk_packages(package.__path__, prefix):
module = importlib.import_module(modname)
for name in dir(module):
attribute = getattr(module, name)
if (
isinstance(attribute, type)
and issubclass(attribute, PipelineStep)
and attribute is not PipelineStep
):
cls.register_step(attribute)

@classmethod
def load_and_register_custom_steps(cls, custom_steps_path: str) -> None:
"""
Dynamically loads and registers step classes found in the specified directory.
This method scans a specified directory for Python files (excluding __init__.py),
dynamically imports these files as modules, and registers all classes derived from
PipelineStep found within these modules.
Parameters
----------
custom_steps_path : str
The path to the directory containing custom step implementation files.
"""
Pipeline.logger.debug(f"Loading custom steps from: {custom_steps_path}")
for filename in os.listdir(custom_steps_path):
if filename.endswith(".py") and not filename.startswith("__"):
filepath = os.path.join(custom_steps_path, filename)
module_name = os.path.splitext(filename)[0]
spec = importlib.util.spec_from_file_location(module_name, filepath)
module = importlib.util.module_from_spec(spec)

try:
spec.loader.exec_module(module)
Pipeline.logger.debug(f"Successfully loaded module: {module_name}")

for attribute_name in dir(module):
attribute = getattr(module, attribute_name)
if (
isinstance(attribute, type)
and issubclass(attribute, PipelineStep)
and attribute is not PipelineStep
):
Pipeline.register_step(attribute)
Pipeline.logger.debug(f"Registered step class: {attribute_name}")
except Exception as e:
Pipeline.logger.error(f"Failed to load module: {module_name}. Error: {e}")

def run(self) -> DataContainer:
"""Run the pipeline on the given data."""

Expand All @@ -125,7 +52,7 @@ def from_json(cls, path: str) -> Pipeline:

custom_steps_path = config.get("custom_steps_path")
if custom_steps_path:
Pipeline.load_and_register_custom_steps(custom_steps_path)
cls.step_registry.load_and_register_custom_steps(custom_steps_path)

pipeline = Pipeline()

Expand All @@ -142,7 +69,7 @@ def from_json(cls, path: str) -> Pipeline:
f"Creating step {step_type} with parameters: \n {json.dumps(parameters, indent=4)}"
)

step_class = Pipeline.get_step_class(step_type)
step_class = cls.step_registry.get_step_class(step_type)
step = step_class(**parameters)
steps.append(step)

Expand Down
73 changes: 73 additions & 0 deletions pipeline_lib/core/step_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import importlib
import logging
import os
import pkgutil

from pipeline_lib.core.steps import PipelineStep


class StepRegistry:
"""A helper class for managing the registry of pipeline steps."""

def __init__(self):
self._step_registry = {}
self.logger = logging.getLogger(StepRegistry.__name__)

def register_step(self, step_class):
"""Register a step class using its class name."""
step_name = step_class.__name__
if not issubclass(step_class, PipelineStep):
raise ValueError(f"{step_class} must be a subclass of PipelineStep")
self._step_registry[step_name] = step_class

def get_step_class(self, step_name):
"""Retrieve a step class by name."""
if step_name in self._step_registry:
return self._step_registry[step_name]
else:
raise ValueError(f"Step class '{step_name}' not found in registry.")

def auto_register_steps_from_package(self, package_name):
"""
Automatically registers all step classes found within a specified package.
"""
package = importlib.import_module(package_name)
prefix = package.__name__ + "."
for importer, modname, ispkg in pkgutil.walk_packages(package.__path__, prefix):
module = importlib.import_module(modname)
for name in dir(module):
attribute = getattr(module, name)
if (
isinstance(attribute, type)
and issubclass(attribute, PipelineStep)
and attribute is not PipelineStep
):
self.register_step(attribute)

def load_and_register_custom_steps(self, custom_steps_path: str) -> None:
"""
Dynamically loads and registers step classes found in the specified directory.
"""
self.logger.debug(f"Loading custom steps from: {custom_steps_path}")
for filename in os.listdir(custom_steps_path):
if filename.endswith(".py") and not filename.startswith("__"):
filepath = os.path.join(custom_steps_path, filename)
module_name = os.path.splitext(filename)[0]
spec = importlib.util.spec_from_file_location(module_name, filepath)
module = importlib.util.module_from_spec(spec)

try:
spec.loader.exec_module(module)
self.logger.debug(f"Successfully loaded module: {module_name}")

for attribute_name in dir(module):
attribute = getattr(module, attribute_name)
if (
isinstance(attribute, type)
and issubclass(attribute, PipelineStep)
and attribute is not PipelineStep
):
self.register_step(attribute)
self.logger.debug(f"Registered step class: {attribute_name}")
except Exception as e:
self.logger.error(f"Failed to load module: {module_name}. Error: {e}")
2 changes: 0 additions & 2 deletions pipeline_lib/core/steps/calculate_metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Optional

import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error

Expand Down
101 changes: 8 additions & 93 deletions pipeline_lib/core/steps/tabular_split.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from typing import Optional, Tuple

import pandas as pd
from sklearn.model_selection import train_test_split

from pipeline_lib.core import DataContainer
Expand All @@ -11,118 +8,36 @@
class TabularSplitStep(PipelineStep):
"""Split the data."""

def __init__(
self,
train_percentage: float,
id_column: str,
train_ids: Optional[list[str]] = None,
validation_ids: Optional[list[str]] = None,
) -> None:
def __init__(self, train_percentage: float) -> None:
"""Initialize SplitStep."""
self.init_logger()
self.train_percentage = train_percentage
self.id_column_name = id_column
self.train_ids = train_ids
self.validation_ids = validation_ids

def _id_based_split(
self,
df: pd.DataFrame,
train_ids: list[str],
validation_ids: list[str],
id_column_name: str,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Splits the DataFrame into training and validation sets based on specified IDs.
Parameters
----------
df : pd.DataFrame
The DataFrame to split.
train_ids : List[str]
List of IDs for the training set.
validation_ids : List[str]
List of IDs for the validation set.
id_column_name : str
The name of the column in df that contains the IDs.
Returns
-------
Tuple[pd.DataFrame, pd.DataFrame]
A tuple containing the training set and the validation set.
"""
train_df = df[df[id_column_name].isin(train_ids)]
validation_df = df[df[id_column_name].isin(validation_ids)]
return train_df, validation_df

def _percentage_based_id_split(
self, df: pd.DataFrame, train_percentage: float, id_column_name: str
) -> Tuple[list[str], list[str]]:
"""
Splits the unique IDs into training and validation sets based on specified percentages.

Parameters
----------
df : pd.DataFrame
The DataFrame containing the IDs.
train_percentage : float
The percentage of IDs to include in the training set.
id_column_name : str
The name of the column containing the IDs.
Returns
-------
Tuple[List[str], List[str]]
A tuple containing lists of training and validation IDs.
"""
unique_ids = df[id_column_name].unique()
train_ids, validation_ids = train_test_split(
unique_ids, train_size=train_percentage, random_state=42
)
return train_ids.tolist(), validation_ids.tolist()
if self.train_percentage <= 0 or self.train_percentage >= 1:
raise ValueError("train_percentage must be between 0 and 1.")

def execute(self, data: DataContainer) -> DataContainer:
"""Execute the split based on IDs."""
"""Execute the random train-validation split."""
self.logger.info("Splitting tabular data...")

df = data[DataContainer.CLEAN]

if self.train_percentage:
if (
self.train_percentage is None
or self.train_percentage <= 0
or self.train_percentage >= 1
):
raise ValueError("train_percentage must be between 0 and 1.")
train_ids, validation_ids = self._percentage_based_id_split(
df, self.train_percentage, self.id_column_name
)

self.logger.info(f"Number of train ids: {len(train_ids)}")
self.logger.info(f"Number of validation ids: {len(validation_ids)}")

train_df, validation_df = self._id_based_split(
df, train_ids, validation_ids, self.id_column_name
train_df, validation_df = train_test_split(
df, train_size=self.train_percentage, random_state=42
)

train_rows = len(train_df)
validation_rows = len(validation_df)
total_rows = train_rows + validation_rows

self.logger.info(
f"Number of rows in training set: {len(train_df)} | {train_rows/total_rows:.2%}"
f"Number of rows in training set: {train_rows} | {train_rows/total_rows:.2%}"
)
self.logger.info(
f"Number of rows in validation set: {len(validation_df)} |"
f"Number of rows in validation set: {validation_rows} |"
f" {validation_rows/total_rows:.2%}"
)

left_ids = df[~df[self.id_column_name].isin(train_ids + validation_ids)][
self.id_column_name
].unique()
self.logger.info(f"Number of IDs left from total df: {len(left_ids)}")
self.logger.debug(f"IDs left from total df: {left_ids}")

data[DataContainer.TRAIN] = train_df
data[DataContainer.VALIDATION] = validation_df

Expand Down
3 changes: 1 addition & 2 deletions pipeline_lib/implementation/tabular/xgboost/fit_model.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import time
from typing import Optional

import optuna
import xgboost as xgb
from joblib import dump
from optuna.pruners import MedianPruner
from sklearn.metrics import mean_absolute_error

from typing import Optional

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps import FitModelStep

Expand Down
3 changes: 2 additions & 1 deletion pipeline_lib/implementation/tabular/xgboost/predict.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Optional

import pandas as pd
from joblib import load

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps import PredictStep
from typing import Optional


class XGBoostPredictStep(PredictStep):
Expand Down

0 comments on commit fef94d9

Please sign in to comment.