84
84
# internal limit.
85
85
MAX_COLLECT_DURATION = 30
86
86
87
- # Maximum number of consecutive rounds in which the surveyor neither sent
88
- # requests to nor received responses from any nodes. A round contains a batch of
89
- # requests sent to select nodes, followed by a wait period of 15 seconds,
90
- # followed by checking for responses and building up the next batch of requests
91
- # to send. Therefore, a setting of `8` is roughly 2 minutes of inactivity
92
- # before the script considers the survey complete. This is necessary because
93
- # it's very likely that not all surveyed nodes will respond to the survey.
94
- # Therefore, we need some cutoff after we which we assume those nodes will never
95
- # respond.
87
+ # Maximum number of consecutive rounds in which the surveyor does not receive
88
+ # responses from any nodes. A round contains a batch of requests sent to select
89
+ # nodes, followed by a wait period of 15 seconds, followed by checking for
90
+ # responses and building up the next batch of requests to send. Therefore, a
91
+ # setting of `8` is roughly 2 minutes of inactivity before the script considers
92
+ # the survey complete. This is necessary because it's very likely that not all
93
+ # surveyed nodes will respond to the survey. Therefore, we need some cutoff
94
+ # after we which we assume those nodes will never respond.
96
95
MAX_INACTIVE_ROUNDS = 8
97
96
98
97
def get_request (url , params = None ):
@@ -129,15 +128,14 @@ def get_next_peers(topology):
129
128
130
129
def update_node (graph , node_info , node_key , results , field_names ):
131
130
"""
132
- For each `(info_field, node_field)` pair in `field_names`, if `info_field`
133
- is in `node_info`, modify the node in `graph` with key `node_key` to store
134
- the value of `info_field` in `node_field`.
131
+ For each `field_name` in `field_names`, if `field_name` is in `node_info`,
132
+ modify `graph` and `results` to contain the field.
135
133
"""
136
- for ( info_field , node_field ) in field_names :
137
- if info_field in node_info :
138
- val = node_info [info_field ]
139
- results [node_field ] = val
140
- graph .add_node (node_key , ** {node_field : val })
134
+ for field_name in field_names :
135
+ if field_name in node_info :
136
+ val = node_info [field_name ]
137
+ results [field_name ] = val
138
+ graph .add_node (node_key , ** {field_name : val })
141
139
142
140
def update_results (graph , parent_info , parent_key , results , is_inbound ):
143
141
direction_tag = "inboundPeers" if is_inbound else "outboundPeers"
@@ -158,16 +156,16 @@ def update_results(graph, parent_info, parent_key, results, is_inbound):
158
156
graph .add_edge (parent_key , other_key , ** edge_properties )
159
157
160
158
# Add survey results to parent node (if available)
161
- field_names = [( "numTotalInboundPeers" , "totalInbound" ) ,
162
- ( "numTotalOutboundPeers" , "totalOutbound" ) ,
163
- ( "maxInboundPeerCount" , "maxInboundPeerCount" ) ,
164
- ( "maxOutboundPeerCount" , "maxOutboundPeerCount" ) ,
165
- ( "addedAuthenticatedPeers" , "addedAuthenticatedPeers" ) ,
166
- ( "droppedAuthenticatedPeers" , "droppedAuthenticatedPeers" ) ,
167
- ( "p75SCPFirstToSelfLatencyMs" , "p75SCPFirstToSelfLatencyMs" ) ,
168
- ( "p75SCPSelfToOtherLatencyMs" , "p75SCPSelfToOtherLatencyMs" ) ,
169
- ( "lostSyncCount" , "lostSyncCount" ) ,
170
- ( "isValidator" , "isValidator" ) ]
159
+ field_names = ["numTotalInboundPeers" ,
160
+ "numTotalOutboundPeers" ,
161
+ "maxInboundPeerCount" ,
162
+ "maxOutboundPeerCount" ,
163
+ "addedAuthenticatedPeers" ,
164
+ "droppedAuthenticatedPeers" ,
165
+ "p75SCPFirstToSelfLatencyMs" ,
166
+ "p75SCPSelfToOtherLatencyMs" ,
167
+ "lostSyncCount" ,
168
+ "isValidator" ]
171
169
update_node (graph , parent_info , parent_key , results , field_names )
172
170
173
171
@@ -187,8 +185,18 @@ def send_survey_requests(peer_list, url_base):
187
185
util .SURVEY_TOPOLOGY_TIME_SLICED_SUCCESS_START ):
188
186
logger .debug ("Send request to %s" , nodeid )
189
187
else :
190
- logger .error ("Failed to send survey request to %s: %s" ,
191
- nodeid , response .text )
188
+ try :
189
+ exception = response .json ()["exception" ]
190
+ if exception == \
191
+ util .SURVEY_TOPOLOGY_TIME_SLICED_ALREADY_IN_BACKLOG_OR_SELF :
192
+ logger .debug ("Node %s is already in backlog or is self" ,
193
+ nodeid )
194
+ else :
195
+ logger .error ("Failed to send survey request to %s: %s" ,
196
+ nodeid , exception )
197
+ except (requests .exceptions .JSONDecodeError , KeyError ):
198
+ logger .error ("Failed to send survey request to %s: %s" ,
199
+ nodeid , response .text )
192
200
193
201
logger .info ("Done sending survey requests" )
194
202
@@ -309,8 +317,8 @@ def augment(args):
309
317
def run_survey (args ):
310
318
graph = nx .DiGraph ()
311
319
merged_results = defaultdict (lambda : {
312
- "totalInbound " : 0 ,
313
- "totalOutbound " : 0 ,
320
+ "numTotalInboundPeers " : 0 ,
321
+ "numTotalOutboundPeers " : 0 ,
314
322
"maxInboundPeerCount" : 0 ,
315
323
"maxOutboundPeerCount" : 0 ,
316
324
"inboundPeers" : {},
@@ -324,6 +332,7 @@ def run_survey(args):
324
332
logger .critical ("%s" , e )
325
333
sys .exit (1 )
326
334
335
+ skip_sleep = args .simulate and args .fast
327
336
url = args .node
328
337
329
338
peers = url + "/peers"
@@ -339,10 +348,11 @@ def run_survey(args):
339
348
logger .critical ("Failed to start survey: %s" , response .text )
340
349
sys .exit (1 )
341
350
342
- # Sleep for duration of collecting phase
343
- logger .info ("Sleeping for collecting phase (%i minutes)" ,
344
- args .collect_duration )
345
- time .sleep (args .collect_duration * 60 )
351
+ if not skip_sleep :
352
+ # Sleep for duration of collecting phase
353
+ logger .info ("Sleeping for collecting phase (%i minutes)" ,
354
+ args .collect_duration )
355
+ time .sleep (args .collect_duration * 60 )
346
356
347
357
# Stop survey recording
348
358
logger .info ("Stopping survey collecting" )
@@ -351,12 +361,13 @@ def run_survey(args):
351
361
logger .critical ("Failed to stop survey: %s" , response .text )
352
362
sys .exit (1 )
353
363
354
- # Allow time for stop message to propagate
355
- sleep_time = 60
356
- logger .info (
357
- "Waiting %i seconds for 'stop collecting' message to propagate" ,
358
- sleep_time )
359
- time .sleep (sleep_time )
364
+ if not skip_sleep :
365
+ # Allow time for stop message to propagate
366
+ sleep_time = 60
367
+ logger .info (
368
+ "Waiting %i seconds for 'stop collecting' message to propagate" ,
369
+ sleep_time )
370
+ time .sleep (sleep_time )
360
371
361
372
peer_list = set ()
362
373
if args .nodeList :
@@ -387,16 +398,14 @@ def run_survey(args):
387
398
388
399
sent_requests = set ()
389
400
heard_from = set ()
401
+ incomplete_responses = set ()
390
402
391
403
# Number of consecutive rounds in which surveyor neither sent requests nor
392
404
# received responses
393
405
inactive_rounds = 0
394
406
395
407
while True :
396
- if peer_list :
397
- inactive_rounds = 0
398
- else :
399
- inactive_rounds += 1
408
+ inactive_rounds += 1
400
409
401
410
send_survey_requests (peer_list , url )
402
411
@@ -405,25 +414,33 @@ def run_survey(args):
405
414
406
415
peer_list = set ()
407
416
408
- # allow time for results. Stellar-core sends out a batch of requests
409
- # every 15 seconds, so there's not much benefit in checking more
410
- # frequently than that
411
- sleep_time = 15
412
- logger .info ("Waiting %i seconds for survey results" , sleep_time )
413
- time .sleep (sleep_time )
417
+ if not skip_sleep :
418
+ # allow time for results. Stellar-core sends out a batch of requests
419
+ # every 15 seconds, so there's not much benefit in checking more
420
+ # frequently than that
421
+ sleep_time = 15
422
+ logger .info ("Waiting %i seconds for survey results" , sleep_time )
423
+ time .sleep (sleep_time )
414
424
415
425
logger .info ("Fetching survey result" )
416
426
data = get_request (url = survey_result ).json ()
417
427
logger .info ("Done fetching result" )
418
428
419
429
if "topology" in data :
420
430
for key in data ["topology" ]:
421
- if data ["topology" ][key ] is not None :
431
+ node_data = data ["topology" ][key ]
432
+ if node_data is not None :
422
433
if key not in heard_from :
423
434
# Received a new response!
424
435
logger .debug ("Received response from %s" , key )
425
436
inactive_rounds = 0
426
437
heard_from .add (key )
438
+ elif key in incomplete_responses and len (node_data ) > 0 :
439
+ # Received additional data for a node that previously
440
+ # responded
441
+ logger .debug ("Received additional data for %s" , key )
442
+ inactive_rounds = 0
443
+ incomplete_responses .remove (key )
427
444
428
445
waiting_to_hear = set ()
429
446
for node in sent_requests :
@@ -455,11 +472,11 @@ def run_survey(args):
455
472
node = merged_results [key ]
456
473
have_inbound = len (node ["inboundPeers" ])
457
474
have_outbound = len (node ["outboundPeers" ])
458
- if (node ["totalInbound " ] > have_inbound or
459
- node ["totalOutbound " ] > have_outbound ):
460
- peer_list .add (util . PendingRequest ( key ,
461
- have_inbound ,
462
- have_outbound ) )
475
+ if (node ["numTotalInboundPeers " ] > have_inbound or
476
+ node ["numTotalOutboundPeers " ] > have_outbound ):
477
+ incomplete_responses .add (key )
478
+ req = util . PendingRequest ( key , have_inbound , have_outbound )
479
+ peer_list . add ( req )
463
480
logger .info ("New nodes: %s Gathering additional peer data: %s" ,
464
481
new_peers , len (peer_list )- new_peers )
465
482
@@ -554,6 +571,10 @@ def main():
554
571
"--simRoot" ,
555
572
required = True ,
556
573
help = "node to start simulation from" )
574
+ parser_simulate .add_argument ("-f" ,
575
+ "--fast" ,
576
+ action = "store_true" ,
577
+ help = "Skip sleep calls during simulation." )
557
578
parser_simulate .set_defaults (simulate = True )
558
579
559
580
parser_analyze = subparsers .add_parser ('analyze' ,
0 commit comments