-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path07.model_serving.py
162 lines (119 loc) · 3.88 KB
/
07.model_serving.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Databricks notebook source
# MAGIC %pip install /Volumes/main/default/file_exchange/denninger/nyc_taxi-0.0.1-py3-none-any.whl
# COMMAND ----------
# MAGIC %restart_python
# COMMAND ----------
import time
import requests
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
EndpointCoreConfigInput,
ServedEntityInput,
TrafficConfig,
Route,
)
from nyctaxi.config import ProjectConfig
from pyspark.sql import SparkSession
workspace = WorkspaceClient()
spark = SparkSession.builder.getOrCreate()
config = ProjectConfig.from_yaml(config_path="project_config.yml")
catalog_name = config.catalog_name
schema_name = config.schema_name
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set_an").toPandas()
workspace.serving_endpoints.create(
name="nyctaxi-model-serving",
config=EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=f"{catalog_name}.{schema_name}.nyctaxi_model_basic",
scale_to_zero_enabled=True,
workload_size="Small",
entity_version=3,
)
],
# Optional if only 1 entity is served
traffic_config=TrafficConfig(
routes=[
Route(served_model_name="nyctaxi_model_basic-3",
traffic_percentage=100)
]
),
),
)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Call the endpoint
# COMMAND ----------
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
host = spark.conf.get("spark.databricks.workspaceUrl")
# COMMAND ----------
# MAGIC %md
# MAGIC ### Create sample request body
# COMMAND ----------
required_columns = [
"trip_distance"
]
sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records")
dataframe_records = [[record] for record in sampled_records]
# COMMAND ----------
dataframe_records
# COMMAND ----------
"""
Each body should be list of json with columns
Example
[{'trip_distance': 2.0
}]
"""
# COMMAND ----------
start_time = time.time()
model_serving_endpoint = (
f"https://{host}/serving-endpoints/nyctaxi-model-serving/invocations"
)
response = requests.post(
f"{model_serving_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"dataframe_records": dataframe_records[0]},
)
end_time = time.time()
execution_time = end_time - start_time
print("Response status:", response.status_code)
print("Reponse text:", response.text)
print("Execution time:", execution_time, "seconds")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Load Test
# COMMAND ----------
# Initialize variables
model_serving_endpoint = (
f"https://{host}/serving-endpoints/nyctaxi-model-serving/invocations"
)
headers = {"Authorization": f"Bearer {token}"}
num_requests = 10
# Function to make a request and record latency
def send_request():
random_record = random.choice(dataframe_records)
start_time = time.time()
response = requests.post(
model_serving_endpoint,
headers=headers,
json={"dataframe_records": random_record},
)
end_time = time.time()
latency = end_time - start_time
return response.status_code, latency
total_start_time = time.time()
latencies = []
# Send requests concurrently
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(send_request) for _ in range(num_requests)]
for future in as_completed(futures):
status_code, latency = future.result()
latencies.append(latency)
total_end_time = time.time()
total_execution_time = total_end_time - total_start_time
# Calculate the average latency
average_latency = sum(latencies) / len(latencies)
print("\nTotal execution time:", total_execution_time, "seconds")
print("Average latency per request:", average_latency, "seconds")