Skip to content

Commit

Permalink
change config dict in init to **params
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed Mar 13, 2024
1 parent 871bbec commit b447fc9
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 104 deletions.
2 changes: 1 addition & 1 deletion pipeline_lib/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def from_json(cls, path: str) -> Pipeline:
)

step_class = Pipeline.get_step_class(step_type)
step = step_class(config=parameters)
step = step_class(**parameters)
steps.append(step)

pipeline.add_steps(steps)
Expand Down
4 changes: 2 additions & 2 deletions pipeline_lib/core/steps/calculate_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
class CalculateMetricsStep(PipelineStep):
"""Calculate metrics."""

def __init__(self, config: Optional[dict] = None) -> None:
def __init__(self) -> None:
"""Initialize CalculateMetricsStep."""
super().__init__(config=config)
super().__init__()
self.init_logger()

def execute(self, data: DataContainer) -> DataContainer:
Expand Down
59 changes: 22 additions & 37 deletions pipeline_lib/core/steps/tabular_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@
class TabularSplitStep(PipelineStep):
"""Split the data."""

def __init__(self, config: Optional[dict] = None) -> None:
def __init__(
self,
train_percentage: float,
id_column: str,
train_ids: Optional[list[str]] = None,
validation_ids: Optional[list[str]] = None,
) -> None:
"""Initialize SplitStep."""
super().__init__(config=config)
self.init_logger()
self.train_percentage = train_percentage
self.id_column_name = id_column
self.train_ids = train_ids
self.validation_ids = validation_ids

def _id_based_split(
self,
Expand Down Expand Up @@ -76,50 +85,24 @@ def execute(self, data: DataContainer) -> DataContainer:
"""Execute the split based on IDs."""
self.logger.info("Splitting tabular data...")

split_configs = self.config

if split_configs is None:
self.logger.info("No split_configs found. No splitting will be performed.")
return data

df = data[DataContainer.CLEAN]
id_column_name = split_configs.get("id_column")
if not id_column_name:
raise ValueError("ID column name must be specified in split_configs.")

# check if both train_percentage and train_ids are provided
if "train_percentage" in split_configs and "train_ids" in split_configs:
raise ValueError(
"Both train_percentage and train_ids cannot be provided in split_configs."
)

# check if either train_percentage or train_ids are provided
if "train_percentage" not in split_configs and "train_ids" not in split_configs:
raise ValueError(
"Either train_percentage or train_ids must be provided in split_configs."
)

if "train_percentage" in split_configs:
train_percentage = split_configs.get("train_percentage")
if train_percentage is None or train_percentage <= 0 or train_percentage >= 1:
if self.train_percentage:
if (
self.train_percentage is None
or self.train_percentage <= 0
or self.train_percentage >= 1
):
raise ValueError("train_percentage must be between 0 and 1.")
train_ids, validation_ids = self._percentage_based_id_split(
df, train_percentage, id_column_name
df, self.train_percentage, self.id_column_name
)
else:
train_ids = split_configs.get("train_ids")
validation_ids = split_configs.get("validation_ids")
if not train_ids or not validation_ids:
raise ValueError(
"Both train_ids and validation_ids must be provided in split_configs unless"
" train_percentage is specified."
)

self.logger.info(f"Number of train ids: {len(train_ids)}")
self.logger.info(f"Number of validation ids: {len(validation_ids)}")

train_df, validation_df = self._id_based_split(
df, train_ids, validation_ids, id_column_name
df, train_ids, validation_ids, self.id_column_name
)

train_rows = len(train_df)
Expand All @@ -134,7 +117,9 @@ def execute(self, data: DataContainer) -> DataContainer:
f" {validation_rows/total_rows:.2%}"
)

left_ids = df[~df[id_column_name].isin(train_ids + validation_ids)][id_column_name].unique()
left_ids = df[~df[self.id_column_name].isin(train_ids + validation_ids)][
self.id_column_name
].unique()
self.logger.info(f"Number of IDs left from total df: {len(left_ids)}")
self.logger.debug(f"IDs left from total df: {left_ids}")

Expand Down
87 changes: 48 additions & 39 deletions pipeline_lib/implementation/tabular/xgboost/fit_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,70 @@
from optuna.pruners import MedianPruner
from sklearn.metrics import mean_absolute_error

from typing import Optional

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps import FitModelStep


class XGBoostFitModelStep(FitModelStep):
"""Fit the model with XGBoost."""

def execute(self, data: DataContainer) -> DataContainer:
self.logger.debug("Starting model fitting with XGBoost")
def __init__(
self,
target: str,
drop_columns: Optional[list[str]] = None,
xgb_params: Optional[dict] = None,
optuna_params: Optional[dict] = None,
save_path: Optional[str] = None,
) -> None:
self.init_logger()

start_time = time.time()
if target is None:
raise ValueError("Target column not found in the parameters.")

model_configs = self.config
self.target = target
self.drop_columns = drop_columns

if model_configs is None:
raise ValueError("No model configs found")
if optuna_params and xgb_params:
raise ValueError("Both optuna_params and xgb_params are defined. Please choose one.")

target = model_configs.get("target")
if not optuna_params and not xgb_params:
raise ValueError(
"No parameters defined. Please define either optuna_params or xgb_params."
)

if target is None:
raise ValueError("Target column not found in model_configs.")
self.xgb_params = xgb_params
self.optuna_params = optuna_params
self.save_path = save_path

data[DataContainer.TARGET] = target
def execute(self, data: DataContainer) -> DataContainer:
self.logger.debug("Starting model fitting with XGBoost")

start_time = time.time()

data[DataContainer.TARGET] = self.target

df_train = data[DataContainer.TRAIN]
df_valid = data[DataContainer.VALIDATION]

drop_columns = model_configs.get("drop_columns")

if drop_columns:
df_train = df_train.drop(columns=drop_columns)
df_valid = df_valid.drop(columns=drop_columns)
if self.drop_columns:
df_train = df_train.drop(columns=self.drop_columns)
df_valid = df_valid.drop(columns=self.drop_columns)

# Prepare the data
X_train = df_train.drop(columns=[target])
y_train = df_train[target]
X_train = df_train.drop(columns=[self.target])
y_train = df_train[self.target]

X_valid = df_valid.drop(columns=[target])
y_valid = df_valid[target]
X_valid = df_valid.drop(columns=[self.target])
y_valid = df_valid[self.target]

optuna_params = model_configs.get("optuna_params")
xgb_params = model_configs.get("xgb_params")
params = self.xgb_params

if optuna_params and xgb_params:
raise ValueError("Both optuna_params and xgb_params are defined. Please choose one.")

if not optuna_params and not xgb_params:
raise ValueError(
"No parameters defined. Please define either optuna_params or xgb_params."
if self.optuna_params:
params = self.optimize_with_optuna(
X_train, y_train, X_valid, y_valid, self.optuna_params
)

params = xgb_params

if optuna_params:
params = self.optimize_with_optuna(X_train, y_train, X_valid, y_valid, optuna_params)
data[DataContainer.TUNING_PARAMS] = params

model = xgb.XGBRegressor(**params)
Expand All @@ -69,10 +78,15 @@ def execute(self, data: DataContainer) -> DataContainer:
X_train,
y_train,
eval_set=[(X_valid, y_valid)],
early_stopping_rounds=model_configs.get("early_stopping_rounds", 100),
verbose=True,
)

end_time = time.time()
elapsed_time = end_time - start_time
minutes = int(elapsed_time // 60)
seconds = int(elapsed_time % 60)
self.logger.info(f"XGBoost model fitting took {minutes} minutes and {seconds} seconds.")

# Save the model to the data container
data[DataContainer.MODEL] = model

Expand All @@ -81,19 +95,14 @@ def execute(self, data: DataContainer) -> DataContainer:
data[DataContainer.IMPORTANCE] = importance

# save model to disk
save_path = model_configs.get("save_path")
save_path = self.save_path

if save_path:
if not save_path.endswith(".joblib"):
raise ValueError("Only joblib format is supported for saving the model.")
self.logger.info(f"Saving the model to {save_path}")
dump(model, save_path)

end_time = time.time()
elapsed_time = end_time - start_time
minutes = int(elapsed_time // 60)
seconds = int(elapsed_time % 60)
self.logger.info(f"XGBoost model fitting took {minutes} minutes and {seconds} seconds.")
return data

def optimize_with_optuna(self, X_train, y_train, X_valid, y_valid, optuna_params):
Expand Down
45 changes: 20 additions & 25 deletions pipeline_lib/implementation/tabular/xgboost/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,42 @@

from pipeline_lib.core import DataContainer
from pipeline_lib.core.steps import PredictStep
from typing import Optional


class XGBoostPredictStep(PredictStep):
"""Obtain the predictions for XGBoost model."""

def execute(self, data: DataContainer) -> DataContainer:
self.logger.debug("Obtaining predictions for XGBoost model.")

if not self.config:
raise ValueError("No prediction configs found.")

load_path = self.config.get("load_path")
if not load_path:
raise ValueError("No load path found in model_configs.")
def __init__(
self,
target: str,
load_path: str,
drop_columns: Optional[list[str]] = None,
) -> None:
self.init_logger()

if not load_path.endswith(".joblib"):
raise ValueError("Only joblib format is supported for loading the model.")
model = load(load_path)

model_input = data[DataContainer.CLEAN]
self.target = target
self.load_path = load_path
self.drop_columns = drop_columns

if not isinstance(model_input, pd.DataFrame):
raise ValueError("model_input must be a pandas DataFrame.")
self.model = load(self.load_path)

if self.config:
drop_columns = self.config.get("drop_columns")
if drop_columns:
model_input = model_input.drop(columns=drop_columns)
def execute(self, data: DataContainer) -> DataContainer:
self.logger.debug("Obtaining predictions for XGBoost model.")

target = self.config.get("target")
if target is None:
raise ValueError("Target column not found in model_configs.")
data[DataContainer.TARGET] = target
model_input = data[DataContainer.CLEAN]

predictions = model.predict(model_input.drop(columns=[target]))
else:
predictions = model.predict(model_input)
if self.drop_columns:
model_input = model_input.drop(columns=self.drop_columns)

predictions = self.model.predict(model_input.drop(columns=[self.target]))

predictions_df = pd.DataFrame(predictions, columns=["prediction"])

model_input[DataContainer.PREDICTIONS] = predictions_df

data[DataContainer.MODEL_OUTPUT] = model_input
data[DataContainer.TARGET] = self.target
return data

0 comments on commit b447fc9

Please sign in to comment.