diff --git a/notebooks/mlops_with_databricks-0.0.1-py3-none-any.whl b/notebooks/mlops_with_databricks-0.0.1-py3-none-any.whl index b996980..142ab4d 100644 Binary files a/notebooks/mlops_with_databricks-0.0.1-py3-none-any.whl and b/notebooks/mlops_with_databricks-0.0.1-py3-none-any.whl differ diff --git a/notebooks/week1&2/05.log_and_register_fe_model.py b/notebooks/week1&2/05.log_and_register_fe_model.py index 29d1dcf..209a07d 100644 --- a/notebooks/week1&2/05.log_and_register_fe_model.py +++ b/notebooks/week1&2/05.log_and_register_fe_model.py @@ -1,10 +1,10 @@ # Databricks notebook source # The 2 cells below is only when you are running from databricks UI, because of 'possible' not working locally in VS -# %pip install mlops_with_databricks-0.0.1-py3-none-any.whl +# MAGIC %pip install ../mlops_with_databricks-0.0.1-py3-none-any.whl # COMMAND ---------- -# dbutils.library.restartPython() +# MAGIC dbutils.library.restartPython() # COMMAND ---------- @@ -176,12 +176,12 @@ testing_df = testing_set.load_df().toPandas() # Split features and target -X_train = training_df[num_features + cat_features] +X_train = training_df[num_features + cat_features + ["AverageTemperature"]] # Don't use sleep_hours_duration, because it's covered in sleep_duration, but was a example to use feature function option # X_train = training_df[num_features + cat_features + ["sleep_hours_duration"]] y_train = training_df[target] -X_test = testing_df[num_features + cat_features] +X_test = testing_df[num_features + cat_features + ["AverageTemperature"]] # Don't use sleep_hours_duration, because it's covered in sleep_duration, but was a example to use feature function option # X_test= testing_df[num_features + cat_features + ["sleep_hours_duration"]] y_test = testing_df[target] @@ -226,5 +226,6 @@ signature=signature, ) mlflow.register_model( - model_uri=f"runs:/{run_id}/lightgbm-pipeline-model-fe", name=f"{catalog_name}.{schema_name}.house_prices_model_fe" + model_uri=f"runs:/{run_id}/lightgbm-pipeline-model-fe", + name=f"{catalog_name}.{schema_name}.sleep_efficiencies_model_fe", ) diff --git a/notebooks/week3/01.feature_serving.py b/notebooks/week3/01.feature_serving.py new file mode 100644 index 0000000..9cfbf7a --- /dev/null +++ b/notebooks/week3/01.feature_serving.py @@ -0,0 +1,260 @@ +# Databricks notebook source +# MAGIC %pip install ../mlops_with_databricks-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +# MAGIC %restart_python + +# COMMAND ---------- + +""" +Create feature table in unity catalog, it will be a delta table +Create online table which uses the feature delta table created in the previous step +Create a feature spec. When you create a feature spec, +you specify the source Delta table. +This allows the feature spec to be used in both offline and online scenarios. +For online lookups, the serving endpoint automatically uses the online table to perform low-latency feature lookups. +The source Delta table and the online table must use the same primary key. + +""" + +import random +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import mlflow +import pandas as pd +import requests +from databricks import feature_engineering +from databricks.feature_engineering import FeatureLookup +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.catalog import ( + OnlineTableSpec, + OnlineTableSpecTriggeredSchedulingPolicy, +) +from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput +from pyspark.sql import SparkSession + +from sleep_efficiency.config import ProjectConfig + +spark = SparkSession.builder.getOrCreate() + +# Initialize Databricks clients +workspace = WorkspaceClient() +fe = feature_engineering.FeatureEngineeringClient() + +# Set the MLflow registry URI +mlflow.set_registry_uri("databricks-uc") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load config, train and test tables + +# COMMAND ---------- + +# Load config +config = ProjectConfig.from_yaml(config_path="../../project_config.yml") + +# Get feature columns details +num_features = config.num_features +cat_features = config.cat_features +target = config.target +catalog_name = config.catalog_name +schema_name = config.schema_name + +# Define table names +feature_table_name = f"{catalog_name}.{schema_name}.sleep_efficiencies_preds" +online_table_name = f"{catalog_name}.{schema_name}.sleep_efficiencies_preds_online" + +# Load training and test sets from Catalog +train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() +test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas() + +df = pd.concat([train_set, test_set]) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load a registered model + +# COMMAND ---------- + +# Load the MLflow model for predictions +pipeline = mlflow.sklearn.load_model(f"models:/{catalog_name}.{schema_name}.sleep_efficiency_model_basic/3") + +# COMMAND ---------- + +# Prepare the DataFrame for predictions and feature table creation - these features are the ones we want to serve. +preds_df = df[["id", "sleep_duration", "awakenings", "sleep_month"]] +preds_df["Predicted_SleepEfficiency"] = pipeline.predict(df[cat_features + num_features]) + +preds_df = spark.createDataFrame(preds_df) + +# 1. Create the feature table in Databricks + +fe.create_table( + name=feature_table_name, + primary_keys=["id"], + df=preds_df, + description="Sleep efficiencies predictions feature table", +) + +# Enable Change Data Feed +spark.sql(f""" + ALTER TABLE {feature_table_name} + SET TBLPROPERTIES (delta.enableChangeDataFeed = true) +""") + +# COMMAND ---------- + +# 2. Create the online table using feature table + +spec = OnlineTableSpec( + primary_key_columns=["id"], + source_table_full_name=feature_table_name, + run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered": "true"}), + perform_full_copy=False, +) + +# Create the online table in Databricks +online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec) + +# COMMAND ---------- + +# 3. Create feture look up and feature spec table feature table + +# Define features to look up from the feature table +features = [ + FeatureLookup( + table_name=feature_table_name, + lookup_key="Id", + feature_names=["id", "sleep_duration", "awakenings", "sleep_month", "Predicted_SleepEfficiency"], + ) +] + +# Create the feature spec for serving +feature_spec_name = f"{catalog_name}.{schema_name}.return_predictions" + +fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Deploy Feature Serving Endpoint + +# COMMAND ---------- + +# 4. Create endpoing using feature spec + +# Create a serving endpoint for the sleep efficiencies predictions +workspace.serving_endpoints.create( + name="sleep-efficiencies-feature-serving", + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=feature_spec_name, # feature spec name defined in the previous step + scale_to_zero_enabled=True, + workload_size="Small", # Define the workload size (Small, Medium, Large) + ) + ] + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Call The Endpoint + +# COMMAND ---------- + + +# COMMAND ---------- + +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() # type: ignore # noqa: F821 +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +id_list = preds_df["Id"] + + +# COMMAND ---------- + +# MAGIC %md +# MAGIC + +# COMMAND ---------- + +start_time = time.time() +serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-feature-serving/invocations" +response = requests.post( + f"{serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_records": [{"Id": "2"}]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Reponse text:", response.text) +print("Execution time:", execution_time, "seconds") + + +# COMMAND ---------- + +# another way to call the endpoint + +response = requests.post( + f"{serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_split": {"columns": ["Id"], "data": [["2"]]}}, +) + + +## Load Test + +# COMMAND ---------- + +# Initialize variables +serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-feature-serving/invocations" +id_list = preds_df.select("Id").rdd.flatMap(lambda x: x).collect() +headers = {"Authorization": f"Bearer {token}"} +num_requests = 10 + + +# Function to make a request and record latency +def send_request(): + random_id = random.choice(id_list) + start_time = time.time() + response = requests.post( + serving_endpoint, + headers=headers, + json={"dataframe_records": [{"Id": random_id}]}, + ) + end_time = time.time() + latency = end_time - start_time # Calculate latency for this request + return response.status_code, latency + + +# Measure total execution time +total_start_time = time.time() +latencies = [] + +# Send requests concurrently +with ThreadPoolExecutor(max_workers=100) as executor: + futures = [executor.submit(send_request) for _ in range(num_requests)] + + for future in as_completed(futures): + status_code, latency = future.result() + latencies.append(latency) + +total_end_time = time.time() +total_execution_time = total_end_time - total_start_time + +# Calculate the average latency +average_latency = sum(latencies) / len(latencies) + +print("\nTotal execution time:", total_execution_time, "seconds") +print("Average latency per request:", average_latency, "seconds") diff --git a/notebooks/week3/02.model_serving.py b/notebooks/week3/02.model_serving.py new file mode 100644 index 0000000..f44fc16 --- /dev/null +++ b/notebooks/week3/02.model_serving.py @@ -0,0 +1,185 @@ +# Databricks notebook source +# MAGIC %pip install ../mlops_with_databricks-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +# MAGIC %restart_python + +# COMMAND ---------- + +import random +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import pandas as pd +import requests +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import ( + EndpointCoreConfigInput, + Route, + ServedEntityInput, + TrafficConfig, +) +from pyspark.sql import SparkSession + +from sleep_efficiency.config import ProjectConfig + +workspace = WorkspaceClient() +spark = SparkSession.builder.getOrCreate() + +config = ProjectConfig.from_yaml(config_path="../../project_config.yml") + +catalog_name = config.catalog_name +schema_name = config.schema_name + +train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() + +workspace.serving_endpoints.create( + name="sleep-efficiencies-model-serving", + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=f"{catalog_name}.{schema_name}.sleep_efficiency_model_basic", + scale_to_zero_enabled=True, + workload_size="Small", + entity_version=3, + ) + ], + # Optional if only 1 entity is served + traffic_config=TrafficConfig( + routes=[Route(served_model_name="sleep_efficiency_model_basic-3", traffic_percentage=100)] + ), + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Call the endpoint + +# COMMAND ---------- + +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() # type: ignore # noqa: F821 +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Create sample request body + +# COMMAND ---------- + +required_columns = [ + "age", + "sleep_duration", + "rem_sleep_percentage", + "deep_sleep_percentage", + "light_sleep_percentage", + "awakenings", + "caffeine_consumption", + "alcohol_consumption", + "exercise_frequency", + "gender", + "smoking_status", + "bedtime", + "wakeup_time", +] + +sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records") +dataframe_records = [[record] for record in sampled_records] + +# COMMAND ---------- + +""" +Each body should be list of json with columns + +[{'LotFrontage': 78.0, + 'LotArea': 9317, + 'OverallQual': 6, + 'OverallCond': 5, + 'YearBuilt': 2006, + 'Exterior1st': 'VinylSd', + 'Exterior2nd': 'VinylSd', + 'MasVnrType': 'None', + 'Foundation': 'PConc', + 'Heating': 'GasA', + 'CentralAir': 'Y', + 'SaleType': 'WD', + 'SaleCondition': 'Normal'}] +""" + +# COMMAND ---------- + +start_time = time.time() + +model_serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-model-serving/invocations" + +# Convert Timestamp to string +dataframe_records[0] = [ + {k: (v.isoformat() if isinstance(v, pd.Timestamp) else v) for k, v in record.items()} + for record in dataframe_records[0] +] + +response = requests.post( + f"{model_serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_records": dataframe_records[0]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Reponse text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load Test + +# COMMAND ---------- + +# Initialize variables +model_serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-model-serving/invocations" + +headers = {"Authorization": f"Bearer {token}"} +num_requests = 1000 + + +# Function to make a request and record latency +def send_request(): + random_record = [ + {k: (v.isoformat() if isinstance(v, pd.Timestamp) else v) for k, v in record.items()} + for record in random.choice(dataframe_records) + ] + start_time = time.time() + response = requests.post( + model_serving_endpoint, + headers=headers, + json={"dataframe_records": random_record}, + ) + end_time = time.time() + latency = end_time - start_time + return response.status_code, latency + + +total_start_time = time.time() +latencies = [] + +# Send requests concurrently +with ThreadPoolExecutor(max_workers=100) as executor: + futures = [executor.submit(send_request) for _ in range(num_requests)] + + for future in as_completed(futures): + status_code, latency = future.result() + latencies.append(latency) + +total_end_time = time.time() +total_execution_time = total_end_time - total_start_time + +# Calculate the average latency +average_latency = sum(latencies) / len(latencies) + +print("\nTotal execution time:", total_execution_time, "seconds") +print("Average latency per request:", average_latency, "seconds") diff --git a/notebooks/week3/03.model_serving_feature_lookup.py b/notebooks/week3/03.model_serving_feature_lookup.py new file mode 100644 index 0000000..a5153bd --- /dev/null +++ b/notebooks/week3/03.model_serving_feature_lookup.py @@ -0,0 +1,152 @@ +# Databricks notebook source +# MAGIC %pip install ../mlops_with_databricks-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +# MAGIC %restart_python + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Create Online Table for sleep efficiency features +# MAGIC We already created sleep_features table as feature look up table. + +# COMMAND ---------- + +import time + +import pandas as pd +import requests +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.catalog import ( + OnlineTableSpec, + OnlineTableSpecTriggeredSchedulingPolicy, +) +from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput +from pyspark.sql import SparkSession + +from sleep_efficiency.config import ProjectConfig + +spark = SparkSession.builder.getOrCreate() + +# Initialize Databricks clients +workspace = WorkspaceClient() + +# COMMAND ---------- + +# Load config +config = ProjectConfig.from_yaml(config_path="../../project_config.yml") +catalog_name = config.catalog_name +schema_name = config.schema_name + +# COMMAND ---------- + +online_table_name = f"{catalog_name}.{schema_name}.temperature_features_online" +spec = OnlineTableSpec( + primary_key_columns=["Month"], + source_table_full_name=f"{catalog_name}.{schema_name}.temperature_features", + run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered": "true"}), + perform_full_copy=False, +) + +online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec) + +# COMMAND ---------- + + +config = ProjectConfig.from_yaml(config_path="../../project_config.yml") + +catalog_name = config.catalog_name +schema_name = config.schema_name + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Create endpoint + +# COMMAND ---------- + +workspace.serving_endpoints.create( + name="sleep-efficiencies-model-serving-fe", + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=f"{catalog_name}.{schema_name}.sleep_efficiencies_model_fe", + scale_to_zero_enabled=True, + workload_size="Small", + entity_version=1, + ) + ] + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Call the endpoint + +# COMMAND ---------- + +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() # type: ignore # noqa: F821 +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +# Excluding "OverallQual", "GrLivArea", "GarageCars" because they will be taken from feature look up +required_columns = [ + "age", + "sleep_duration", + "rem_sleep_percentage", + "deep_sleep_percentage", + "light_sleep_percentage", + "awakenings", + "caffeine_consumption", + "alcohol_consumption", + "exercise_frequency", + "gender", + "smoking_status", + "bedtime", + "wakeup_time", + "id", + "sleep_month", +] + +train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() + +sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records") +dataframe_records = [[record] for record in sampled_records] + +# COMMAND ---------- + +dataframe_records[0] + +# COMMAND ---------- + +start_time = time.time() + +model_serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-model-serving-fe/invocations" + +dataframe_records[0] = [ + {k: (v.isoformat() if isinstance(v, pd.Timestamp) else v) for k, v in record.items()} + for record in dataframe_records[0] +] + +response = requests.post( + f"{model_serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_records": dataframe_records[0]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Reponse text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- + +sleep_features = spark.table(f"{catalog_name}.{schema_name}.temperature_features").toPandas() + + +# COMMAND ---------- diff --git a/notebooks/week3/04.AB_test_model_serving.py b/notebooks/week3/04.AB_test_model_serving.py new file mode 100644 index 0000000..d26ad2c --- /dev/null +++ b/notebooks/week3/04.AB_test_model_serving.py @@ -0,0 +1,350 @@ +# Databricks notebook source +# MAGIC %pip install ../mlops_with_databricks-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +# MAGIC %restart_python + +# COMMAND ---------- + +import hashlib +import time + +import mlflow +import pandas as pd +import requests +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput +from lightgbm import LGBMRegressor +from mlflow import MlflowClient +from mlflow.models import infer_signature +from pyspark.sql import SparkSession +from sklearn.compose import ColumnTransformer +from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder + +from sleep_efficiency.config import ProjectConfig + +# Set up MLflow for tracking and model registry +mlflow.set_tracking_uri("databricks") +mlflow.set_registry_uri("databricks-uc") + +# Initialize the MLflow client for model management +client = MlflowClient() + +# Load configuration +config = ProjectConfig.from_yaml(config_path="../../project_config.yml") + +# Extract key configuration details +num_features = config.num_features +cat_features = config.cat_features +target = config.target +catalog_name = config.catalog_name +schema_name = config.schema_name +ab_test_params = config.ab_test + +# COMMAND ---------- + +# Set up specific parameters for model A and model B as part of the A/B test +parameters_a = { + "learning_rate": ab_test_params["learning_rate_a"], + "n_estimators": ab_test_params["n_estimators"], + "max_depth": ab_test_params["max_depth_a"], +} + +parameters_b = { + "learning_rate": ab_test_params["learning_rate_b"], + "n_estimators": ab_test_params["n_estimators"], + "max_depth": ab_test_params["max_depth_b"], +} + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load and Prepare Training and Testing Datasets + +# COMMAND ---------- + +# Initialize a Databricks session for Spark operations +spark = SparkSession.builder.getOrCreate() + +# Load the training and testing sets from Databricks tables +train_set_spark = spark.table(f"{catalog_name}.{schema_name}.train_set") +train_set = train_set_spark.toPandas() +test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas() + +# Define features and target variables +X_train = train_set[num_features + cat_features] +y_train = train_set[target] +X_test = test_set[num_features + cat_features] +y_test = test_set[target] + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Train Model A and Log with MLflow + +# COMMAND ---------- + +# Define a preprocessor for categorical features, which will one-hot encode categorical variables +preprocessor = ColumnTransformer( + transformers=[("cat", OneHotEncoder(handle_unknown="ignore"), cat_features)], remainder="passthrough" +) + +# Build a pipeline combining preprocessing and model training steps +pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("regressor", LGBMRegressor(**parameters_a))]) + +# Set the MLflow experiment to track this A/B testing project +mlflow.set_experiment(experiment_name="/Shared/sleep-efficiencies-ab") +model_name = f"{catalog_name}.{schema_name}.sleep_efficiencies_model_ab" + +# Git commit hash for tracking model version +git_sha = "week3" + +# Start MLflow run to track training of Model A +with mlflow.start_run(tags={"model_class": "A", "git_sha": git_sha}) as run: + run_id = run.info.run_id + + # Train the model + pipeline.fit(X_train, y_train) + y_pred = pipeline.predict(X_test) + + # Calculate performance metrics + mse = mean_squared_error(y_test, y_pred) + mae = mean_absolute_error(y_test, y_pred) + r2 = r2_score(y_test, y_pred) + + # Log model parameters, metrics, and other artifacts in MLflow + mlflow.log_param("model_type", "LightGBM with preprocessing") + mlflow.log_params(parameters_a) + mlflow.log_metric("mse", mse) + mlflow.log_metric("mae", mae) + mlflow.log_metric("r2_score", r2) + signature = infer_signature(model_input=X_train, model_output=y_pred) + + # Log the input dataset for tracking reproducibility + dataset = mlflow.data.from_spark(train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set", version="0") + mlflow.log_input(dataset, context="training") + + # Log the pipeline model in MLflow with a unique artifact path + mlflow.sklearn.log_model(sk_model=pipeline, artifact_path="lightgbm-pipeline-model", signature=signature) + +model_version = mlflow.register_model( + model_uri=f"runs:/{run_id}/lightgbm-pipeline-model", name=model_name, tags={"git_sha": f"{git_sha}"} +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Register Model A and Assign Alias + +# COMMAND ---------- + +# Assign alias for easy reference in future A/B tests +model_version_alias = "model_A" + +client.set_registered_model_alias(model_name, model_version_alias, f"{model_version.version}") +model_uri = f"models:/{model_name}@{model_version_alias}" +model_A = mlflow.sklearn.load_model(model_uri) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Train Model B and Log with MLflow + +# COMMAND ---------- + +# Repeat the training and logging steps for Model B using parameters for B +pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("regressor", LGBMRegressor(**parameters_b))]) + +# Start MLflow run for Model B +with mlflow.start_run(tags={"model_class": "B", "git_sha": git_sha}) as run: + run_id = run.info.run_id + + pipeline.fit(X_train, y_train) + y_pred = pipeline.predict(X_test) + + mse = mean_squared_error(y_test, y_pred) + mae = mean_absolute_error(y_test, y_pred) + r2 = r2_score(y_test, y_pred) + + mlflow.log_param("model_type", "LightGBM with preprocessing") + mlflow.log_params(parameters_b) + mlflow.log_metric("mse", mse) + mlflow.log_metric("mae", mae) + mlflow.log_metric("r2_score", r2) + signature = infer_signature(model_input=X_train, model_output=y_pred) + + dataset = mlflow.data.from_spark(train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set", version="0") + mlflow.log_input(dataset, context="training") + mlflow.sklearn.log_model(sk_model=pipeline, artifact_path="lightgbm-pipeline-model", signature=signature) + +model_version = mlflow.register_model( + model_uri=f"runs:/{run_id}/lightgbm-pipeline-model", name=model_name, tags={"git_sha": f"{git_sha}"} +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Register Model B and Assign Alias + +# COMMAND ---------- + +# Assign alias for Model B +model_version_alias = "model_B" + +client.set_registered_model_alias(model_name, model_version_alias, f"{model_version.version}") +model_uri = f"models:/{model_name}@{model_version_alias}" +model_B = mlflow.sklearn.load_model(model_uri) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Define Custom A/B Test Model + +# COMMAND ---------- + + +class SleepEfficiencyModelWrapper(mlflow.pyfunc.PythonModel): + def __init__(self, models): + self.models = models + self.model_a = models[0] + self.model_b = models[1] + + def predict(self, context, model_input): + if isinstance(model_input, pd.DataFrame): + sleep_person_id = str(model_input["id"].values[0]) + hashed_id = hashlib.md5(sleep_person_id.encode(encoding="UTF-8")).hexdigest() + # convert a hexadecimal (base-16) string into an integer + if int(hashed_id, 16) % 2: + predictions = self.model_a.predict(model_input.drop(["id"], axis=1)) + return {"Prediction": predictions[0], "model": "Model A"} + else: + predictions = self.model_b.predict(model_input.drop(["id"], axis=1)) + return {"Prediction": predictions[0], "model": "Model B"} + else: + raise ValueError("Input must be a pandas DataFrame.") + + +# COMMAND ---------- + +X_train = train_set[num_features + cat_features + ["id"]] +X_test = test_set[num_features + cat_features + ["id"]] + + +# COMMAND ---------- + +models = [model_A, model_B] +wrapped_model = SleepEfficiencyModelWrapper(models) # we pass the loaded models to the wrapper +example_input = X_test.iloc[0:1] # Select the first row for prediction as example +example_prediction = wrapped_model.predict(context=None, model_input=example_input) +print("Example Prediction:", example_prediction) + +# COMMAND ---------- + +mlflow.set_experiment(experiment_name="/Shared/sleep-efficiencies-ab-testing") +model_name = f"{catalog_name}.{schema_name}.sleep_efficiencies_model_pyfunc_ab_test" + +with mlflow.start_run() as run: + run_id = run.info.run_id + signature = infer_signature(model_input=X_train, model_output={"Prediction": 1234.5, "model": "Model B"}) + dataset = mlflow.data.from_spark(train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set", version="0") + mlflow.log_input(dataset, context="training") + mlflow.pyfunc.log_model( + python_model=wrapped_model, artifact_path="pyfunc-sleep-efficiencies-model-ab", signature=signature + ) +model_version = mlflow.register_model( + model_uri=f"runs:/{run_id}/pyfunc-sleep-efficiencies-model-ab", name=model_name, tags={"git_sha": f"{git_sha}"} +) + +# COMMAND ---------- + +model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version.version}") + +# Run prediction +predictions = model.predict(X_test.iloc[0:1]) + + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Create serving endpoint + +# COMMAND ---------- + +workspace = WorkspaceClient() + +workspace.serving_endpoints.create( + name="sleep-efficiencies-model-serving-ab-test", + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=f"{catalog_name}.{schema_name}.sleep_efficiencies_model_pyfunc_ab_test", + scale_to_zero_enabled=True, + workload_size="Small", + entity_version=model_version.version, + ) + ] + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Call the endpoint + +# COMMAND ---------- + +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() # type: ignore # noqa: F821 +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +required_columns = [ + "age", + "sleep_duration", + "rem_sleep_percentage", + "deep_sleep_percentage", + "light_sleep_percentage", + "awakenings", + "caffeine_consumption", + "alcohol_consumption", + "exercise_frequency", + "gender", + "smoking_status", + "bedtime", + "wakeup_time", + "id", +] + +train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() +sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records") +dataframe_records = [[record] for record in sampled_records] + +# COMMAND ---------- + +start_time = time.time() + +model_serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-model-serving-ab-test/invocations" + +# Convert Timestamp to string +dataframe_records[0] = [ + {k: (v.isoformat() if isinstance(v, pd.Timestamp) else v) for k, v in record.items()} + for record in dataframe_records[0] +] + +response = requests.post( + f"{model_serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_records": dataframe_records[0]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Reponse text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- diff --git a/notebooks/week3/README.md b/notebooks/week3/README.md new file mode 100644 index 0000000..c450d63 --- /dev/null +++ b/notebooks/week3/README.md @@ -0,0 +1,70 @@ +# Week 3 Materials and Code Overview + +Hello students, + +Before diving into the Week 3 materials and code, let's clarify the key concepts and implementations covered in this lecture. + +## Overview + +Last week we demonstrated model training and registering for different use cases. +This week, we show three different serving endpoint creation for different scenarios. Feature serving, model serving and model serving with feature look up. + +## Code Structure and Implementations + +### 1. Feature Serving +``` +01.feature_serving.py +``` +Steps: +* The process begins by loading both the training and testing datasets, which are then concatenated into a single DataFrame. Subsequently, we load a pre-registered Scikit-learn model for generating predictions and select the features to be used for serving. + +* Using the loaded model, we generate predictions for our dataset, resulting in a final DataFrame that includes 4 features, one of which is the predicted column. This DataFrame is then utilized to create a feature table in the form of a Delta table. + +* Next, we establish an online feature table by using the previously created offline feature table as the source, which is also a Delta table. This setup enables the creation of an online table that relies on the feature Delta table crafted in the preceding steps. + +* To create serving endpoints, it's essential to create a feature spec based on the feature table. This specification defines the source feature Delta table, allowing the feature spec to support both offline and online scenarios. For online lookups, the serving endpoint automatically utilizes the online table to execute low-latency feature retrievals. Both the source Delta table and the online table share the same primary key. + +* Finally, we create a serving endpoint by using the feature spec as serving entity. This endpoint can be used for online feature lookups. The subsequent code examples shows how to invoke this endpoint and get responses. + +### 2. Model Serving +``` +02.model_serving.py +``` +Model serving is a process of creating a model serving endpoint that can be used for inference. Endpoint creation process is similar to feature serving, with the exception that we don't need to create a feature table. Instead, we simply create a model serving endpoint that relies on the model we trained. + +Steps: +* We start with loading the trained and registered model. +* Then we create a model serving endpoint using the model. It's important to note that entity name we pass is a registered model name and the version is an existing model version. +* We also show an example of traffic split, which is a feature of model serving that allows us to split traffic between multiple model versions. +* Finally, we invoke the endpoint and get the predictions. The payload should be a JSON object that includes the same features used for training and values. We need to provide all the features required for prediction. +* We also added an example piece of code for simple load test to get average latency. + +### 3. Model Serving with Feature Look Up +``` +03.model_serving_feature_lookup.py +``` + +This is a combination of the previous two examples. We load a pre-trained model and create a feature table for look up. Then we create a model serving endpoint that uses the feature table. Last week, we trained a model with feature lookup and feature func. Now we will create a serving endpoint for that model. + +Steps: +- We start with creating an online table for existing offline feature table, temperature_features. This is the table we created last week on *week 2 - 05.log_and_register_fe_model.py* notebook. +- This online table is required for our model to look up features at serving endpoint. +- Next is the same as in the previous notebook, we create an endpoint using the model we registred in the same notebook *week 2 - 05.log_and_register_fe_model.p*. This is the model we registred using feature lookup and feature func. +- When we send request to the model endpoint, this time, we won't need to provide all the features. 3 features will be taken from the feature lookup table, also one feature "sleep_duration" will be calculated by the feature function. + + +### 4. A/B Testing +``` +04.AB_test_model_serving.py +``` +In this notebook, we show the setup of A/B testing for two different model versions using Pyfunc. We'll train, register, and implement a serving endpoint that uses a Pyfunc model as a wrapper for these versions. + +Steps: +- We start with loading the configurations, parameters for model A and model B, training and testing datasets. +- We use the same approach as we did in *week 2 - 03.log_and_register_model.py*. +- We train the model A and model B, and register them. +- After training, we create aliases for each model version, referred to as `model_A` and `model_B`. +- These registered model versions are then loaded and used within a wrapper class. +- The wrapper class is where we define the A/B testing logic, controlling which data points receive predictions from which model. For this, we use hash function. +- We run an MLflow experiment with this wrapper model and register. +- The next steps involve creating a serving endpoint, similar to our previous examples, and shows how to invoke it. diff --git a/project_config.yml b/project_config.yml index 4cdd34a..a4ab23c 100644 --- a/project_config.yml +++ b/project_config.yml @@ -6,6 +6,13 @@ parameters: n_estimators: 1000 max_depth: 6 +ab_test: + learning_rate_a: 0.02 + learning_rate_b: 0.02 + n_estimators: 1000 + max_depth_a: 6 + max_depth_b: 10 + num_features: - age - sleep_duration diff --git a/src/sleep_efficiency/config.py b/src/sleep_efficiency/config.py index f639733..a50e7f1 100644 --- a/src/sleep_efficiency/config.py +++ b/src/sleep_efficiency/config.py @@ -12,6 +12,7 @@ class ProjectConfig(BaseModel): catalog_name: str schema_name: str parameters: Dict[str, Any] # Dictionary to hold model-related parameters + ab_test: Dict[str, Any] # Dictionary to hold A/B test parameters @classmethod def from_yaml(cls, config_path: str):