@@ -216,14 +216,17 @@ def __init__(self, conf):
216
216
self .swift_dir = conf .get ('swift_dir' , '/etc/swift' )
217
217
self .port = int (conf .get ('bind_port' , 6000 ))
218
218
self .concurrency = int (conf .get ('concurrency' , 1 ))
219
- self .timeout = conf .get ('timeout' , '5' )
220
- self .stats_interval = int (conf .get ('stats_interval' , '3600' ))
219
+ self .stats_interval = int (conf .get ('stats_interval' , '300' ))
221
220
self .object_ring = Ring (join (self .swift_dir , 'object.ring.gz' ))
222
221
self .ring_check_interval = int (conf .get ('ring_check_interval' , 15 ))
223
222
self .next_check = time .time () + self .ring_check_interval
224
223
self .reclaim_age = int (conf .get ('reclaim_age' , 86400 * 7 ))
225
224
self .partition_times = []
226
225
self .run_pause = int (conf .get ('run_pause' , 30 ))
226
+ self .rsync_timeout = int (conf .get ('rsync_timeout' , 300 ))
227
+ self .rsync_io_timeout = conf .get ('rsync_io_timeout' , '10' )
228
+ self .http_timeout = int (conf .get ('http_timeout' , 60 ))
229
+ self .lockup_timeout = int (conf .get ('lockup_timeout' , 900 ))
227
230
228
231
def _rsync (self , args ):
229
232
"""
@@ -234,14 +237,15 @@ def _rsync(self, args):
234
237
start_time = time .time ()
235
238
ret_val = None
236
239
try :
237
- with Timeout (120 ):
240
+ with Timeout (self . rsync_timeout ):
238
241
proc = subprocess .Popen (args , stdout = subprocess .PIPE ,
239
242
stderr = subprocess .STDOUT )
240
243
results = proc .stdout .read ()
241
244
ret_val = proc .wait ()
242
- finally :
243
- if ret_val is None :
244
- proc .kill ()
245
+ except Timeout :
246
+ self .logger .error ("Killing long-running rsync: %s" % str (args ))
247
+ proc .kill ()
248
+ return 1 # failure response code
245
249
total_time = time .time () - start_time
246
250
if results :
247
251
for result in results .split ('\n ' ):
@@ -259,7 +263,7 @@ def _rsync(self, args):
259
263
args [- 2 ], args [- 1 ], total_time , ret_val ))
260
264
if ret_val :
261
265
self .logger .error ('Bad rsync return code: %d' % ret_val )
262
- return ret_val , results
266
+ return ret_val
263
267
264
268
def rsync (self , node , job , suffixes ):
265
269
"""
@@ -282,8 +286,8 @@ def rsync(self, node, job, suffixes):
282
286
'--xattrs' ,
283
287
'--itemize-changes' ,
284
288
'--ignore-existing' ,
285
- '--timeout=%s' % self .timeout ,
286
- '--contimeout=%s' % self .timeout ,
289
+ '--timeout=%s' % self .rsync_io_timeout ,
290
+ '--contimeout=%s' % self .rsync_io_timeout ,
287
291
]
288
292
if self .vm_test_mode :
289
293
rsync_module = '%s::object%s' % (node ['ip' ], node ['port' ])
@@ -299,8 +303,7 @@ def rsync(self, node, job, suffixes):
299
303
return False
300
304
args .append (join (rsync_module , node ['device' ],
301
305
'objects' , job ['partition' ]))
302
- ret_val , results = self ._rsync (args )
303
- return ret_val == 0
306
+ return self ._rsync (args ) == 0
304
307
305
308
def check_ring (self ):
306
309
"""
@@ -334,7 +337,7 @@ def tpool_get_suffixes(path):
334
337
for node in job ['nodes' ]:
335
338
success = self .rsync (node , job , suffixes )
336
339
if success :
337
- with Timeout (60 ):
340
+ with Timeout (self . http_timeout ):
338
341
http_connect (node ['ip' ],
339
342
node ['port' ],
340
343
node ['device' ], job ['partition' ], 'REPLICATE' ,
@@ -371,7 +374,7 @@ def update(self, job):
371
374
node = next (nodes )
372
375
attempts_left -= 1
373
376
try :
374
- with Timeout (60 ):
377
+ with Timeout (self . http_timeout ):
375
378
resp = http_connect (node ['ip' ], node ['port' ],
376
379
node ['device' ], job ['partition' ], 'REPLICATE' ,
377
380
'' , headers = {'Content-Length' : '0' }).getresponse ()
@@ -394,7 +397,7 @@ def update(self, job):
394
397
self .rsync (node , job , suffixes )
395
398
recalculate_hashes (job ['path' ], suffixes ,
396
399
reclaim_age = self .reclaim_age )
397
- with Timeout (60 ):
400
+ with Timeout (self . http_timeout ):
398
401
conn = http_connect (node ['ip' ], node ['port' ],
399
402
node ['device' ], job ['partition' ], 'REPLICATE' ,
400
403
'/' + '-' .join (suffixes ),
@@ -448,16 +451,24 @@ def kill_coros(self):
448
451
def heartbeat (self ):
449
452
"""
450
453
Loop that runs in the background during replication. It periodically
451
- logs progress and attempts to detect lockups, killing any running
452
- coroutines if the replicator hasn't made progress since last hearbeat.
454
+ logs progress.
453
455
"""
454
456
while True :
457
+ eventlet .sleep (self .stats_interval )
458
+ self .stats_line ()
459
+
460
+ def detect_lockups (self ):
461
+ """
462
+ In testing, the pool.waitall() call very occasionally failed to return.
463
+ This is an attempt to make sure the replicator finishes its replication
464
+ pass in some eventuality.
465
+ """
466
+ while True :
467
+ eventlet .sleep (self .lockup_timeout )
455
468
if self .replication_count == self .last_replication_count :
456
469
self .logger .error ("Lockup detected.. killing live coros." )
457
470
self .kill_coros ()
458
471
self .last_replication_count = self .replication_count
459
- eventlet .sleep (300 )
460
- self .stats_line ()
461
472
462
473
def replicate (self ):
463
474
"""Run a replication pass"""
@@ -470,6 +481,7 @@ def replicate(self):
470
481
self .partition_times = []
471
482
jobs = []
472
483
stats = eventlet .spawn (self .heartbeat )
484
+ lockup_detector = eventlet .spawn (self .detect_lockups )
473
485
try :
474
486
ips = whataremyips ()
475
487
self .run_pool = GreenPool (size = self .concurrency )
@@ -508,13 +520,15 @@ def replicate(self):
508
520
self .run_pool .spawn (self .update_deleted , job )
509
521
else :
510
522
self .run_pool .spawn (self .update , job )
511
- with Timeout (120 ):
523
+ with Timeout (self . lockup_timeout ):
512
524
self .run_pool .waitall ()
513
525
except (Exception , Timeout ):
514
- self .logger .exception ("Exception while replicating " )
526
+ self .logger .exception ("Exception in top-level replication loop " )
515
527
self .kill_coros ()
516
- self .stats_line ()
517
- stats .kill ()
528
+ finally :
529
+ stats .kill ()
530
+ lockup_detector .kill ()
531
+ self .stats_line ()
518
532
519
533
def run_once (self ):
520
534
start = time .time ()
0 commit comments