Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/week3 #2

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Binary file modified notebooks/mlops_with_databricks-0.0.1-py3-none-any.whl
Binary file not shown.
10 changes: 5 additions & 5 deletions notebooks/week1&2/05.log_and_register_fe_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Databricks notebook source
# The 2 cells below is only when you are running from databricks UI, because of 'possible' not working locally in VS
# %pip install mlops_with_databricks-0.0.1-py3-none-any.whl
%pip install ../mlops_with_databricks-0.0.1-py3-none-any.whl
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

The relative path is incorrect and should be updated to ./mlops_with_databricks-0.0.1-py3-none-any.whl

The wheel file is actually located in the same directory as the notebook (notebooks/), not in the parent directory. The current relative path ../mlops_with_databricks-0.0.1-py3-none-any.whl is incorrect and would fail to find the package.

🔗 Analysis chain

Consider using an absolute path for package installation.

The relative path ../mlops_with_databricks-0.0.1-py3-none-any.whl might break if the notebook is run from a different working directory.

Let's verify the package location:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find the wheel file location
fd -e whl

Length of output: 66

🧰 Tools
🪛 Ruff

3-3: SyntaxError: Expected a statement


3-3: SyntaxError: Simple statements must be separated by newlines or semicolons


3-3: SyntaxError: Expected an identifier


3-3: SyntaxError: Expected an identifier


3-3: SyntaxError: Simple statements must be separated by newlines or semicolons


# COMMAND ----------

# dbutils.library.restartPython()
dbutils.library.restartPython()

# COMMAND ----------

Expand Down Expand Up @@ -176,12 +176,12 @@
testing_df = testing_set.load_df().toPandas()

# Split features and target
X_train = training_df[num_features + cat_features]
X_train = training_df[num_features + cat_features + ["AverageTemperature"]]
# Don't use sleep_hours_duration, because it's covered in sleep_duration, but was a example to use feature function option
# X_train = training_df[num_features + cat_features + ["sleep_hours_duration"]]

y_train = training_df[target]
X_test = testing_df[num_features + cat_features]
X_test = testing_df[num_features + cat_features + ["AverageTemperature"]]
# Don't use sleep_hours_duration, because it's covered in sleep_duration, but was a example to use feature function option
# X_test= testing_df[num_features + cat_features + ["sleep_hours_duration"]]
y_test = testing_df[target]
Expand Down Expand Up @@ -226,5 +226,5 @@
signature=signature,
)
mlflow.register_model(
model_uri=f"runs:/{run_id}/lightgbm-pipeline-model-fe", name=f"{catalog_name}.{schema_name}.house_prices_model_fe"
model_uri=f"runs:/{run_id}/lightgbm-pipeline-model-fe", name=f"{catalog_name}.{schema_name}.sleep_efficiencies_model_fe"
)
263 changes: 263 additions & 0 deletions notebooks/week3/01.feature_serving.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
# Databricks notebook source
# MAGIC %pip install ../mlops_with_databricks-0.0.1-py3-none-any.whl

# COMMAND ----------

# MAGIC %restart_python

# COMMAND ----------

"""
Create feature table in unity catalog, it will be a delta table
Create online table which uses the feature delta table created in the previous step
Create a feature spec. When you create a feature spec,
you specify the source Delta table.
This allows the feature spec to be used in both offline and online scenarios.
For online lookups, the serving endpoint automatically uses the online table to perform low-latency feature lookups.
The source Delta table and the online table must use the same primary key.

"""

import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import mlflow
import pandas as pd
import requests
from databricks import feature_engineering
from databricks.feature_engineering import FeatureLookup
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import (
OnlineTableSpec,
OnlineTableSpecTriggeredSchedulingPolicy,
)
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
from pyspark.sql import SparkSession

from sleep_efficiency.config import ProjectConfig

spark = SparkSession.builder.getOrCreate()

# Initialize Databricks clients
workspace = WorkspaceClient()
fe = feature_engineering.FeatureEngineeringClient()

# Set the MLflow registry URI
mlflow.set_registry_uri("databricks-uc")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Load config, train and test tables

# COMMAND ----------

# Load config
config = ProjectConfig.from_yaml(config_path="../../project_config.yml")

# Get feature columns details
num_features = config.num_features
cat_features = config.cat_features
target = config.target
catalog_name = config.catalog_name
schema_name = config.schema_name

# Define table names
feature_table_name = f"{catalog_name}.{schema_name}.sleep_efficiencies_preds"
online_table_name = f"{catalog_name}.{schema_name}.sleep_efficiencies_preds_online"

# Load training and test sets from Catalog
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()

df = pd.concat([train_set, test_set])

# COMMAND ----------

# MAGIC %md
# MAGIC ## Load a registered model

# COMMAND ----------

# Load the MLflow model for predictions
pipeline = mlflow.sklearn.load_model(f"models:/{catalog_name}.{schema_name}.sleep_efficiency_model_basic/3")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider parameterizing the model version

The model version is hardcoded to "3". Consider moving this to configuration to make it more maintainable and flexible.

-pipeline = mlflow.sklearn.load_model(f"models:/{catalog_name}.{schema_name}.sleep_efficiency_model_basic/3")
+model_version = config.model_version  # Add this to your config
+pipeline = mlflow.sklearn.load_model(f"models:/{catalog_name}.{schema_name}.sleep_efficiency_model_basic/{model_version}")

Committable suggestion skipped: line range outside the PR's diff.


# COMMAND ----------

# Prepare the DataFrame for predictions and feature table creation - these features are the ones we want to serve.
preds_df = df[["id", "sleep_duration", "awakenings", "sleep_month"]]
preds_df["Predicted_SleepEfficiency"] = pipeline.predict(df[cat_features + num_features])

preds_df = spark.createDataFrame(preds_df)

# 1. Create the feature table in Databricks

fe.create_table(
name=feature_table_name,
primary_keys=["id"],
df=preds_df,
description="Sleep efficiencies predictions feature table",
)

# Enable Change Data Feed
spark.sql(f"""
ALTER TABLE {feature_table_name}
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# COMMAND ----------

# 2. Create the online table using feature table

spec = OnlineTableSpec(
primary_key_columns=["id"],
source_table_full_name=feature_table_name,
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered": "true"}),
perform_full_copy=False,
)

# Create the online table in Databricks
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)

# COMMAND ----------

# 3. Create feture look up and feature spec table feature table

# Define features to look up from the feature table
features = [
FeatureLookup(
table_name=feature_table_name,
lookup_key="Id",
feature_names=["id", "sleep_duration", "awakenings", "sleep_month", "Predicted_SleepEfficiency"],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistent use of 'Id' vs 'id' could lead to errors

There is inconsistency in the use of 'Id' (uppercase I) and 'id' (lowercase i) as key identifiers in your code. This can cause KeyError exceptions or unexpected behavior when accessing DataFrame columns or performing lookups. Ensure consistent use of 'id' (lowercase) throughout the code to match the DataFrame column names.

Apply the following changes to resolve the inconsistency:

At line 132:

-            lookup_key="Id",
+            lookup_key="id",

At line 179:

- id_list = preds_df["Id"]
+ id_list = preds_df["id"]

At line 197:

-        json={"dataframe_records": [{"Id": "2"}]},
+        json={"dataframe_records": [{"id": "2"}]},

At line 215:

-        json={"dataframe_split": {"columns": ["Id"], "data": [["2"]]}},
+        json={"dataframe_split": {"columns": ["id"], "data": [["2"]]}},

At line 237:

-            json={"dataframe_records": [{"Id": random_id}]},
+            json={"dataframe_records": [{"id": random_id}]},

Also applies to: 179-179, 197-197, 215-215, 237-237

)
]

# Create the feature spec for serving
feature_spec_name = f"{catalog_name}.{schema_name}.return_predictions"

fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Deploy Feature Serving Endpoint

# COMMAND ----------

# 4. Create endpoing using feature spec

# Create a serving endpoint for the sleep efficiencies predictions
workspace.serving_endpoints.create(
name="sleep-efficiencies-feature-serving",
config=EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name, # feature spec name defined in the previous step
scale_to_zero_enabled=True,
workload_size="Small", # Define the workload size (Small, Medium, Large)
)
]
),
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Call The Endpoint

# COMMAND ----------


# COMMAND ----------

token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
host = spark.conf.get("spark.databricks.workspaceUrl")

# COMMAND ----------

id_list = preds_df["Id"]

# COMMAND ----------

display(id_list)

# COMMAND ----------

# MAGIC %md
# MAGIC

# COMMAND ----------

start_time = time.time()
serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-feature-serving/invocations"
response = requests.post(
f"{serving_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"dataframe_records": [{"Id": "2"}]},
)

end_time = time.time()
execution_time = end_time - start_time

print("Response status:", response.status_code)
print("Reponse text:", response.text)
print("Execution time:", execution_time, "seconds")


# COMMAND ----------

# another way to call the endpoint

response = requests.post(
f"{serving_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"dataframe_split": {"columns": ["Id"], "data": [["2"]]}},
)


## Load Test

# COMMAND ----------

# Initialize variables
serving_endpoint = f"https://{host}/serving-endpoints/sleep-efficiencies-feature-serving/invocations"
id_list = preds_df.select("Id").rdd.flatMap(lambda x: x).collect()
headers = {"Authorization": f"Bearer {token}"}
num_requests = 10


# Function to make a request and record latency
def send_request():
random_id = random.choice(id_list)
start_time = time.time()
response = requests.post(
serving_endpoint,
headers=headers,
json={"dataframe_records": [{"Id": random_id}]},
)
end_time = time.time()
latency = end_time - start_time # Calculate latency for this request
return response.status_code, latency
Comment on lines +231 to +238
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add exception handling to API requests in send_request function

The send_request function lacks error handling for potential exceptions during the API request. This could cause the program to crash during the load test if a network error occurs.

Consider adding exception handling to make the code more robust:

def send_request():
    random_id = random.choice(id_list)
    start_time = time.time()
    try:
        response = requests.post(
            serving_endpoint,
            headers=headers,
            json={"dataframe_records": [{"id": random_id}]},
        )
        response.raise_for_status()
        status_code = response.status_code
    except requests.exceptions.RequestException as e:
        status_code = None
        print(f"Request failed: {e}")
    end_time = time.time()
    latency = end_time - start_time  # Calculate latency for this request
    return status_code, latency



# Measure total execution time
total_start_time = time.time()
latencies = []

# Send requests concurrently
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(send_request) for _ in range(num_requests)]

for future in as_completed(futures):
status_code, latency = future.result()
latencies.append(latency)

total_end_time = time.time()
total_execution_time = total_end_time - total_start_time

# Calculate the average latency
average_latency = sum(latencies) / len(latencies)

print("\nTotal execution time:", total_execution_time, "seconds")
print("Average latency per request:", average_latency, "seconds")
Loading
Loading