Skip to content

Commit

Permalink
refactor clean step
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 26, 2024
1 parent 6276cc5 commit 556ef59
Showing 1 changed file with 92 additions and 78 deletions.
170 changes: 92 additions & 78 deletions pipeline_lib/core/steps/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,93 +28,107 @@ def execute(self, data: DataContainer) -> DataContainer:

df = data.raw

if self.fill_missing:
for column, fill_value in self.fill_missing.items():
if column in df.columns:
df[column].fillna(fill_value, inplace=True)
self.logger.info(
f"Filled missing values in column '{column}' with {fill_value}"
)
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")

if self.remove_outliers:
for column, method in self.remove_outliers.items():
if column in df.columns:
if method == "clip":
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - (1.5 * iqr)
upper_bound = q3 + (1.5 * iqr)
df[column] = df[column].clip(lower=lower_bound, upper=upper_bound)
self.logger.info(f"Clipped outliers in column '{column}'")
elif method == "drop":
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - (1.5 * iqr)
upper_bound = q3 + (1.5 * iqr)
outliers = (df[column] < lower_bound) | (df[column] > upper_bound)
df = df[~outliers]
self.logger.info(f"Dropped outliers in column '{column}'")
else:
self.logger.warning(f"Unsupported outlier removal method '{method}'")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
df = self._remove_outliers(df)

if self.fill_missing:
df = self._fill_missing(df)

if self.convert_dtypes:
for column, dtype in self.convert_dtypes.items():
if column in df.columns:
df[column] = df[column].astype(dtype)
self.logger.info(f"Converted column '{column}' to {dtype}")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
df = self._convert_dtypes(df)

if self.drop_na_columns:
for column in self.drop_na_columns:
if column in df.columns:
initial_rows = len(df)
df.dropna(subset=[column], inplace=True)
dropped_rows = initial_rows - len(df)
self.logger.info(
f"Dropped {dropped_rows} rows with None values in column '{column}'"
)
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
df = self._drop_na_columns(df)

if self.drop_ids:
for column, ids in self.drop_ids.items():
if column in df.columns:
initial_rows = len(df)
initial_ids = set(df[column].unique())

dropped_ids = set(ids) & initial_ids
not_found_ids = set(ids) - initial_ids

if dropped_ids:
df = df.loc[~df[column].isin(dropped_ids)].copy()
dropped_rows = initial_rows - len(df)
percentage_dropped = (
dropped_rows / initial_rows
) * 100 # Calculate the percentage of rows dropped
self.logger.info(
f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs"
f" {list(dropped_ids)} in column '{column}'"
)
else:
self.logger.info(
f"No rows dropped for IDs {list(ids)} in column '{column}'"
)

if not_found_ids:
self.logger.warning(
f"IDs {list(not_found_ids)} not found in column '{column}'"
)
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
df = self._drop_ids(df)

data.clean = df
data.flow = df

return data

def _remove_outliers(self, df):
for column, method in self.remove_outliers.items():
if column in df.columns:
if method == "clip":
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - (1.5 * iqr)
upper_bound = q3 + (1.5 * iqr)
df[column] = df[column].clip(lower=lower_bound, upper=upper_bound)
self.logger.info(f"Clipped outliers in column '{column}'")
elif method == "drop":
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - (1.5 * iqr)
upper_bound = q3 + (1.5 * iqr)
outliers = (df[column] < lower_bound) | (df[column] > upper_bound)
df = df[~outliers]
self.logger.info(f"Dropped outliers in column '{column}'")
else:
self.logger.warning(f"Unsupported outlier removal method '{method}'")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
return df

def _fill_missing(self, df):
for column, fill_value in self.fill_missing.items():
if column in df.columns:
df[column].fillna(fill_value, inplace=True)
self.logger.info(f"Filled missing values in column '{column}' with {fill_value}")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
return df

def _convert_dtypes(self, df):
for column, dtype in self.convert_dtypes.items():
if column in df.columns:
df[column] = df[column].astype(dtype)
self.logger.info(f"Converted column '{column}' to {dtype}")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
return df

def _drop_na_columns(self, df):
for column in self.drop_na_columns:
if column in df.columns:
initial_rows = len(df)
df.dropna(subset=[column], inplace=True)
dropped_rows = initial_rows - len(df)
self.logger.info(
f"Dropped {dropped_rows} rows with None values in column '{column}'"
)
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
return df

def _drop_ids(self, df):
for column, ids in self.drop_ids.items():
if column in df.columns:
initial_rows = len(df)
initial_ids = set(df[column].unique())

dropped_ids = set(ids) & initial_ids
not_found_ids = set(ids) - initial_ids

if dropped_ids:
df = df.loc[~df[column].isin(dropped_ids)].copy()
dropped_rows = initial_rows - len(df)
percentage_dropped = (
dropped_rows / initial_rows
) * 100 # Calculate the percentage of rows dropped
self.logger.info(
f"Dropped {dropped_rows} rows ({percentage_dropped:.2f}%) with IDs"
f" {list(dropped_ids)} in column '{column}'"
)
else:
self.logger.info(f"No rows dropped for IDs {list(ids)} in column '{column}'")

if not_found_ids:
self.logger.warning(f"IDs {list(not_found_ids)} not found in column '{column}'")
else:
self.logger.warning(f"Column '{column}' not found in the DataFrame")
return df

0 comments on commit 556ef59

Please sign in to comment.