diff --git a/pipeline_lib/core/steps/calculate_features.py b/pipeline_lib/core/steps/calculate_features.py index 2c52397..d6e0227 100644 --- a/pipeline_lib/core/steps/calculate_features.py +++ b/pipeline_lib/core/steps/calculate_features.py @@ -1,9 +1,18 @@ from typing import List, Optional +import pandas as pd +from pandas.api.types import is_datetime64_any_dtype + from pipeline_lib.core import DataContainer from pipeline_lib.core.steps.base import PipelineStep +class UnsupportedFeatureError(Exception): + """Custom exception for unsupported features.""" + + pass + + class CalculateFeaturesStep(PipelineStep): """Calculate features.""" @@ -11,43 +20,67 @@ def __init__( self, datetime_columns: Optional[List[str]] = None, features: Optional[List[str]] = None, - config: Optional[dict] = None, ) -> None: """Initialize CalculateFeaturesStep.""" - super().__init__(config=config) + super().__init__() self.init_logger() self.datetime_columns = datetime_columns self.features = features + if self.datetime_columns and not isinstance(self.datetime_columns, list): + self.datetime_columns = [self.datetime_columns] + + self.feature_extractors = { + "year": lambda col: col.dt.year, + "month": lambda col: col.dt.month, + "day": lambda col: col.dt.day, + "hour": lambda col: col.dt.hour, + "minute": lambda col: col.dt.minute, + "second": lambda col: col.dt.second, + "weekday": lambda col: col.dt.weekday, + "dayofyear": lambda col: col.dt.dayofyear, + } + + # Validate features during initialization + if self.features: + unsupported_features = set(self.features) - set(self.feature_extractors.keys()) + if unsupported_features: + raise UnsupportedFeatureError( + f"Unsupported datetime features: {unsupported_features}" + ) + + def _convert_column_to_datetime(self, df: pd.DataFrame, column: str) -> None: + """Convert a column to datetime.""" + # Check if the column is already a datetime type + if not is_datetime64_any_dtype(df[column]): + try: + df.loc[:, column] = pd.to_datetime(df[column], errors="raise") + self.logger.info(f"Column '{column}' converted to datetime.") + except Exception as e: + self.logger.error(f"Error converting column '{column}' to datetime: {e}") + else: + self.logger.debug(f"Column '{column}' is already a datetime type.") + + def _extract_feature(self, df: pd.DataFrame, column: str, feature: str) -> None: + """Extract a single feature from a datetime column.""" + extractor = self.feature_extractors[feature] + df.loc[:, f"{column}_{feature}"] = extractor(df[column]) + def execute(self, data: DataContainer) -> DataContainer: """Execute the step.""" self.logger.info("Calculating features") df = data[DataContainer.CLEAN] + created_features = [] if self.datetime_columns: for column in self.datetime_columns: if column in df.columns: + self._convert_column_to_datetime(df, column) if self.features: for feature in self.features: - if feature == "year": - df.loc[:, f"{column}_year"] = df[column].dt.year - elif feature == "month": - df.loc[:, f"{column}_month"] = df[column].dt.month - elif feature == "day": - df.loc[:, f"{column}_day"] = df[column].dt.day - elif feature == "hour": - df.loc[:, f"{column}_hour"] = df[column].dt.hour - elif feature == "minute": - df.loc[:, f"{column}_minute"] = df[column].dt.minute - elif feature == "second": - df.loc[:, f"{column}_second"] = df[column].dt.second - elif feature == "weekday": - df.loc[:, f"{column}_weekday"] = df[column].dt.weekday - elif feature == "dayofyear": - df.loc[:, f"{column}_dayofyear"] = df[column].dt.dayofyear - else: - self.logger.warning(f"Unsupported datetime feature: {feature}") + self._extract_feature(df, column, feature) + created_features.append(f"{column}_{feature}") else: self.logger.warning( "No datetime features specified. Skipping feature extraction." @@ -62,6 +95,8 @@ def execute(self, data: DataContainer) -> DataContainer: df = df.drop(columns=self.datetime_columns) self.logger.info(f"Dropped datetime columns: {self.datetime_columns}") + self.logger.info(f"Created new features: {created_features}") + data[DataContainer.FEATURES] = df return data diff --git a/pipeline_lib/core/steps/clean.py b/pipeline_lib/core/steps/clean.py index a52ee14..64a1576 100644 --- a/pipeline_lib/core/steps/clean.py +++ b/pipeline_lib/core/steps/clean.py @@ -12,7 +12,6 @@ def __init__( convert_dtypes: Optional[dict] = None, drop_na_columns: Optional[list] = None, drop_ids: Optional[dict] = None, - ): self.init_logger() self.fill_missing = fill_missing @@ -93,9 +92,12 @@ def execute(self, data: DataContainer) -> DataContainer: if dropped_ids: df = df.loc[~df[column].isin(dropped_ids)].copy() dropped_rows = initial_rows - len(df) - percentage_dropped = (dropped_rows / initial_rows) * 100 # Calculate the percentage of rows dropped + percentage_dropped = ( + dropped_rows / initial_rows + ) * 100 # Calculate the percentage of rows dropped self.logger.info( - f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs {list(dropped_ids)} in column '{column}'" + f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs" + f" {list(dropped_ids)} in column '{column}'" ) else: self.logger.info( @@ -109,7 +111,6 @@ def execute(self, data: DataContainer) -> DataContainer: else: self.logger.warning(f"Column '{column}' not found in the DataFrame") - data[DataContainer.CLEAN] = df return data