Skip to content

Commit

Permalink
Resolved linting errors
Browse files Browse the repository at this point in the history
  • Loading branch information
a0134m committed Nov 9, 2024
1 parent cee3a53 commit 9794c26
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 90 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM databricksruntime/python:15.4-LTS
FROM databricksruntime/python:15.4-LTS

ARG PROJECT_DIR=/project

Expand All @@ -7,4 +7,4 @@ RUN pip install uv==0.4.20
WORKDIR ${PROJECT_DIR}
COPY dist/wine_quality-0.0.1-py3-none-any.whl ${PROJECT_DIR}/

RUN uv pip install --python /databricks/python3 ${PROJECT_DIR}/wine_quality-0.0.1-py3-none-any.whl
RUN uv pip install --python /databricks/python3 ${PROJECT_DIR}/wine_quality-0.0.1-py3-none-any.whl
25 changes: 16 additions & 9 deletions notebooks/week3/01. feature_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
OnlineTableSpecTriggeredSchedulingPolicy,
)
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

from wine_quality.config import ProjectConfig
Expand All @@ -40,11 +41,12 @@

# MAGIC %md
# MAGIC ## Deploy and query a feature serving endpoint
https://docs.databricks.com/en/machine-learning/feature-store/feature-serving-tutorial.html
# MAGIC https://docs.databricks.com/en/machine-learning/feature-store/feature-serving-tutorial.html

# COMMAND ----------

spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

# Initialize Databricks clients
workspace = WorkspaceClient()
Expand Down Expand Up @@ -122,14 +124,14 @@
perform_full_copy=False,
)

#Create the online table in Databricks
# Create the online table in Databricks
try:
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
if "already exists" in str(e):
pass
else:
raise e

online_table_pipeline = workspace.online_tables.get(name=online_table_name)

Expand All @@ -139,7 +141,9 @@
# Define features to look up from the feature table
features = [
FeatureLookup(
table_name=feature_table_name, lookup_key="id", feature_names=[ "volatile_acidity", "alcohol", "sulphates", "quality"]
table_name=feature_table_name,
lookup_key="id",
feature_names=["volatile_acidity", "alcohol", "sulphates", "quality"],
)
]

Expand Down Expand Up @@ -175,7 +179,7 @@
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name, # feature spec name defined in the previous step
scale_to_zero_enabled=True, # Cost saving mechanism where the endpoint scales down to zero when not in use
scale_to_zero_enabled=True, # Cost saving mechanism where the endpoint scales down to zero when not in use
workload_size="Small", # Define the workload size (Small, Medium, Large)
)
]
Expand Down Expand Up @@ -248,6 +252,7 @@
headers = {"Authorization": f"Bearer {token}"}
num_requests = 10


# COMMAND ----------
# Function to make a request and record latency
def send_request():
Expand All @@ -261,6 +266,8 @@ def send_request():
end_time = time.time()
latency = end_time - start_time # Calculate latency for this request
return response.status_code, latency


# COMMAND ----------

# Measure total execution time
Expand All @@ -282,4 +289,4 @@ def send_request():
average_latency = sum(latencies) / len(latencies)

print("\nTotal execution time:", total_execution_time, "seconds")
print("Average latency per request:", average_latency, "seconds")
print("Average latency per request:", average_latency, "seconds")
37 changes: 16 additions & 21 deletions notebooks/week3/02.model_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,24 @@

# COMMAND ----------

import random
import time

import requests
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
EndpointCoreConfigInput,
ServedEntityInput,
TrafficConfig,
Route,
)
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

from wine_quality.config import ProjectConfig
from pyspark.sql import SparkSession

workspace = WorkspaceClient()
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

config = ProjectConfig.from_yaml(config_path="../../project_config.yml")

Expand All @@ -45,13 +44,13 @@
entity_version=3,
)
],
# # Optional if only 1 entity is served
# traffic_config=TrafficConfig(
# routes=[
# Route(served_model_name="wine_quality_model-3",
# traffic_percentage=100)
# ]
# ),
# # Optional if only 1 entity is served
# traffic_config=TrafficConfig(
# routes=[
# Route(served_model_name="wine_quality_model-3",
# traffic_percentage=100)
# ]
# ),
),
)
except Exception as e:
Expand Down Expand Up @@ -88,7 +87,7 @@
"density",
"pH",
"sulphates",
"alcohol"
"alcohol",
]

sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records")
Expand Down Expand Up @@ -118,9 +117,7 @@
# COMMAND ----------
start_time = time.time()

model_serving_endpoint = (
f"https://{host}/serving-endpoints/wine-quality-model-serving/invocations"
)
model_serving_endpoint = f"https://{host}/serving-endpoints/wine-quality-model-serving/invocations"
response = requests.post(
f"{model_serving_endpoint}",
headers={"Authorization": f"Bearer {token}"},
Expand All @@ -142,9 +139,7 @@
# COMMAND ----------

# Initialize variables
model_serving_endpoint = (
f"https://{host}/serving-endpoints/wine-quality-model-serving/invocations"
)
model_serving_endpoint = f"https://{host}/serving-endpoints/wine-quality-model-serving/invocations"

headers = {"Authorization": f"Bearer {token}"}
num_requests = 1000
Expand Down Expand Up @@ -182,4 +177,4 @@ def send_request():
average_latency = sum(latencies) / len(latencies)

print("\nTotal execution time:", total_execution_time, "seconds")
print("Average latency per request:", average_latency, "seconds")
print("Average latency per request:", average_latency, "seconds")
16 changes: 9 additions & 7 deletions notebooks/week3/03.model_serving_feature_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
OnlineTableSpecTriggeredSchedulingPolicy,
)
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

from wine_quality.config import ProjectConfig

spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

# Initialize Databricks clients
workspace = WorkspaceClient()
Expand All @@ -47,14 +49,14 @@
perform_full_copy=False,
)

#Create the online table in Databricks
# Create the online table in Databricks
try:
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
if "already exists" in str(e):
pass
else:
raise e

# COMMAND ----------

Expand Down Expand Up @@ -127,7 +129,7 @@

# COMMAND ----------

train_set.dtypes
train_set_dtypes = train_set.dtypes

# COMMAND ----------

Expand Down Expand Up @@ -157,4 +159,4 @@

# COMMAND ----------

wine_features.dtypes
wine_features_dtypes = wine_features.dtypes
48 changes: 19 additions & 29 deletions notebooks/week3/04.AB_test_model_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@
# MAGIC %restart_python

# COMMAND ----------
import hashlib
import time

import mlflow
import pandas as pd
import requests
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
from lightgbm import LGBMRegressor
from mlflow import MlflowClient
from mlflow.models import infer_signature
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
import hashlib
import requests

from wine_quality.config import ProjectConfig

Expand Down Expand Up @@ -58,6 +58,10 @@
"max_depth": ab_test_params["max_depth_b"],
}

# COMMAND ----------
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

# COMMAND ----------

# MAGIC %md
Expand Down Expand Up @@ -108,7 +112,7 @@

# Train the model
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
y_pred = pipeline.predict(X_test)

# Calculate performance metrics
mse = mean_squared_error(y_test, y_pred)
Expand All @@ -124,9 +128,7 @@
signature = infer_signature(model_input=X_train, model_output=y_pred)

# Log the input dataset for tracking reproducibility
dataset = mlflow.data.from_spark(train_set_spark,
table_name=f"{catalog_name}.{schema_name}.train_set",
version="0")
dataset = mlflow.data.from_spark(train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set", version="0")
mlflow.log_input(dataset, context="training")

# Log the pipeline model in MLflow with a unique artifact path
Expand Down Expand Up @@ -178,8 +180,7 @@
mlflow.log_metric("r2_score", r2)
signature = infer_signature(model_input=X_train, model_output=y_pred)

dataset = mlflow.data.from_spark(train_set_spark,
table_name=f"{catalog_name}.{schema_name}.train_set", version="0")
dataset = mlflow.data.from_spark(train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set", version="0")
mlflow.log_input(dataset, context="training")
mlflow.sklearn.log_model(sk_model=pipeline, artifact_path="lightgbm-pipeline-model", signature=signature)

Expand Down Expand Up @@ -233,16 +234,14 @@ def predict(self, context, model_input):

# COMMAND ----------
X_train = train_set[num_features + ["id"]]
X_test = test_set[num_features + ["id"]]
X_test = test_set[num_features + ["id"]]


# COMMAND ----------
models = [model_A, model_B]
wrapped_model = WineQualityModelWrapper(models) # we pass the loaded models to the wrapper
example_input = X_test.iloc[0:1] # Select the first row for prediction as example
example_prediction = wrapped_model.predict(
context=None,
model_input=example_input)
example_prediction = wrapped_model.predict(context=None, model_input=example_input)
print("Example Prediction:", example_prediction)

# COMMAND ----------
Expand All @@ -251,22 +250,16 @@ def predict(self, context, model_input):

with mlflow.start_run() as run:
run_id = run.info.run_id
signature = infer_signature(model_input=X_train,
model_output={"Prediction": 1234.5,
"model": "Model B"})
dataset = mlflow.data.from_spark(train_set_spark,
table_name=f"{catalog_name}.{schema_name}.train_set",
version="0")
signature = infer_signature(model_input=X_train, model_output={"Prediction": 1234.5, "model": "Model B"})
dataset = mlflow.data.from_spark(train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set", version="0")
mlflow.log_input(dataset, context="training")
mlflow.pyfunc.log_model(
python_model=wrapped_model,# passing wrapped model here instead sklearn model
python_model=wrapped_model, # passing wrapped model here instead sklearn model
artifact_path="pyfunc-wine-quality-model-ab",
signature=signature
signature=signature,
)
model_version = mlflow.register_model(
model_uri=f"runs:/{run_id}/pyfunc-wine-quality-model-ab",
name=model_name,
tags={"git_sha": f"{git_sha}"}
model_uri=f"runs:/{run_id}/pyfunc-wine-quality-model-ab", name=model_name, tags={"git_sha": f"{git_sha}"}
)

# COMMAND ----------
Expand All @@ -276,7 +269,7 @@ def predict(self, context, model_input):
predictions = model.predict(X_test.iloc[0:1])

# Display predictions
predictions
# predictions

# COMMAND ----------

Expand Down Expand Up @@ -313,7 +306,6 @@ def predict(self, context, model_input):
# MAGIC ### Call the endpoint

# COMMAND ----------

token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
host = spark.conf.get("spark.databricks.workspaceUrl")

Expand Down Expand Up @@ -342,9 +334,7 @@ def predict(self, context, model_input):

start_time = time.time()

model_serving_endpoint = (
f"https://{host}/serving-endpoints/wine-quality-model-serving-ab-test/invocations"
)
model_serving_endpoint = f"https://{host}/serving-endpoints/wine-quality-model-serving-ab-test/invocations"

response = requests.post(
f"{model_serving_endpoint}",
Expand Down
Loading

0 comments on commit 9794c26

Please sign in to comment.