diff --git a/pipeline_lib/core/data_container.py b/pipeline_lib/core/data_container.py index 1f50718..86c64e5 100644 --- a/pipeline_lib/core/data_container.py +++ b/pipeline_lib/core/data_container.py @@ -6,7 +6,7 @@ import logging import pickle import sys -from typing import Optional, Union +from typing import Any, Optional, Union import yaml @@ -21,23 +21,6 @@ class DataContainer: A dictionary to store data items. """ - RAW = "raw" - CLEAN = "clean" - TRAIN = "train" - VALIDATION = "validation" - TEST = "test" - MODEL = "model" - MODEL_INPUT = "model_input" - MODEL_OUTPUT = "model_output" - METRICS = "metrics" - PREDICTIONS = "predictions" - EXPLAINER = "explainer" - TUNING_PARAMS = "tuning_params" - TARGET = "target" - IMPORTANCE = "importance" - DROP_COLUMNS = "drop_columns" - FEATURES = "features" - def __init__(self, initial_data: Optional[dict] = None): """ Initialize the DataContainer with an empty dictionary or provided data. @@ -296,6 +279,319 @@ def from_yaml(cls, file_path: str) -> DataContainer: # The loaded data is used as the initial data for the DataContainer instance return cls(initial_data=data) + @property + def clean(self) -> Any: + """ + Get the clean data from the DataContainer. + + Returns + ------- + Any + The clean data stored in the DataContainer. + """ + return self["clean"] + + @clean.setter + def clean(self, value: Any): + """ + Set the clean data in the DataContainer. + + Parameters + ---------- + value + The clean data to be stored in the DataContainer. + """ + self["clean"] = value + + # create the same for raw + @property + def raw(self) -> Any: + """ + Get the raw data from the DataContainer. + + Returns + ------- + Any + The raw data stored in the DataContainer. + """ + return self["raw"] + + @raw.setter + def raw(self, value: Any): + """ + Set the raw data in the DataContainer. + + Parameters + ---------- + value + The raw data to be stored in the DataContainer. + """ + self["raw"] = value + + @property + def train(self) -> Any: + """ + Get the train data from the DataContainer. + + Returns + ------- + Any + The train data stored in the DataContainer. + """ + return self["train"] + + @train.setter + def train(self, value: Any): + """ + Set the train data in the DataContainer. + + Parameters + ---------- + value + The train data to be stored in the DataContainer. + """ + self["train"] = value + + @property + def validation(self) -> Any: + """ + Get the validation data from the DataContainer. + + Returns + ------- + Any + The validation data stored in the DataContainer. + """ + return self["validation"] + + @validation.setter + def validation(self, value: Any): + """ + Set the validation data in the DataContainer. + + Parameters + ---------- + value + The validation data to be stored in the DataContainer. + """ + self["validation"] = value + + @property + def model(self) -> Any: + """ + Get the model from the DataContainer. + + Returns + ------- + Any + The model stored in the DataContainer. + """ + return self["model"] + + @model.setter + def model(self, value: Any): + """ + Set the model in the DataContainer. + + Parameters + ---------- + value + The model to be stored in the DataContainer. + """ + self["model"] = value + + @property + def model_input(self) -> Any: + """ + Get the model input from the DataContainer. + + Returns + ------- + Any + The model input stored in the DataContainer. + """ + return self["model_input"] + + @model_input.setter + def model_input(self, value: Any): + """ + Set the model input in the DataContainer. + + Parameters + ---------- + value + The model input to be stored in the DataContainer. + """ + self["model_input"] = value + + @property + def model_output(self) -> Any: + """ + Get the model output from the DataContainer. + + Returns + ------- + Any + The model output stored in the DataContainer. + """ + return self["model_output"] + + @model_output.setter + def model_output(self, value: Any): + """ + Set the model output in the DataContainer. + + Parameters + ---------- + value + The model output to be stored in the DataContainer. + """ + self["model_output"] = value + + @property + def metrics(self) -> Any: + """ + Get the metrics from the DataContainer. + + Returns + ------- + Any + The metrics stored in the DataContainer. + """ + return self["metrics"] + + @metrics.setter + def metrics(self, value: Any): + """ + Set the metrics in the DataContainer. + + Parameters + ---------- + value + The metrics to be stored in the DataContainer. + """ + self["metrics"] = value + + @property + def predictions(self) -> Any: + """ + Get the predictions from the DataContainer. + + Returns + ------- + Any + The predictions stored in the DataContainer. + """ + return self["predictions"] + + @predictions.setter + def predictions(self, value: Any): + """ + Set the predictions in the DataContainer. + + Parameters + ---------- + value + The predictions to be stored in the DataContainer. + """ + self["predictions"] = value + + @property + def explainer(self) -> Any: + """ + Get the explainer from the DataContainer. + + Returns + ------- + Any + The explainer stored in the DataContainer. + """ + return self["explainer"] + + @explainer.setter + def explainer(self, value: Any): + """ + Set the explainer in the DataContainer. + + Parameters + ---------- + value + The explainer to be stored in the DataContainer. + """ + self["explainer"] = value + + @property + def tuning_params(self) -> Any: + """ + Get the tuning parameters from the DataContainer. + + Returns + ------- + Any + The tuning parameters stored in the DataContainer. + """ + return self["tuning_params"] + + @tuning_params.setter + def tuning_params(self, value: Any): + """ + Set the tuning parameters in the DataContainer. + + Parameters + ---------- + value + The tuning parameters to be stored in the DataContainer. + """ + self["tuning_params"] = value + + @property + def target(self) -> Any: + """ + Get the target from the DataContainer. + + Returns + ------- + Any + The target stored in the DataContainer. + """ + return self["target"] + + @target.setter + def target(self, value: Any): + """ + Set the target in the DataContainer. + + Parameters + ---------- + value + The target to be stored in the DataContainer. + """ + self["target"] = value + + @property + def features(self) -> Any: + """ + Get the features from the DataContainer. + + Returns + ------- + Any + The features stored in the DataContainer. + """ + return self["features"] + + @features.setter + def features(self, value: Any): + """ + Set the features in the DataContainer. + + Parameters + ---------- + value + The features to be stored in the DataContainer. + """ + self["features"] = value + def __eq__(self, other) -> bool: """ Compare this DataContainer with another for equality. diff --git a/pipeline_lib/core/steps/calculate_features.py b/pipeline_lib/core/steps/calculate_features.py index d6e0227..c65df61 100644 --- a/pipeline_lib/core/steps/calculate_features.py +++ b/pipeline_lib/core/steps/calculate_features.py @@ -70,7 +70,7 @@ def execute(self, data: DataContainer) -> DataContainer: """Execute the step.""" self.logger.info("Calculating features") - df = data[DataContainer.CLEAN] + df = data.clean created_features = [] if self.datetime_columns: @@ -97,6 +97,6 @@ def execute(self, data: DataContainer) -> DataContainer: self.logger.info(f"Created new features: {created_features}") - data[DataContainer.FEATURES] = df + data.features = df return data diff --git a/pipeline_lib/core/steps/calculate_metrics.py b/pipeline_lib/core/steps/calculate_metrics.py index 8589283..c3c917a 100644 --- a/pipeline_lib/core/steps/calculate_metrics.py +++ b/pipeline_lib/core/steps/calculate_metrics.py @@ -15,20 +15,20 @@ def __init__(self) -> None: def execute(self, data: DataContainer) -> DataContainer: self.logger.debug("Starting metric calculation") - model_output = data[DataContainer.MODEL_OUTPUT] + model_output = data.model_output - target_column_name = data.get(DataContainer.TARGET) + target_column_name = data.target if target_column_name is None: raise ValueError("Target column not found on any configuration.") true_values = model_output[target_column_name] - predictions = model_output[DataContainer.PREDICTIONS] + predictions = model_output["predictions"] mae = mean_absolute_error(true_values, predictions) rmse = np.sqrt(mean_squared_error(true_values, predictions)) results = {"MAE": str(mae), "RMSE": str(rmse)} self.logger.info(results) - data[DataContainer.METRICS] = results + data.metrics = results return data diff --git a/pipeline_lib/core/steps/clean.py b/pipeline_lib/core/steps/clean.py index 64a1576..7876244 100644 --- a/pipeline_lib/core/steps/clean.py +++ b/pipeline_lib/core/steps/clean.py @@ -23,7 +23,7 @@ def __init__( def execute(self, data: DataContainer) -> DataContainer: self.logger.info("Cleaning tabular data...") - df = data[DataContainer.RAW] + df = data.raw if self.fill_missing: for column, fill_value in self.fill_missing.items(): @@ -111,6 +111,6 @@ def execute(self, data: DataContainer) -> DataContainer: else: self.logger.warning(f"Column '{column}' not found in the DataFrame") - data[DataContainer.CLEAN] = df + data.clean = df return data diff --git a/pipeline_lib/core/steps/explainer_dashboard.py b/pipeline_lib/core/steps/explainer_dashboard.py index 6863b95..a5612c1 100644 --- a/pipeline_lib/core/steps/explainer_dashboard.py +++ b/pipeline_lib/core/steps/explainer_dashboard.py @@ -17,19 +17,15 @@ def __init__( def execute(self, data: DataContainer) -> DataContainer: self.logger.debug("Starting explainer dashboard") - model = data.get(DataContainer.MODEL) + model = data.model if model is None: raise ValueError("Model not found in data container.") - target = data.get(DataContainer.TARGET) + target = data.target if target is None: raise ValueError("Target column not found in any parameter.") - df = ( - data[DataContainer.FEATURES] - if DataContainer.FEATURES in data - else data[DataContainer.CLEAN] - ) + df = data.features if data.features is not None else data.clean if len(df) > self.max_samples: # Randomly sample a subset of data points if the dataset is larger than max_samples @@ -53,6 +49,6 @@ def execute(self, data: DataContainer) -> DataContainer: y_test, ) - data[DataContainer.EXPLAINER] = explainer + data.explainer = explainer return data diff --git a/pipeline_lib/core/steps/generate.py b/pipeline_lib/core/steps/generate.py index 6c5173b..24e412f 100644 --- a/pipeline_lib/core/steps/generate.py +++ b/pipeline_lib/core/steps/generate.py @@ -33,7 +33,7 @@ def execute(self, data: DataContainer) -> DataContainer: else: raise ValueError(f"Unsupported file type: {file_type}") - data[DataContainer.RAW] = df + data.raw = df self.logger.info(f"Generated DataFrame with shape: {df.shape}") diff --git a/pipeline_lib/core/steps/tabular_split.py b/pipeline_lib/core/steps/tabular_split.py index 68b78ce..8815929 100644 --- a/pipeline_lib/core/steps/tabular_split.py +++ b/pipeline_lib/core/steps/tabular_split.py @@ -19,11 +19,7 @@ def execute(self, data: DataContainer) -> DataContainer: """Execute the random train-validation split.""" self.logger.info("Splitting tabular data...") - df = ( - data[DataContainer.FEATURES] - if DataContainer.FEATURES in data - else data[DataContainer.CLEAN] - ) + df = data.features if data.features is not None else data.clean train_df, validation_df = train_test_split( df, train_size=self.train_percentage, random_state=42 @@ -41,7 +37,7 @@ def execute(self, data: DataContainer) -> DataContainer: f" {validation_rows/total_rows:.2%}" ) - data[DataContainer.TRAIN] = train_df - data[DataContainer.VALIDATION] = validation_df + data.train = train_df + data.validation = validation_df return data diff --git a/pipeline_lib/implementation/tabular/xgboost/fit_model.py b/pipeline_lib/implementation/tabular/xgboost/fit_model.py index 6200ee0..170c787 100644 --- a/pipeline_lib/implementation/tabular/xgboost/fit_model.py +++ b/pipeline_lib/implementation/tabular/xgboost/fit_model.py @@ -52,11 +52,8 @@ def execute(self, data: DataContainer) -> DataContainer: start_time = time.time() - data[DataContainer.TARGET] = self.target - data[DataContainer.DROP_COLUMNS] = self.drop_columns - - df_train = data[DataContainer.TRAIN] - df_valid = data[DataContainer.VALIDATION] + df_train = data.train + df_valid = data.validation if self.drop_columns: df_train = df_train.drop(columns=self.drop_columns) @@ -75,7 +72,7 @@ def execute(self, data: DataContainer) -> DataContainer: params = self.optimize_with_optuna( X_train, y_train, X_valid, y_valid, self.optuna_params ) - data[DataContainer.TUNING_PARAMS] = params + data.tuning_params = params model = xgb.XGBRegressor(**params) @@ -93,7 +90,8 @@ def execute(self, data: DataContainer) -> DataContainer: self.logger.info(f"XGBoost model fitting took {minutes} minutes and {seconds} seconds.") # Save the model to the data container - data[DataContainer.MODEL] = model + data.model = model + data.target = self.target if self.save_path: self.logger.info(f"Saving the model to {self.save_path}") diff --git a/pipeline_lib/implementation/tabular/xgboost/predict.py b/pipeline_lib/implementation/tabular/xgboost/predict.py index e52e9a1..e28954d 100644 --- a/pipeline_lib/implementation/tabular/xgboost/predict.py +++ b/pipeline_lib/implementation/tabular/xgboost/predict.py @@ -30,11 +30,7 @@ def __init__( def execute(self, data: DataContainer) -> DataContainer: self.logger.debug("Obtaining predictions for XGBoost model.") - model_input = ( - data[DataContainer.FEATURES] - if DataContainer.FEATURES in data - else data[DataContainer.CLEAN] - ) + model_input = data.features if data.features is not None else data.clean if self.drop_columns: self.logger.info(f"Dropping columns: {self.drop_columns}") @@ -44,9 +40,8 @@ def execute(self, data: DataContainer) -> DataContainer: predictions_df = pd.DataFrame(predictions, columns=["prediction"]) - model_input[DataContainer.PREDICTIONS] = predictions_df - data[DataContainer.MODEL] = self.model - data[DataContainer.MODEL_OUTPUT] = model_input - data[DataContainer.TARGET] = self.target - data[DataContainer.DROP_COLUMNS] = self.drop_columns + model_input["predictions"] = predictions_df + data.model = self.model + data.model_output = model_input + data.target = self.target return data