-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path06.feature_serving.py
262 lines (188 loc) · 7.53 KB
/
06.feature_serving.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# Databricks notebook source
# MAGIC %pip install /Volumes/main/default/file_exchange/denninger/nyc_taxi-0.0.1-py3-none-any.whl
# COMMAND ----------
# MAGIC %restart_python
# COMMAND ----------
"""
Create feature table in unity catalog, it will be a delta table
Create online table which uses the feature delta table created in the previous step
Create a feature spec. When you create a feature spec,
you specify the source Delta table.
This allows the feature spec to be used in both offline and online scenarios.
For online lookups, the serving endpoint automatically uses the online table to perform low-latency feature lookups.
The source Delta table and the online table must use the same primary key.
"""
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import mlflow
import pandas as pd
import requests
from databricks import feature_engineering
from databricks.feature_engineering import FeatureLookup
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import (
OnlineTableSpec,
OnlineTableSpecTriggeredSchedulingPolicy,
)
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
from pyspark.sql import SparkSession
from nyctaxi.config import ProjectConfig
spark = SparkSession.builder.getOrCreate()
# Initialize Databricks clients
workspace = WorkspaceClient()
fe = feature_engineering.FeatureEngineeringClient()
# Set the MLflow registry URI
mlflow.set_registry_uri("databricks-uc")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Load config, train and test tables
# COMMAND ----------
# Load config
config = ProjectConfig.from_yaml(config_path="project_config.yml")
# Get feature columns details
num_features = config.num_features
cat_features = config.cat_features
target = config.target
catalog_name = config.catalog_name
schema_name = config.schema_name
# Define table names
feature_table_name = f"{catalog_name}.{schema_name}.nyctaxi_preds"
online_table_name = f"{catalog_name}.{schema_name}.nyctaxi_preds_online"
# Load training and test sets from Catalog
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set_an").toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set_an").toPandas()
df = pd.concat([train_set, test_set])
# COMMAND ----------
# MAGIC %md
# MAGIC ## Load a registered model
# COMMAND ----------
# Load the MLflow model for predictions
pipeline = mlflow.sklearn.load_model(f"models:/{catalog_name}.{schema_name}.nyctaxi_model_basic/3")
# COMMAND ----------
# Prepare the DataFrame for predictions and feature table creation - these features are the ones we want to serve.
preds_df = df[["pickup_zip", "trip_distance"]]
preds_df["predicted_fare_amount"] = pipeline.predict(df[num_features])
preds_df = spark.createDataFrame(preds_df)
# COMMAND ----------
# Clean up so that the key pickup_zip only only has onw row
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
window = Window.partitionBy("pickup_zip").orderBy("trip_distance")
preds_df = preds_df.withColumn("row_number", row_number().over(window))
preds_df = preds_df.filter(preds_df.row_number == 1).drop("row_number")
display(preds_df)
# COMMAND ----------
# 1. Create the feature table in Databricks
fe.create_table(
name=feature_table_name, primary_keys=["pickup_zip"], df=preds_df, description="New York City Taxi predictions feature table"
)
# Enable Change Data Feed
spark.sql(f"""
ALTER TABLE {feature_table_name}
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# COMMAND ----------
# 2. Create the online table using feature table
spec = OnlineTableSpec(
primary_key_columns=["pickup_zip"],
source_table_full_name=feature_table_name,
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered": "true"}),
perform_full_copy=False,
)
# Create the online table in Databricks
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
# COMMAND ----------
# 3. Create feature look up and feature spec table feature table
# Define features to look up from the feature table
features = [
FeatureLookup(
table_name=feature_table_name, lookup_key="pickup_zip", feature_names=["trip_distance"]
)
]
# Create the feature spec for serving
feature_spec_name = f"{catalog_name}.{schema_name}.return_nyctaxi_predictions"
fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Deploy Feature Serving Endpoint
# COMMAND ----------
# 4. Create endpoint using feature spec
# Create a serving endpoint for the house prices predictions
workspace.serving_endpoints.create(
name="nyctaxi-feature-serving",
config=EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name, # feature spec name defined in the previous step
scale_to_zero_enabled=True,
workload_size="Small", # Define the workload size (Small, Medium, Large)
)
]
),
)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Call The Endpoint
# COMMAND ----------
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
host = spark.conf.get("spark.databricks.workspaceUrl")
# COMMAND ----------
id_list = preds_df["pickup_zip"]
# COMMAND ----------
start_time = time.time()
serving_endpoint = f"https://{host}/serving-endpoints/nyctaxi-feature-serving/invocations"
response = requests.post(
f"{serving_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"dataframe_records": [{"pickup_zip": "10119"}]},
)
end_time = time.time()
execution_time = end_time - start_time
print("Response status:", response.status_code)
print("Reponse text:", response.text)
print("Execution time:", execution_time, "seconds")
# COMMAND ----------
# another way to call the endpoint
response = requests.post(
f"{serving_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"dataframe_split": {"columns": ["pickup_zip"], "data": [["10119"]]}},
)
print("Response status:", response.status_code)
print("Reponse text:", response.text)
print("Execution time:", execution_time, "seconds")
# COMMAND ----------
# Initialize variables
serving_endpoint = f"https://{host}/serving-endpoints/house-prices-feature-serving/invocations"
id_list = preds_df.select("pickup_zip").rdd.flatMap(lambda x: x).collect()
headers = {"Authorization": f"Bearer {token}"}
num_requests = 10
# Function to make a request and record latency
def send_request():
random_id = random.choice(id_list)
start_time = time.time()
response = requests.post(
serving_endpoint,
headers=headers,
json={"dataframe_records": [{"pickup_zip": random_id}]},
)
end_time = time.time()
latency = end_time - start_time # Calculate latency for this request
return response.status_code, latency
# Measure total execution time
total_start_time = time.time()
latencies = []
# Send requests concurrently
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(send_request) for _ in range(num_requests)]
for future in as_completed(futures):
status_code, latency = future.result()
latencies.append(latency)
total_end_time = time.time()
total_execution_time = total_end_time - total_start_time
# Calculate the average latency
average_latency = sum(latencies) / len(latencies)
print("\nTotal execution time:", total_execution_time, "seconds")
print("Average latency per request:", average_latency, "seconds")
# COMMAND ----------