Skip to content

Commit

Permalink
improve efficiency and error handling in calculate features step
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 18, 2024
1 parent b85720f commit 3b3db93
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
75 changes: 55 additions & 20 deletions pipeline_lib/core/steps/calculate_features.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,86 @@
from typing import List, Optional

import pandas as pd
from pandas.api.types import is_datetime64_any_dtype

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps.base import PipelineStep


class UnsupportedFeatureError(Exception):
"""Custom exception for unsupported features."""

pass


class CalculateFeaturesStep(PipelineStep):
"""Calculate features."""

def __init__(
self,
datetime_columns: Optional[List[str]] = None,
features: Optional[List[str]] = None,
config: Optional[dict] = None,
) -> None:
"""Initialize CalculateFeaturesStep."""
super().__init__(config=config)
super().__init__()
self.init_logger()
self.datetime_columns = datetime_columns
self.features = features

if self.datetime_columns and not isinstance(self.datetime_columns, list):
self.datetime_columns = [self.datetime_columns]

self.feature_extractors = {
"year": lambda col: col.dt.year,
"month": lambda col: col.dt.month,
"day": lambda col: col.dt.day,
"hour": lambda col: col.dt.hour,
"minute": lambda col: col.dt.minute,
"second": lambda col: col.dt.second,
"weekday": lambda col: col.dt.weekday,
"dayofyear": lambda col: col.dt.dayofyear,
}

# Validate features during initialization
if self.features:
unsupported_features = set(self.features) - set(self.feature_extractors.keys())
if unsupported_features:
raise UnsupportedFeatureError(
f"Unsupported datetime features: {unsupported_features}"
)

def _convert_column_to_datetime(self, df: pd.DataFrame, column: str) -> None:
"""Convert a column to datetime."""
# Check if the column is already a datetime type
if not is_datetime64_any_dtype(df[column]):
try:
df.loc[:, column] = pd.to_datetime(df[column], errors="raise")
self.logger.info(f"Column '{column}' converted to datetime.")
except Exception as e:
self.logger.error(f"Error converting column '{column}' to datetime: {e}")
else:
self.logger.debug(f"Column '{column}' is already a datetime type.")

def _extract_feature(self, df: pd.DataFrame, column: str, feature: str) -> None:
"""Extract a single feature from a datetime column."""
extractor = self.feature_extractors[feature]
df.loc[:, f"{column}_{feature}"] = extractor(df[column])

def execute(self, data: DataContainer) -> DataContainer:
"""Execute the step."""
self.logger.info("Calculating features")

df = data[DataContainer.CLEAN]
created_features = []

if self.datetime_columns:
for column in self.datetime_columns:
if column in df.columns:
self._convert_column_to_datetime(df, column)
if self.features:
for feature in self.features:
if feature == "year":
df.loc[:, f"{column}_year"] = df[column].dt.year
elif feature == "month":
df.loc[:, f"{column}_month"] = df[column].dt.month
elif feature == "day":
df.loc[:, f"{column}_day"] = df[column].dt.day
elif feature == "hour":
df.loc[:, f"{column}_hour"] = df[column].dt.hour
elif feature == "minute":
df.loc[:, f"{column}_minute"] = df[column].dt.minute
elif feature == "second":
df.loc[:, f"{column}_second"] = df[column].dt.second
elif feature == "weekday":
df.loc[:, f"{column}_weekday"] = df[column].dt.weekday
elif feature == "dayofyear":
df.loc[:, f"{column}_dayofyear"] = df[column].dt.dayofyear
else:
self.logger.warning(f"Unsupported datetime feature: {feature}")
self._extract_feature(df, column, feature)
created_features.append(f"{column}_{feature}")
else:
self.logger.warning(
"No datetime features specified. Skipping feature extraction."
Expand All @@ -62,6 +95,8 @@ def execute(self, data: DataContainer) -> DataContainer:
df = df.drop(columns=self.datetime_columns)
self.logger.info(f"Dropped datetime columns: {self.datetime_columns}")

self.logger.info(f"Created new features: {created_features}")

data[DataContainer.FEATURES] = df

return data
9 changes: 5 additions & 4 deletions pipeline_lib/core/steps/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def __init__(
convert_dtypes: Optional[dict] = None,
drop_na_columns: Optional[list] = None,
drop_ids: Optional[dict] = None,

):
self.init_logger()
self.fill_missing = fill_missing
Expand Down Expand Up @@ -93,9 +92,12 @@ def execute(self, data: DataContainer) -> DataContainer:
if dropped_ids:
df = df.loc[~df[column].isin(dropped_ids)].copy()
dropped_rows = initial_rows - len(df)
percentage_dropped = (dropped_rows / initial_rows) * 100 # Calculate the percentage of rows dropped
percentage_dropped = (
dropped_rows / initial_rows
) * 100 # Calculate the percentage of rows dropped
self.logger.info(
f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs {list(dropped_ids)} in column '{column}'"
f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs"
f" {list(dropped_ids)} in column '{column}'"
)
else:
self.logger.info(
Expand All @@ -109,7 +111,6 @@ def execute(self, data: DataContainer) -> DataContainer:
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")


data[DataContainer.CLEAN] = df

return data

0 comments on commit 3b3db93

Please sign in to comment.