From 556ef5997ae693da9d94045163545269437890a1 Mon Sep 17 00:00:00 2001 From: Diego Marvid Date: Tue, 26 Mar 2024 17:30:38 -0300 Subject: [PATCH] refactor clean step --- pipeline_lib/core/steps/clean.py | 170 +++++++++++++++++-------------- 1 file changed, 92 insertions(+), 78 deletions(-) diff --git a/pipeline_lib/core/steps/clean.py b/pipeline_lib/core/steps/clean.py index 3c094f4..9c31ef4 100644 --- a/pipeline_lib/core/steps/clean.py +++ b/pipeline_lib/core/steps/clean.py @@ -28,93 +28,107 @@ def execute(self, data: DataContainer) -> DataContainer: df = data.raw - if self.fill_missing: - for column, fill_value in self.fill_missing.items(): - if column in df.columns: - df[column].fillna(fill_value, inplace=True) - self.logger.info( - f"Filled missing values in column '{column}' with {fill_value}" - ) - else: - self.logger.warning(f"Column '{column}' not found in the DataFrame") - if self.remove_outliers: - for column, method in self.remove_outliers.items(): - if column in df.columns: - if method == "clip": - q1 = df[column].quantile(0.25) - q3 = df[column].quantile(0.75) - iqr = q3 - q1 - lower_bound = q1 - (1.5 * iqr) - upper_bound = q3 + (1.5 * iqr) - df[column] = df[column].clip(lower=lower_bound, upper=upper_bound) - self.logger.info(f"Clipped outliers in column '{column}'") - elif method == "drop": - q1 = df[column].quantile(0.25) - q3 = df[column].quantile(0.75) - iqr = q3 - q1 - lower_bound = q1 - (1.5 * iqr) - upper_bound = q3 + (1.5 * iqr) - outliers = (df[column] < lower_bound) | (df[column] > upper_bound) - df = df[~outliers] - self.logger.info(f"Dropped outliers in column '{column}'") - else: - self.logger.warning(f"Unsupported outlier removal method '{method}'") - else: - self.logger.warning(f"Column '{column}' not found in the DataFrame") + df = self._remove_outliers(df) + + if self.fill_missing: + df = self._fill_missing(df) if self.convert_dtypes: - for column, dtype in self.convert_dtypes.items(): - if column in df.columns: - df[column] = df[column].astype(dtype) - self.logger.info(f"Converted column '{column}' to {dtype}") - else: - self.logger.warning(f"Column '{column}' not found in the DataFrame") + df = self._convert_dtypes(df) if self.drop_na_columns: - for column in self.drop_na_columns: - if column in df.columns: - initial_rows = len(df) - df.dropna(subset=[column], inplace=True) - dropped_rows = initial_rows - len(df) - self.logger.info( - f"Dropped {dropped_rows} rows with None values in column '{column}'" - ) - else: - self.logger.warning(f"Column '{column}' not found in the DataFrame") + df = self._drop_na_columns(df) if self.drop_ids: - for column, ids in self.drop_ids.items(): - if column in df.columns: - initial_rows = len(df) - initial_ids = set(df[column].unique()) - - dropped_ids = set(ids) & initial_ids - not_found_ids = set(ids) - initial_ids - - if dropped_ids: - df = df.loc[~df[column].isin(dropped_ids)].copy() - dropped_rows = initial_rows - len(df) - percentage_dropped = ( - dropped_rows / initial_rows - ) * 100 # Calculate the percentage of rows dropped - self.logger.info( - f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs" - f" {list(dropped_ids)} in column '{column}'" - ) - else: - self.logger.info( - f"No rows dropped for IDs {list(ids)} in column '{column}'" - ) - - if not_found_ids: - self.logger.warning( - f"IDs {list(not_found_ids)} not found in column '{column}'" - ) - else: - self.logger.warning(f"Column '{column}' not found in the DataFrame") + df = self._drop_ids(df) data.clean = df data.flow = df return data + + def _remove_outliers(self, df): + for column, method in self.remove_outliers.items(): + if column in df.columns: + if method == "clip": + q1 = df[column].quantile(0.25) + q3 = df[column].quantile(0.75) + iqr = q3 - q1 + lower_bound = q1 - (1.5 * iqr) + upper_bound = q3 + (1.5 * iqr) + df[column] = df[column].clip(lower=lower_bound, upper=upper_bound) + self.logger.info(f"Clipped outliers in column '{column}'") + elif method == "drop": + q1 = df[column].quantile(0.25) + q3 = df[column].quantile(0.75) + iqr = q3 - q1 + lower_bound = q1 - (1.5 * iqr) + upper_bound = q3 + (1.5 * iqr) + outliers = (df[column] < lower_bound) | (df[column] > upper_bound) + df = df[~outliers] + self.logger.info(f"Dropped outliers in column '{column}'") + else: + self.logger.warning(f"Unsupported outlier removal method '{method}'") + else: + self.logger.warning(f"Column '{column}' not found in the DataFrame") + return df + + def _fill_missing(self, df): + for column, fill_value in self.fill_missing.items(): + if column in df.columns: + df[column].fillna(fill_value, inplace=True) + self.logger.info(f"Filled missing values in column '{column}' with {fill_value}") + else: + self.logger.warning(f"Column '{column}' not found in the DataFrame") + return df + + def _convert_dtypes(self, df): + for column, dtype in self.convert_dtypes.items(): + if column in df.columns: + df[column] = df[column].astype(dtype) + self.logger.info(f"Converted column '{column}' to {dtype}") + else: + self.logger.warning(f"Column '{column}' not found in the DataFrame") + return df + + def _drop_na_columns(self, df): + for column in self.drop_na_columns: + if column in df.columns: + initial_rows = len(df) + df.dropna(subset=[column], inplace=True) + dropped_rows = initial_rows - len(df) + self.logger.info( + f"Dropped {dropped_rows} rows with None values in column '{column}'" + ) + else: + self.logger.warning(f"Column '{column}' not found in the DataFrame") + return df + + def _drop_ids(self, df): + for column, ids in self.drop_ids.items(): + if column in df.columns: + initial_rows = len(df) + initial_ids = set(df[column].unique()) + + dropped_ids = set(ids) & initial_ids + not_found_ids = set(ids) - initial_ids + + if dropped_ids: + df = df.loc[~df[column].isin(dropped_ids)].copy() + dropped_rows = initial_rows - len(df) + percentage_dropped = ( + dropped_rows / initial_rows + ) * 100 # Calculate the percentage of rows dropped + self.logger.info( + f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs" + f" {list(dropped_ids)} in column '{column}'" + ) + else: + self.logger.info(f"No rows dropped for IDs {list(ids)} in column '{column}'") + + if not_found_ids: + self.logger.warning(f"IDs {list(not_found_ids)} not found in column '{column}'") + else: + self.logger.warning(f"Column '{column}' not found in the DataFrame") + return df