Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/week2 #2

Merged
merged 22 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ea58f16
refactor(project_configs.yml-config.py): update project config
Garett601 Oct 24, 2024
a44a2eb
refactor(utils.py,-rf_model.py-data_preprocessor.py-processed_data.py…
Garett601 Oct 24, 2024
0b2660d
test(test_rf_model.py-test_data_preprocessor.py): update tests for re…
Garett601 Oct 24, 2024
4573671
refactor(week1/week_1.ipynb): update repo structure
Garett601 Oct 24, 2024
06445d5
refactor(week_1.py): refactor week 1 notebook
Garett601 Oct 24, 2024
507c239
refactor(week_1.py): refactor week 1 notebook
Garett601 Oct 24, 2024
b35f12f
feat(data_preprocessor.py): add save_to_catalog method
Garett601 Oct 24, 2024
6a62cea
style(data_preprocessor.py): update docstring
Garett601 Oct 24, 2024
b2e8238
test(test_data_preprocessor.py): add test for save_to_catalog method
Garett601 Oct 24, 2024
30d57af
feat(01_prepare_data.py): first task for week2 homework
Garett601 Oct 24, 2024
1107246
feat(02_mlflow_experiment.py): week 2 notebook 2 code
Garett601 Oct 25, 2024
444e285
feat(03_log_and_register_model.py): week2 notebook 3 code
Garett601 Oct 25, 2024
65d3bf3
feat(04_log_and_register_custom_model.py): week2 notebook 4 code
Garett601 Oct 25, 2024
a2c9e31
chore: save latest code - why do I keep doing this?
Garett601 Oct 25, 2024
65b5fe8
refactor(01_prepare_dataset.py): maintain DateTime column when saving…
Garett601 Oct 25, 2024
e7f7120
feat(05_log_and_register_fe_model.py): week2 notebook 5 code
Garett601 Oct 26, 2024
c612a55
docs(README.md): Update readme with changes
Garett601 Oct 26, 2024
7aeeb1d
ci(ci.yml): add PySpark to CI run for tests
Garett601 Oct 26, 2024
beccea3
ci(ci.yml): add databricks-sdk to testing step
Garett601 Oct 26, 2024
6a8146a
ci(ci.yml): add pyspark AND databricks-sdk to tests step
Garett601 Oct 26, 2024
f25a887
test(test_data_preprocessor.py): mock pysoark.sql.functions
Garett601 Oct 26, 2024
ea29214
test(test_data_preprocessor.py): mock all spark behaviour
Garett601 Oct 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,9 @@ dmypy.json

.databricks
.ruff_cache/

# JSON files
*.json
Garett601 marked this conversation as resolved.
Show resolved Hide resolved

# MLFlow
mlruns/
52 changes: 52 additions & 0 deletions README.md
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ uv lock
# Updates, Issues, Workarounds, Notes

---
<h1 align="center">
week 1
</h1>

## 15/10/2024 - Workaround for environment setup
> **Note**
- loosened python-version in pyproject.toml to `requires-python = ">=3.11, <3.12"`
Expand Down Expand Up @@ -99,3 +103,51 @@ uv lock
- The dataset is not included in the repository to avoid large file size. It should first attempt to get the data from the UCI ML Repository.
- If that fails, the dataset is expected to be in `data/Tetuan City power consumption.csv`. You can download it from [here](https://www.kaggle.com/datasets/gmkeshav/tetuan-city-power-consumption).
---
---
<h1 align="center">
week 2
</h1>

## 26/10/2024 - Feature Engineering
> **Note**
- Dataset is now available in UC as table
- updated DataPreprocessor and separated data loading and preprocessing

> **Issue**
- Feature Engineering was not working when running from within the IDE
```shell
Exception: {'error_code': 'PERMISSION_DENIED', 'message': "Request failed access control checks. Permission check failed for 'heiaepgah71pwedmld01001.power_consumption.power_consumption_features'."}
```
- In example code, the features generated at runtime were not used in the fe model
> **Workaround**
- Ran the feature engineering notebook from Databricks workspace, this resolved permissions issues
- Ran the feature engineering feature function on the training and testing set and included the new features in the fe model
- ```python
testing_set = fe.create_training_set(
df=test_set,
label=target,
feature_lookups=[
FeatureFunction(
udf_name=function_name,
output_name="weather_interaction",
input_bindings={
"temperature": "Temperature",
"humidity": "Humidity",
"wind_speed": "Wind_Speed"
},
),
],
exclude_columns=["update_timestamp_utc"]
)
```
```python
training_df = training_set.load_df().toPandas()
testing_df = testing_set.load_df().toPandas()

X_train = training_df[num_features + cat_features + ["weather_interaction"]]
y_train = training_df[target]

X_test= testing_df[num_features + cat_features + ["weather_interaction"]]
y_test = testing_df[target]
```
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
---
26 changes: 22 additions & 4 deletions configs/project_configs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ hyperparameters:
n_estimators: 1000
max_depth: 6

features:
processed_features:
num_features:
- Temperature
- Humidity
Expand All @@ -18,8 +18,13 @@ features:
- diffuse_flows

cat_features:
- DayOfWeek
- IsWeekend
- DayOfWeek_1
- DayOfWeek_2
- DayOfWeek_3
- DayOfWeek_4
- DayOfWeek_5
- DayOfWeek_6
- IsWeekend_1
Garett601 marked this conversation as resolved.
Show resolved Hide resolved

target:
target:
Expand All @@ -28,4 +33,17 @@ target:
- Zone_3_Power_Consumption

dataset:
id: 849
raw_data_table: tetuan_city_power_consumption
num_features:
- Temperature
- Humidity
- Wind_Speed
- Hour
- Day
- Month
- general_diffuse_flows
- diffuse_flows

cat_features:
- DayOfWeek
- IsWeekend
Binary file not shown.
68 changes: 68 additions & 0 deletions notebooks/week1/week_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Databricks notebook source
from power_consumption.preprocessing.data_preprocessor import DataProcessor
from power_consumption.model.rf_model import ConsumptionModel
from power_consumption.utils import visualise_results, plot_actual_vs_predicted, plot_feature_importance
from power_consumption.config import Config
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Garett601 marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------

config = Config.from_yaml("../../configs/project_configs.yml")

# COMMAND ----------
catalog_name = config.catalog_name
schema_name = config.schema_name
raw_data_table = config.dataset.raw_data_table
# COMMAND ----------
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
data_spark = spark.table(f"{catalog_name}.{schema_name}.{raw_data_table}")
# COMMAND ----------
data_pandas = data_spark.toPandas()
# COMMAND ----------
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
data_processor = DataProcessor(config, data_pandas)
# COMMAND ----------
data_processor.preprocess_data()
# COMMAND ----------
train_set, test_set = data_processor.split_data()

Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
target_columns = config.target.target
feature_columns = config.processed_features.num_features + config.processed_features.cat_features

X_train = train_set[feature_columns]
y_train = train_set[target_columns]
X_test = test_set[feature_columns]
y_test = test_set[target_columns]

Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
model = ConsumptionModel(config)
model.train(X_train, y_train)

# COMMAND ----------

# Make predictions and evaluate the model
y_pred = model.predict(X_test)
mse, r2 = model.evaluate(X_test, y_test)

Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------

# Visualize results as time series
visualise_results(y_test, y_pred, target_columns)

# COMMAND ----------

# Get feature importance
feature_importance, feature_names = model.get_feature_importance()
# COMMAND ----------

# Plot actual vs predicted values
plot_actual_vs_predicted(y_test.values, y_pred, target_columns)

# COMMAND ----------

# Plot feature importance
plot_feature_importance(feature_importance, feature_names, top_n=15)

Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
32 changes: 32 additions & 0 deletions notebooks/week2/01_prepare_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Databricks notebook source
from power_consumption.preprocessing.data_preprocessor import DataProcessor
from power_consumption.config import Config
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


# COMMAND ----------

config = Config.from_yaml("../../configs/project_configs.yml")
Garett601 marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------
catalog_name = config.catalog_name
schema_name = config.schema_name
raw_data_table = config.dataset.raw_data_table
# COMMAND ----------
data_spark = spark.table(f"{catalog_name}.{schema_name}.{raw_data_table}")
# COMMAND ----------
data_pandas = data_spark.toPandas()
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
data_processor = DataProcessor(config, data_pandas)
# COMMAND ----------
data_processor.preprocess_data()
# COMMAND ----------
train_set, test_set = data_processor.split_data()
# COMMAND ----------
train_set.reset_index(inplace=True)
test_set.reset_index(inplace=True)
# COMMAND ----------
data_processor.save_to_catalog(train_set=train_set, test_set=test_set, spark=spark)
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
53 changes: 53 additions & 0 deletions notebooks/week2/02_mlfow_experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Databricks notebook source
import json

import mlflow

mlflow.set_tracking_uri("databricks")

mlflow.set_experiment(experiment_name="/Shared/power-consumption")
mlflow.set_experiment_tags({"repository_name": "power-consumption"})
Garett601 marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------
experiments = mlflow.search_experiments(
filter_string="tags.repository_name='power-consumption'"
)

print(experiments)

Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
with open("mlflow_experiment.json", "w") as json_file:
json.dump(experiments[0].__dict__, json_file, indent=4)
Garett601 marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------
with mlflow.start_run(
run_name="test-run",
tags={
"git_sha": "30d57afb2efca70cede3061d00f2a553c2b4779b"
}
) as run:
mlflow.log_params({"type": "demo"})
mlflow.log_metrics(
{
"metric_1": 1.0,
"metric_2": 2.0
}
)
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
run_id = mlflow.search_runs(
experiment_names=["/Shared/power-consumption"],
filter_string="tags.git_sha='30d57afb2efca70cede3061d00f2a553c2b4779b'",
).run_id[0]
run_info = mlflow.get_run(run_id=f"{run_id}").to_dictionary()
print(run_info)

# COMMAND ----------
with open("run_info.json", "w") as json_file:
json.dump(run_info, json_file, indent=4)
Garett601 marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------
print(run_info["data"]["metrics"])

# COMMAND ----------
print(run_info["data"]["params"])
# COMMAND ----------
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
104 changes: 104 additions & 0 deletions notebooks/week2/03_log_and_register_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Databricks notebook source
import mlflow
from mlflow.models import infer_signature

from pyspark.sql import SparkSession
from power_consumption.config import Config

from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from lightgbm import LGBMRegressor
from sklearn.multioutput import MultiOutputRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri("databricks-uc")
# COMMAND ----------
config = Config.from_yaml("../../configs/project_configs.yml")
# COMMAND ----------
num_features = config.processed_features.num_features
cat_features = config.processed_features.cat_features
target = config.target.target
parameters = config.hyperparameters.__dict__

catalog_name = config.catalog_name
schema_name = config.schema_name
# COMMAND ----------
spark = SparkSession.builder.getOrCreate()

train_set_spark = spark.table(f"{catalog_name}.{schema_name}.train_set")
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
X_train = train_set[num_features + cat_features]
y_train = train_set[target]

X_test = test_set[num_features + cat_features]
y_test = test_set[target]
# COMMAND ----------
# Define the preprocessor for categorical features
preprocessor = ColumnTransformer(
transformers=[('cat', OneHotEncoder(handle_unknown='ignore'), cat_features)],
remainder='passthrough'
)

# Create the pipeline with preprocessing and the multi-output LightGBM regressor
pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('regressor', MultiOutputRegressor(LGBMRegressor(**parameters)))
])
# COMMAND ----------
mlflow.set_experiment(experiment_name='/Shared/power-consumption')
git_sha = "30d57afb2efca70cede3061d00f2a553c2b4779b"

Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# Start an MLflow run to track the training process
with mlflow.start_run(
tags={"git_sha": f"{git_sha}",
"branch": "feature/week2"},
Garett601 marked this conversation as resolved.
Show resolved Hide resolved
) as run:
run_id = run.info.run_id

pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
Garett601 marked this conversation as resolved.
Show resolved Hide resolved

# Evaluate the model performance
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"Mean Squared Error: {mse}")
print(f"Mean Absolute Error: {mae}")
print(f"R2 Score: {r2}")

# Log parameters, metrics, and the model to MLflow
mlflow.log_param("model_type", "LightGBM with preprocessing")
mlflow.log_params(parameters)
mlflow.log_metric("mse", mse)
mlflow.log_metric("mae", mae)
mlflow.log_metric("r2_score", r2)
signature = infer_signature(model_input=X_train, model_output=y_pred)

dataset = mlflow.data.from_spark(
train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set",
version="0")
mlflow.log_input(dataset, context="training")

mlflow.sklearn.log_model(
sk_model=pipeline,
artifact_path="lightgbm-pipeline-model",
signature=signature
)
# COMMAND ----------
model_version = mlflow.register_model(
model_uri=f'runs:/{run_id}/lightgbm-pipeline-model',
name=f"{catalog_name}.{schema_name}.power_consumption_model",
tags={"git_sha": f"{git_sha}"})

# COMMAND ----------
run = mlflow.get_run(run_id)
dataset_info = run.inputs.dataset_inputs[0].dataset
dataset_source = mlflow.data.get_source(dataset_info)
dataset_source.load()

Garett601 marked this conversation as resolved.
Show resolved Hide resolved
# COMMAND ----------
Loading
Loading