@@ -199,23 +199,31 @@ def run_multiple_clients(
199
199
200
200
# Calculate container timeout
201
201
container_timeout = 300 # 5 minutes default
202
- buffer_timeout = args .container_timeout_buffer # Configurable buffer from command line
202
+ buffer_timeout = (
203
+ args .container_timeout_buffer
204
+ ) # Configurable buffer from command line
203
205
if "test-time" in benchmark_command_str :
204
206
# Try to extract test time and add buffer
205
207
import re
208
+
206
209
# Handle both --test-time (memtier) and -test-time (pubsub-sub-bench)
207
- test_time_match = re .search (r'--?test-time[=\s]+(\d+)' , benchmark_command_str )
210
+ test_time_match = re .search (
211
+ r"--?test-time[=\s]+(\d+)" , benchmark_command_str
212
+ )
208
213
if test_time_match :
209
214
test_time = int (test_time_match .group (1 ))
210
215
container_timeout = test_time + buffer_timeout
211
- logging .info (f"Client { client_index } : Set container timeout to { container_timeout } s (test-time: { test_time } s + { buffer_timeout } s buffer)" )
216
+ logging .info (
217
+ f"Client { client_index } : Set container timeout to { container_timeout } s (test-time: { test_time } s + { buffer_timeout } s buffer)"
218
+ )
212
219
213
220
logging .info (
214
221
f"Starting client { client_index } with docker image { client_image } (cpuset={ client_cpuset_cpus } ) with args: { benchmark_command_str } "
215
222
)
216
223
217
224
# Start container (detached)
218
225
import os
226
+
219
227
container = docker_client .containers .run (
220
228
image = client_image ,
221
229
volumes = {
@@ -234,14 +242,16 @@ def run_multiple_clients(
234
242
user = f"{ os .getuid ()} :{ os .getgid ()} " , # Run as current user to fix permissions
235
243
)
236
244
237
- containers .append ({
238
- "container" : container ,
239
- "client_index" : client_index ,
240
- "client_tool" : client_tool ,
241
- "client_image" : client_image ,
242
- "benchmark_command_str" : benchmark_command_str ,
243
- "timeout" : container_timeout ,
244
- })
245
+ containers .append (
246
+ {
247
+ "container" : container ,
248
+ "client_index" : client_index ,
249
+ "client_tool" : client_tool ,
250
+ "client_image" : client_image ,
251
+ "benchmark_command_str" : benchmark_command_str ,
252
+ "timeout" : container_timeout ,
253
+ }
254
+ )
245
255
246
256
except Exception as e :
247
257
error_msg = f"Error starting client { client_index } : { e } "
@@ -264,30 +274,38 @@ def run_multiple_clients(
264
274
try :
265
275
# Wait for container to complete
266
276
exit_code = container .wait (timeout = container_info ["timeout" ])
267
- client_stdout = container .logs ().decode (' utf-8' )
277
+ client_stdout = container .logs ().decode (" utf-8" )
268
278
269
279
# Check if container succeeded
270
- if exit_code .get ('StatusCode' , 1 ) != 0 :
271
- logging .error (f"Client { client_index } failed with exit code: { exit_code } " )
280
+ if exit_code .get ("StatusCode" , 1 ) != 0 :
281
+ logging .error (
282
+ f"Client { client_index } failed with exit code: { exit_code } "
283
+ )
272
284
logging .error (f"Client { client_index } stdout/stderr:" )
273
285
logging .error (client_stdout )
274
286
# Fail fast on container execution errors
275
- raise RuntimeError (f"Client { client_index } ({ client_tool } ) failed with exit code { exit_code } " )
287
+ raise RuntimeError (
288
+ f"Client { client_index } ({ client_tool } ) failed with exit code { exit_code } "
289
+ )
276
290
277
- logging .info (f"Client { client_index } completed successfully with exit code: { exit_code } " )
291
+ logging .info (
292
+ f"Client { client_index } completed successfully with exit code: { exit_code } "
293
+ )
278
294
279
- results .append ({
280
- "client_index" : client_index ,
281
- "stdout" : client_stdout ,
282
- "config" : client_configs [client_index ],
283
- "tool" : client_tool ,
284
- "image" : client_image ,
285
- })
295
+ results .append (
296
+ {
297
+ "client_index" : client_index ,
298
+ "stdout" : client_stdout ,
299
+ "config" : client_configs [client_index ],
300
+ "tool" : client_tool ,
301
+ "image" : client_image ,
302
+ }
303
+ )
286
304
287
305
except Exception as e :
288
306
# Get logs even if wait failed
289
307
try :
290
- client_stdout = container .logs ().decode (' utf-8' )
308
+ client_stdout = container .logs ().decode (" utf-8" )
291
309
logging .error (f"Client { client_index } logs:" )
292
310
logging .error (client_stdout )
293
311
except :
@@ -327,26 +345,36 @@ def run_multiple_clients(
327
345
328
346
if os .path .exists (json_filepath ):
329
347
try :
330
- with open (json_filepath , 'r' ) as f :
348
+ with open (json_filepath , "r" ) as f :
331
349
client_json = json .load (f )
332
350
333
351
if "memtier_benchmark" in tool :
334
352
# Store memtier JSON
335
353
memtier_json = client_json
336
- logging .info (f"Successfully read memtier JSON output from client { client_index } " )
354
+ logging .info (
355
+ f"Successfully read memtier JSON output from client { client_index } "
356
+ )
337
357
elif "pubsub-sub-bench" in tool :
338
358
# Store pubsub JSON
339
359
pubsub_json = client_json
340
- logging .info (f"Successfully read pubsub-sub-bench JSON output from client { client_index } " )
360
+ logging .info (
361
+ f"Successfully read pubsub-sub-bench JSON output from client { client_index } "
362
+ )
341
363
342
- logging .info (f"Successfully read JSON output from client { client_index } ({ tool } )" )
364
+ logging .info (
365
+ f"Successfully read JSON output from client { client_index } ({ tool } )"
366
+ )
343
367
344
368
except Exception as e :
345
- logging .warning (f"Failed to read JSON from client { client_index } : { e } " )
369
+ logging .warning (
370
+ f"Failed to read JSON from client { client_index } : { e } "
371
+ )
346
372
# Fall back to stdout
347
373
pass
348
374
else :
349
- logging .warning (f"JSON output file not found for client { client_index } : { json_filepath } " )
375
+ logging .warning (
376
+ f"JSON output file not found for client { client_index } : { json_filepath } "
377
+ )
350
378
351
379
# Merge JSON outputs from both tools
352
380
if memtier_json and pubsub_json :
@@ -355,7 +383,9 @@ def run_multiple_clients(
355
383
# Add pubsub metrics to the aggregated result
356
384
aggregated_json .update (pubsub_json )
357
385
aggregated_stdout = json .dumps (aggregated_json , indent = 2 )
358
- logging .info ("Using merged JSON results from memtier and pubsub-sub-bench clients" )
386
+ logging .info (
387
+ "Using merged JSON results from memtier and pubsub-sub-bench clients"
388
+ )
359
389
elif memtier_json :
360
390
# Only memtier available
361
391
aggregated_json = memtier_json
@@ -369,7 +399,9 @@ def run_multiple_clients(
369
399
else :
370
400
# Fall back to concatenated stdout
371
401
aggregated_stdout = "\n " .join ([r ["stdout" ] for r in successful_results ])
372
- logging .warning ("No JSON results found, falling back to concatenated stdout" )
402
+ logging .warning (
403
+ "No JSON results found, falling back to concatenated stdout"
404
+ )
373
405
374
406
return aggregated_stdout , results
375
407
@@ -657,7 +689,7 @@ def prepare_pubsub_sub_bench_parameters(
657
689
arbitrary_command = False
658
690
659
691
benchmark_command = [
660
- #full_benchmark_path,
692
+ # full_benchmark_path,
661
693
"-json-out-file" ,
662
694
local_benchmark_output_filename ,
663
695
]
@@ -666,7 +698,9 @@ def prepare_pubsub_sub_bench_parameters(
666
698
if unix_socket != "" :
667
699
# pubsub-sub-bench doesn't support unix sockets directly
668
700
# Fall back to host/port
669
- logging .warning ("pubsub-sub-bench doesn't support unix sockets, using host/port" )
701
+ logging .warning (
702
+ "pubsub-sub-bench doesn't support unix sockets, using host/port"
703
+ )
670
704
benchmark_command .extend (["-host" , server , "-port" , str (port )])
671
705
else :
672
706
benchmark_command .extend (["-host" , server , "-port" , str (port )])
@@ -706,9 +740,12 @@ def prepare_pubsub_sub_bench_parameters(
706
740
if override_test_time and override_test_time > 0 :
707
741
# Remove any existing -test-time from user arguments
708
742
import re
709
- user_arguments = re .sub (r'-test-time\s+\d+' , '' , user_arguments )
743
+
744
+ user_arguments = re .sub (r"-test-time\s+\d+" , "" , user_arguments )
710
745
# Add our override test time
711
- benchmark_command_str = benchmark_command_str + " -test-time " + str (override_test_time )
746
+ benchmark_command_str = (
747
+ benchmark_command_str + " -test-time " + str (override_test_time )
748
+ )
712
749
logging .info (f"Applied test-time override: { override_test_time } s" )
713
750
714
751
# Add cleaned user arguments
@@ -1288,17 +1325,25 @@ def delete_temporary_files(
1288
1325
# Wait for container and get output
1289
1326
try :
1290
1327
exit_code = container .wait ()
1291
- client_container_stdout = container .logs ().decode ('utf-8' )
1292
- logging .info (f"Single client completed with exit code: { exit_code } " )
1328
+ client_container_stdout = container .logs ().decode (
1329
+ "utf-8"
1330
+ )
1331
+ logging .info (
1332
+ f"Single client completed with exit code: { exit_code } "
1333
+ )
1293
1334
except Exception as wait_error :
1294
1335
logging .error (f"Single client wait error: { wait_error } " )
1295
- client_container_stdout = container .logs ().decode ('utf-8' )
1336
+ client_container_stdout = container .logs ().decode (
1337
+ "utf-8"
1338
+ )
1296
1339
finally :
1297
1340
# Clean up container
1298
1341
try :
1299
1342
container .remove (force = True )
1300
1343
except Exception as cleanup_error :
1301
- logging .warning (f"Single client cleanup error: { cleanup_error } " )
1344
+ logging .warning (
1345
+ f"Single client cleanup error: { cleanup_error } "
1346
+ )
1302
1347
1303
1348
benchmark_end_time = datetime .datetime .now ()
1304
1349
benchmark_duration_seconds = (
@@ -1350,9 +1395,14 @@ def delete_temporary_files(
1350
1395
None ,
1351
1396
)
1352
1397
# Check if we have multi-client results with aggregated JSON
1353
- if is_multiple_clients and client_container_stdout .strip ().startswith ('{' ):
1398
+ if (
1399
+ is_multiple_clients
1400
+ and client_container_stdout .strip ().startswith ("{" )
1401
+ ):
1354
1402
# Use aggregated JSON from multi-client runner
1355
- logging .info ("Using aggregated JSON results from multi-client execution" )
1403
+ logging .info (
1404
+ "Using aggregated JSON results from multi-client execution"
1405
+ )
1356
1406
results_dict = json .loads (client_container_stdout )
1357
1407
# Print results table for multi-client
1358
1408
print_results_table_stdout (
@@ -1481,7 +1531,10 @@ def delete_temporary_files(
1481
1531
try :
1482
1532
# Try to get redis connection to display server info
1483
1533
import redis as redis_module
1484
- r = redis_module .StrictRedis (host = 'localhost' , port = 6379 , decode_responses = True )
1534
+
1535
+ r = redis_module .StrictRedis (
1536
+ host = "localhost" , port = 6379 , decode_responses = True
1537
+ )
1485
1538
r .ping () # Test connection
1486
1539
print_redis_info_section ([r ])
1487
1540
except Exception as e :
@@ -1635,24 +1688,35 @@ def print_redis_info_section(redis_conns):
1635
1688
["GCC Version" , redis_info .get ("gcc_version" , "unknown" )],
1636
1689
["Process ID" , str (redis_info .get ("process_id" , "unknown" ))],
1637
1690
["TCP Port" , str (redis_info .get ("tcp_port" , "unknown" ))],
1638
- ["Uptime (seconds)" , str (redis_info .get ("uptime_in_seconds" , "unknown" ))],
1691
+ [
1692
+ "Uptime (seconds)" ,
1693
+ str (redis_info .get ("uptime_in_seconds" , "unknown" )),
1694
+ ],
1639
1695
]
1640
1696
1641
1697
from pytablewriter import MarkdownTableWriter
1698
+
1642
1699
writer = MarkdownTableWriter (
1643
1700
table_name = "" ,
1644
1701
headers = ["Property" , "Value" ],
1645
1702
value_matrix = redis_info_data ,
1646
1703
)
1647
1704
writer .write_table ()
1648
1705
1649
- logging .info (f"Displayed Redis server information: Redis { redis_info .get ('redis_version' , 'unknown' )} " )
1706
+ logging .info (
1707
+ f"Displayed Redis server information: Redis { redis_info .get ('redis_version' , 'unknown' )} "
1708
+ )
1650
1709
except Exception as e :
1651
1710
logging .warning (f"Failed to collect Redis server information: { e } " )
1652
1711
1653
1712
1654
1713
def prepare_overall_total_test_results (
1655
- benchmark_config , default_metrics , results_dict , test_name , overall_results_matrix , redis_conns = None
1714
+ benchmark_config ,
1715
+ default_metrics ,
1716
+ results_dict ,
1717
+ test_name ,
1718
+ overall_results_matrix ,
1719
+ redis_conns = None ,
1656
1720
):
1657
1721
# check which metrics to extract
1658
1722
(
@@ -1793,11 +1857,11 @@ def data_prepopulation_step(
1793
1857
# Wait for preload container and get output
1794
1858
try :
1795
1859
exit_code = container .wait ()
1796
- client_container_stdout = container .logs ().decode (' utf-8' )
1860
+ client_container_stdout = container .logs ().decode (" utf-8" )
1797
1861
logging .info (f"Preload tool completed with exit code: { exit_code } " )
1798
1862
except Exception as wait_error :
1799
1863
logging .error (f"Preload tool wait error: { wait_error } " )
1800
- client_container_stdout = container .logs ().decode (' utf-8' )
1864
+ client_container_stdout = container .logs ().decode (" utf-8" )
1801
1865
finally :
1802
1866
# Clean up container
1803
1867
try :
0 commit comments