From b85720fdd53f9bec87b916e3a1ef75a06827c4fa Mon Sep 17 00:00:00 2001 From: Diego Marvid Date: Mon, 18 Mar 2024 14:11:51 -0300 Subject: [PATCH] add calculate features and improve clean step --- pipeline_lib/core/data_container.py | 1 + pipeline_lib/core/steps/calculate_features.py | 53 ++++++++++++++++++- pipeline_lib/core/steps/clean.py | 46 ++++++++++++++++ pipeline_lib/core/steps/tabular_split.py | 6 ++- 4 files changed, 103 insertions(+), 3 deletions(-) diff --git a/pipeline_lib/core/data_container.py b/pipeline_lib/core/data_container.py index 8fe73c5..1f50718 100644 --- a/pipeline_lib/core/data_container.py +++ b/pipeline_lib/core/data_container.py @@ -36,6 +36,7 @@ class DataContainer: TARGET = "target" IMPORTANCE = "importance" DROP_COLUMNS = "drop_columns" + FEATURES = "features" def __init__(self, initial_data: Optional[dict] = None): """ diff --git a/pipeline_lib/core/steps/calculate_features.py b/pipeline_lib/core/steps/calculate_features.py index 0316cb1..2c52397 100644 --- a/pipeline_lib/core/steps/calculate_features.py +++ b/pipeline_lib/core/steps/calculate_features.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import List, Optional from pipeline_lib.core import DataContainer from pipeline_lib.core.steps.base import PipelineStep @@ -7,12 +7,61 @@ class CalculateFeaturesStep(PipelineStep): """Calculate features.""" - def __init__(self, config: Optional[dict] = None) -> None: + def __init__( + self, + datetime_columns: Optional[List[str]] = None, + features: Optional[List[str]] = None, + config: Optional[dict] = None, + ) -> None: """Initialize CalculateFeaturesStep.""" super().__init__(config=config) self.init_logger() + self.datetime_columns = datetime_columns + self.features = features def execute(self, data: DataContainer) -> DataContainer: """Execute the step.""" self.logger.info("Calculating features") + + df = data[DataContainer.CLEAN] + + if self.datetime_columns: + for column in self.datetime_columns: + if column in df.columns: + if self.features: + for feature in self.features: + if feature == "year": + df.loc[:, f"{column}_year"] = df[column].dt.year + elif feature == "month": + df.loc[:, f"{column}_month"] = df[column].dt.month + elif feature == "day": + df.loc[:, f"{column}_day"] = df[column].dt.day + elif feature == "hour": + df.loc[:, f"{column}_hour"] = df[column].dt.hour + elif feature == "minute": + df.loc[:, f"{column}_minute"] = df[column].dt.minute + elif feature == "second": + df.loc[:, f"{column}_second"] = df[column].dt.second + elif feature == "weekday": + df.loc[:, f"{column}_weekday"] = df[column].dt.weekday + elif feature == "dayofyear": + df.loc[:, f"{column}_dayofyear"] = df[column].dt.dayofyear + else: + self.logger.warning(f"Unsupported datetime feature: {feature}") + else: + self.logger.warning( + "No datetime features specified. Skipping feature extraction." + ) + else: + self.logger.warning(f"Datetime column '{column}' not found in the DataFrame") + else: + self.logger.warning("No datetime columns specified. Skipping feature extraction.") + + # drop original datetime columns + if self.datetime_columns: + df = df.drop(columns=self.datetime_columns) + self.logger.info(f"Dropped datetime columns: {self.datetime_columns}") + + data[DataContainer.FEATURES] = df + return data diff --git a/pipeline_lib/core/steps/clean.py b/pipeline_lib/core/steps/clean.py index 0341f5a..a52ee14 100644 --- a/pipeline_lib/core/steps/clean.py +++ b/pipeline_lib/core/steps/clean.py @@ -10,11 +10,16 @@ def __init__( fill_missing: Optional[dict] = None, remove_outliers: Optional[dict] = None, convert_dtypes: Optional[dict] = None, + drop_na_columns: Optional[list] = None, + drop_ids: Optional[dict] = None, + ): self.init_logger() self.fill_missing = fill_missing self.remove_outliers = remove_outliers self.convert_dtypes = convert_dtypes + self.drop_na_columns = drop_na_columns + self.drop_ids = drop_ids def execute(self, data: DataContainer) -> DataContainer: self.logger.info("Cleaning tabular data...") @@ -64,6 +69,47 @@ def execute(self, data: DataContainer) -> DataContainer: else: self.logger.warning(f"Column '{column}' not found in the DataFrame") + if self.drop_na_columns: + for column in self.drop_na_columns: + if column in df.columns: + initial_rows = len(df) + df.dropna(subset=[column], inplace=True) + dropped_rows = initial_rows - len(df) + self.logger.info( + f"Dropped {dropped_rows} rows with None values in column '{column}'" + ) + else: + self.logger.warning(f"Column '{column}' not found in the DataFrame") + + if self.drop_ids: + for column, ids in self.drop_ids.items(): + if column in df.columns: + initial_rows = len(df) + initial_ids = set(df[column].unique()) + + dropped_ids = set(ids) & initial_ids + not_found_ids = set(ids) - initial_ids + + if dropped_ids: + df = df.loc[~df[column].isin(dropped_ids)].copy() + dropped_rows = initial_rows - len(df) + percentage_dropped = (dropped_rows / initial_rows) * 100 # Calculate the percentage of rows dropped + self.logger.info( + f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs {list(dropped_ids)} in column '{column}'" + ) + else: + self.logger.info( + f"No rows dropped for IDs {list(ids)} in column '{column}'" + ) + + if not_found_ids: + self.logger.warning( + f"IDs {list(not_found_ids)} not found in column '{column}'" + ) + else: + self.logger.warning(f"Column '{column}' not found in the DataFrame") + + data[DataContainer.CLEAN] = df return data diff --git a/pipeline_lib/core/steps/tabular_split.py b/pipeline_lib/core/steps/tabular_split.py index ae5673b..68b78ce 100644 --- a/pipeline_lib/core/steps/tabular_split.py +++ b/pipeline_lib/core/steps/tabular_split.py @@ -19,7 +19,11 @@ def execute(self, data: DataContainer) -> DataContainer: """Execute the random train-validation split.""" self.logger.info("Splitting tabular data...") - df = data[DataContainer.CLEAN] + df = ( + data[DataContainer.FEATURES] + if DataContainer.FEATURES in data + else data[DataContainer.CLEAN] + ) train_df, validation_df = train_test_split( df, train_size=self.train_percentage, random_state=42