Skip to content

Commit

Permalink
add calculate features and improve clean step
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 18, 2024
1 parent 86edc65 commit b85720f
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 3 deletions.
1 change: 1 addition & 0 deletions pipeline_lib/core/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DataContainer:
TARGET = "target"
IMPORTANCE = "importance"
DROP_COLUMNS = "drop_columns"
FEATURES = "features"

def __init__(self, initial_data: Optional[dict] = None):
"""
Expand Down
53 changes: 51 additions & 2 deletions pipeline_lib/core/steps/calculate_features.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import List, Optional

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps.base import PipelineStep
Expand All @@ -7,12 +7,61 @@
class CalculateFeaturesStep(PipelineStep):
"""Calculate features."""

def __init__(self, config: Optional[dict] = None) -> None:
def __init__(
self,
datetime_columns: Optional[List[str]] = None,
features: Optional[List[str]] = None,
config: Optional[dict] = None,
) -> None:
"""Initialize CalculateFeaturesStep."""
super().__init__(config=config)
self.init_logger()
self.datetime_columns = datetime_columns
self.features = features

def execute(self, data: DataContainer) -> DataContainer:
"""Execute the step."""
self.logger.info("Calculating features")

df = data[DataContainer.CLEAN]

if self.datetime_columns:
for column in self.datetime_columns:
if column in df.columns:
if self.features:
for feature in self.features:
if feature == "year":
df.loc[:, f"{column}_year"] = df[column].dt.year
elif feature == "month":
df.loc[:, f"{column}_month"] = df[column].dt.month
elif feature == "day":
df.loc[:, f"{column}_day"] = df[column].dt.day
elif feature == "hour":
df.loc[:, f"{column}_hour"] = df[column].dt.hour
elif feature == "minute":
df.loc[:, f"{column}_minute"] = df[column].dt.minute
elif feature == "second":
df.loc[:, f"{column}_second"] = df[column].dt.second
elif feature == "weekday":
df.loc[:, f"{column}_weekday"] = df[column].dt.weekday
elif feature == "dayofyear":
df.loc[:, f"{column}_dayofyear"] = df[column].dt.dayofyear
else:
self.logger.warning(f"Unsupported datetime feature: {feature}")
else:
self.logger.warning(
"No datetime features specified. Skipping feature extraction."
)
else:
self.logger.warning(f"Datetime column '{column}' not found in the DataFrame")
else:
self.logger.warning("No datetime columns specified. Skipping feature extraction.")

# drop original datetime columns
if self.datetime_columns:
df = df.drop(columns=self.datetime_columns)
self.logger.info(f"Dropped datetime columns: {self.datetime_columns}")

data[DataContainer.FEATURES] = df

return data
46 changes: 46 additions & 0 deletions pipeline_lib/core/steps/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ def __init__(
fill_missing: Optional[dict] = None,
remove_outliers: Optional[dict] = None,
convert_dtypes: Optional[dict] = None,
drop_na_columns: Optional[list] = None,
drop_ids: Optional[dict] = None,

):
self.init_logger()
self.fill_missing = fill_missing
self.remove_outliers = remove_outliers
self.convert_dtypes = convert_dtypes
self.drop_na_columns = drop_na_columns
self.drop_ids = drop_ids

def execute(self, data: DataContainer) -> DataContainer:
self.logger.info("Cleaning tabular data...")
Expand Down Expand Up @@ -64,6 +69,47 @@ def execute(self, data: DataContainer) -> DataContainer:
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")

if self.drop_na_columns:
for column in self.drop_na_columns:
if column in df.columns:
initial_rows = len(df)
df.dropna(subset=[column], inplace=True)
dropped_rows = initial_rows - len(df)
self.logger.info(
f"Dropped {dropped_rows} rows with None values in column '{column}'"
)
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")

if self.drop_ids:
for column, ids in self.drop_ids.items():
if column in df.columns:
initial_rows = len(df)
initial_ids = set(df[column].unique())

dropped_ids = set(ids) & initial_ids
not_found_ids = set(ids) - initial_ids

if dropped_ids:
df = df.loc[~df[column].isin(dropped_ids)].copy()
dropped_rows = initial_rows - len(df)
percentage_dropped = (dropped_rows / initial_rows) * 100 # Calculate the percentage of rows dropped
self.logger.info(
f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs {list(dropped_ids)} in column '{column}'"
)
else:
self.logger.info(
f"No rows dropped for IDs {list(ids)} in column '{column}'"
)

if not_found_ids:
self.logger.warning(
f"IDs {list(not_found_ids)} not found in column '{column}'"
)
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")


data[DataContainer.CLEAN] = df

return data
6 changes: 5 additions & 1 deletion pipeline_lib/core/steps/tabular_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ def execute(self, data: DataContainer) -> DataContainer:
"""Execute the random train-validation split."""
self.logger.info("Splitting tabular data...")

df = data[DataContainer.CLEAN]
df = (
data[DataContainer.FEATURES]
if DataContainer.FEATURES in data
else data[DataContainer.CLEAN]
)

train_df, validation_df = train_test_split(
df, train_size=self.train_percentage, random_state=42
Expand Down

0 comments on commit b85720f

Please sign in to comment.