-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbenchmark.py
420 lines (368 loc) · 15 KB
/
benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""Inference benchmark tool for TensorFlow Serving and Triton."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import base64
import csv
from requests.sessions import default_headers
import distribution
import functools
import io
import numbers
import os
import pandas as pd
import queue as Queue
import requests as r
import threading
import multiprocessing
import time
import numpy as np
# Disable GPU, so tensorflow initializes faster
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import tensorflow.compat.v1 as tf
from google.protobuf.json_format import Parse as ProtoParseJson
from tensorflow.core.framework import types_pb2
from tensorflow.python.platform import gfile
from itertools import cycle, islice
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
from clients import base_client
tf.app.flags.DEFINE_integer(
"num_requests",
0,
"Total # of requests to send for each benchmark. "
"Either num_requests or num_seconds has to be specified.")
tf.app.flags.DEFINE_float(
"num_seconds",
0,
"Number of seconds to run each benchmark for. "
"Either num_requests or num_seconds has to be specified.")
tf.app.flags.DEFINE_integer(
"num_warmup_requests", 0,
"Number requests to send before starting benchmark.")
tf.app.flags.DEFINE_integer(
"num_warmup_delay_seconds", 2,
"Number of seconds to wait after warmup requests are sent.")
tf.app.flags.DEFINE_string(
"qps_range",
"",
"Desired client side request QPS in"
"one of the following formats:"
" - qps - benchmark at one QPS"
" - start, stop - benchmark at QPS range [start, stop)"
" - start, stop, step - benchmark at QPS range [start, stop) with step"
" - [qps1, qps2] - benchmark at give QPS range values",
)
tf.app.flags.DEFINE_float("request_timeout", 300.0,
"Timeout for inference request.")
tf.app.flags.DEFINE_string("model_name", "",
"Name of the model being served on the ModelServer")
tf.app.flags.DEFINE_string("model_version", "",
"Version the model being served on the ModelServer")
tf.app.flags.DEFINE_string(
"signature_name",
"serving_default",
"Name of the model signature on the ModelServer",
)
tf.app.flags.DEFINE_string(
"host", "localhost",
"Host name to connect to, localhost by default. Alternatively you can specify"
"full URL to send requests to.")
tf.app.flags.DEFINE_integer("port", None, "Port to connect to.")
tf.app.flags.DEFINE_enum(
"mode",
"grpc",
["grpc", "sync_grpc", "rest", "triton_grpc", "triton_rest", "vertex_gapic",
"vertex_rest"],
"Benchmark mode: gRPC, synchronous gRPC, or REST, or Triton format.",
)
tf.app.flags.DEFINE_enum("distribution", "uniform",
["uniform", "poisson", "pareto"], "Distribution")
tf.app.flags.DEFINE_string(
"tfrecord_dataset_path", "",
"The path to data in tfrecord or tfrecord.gz format.")
tf.app.flags.DEFINE_string(
"requests_file_path",
"",
"The path the predict_pb2.PredictRequest requests file serialized in json format.",
)
tf.app.flags.DEFINE_string("jsonl_file_path", "",
"The path the dataset file in jsonl format.")
tf.app.flags.DEFINE_string("input_name", "input",
"The name of the model input tensor.")
tf.app.flags.DEFINE_integer("batch_size", None, "Per request batch size.")
tf.app.flags.DEFINE_integer("workers", 1, "Number of workers.")
tf.app.flags.DEFINE_string(
"api_key", "",
"API Key for ESP service if authenticating external requests.")
tf.app.flags.DEFINE_string("csv_report_filename", "",
"Optional filename to generate report")
tf.app.flags.DEFINE_string("title", "", "Optional title for the report")
tf.app.flags.DEFINE_enum("grpc_compression", "none",
["none", "deflate", "gzip"], "grpc compression")
tf.app.flags.DEFINE_string("authorization_header", "",
"Authorization header for REST requests.")
tf.app.flags.DEFINE_string("grpc_destination", "",
"gRPC destination metadata header.")
tf.app.flags.DEFINE_string("default_int_type", "",
"Default type to use for integer values.")
tf.app.flags.DEFINE_string("default_float_type", "",
"Default type to use for fractional values.")
tf.app.flags.DEFINE_bool("busy_sleep", False,
"Use busy sleep instead of time.sleep().")
tf.app.flags.DEFINE_bool(
"bail_on_error", False,
"Stop sending more requests for the current QPS if any error occurs.")
tf.app.flags.DEFINE_integer(
"goodput_search_p99_ms", 0,
"Run with high-to-low QPS. Skip lower QPS if all requests succeeded and p99 is less than the given value for the current QPS."
)
FLAGS = tf.app.flags.FLAGS
def get_client_class():
if FLAGS.mode == "grpc":
from clients import tensorflow_serving_grpc
return tensorflow_serving_grpc.TensorflowServingGrpc
elif FLAGS.mode == "sync_grpc":
from clients import tensorflow_serving_sync_grpc
return tensorflow_serving_sync_grpc.TensorflowServingSyncGrpc
elif FLAGS.mode == "rest":
from clients import tensorflow_serving_rest
return tensorflow_serving_rest.TensorflowServingRest
elif FLAGS.mode == "triton_rest":
from clients import triton_rest
return triton_rest.TritonRest
elif FLAGS.mode == "triton_grpc":
from clients import triton_grpc
return triton_grpc.TritonGrpc
elif FLAGS.mode == "vertex_gapic":
from clients import vertex_gapic
return vertex_gapic.VertexGapic
elif FLAGS.mode == "vertex_rest":
from clients import vertex_rest
return vertex_rest.VertexRest
else:
raise ValueError("Invalid mode")
def get_grpc_compression():
if FLAGS.grpc_compression == "gzip":
return grpc.Compression.Gzip
elif FLAGS.grpc_compression == "deflate":
return grpc.Compression.Deflate
else:
return None
def get_qps_range(qps_range_string):
qps_range_string = qps_range_string.strip()
if qps_range_string.startswith("[") and qps_range_string.endswith("]"):
qps_range_string = qps_range_string.lstrip("[").rstrip("]")
qps_range_list = list(map(lambda v: float(v), qps_range_string.split(",")))
return qps_range_list
qps_range_list = list(map(lambda v: float(v), qps_range_string.split(",")))
qps_range_start = 0
qps_range_step = 1
if len(qps_range_list) == 1:
return [qps_range_list[0]]
elif len(qps_range_list) == 2:
return list(np.arange(qps_range_list[0], qps_range_list[1]))
elif len(qps_range_list) == 3:
return list(np.arange(qps_range_list[0], qps_range_list[1], qps_range_list[2]))
else:
raise ValueError("Invalid argument qps_range:" + qps_range_start)
def merge_results(results, result):
for key, value in result.items():
if not key.startswith("_"):
if key not in results:
results[key] = [value]
else:
results[key].append(value)
def merge_worker_results(worker_results):
success = 0
error = 0
reqested_qps = 0
start_time = []
end_time = []
latency = []
avg_miss_rate_percent = []
for worker_result in worker_results:
success += worker_result["success"]
error += worker_result["error"]
reqested_qps += worker_result["reqested_qps"]
avg_miss_rate_percent.append(worker_result["avg_miss_rate_percent"])
latency.extend(worker_result["_latency"])
start_time.append(worker_result["_start_time"])
end_time.append(worker_result["_end_time"])
time = np.max(end_time) - np.min(start_time)
return {
"reqested_qps": reqested_qps,
"actual_qps": (success + error) / time,
"success": success,
"error": error,
"time": time,
"avg_latency": np.average(latency) * 1000 if latency else [],
"p50": np.percentile(latency, 50) * 1000 if latency else [],
"p90": np.percentile(latency, 90) * 1000 if latency else [],
"p99": np.percentile(latency, 99) * 1000 if latency else [],
"avg_miss_rate_percent": np.average(avg_miss_rate_percent),
}
def print_result(result):
v = []
for key, value in result.items():
if not key.startswith("_"):
if "float" in str(type(value)):
v.append("{}: {:.2f}".format(key, value))
else:
v.append("{}: {}".format(key, value))
tf.logging.info("\t".join(v))
def get_requests_for_qps(requests_list, qps, num_requests=None, num_seconds=None, worker_index=0):
if num_requests:
pass
elif num_seconds:
num_requests = int(max(qps * num_seconds, 10))
else:
raise ValueError("Neither num_requests, nor num_seconds are specified")
print('num_requests', num_requests)
return islice(cycle(requests_list),
worker_index * num_requests,
(worker_index + 1) * num_requests), num_requests
def is_goodput_found(result: dict) -> bool:
return (FLAGS.goodput_search_p99_ms > 0 and result["error"] == 0 and
result["p99"] < FLAGS.goodput_search_p99_ms)
def main(argv):
del argv
tf.disable_v2_behavior()
if FLAGS.qps_range is None or FLAGS.qps_range == "":
tf.logging.error("Please specify qps_range")
exit(1)
if not FLAGS.num_requests and not FLAGS.num_seconds:
tf.logging.error("Either num_requests, or num_seconds should be specified")
request_path = None
request_format = None
if FLAGS.tfrecord_dataset_path != "":
request_format = base_client.RequestFormat.TFRECORD
request_path = FLAGS.tfrecord_dataset_path
elif FLAGS.requests_file_path != "":
request_format = base_client.RequestFormat.FILE
request_path = FLAGS.requests_file_path
elif FLAGS.jsonl_file_path != "":
request_format = base_client.RequestFormat.DICTIONARY
request_path = FLAGS.jsonl_file_path
else:
raise ValueError(
"Either tfrecord_dataset_path or requests_file_path flag has to be specified"
)
http_headers = {}
if FLAGS.authorization_header:
http_headers["authorization"] = FLAGS.authorization_header
grpc_metadata = []
if FLAGS.api_key:
grpc_metadata.append(("x-api-key", FLAGS.api_key))
if FLAGS.grpc_destination:
grpc_metadata.append(("grpc-destination", FLAGS.grpc_destination))
client_class = get_client_class()
client = client_class(FLAGS.host, FLAGS.port, FLAGS.model_name,
FLAGS.model_version, FLAGS.signature_name,
FLAGS.distribution, FLAGS.input_name,
FLAGS.default_int_type,
FLAGS.default_float_type, http_headers, grpc_metadata,
get_grpc_compression(), FLAGS.request_timeout,
FLAGS.busy_sleep, FLAGS.bail_on_error)
tf.logging.info("Loading data")
requests_list = client.get_requests(request_format, request_path,
FLAGS.num_warmup_requests,
FLAGS.batch_size)
qps_range = get_qps_range(FLAGS.qps_range)
if FLAGS.goodput_search_p99_ms:
qps_range = sorted(qps_range, reverse=True)
results = {}
if FLAGS.num_warmup_requests > 0:
tf.logging.info("Sending {} warmup requests".format(
FLAGS.num_warmup_requests))
warmup_qps = qps_range[0]
warmup_requests, num_requests = get_requests_for_qps(
requests_list, warmup_qps, num_requests=FLAGS.num_warmup_requests)
client.bail_on_error = False
_ = client.run(warmup_requests, num_requests, warmup_qps)
client.bail_on_error = FLAGS.bail_on_error
if FLAGS.num_warmup_delay_seconds:
tf.logging.info("Waiting for %d seconds after warmup", FLAGS.num_warmup_delay_seconds)
time.sleep(FLAGS.num_warmup_delay_seconds)
tf.logging.info("Warmup complete")
if FLAGS.workers == 1:
for qps in qps_range:
worker_requests, num_requests = get_requests_for_qps(
requests_list,
qps,
FLAGS.num_requests,
FLAGS.num_seconds)
result = client.run(worker_requests, num_requests, qps)
print_result(result)
merge_results(results, result)
if is_goodput_found(result):
break
else:
def _worker_load_test_func(qps, worker_results, worker_index):
worker_requests, num_requests = get_requests_for_qps(
requests_list,
qps,
FLAGS.num_requests,
FLAGS.num_seconds,
worker_index = worker_index)
worker_results[worker_index] = client.run(worker_requests,
num_requests, qps)
for qps in qps_range:
client.bail_event.clear()
worker_processes = []
with multiprocessing.Manager() as manager:
worker_results = manager.list()
for worker_index in range(FLAGS.workers):
worker_process = multiprocessing.Process(
target=_worker_load_test_func,
args=(qps, worker_results, worker_index))
worker_processes.append(worker_process)
worker_results.append({})
for worker_process in worker_processes:
# Offset each worker's startup time.
# Otherwise, the uniform distribution would become a uniform-burst.
time.sleep(1 / qps / len(worker_processes))
worker_process.start()
for worker_process in worker_processes:
worker_process.join()
result = merge_worker_results(worker_results)
print_result(result)
merge_results(results, result)
if is_goodput_found(result):
break
if FLAGS.title and "reqested_qps" in results and len(
results["reqested_qps"]) > 0:
results["title"] = [""] * len(results["reqested_qps"])
results["title"][0] = FLAGS.title
df = pd.DataFrame.from_dict(results)
tf.logging.info("\n" + df.to_string(
columns=[
"reqested_qps", "actual_qps", "success", "error", "avg_latency",
"p99", "avg_miss_rate_percent"
],
index=False))
if FLAGS.csv_report_filename is not None and FLAGS.csv_report_filename != "":
df.to_csv(FLAGS.csv_report_filename)
import matplotlib.pyplot as plt
base_image_file_name = FLAGS.csv_report_filename.replace(".csv",
"").replace(
".", "_")
fig, (p1, p2) = plt.subplots(2, 1, figsize=(10, 12), facecolor=(1, 1, 1))
p1.set_title(FLAGS.title + " Requested QPS")
p1.plot("reqested_qps", "p50", data=results, label="p50")
p1.plot("reqested_qps", "p90", data=results, label="p90")
p1.plot("reqested_qps", "p99", data=results, label="p99")
p1.plot("reqested_qps", "avg_latency", data=results, label="avg_latency")
p1.legend()
plt.title(FLAGS.title + " Actual QPS")
p2.plot("actual_qps", "p50", data=results, label="p50")
p2.plot("actual_qps", "p90", data=results, label="p90")
p2.plot("actual_qps", "p99", data=results, label="p99")
p2.plot("actual_qps", "avg_latency", data=results, label="avg_latency")
p2.legend()
plt.savefig(base_image_file_name + ".png")
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
logger = tf.get_logger()
logger.propagate = False
tf.app.run(main)