Skip to content

Commit

Permalink
enable custom step registration via folder path in config.json
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 5, 2024
1 parent e1a71c2 commit 74369b1
Showing 1 changed file with 52 additions and 9 deletions.
61 changes: 52 additions & 9 deletions pipeline_lib/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import importlib
import json
import logging
import os
import pkgutil
from typing import Optional

Expand All @@ -14,18 +15,13 @@ class Pipeline:
"""Base class for pipelines."""

_step_registry = {}
logger = logging.getLogger("Pipeline")

def __init__(self, initial_data: Optional[DataContainer] = None):
self.steps = []
if not all(isinstance(step, PipelineStep) for step in self.steps):
raise TypeError("All steps must be instances of PipelineStep")
self.initial_data = initial_data
self.init_logger()

def init_logger(self) -> None:
"""Initialize the logger."""
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug(f"{self.__class__.__name__} initialized")

@classmethod
def register_step(cls, step_class):
Expand Down Expand Up @@ -65,13 +61,56 @@ def auto_register_steps_from_package(cls, package_name):
):
cls.register_step(attribute)

@staticmethod
def load_and_register_custom_steps(custom_steps_path: str) -> None:
"""
Dynamically loads and registers step classes found in the specified directory.
This method scans a specified directory for Python files (excluding __init__.py),
dynamically imports these files as modules, and registers all classes derived from
PipelineStep found within these modules.
Parameters
----------
custom_steps_path : str
The path to the directory containing custom step implementation files.
Returns
-------
NoReturn
This function does not return anything.
"""
Pipeline.logger.debug(f"Loading custom steps from: {custom_steps_path}")
for filename in os.listdir(custom_steps_path):
if filename.endswith(".py") and not filename.startswith("__"):
filepath = os.path.join(custom_steps_path, filename)
module_name = os.path.splitext(filename)[0]
spec = importlib.util.spec_from_file_location(module_name, filepath)
module = importlib.util.module_from_spec(spec)

try:
spec.loader.exec_module(module)
Pipeline.logger.debug(f"Successfully loaded module: {module_name}")

for attribute_name in dir(module):
attribute = getattr(module, attribute_name)
if (
isinstance(attribute, type)
and issubclass(attribute, PipelineStep)
and attribute is not PipelineStep
):
Pipeline.register_step(attribute)
Pipeline.logger.debug(f"Registered step class: {attribute_name}")
except Exception as e:
Pipeline.logger.error(f"Failed to load module: {module_name}. Error: {e}")

def run(self) -> DataContainer:
"""Run the pipeline on the given data."""

data = DataContainer()

for i, step in enumerate(self.steps):
self.logger.info(f"Running {step.__class__.__name__} - {i + 1} / {len(self.steps)}")
Pipeline.logger.info(f"Running {step.__class__.__name__} - {i + 1} / {len(self.steps)}")
data = step.execute(data)
return data

Expand All @@ -85,14 +124,18 @@ def from_json(cls, path: str) -> Pipeline:
with open(path, "r") as config_file:
config = json.load(config_file)

pipeline = Pipeline() # Assuming you have a default or base Pipeline class
custom_steps_path = config.get("custom_steps_path")
if custom_steps_path:
Pipeline.load_and_register_custom_steps(custom_steps_path)

pipeline = Pipeline()
steps = []

for step_config in config["pipeline"]["steps"]:
step_type = step_config["step_type"]
parameters = step_config.get("parameters", {})

pipeline.logger.info(
Pipeline.logger.info(
f"Creating step {step_type} with parameters: \n {json.dumps(parameters, indent=4)}"
)

Expand Down

0 comments on commit 74369b1

Please sign in to comment.