diff --git a/pipeline_lib/core/data_container.py b/pipeline_lib/core/data_container.py index 86c64e5..ed895ae 100644 --- a/pipeline_lib/core/data_container.py +++ b/pipeline_lib/core/data_container.py @@ -592,6 +592,30 @@ def features(self, value: Any): """ self["features"] = value + @property + def flow(self) -> Any: + """ + Get the flow from the DataContainer. + + Returns + ------- + Any + The flow stored in the DataContainer. + """ + return self["flow"] + + @flow.setter + def flow(self, value: Any): + """ + Set the flow in the DataContainer. + + Parameters + ---------- + value + The flow to be stored in the DataContainer. + """ + self["flow"] = value + def __eq__(self, other) -> bool: """ Compare this DataContainer with another for equality. diff --git a/pipeline_lib/core/steps/calculate_features.py b/pipeline_lib/core/steps/calculate_features.py index c65df61..00ca7bb 100644 --- a/pipeline_lib/core/steps/calculate_features.py +++ b/pipeline_lib/core/steps/calculate_features.py @@ -70,7 +70,7 @@ def execute(self, data: DataContainer) -> DataContainer: """Execute the step.""" self.logger.info("Calculating features") - df = data.clean + df = data.flow created_features = [] if self.datetime_columns: @@ -97,6 +97,6 @@ def execute(self, data: DataContainer) -> DataContainer: self.logger.info(f"Created new features: {created_features}") - data.features = df + data.flow = df return data diff --git a/pipeline_lib/core/steps/clean.py b/pipeline_lib/core/steps/clean.py index 7876244..e9fef35 100644 --- a/pipeline_lib/core/steps/clean.py +++ b/pipeline_lib/core/steps/clean.py @@ -112,5 +112,6 @@ def execute(self, data: DataContainer) -> DataContainer: self.logger.warning(f"Column '{column}' not found in the DataFrame") data.clean = df + data.flow = df return data diff --git a/pipeline_lib/core/steps/explainer_dashboard.py b/pipeline_lib/core/steps/explainer_dashboard.py index a5612c1..ee1f325 100644 --- a/pipeline_lib/core/steps/explainer_dashboard.py +++ b/pipeline_lib/core/steps/explainer_dashboard.py @@ -25,7 +25,7 @@ def execute(self, data: DataContainer) -> DataContainer: if target is None: raise ValueError("Target column not found in any parameter.") - df = data.features if data.features is not None else data.clean + df = data.flow if len(df) > self.max_samples: # Randomly sample a subset of data points if the dataset is larger than max_samples diff --git a/pipeline_lib/core/steps/generate.py b/pipeline_lib/core/steps/generate.py index 24e412f..c4c1ff1 100644 --- a/pipeline_lib/core/steps/generate.py +++ b/pipeline_lib/core/steps/generate.py @@ -34,6 +34,7 @@ def execute(self, data: DataContainer) -> DataContainer: raise ValueError(f"Unsupported file type: {file_type}") data.raw = df + data.flow = df self.logger.info(f"Generated DataFrame with shape: {df.shape}") diff --git a/pipeline_lib/core/steps/tabular_split.py b/pipeline_lib/core/steps/tabular_split.py index 8815929..1e7e31e 100644 --- a/pipeline_lib/core/steps/tabular_split.py +++ b/pipeline_lib/core/steps/tabular_split.py @@ -19,7 +19,7 @@ def execute(self, data: DataContainer) -> DataContainer: """Execute the random train-validation split.""" self.logger.info("Splitting tabular data...") - df = data.features if data.features is not None else data.clean + df = data.flow train_df, validation_df = train_test_split( df, train_size=self.train_percentage, random_state=42 diff --git a/pipeline_lib/implementation/tabular/xgboost/predict.py b/pipeline_lib/implementation/tabular/xgboost/predict.py index e28954d..2759685 100644 --- a/pipeline_lib/implementation/tabular/xgboost/predict.py +++ b/pipeline_lib/implementation/tabular/xgboost/predict.py @@ -30,7 +30,7 @@ def __init__( def execute(self, data: DataContainer) -> DataContainer: self.logger.debug("Obtaining predictions for XGBoost model.") - model_input = data.features if data.features is not None else data.clean + model_input = data.flow if self.drop_columns: self.logger.info(f"Dropping columns: {self.drop_columns}")