@@ -46,7 +46,18 @@ class HostDistance(object):
46
46
connections opened to it.
47
47
"""
48
48
49
- LOCAL = 0
49
+ LOCAL_RACK = 0
50
+ """
51
+ Nodes with ``LOCAL_RACK`` distance will be preferred for operations
52
+ under some load balancing policies (such as :class:`.RackAwareRoundRobinPolicy`)
53
+ and will have a greater number of connections opened against
54
+ them by default.
55
+
56
+ This distance is typically used for nodes within the same
57
+ datacenter and the same rack as the client.
58
+ """
59
+
60
+ LOCAL = 1
50
61
"""
51
62
Nodes with ``LOCAL`` distance will be preferred for operations
52
63
under some load balancing policies (such as :class:`.DCAwareRoundRobinPolicy`)
@@ -57,12 +68,12 @@ class HostDistance(object):
57
68
datacenter as the client.
58
69
"""
59
70
60
- REMOTE = 1
71
+ REMOTE = 2
61
72
"""
62
73
Nodes with ``REMOTE`` distance will be treated as a last resort
63
- by some load balancing policies (such as :class:`.DCAwareRoundRobinPolicy`)
64
- and will have a smaller number of connections opened against
65
- them by default.
74
+ by some load balancing policies (such as :class:`.DCAwareRoundRobinPolicy`
75
+ and :class:`.RackAwareRoundRobinPolicy`)and will have a smaller number of
76
+ connections opened against them by default.
66
77
67
78
This distance is typically used for nodes outside of the
68
79
datacenter that the client is running in.
@@ -316,6 +327,113 @@ def on_add(self, host):
316
327
def on_remove (self , host ):
317
328
self .on_down (host )
318
329
330
+ class RackAwareRoundRobinPolicy (LoadBalancingPolicy ):
331
+ """
332
+ Similar to :class:`.DCAwareRoundRobinPolicy`, but prefers hosts
333
+ in the local rack, before hosts in the local datacenter but a
334
+ different rack, before hosts in all other datercentres
335
+ """
336
+
337
+ local_dc = None
338
+ local_rack = None
339
+ used_hosts_per_remote_dc = 0
340
+
341
+ def __init__ (self , local_dc , local_rack , used_hosts_per_remote_dc = 0 ):
342
+ """
343
+ The `local_dc` and `local_rack` parameters should be the name of the
344
+ datacenter and rack (such as is reported by ``nodetool ring``) that
345
+ should be considered local.
346
+
347
+ `used_hosts_per_remote_dc` controls how many nodes in
348
+ each remote datacenter will have connections opened
349
+ against them. In other words, `used_hosts_per_remote_dc` hosts
350
+ will be considered :attr:`~.HostDistance.REMOTE` and the
351
+ rest will be considered :attr:`~.HostDistance.IGNORED`.
352
+ By default, all remote hosts are ignored.
353
+ """
354
+ self .local_rack = local_rack
355
+ self .local_dc = local_dc
356
+ self .used_hosts_per_remote_dc = used_hosts_per_remote_dc
357
+ self ._live_hosts = {}
358
+ self ._dc_live_hosts = {}
359
+ self ._endpoints = []
360
+ LoadBalancingPolicy .__init__ (self )
361
+
362
+ def _rack (self , host ):
363
+ return host .rack or self .local_rack
364
+
365
+ def _dc (self , host ):
366
+ return host .datacenter or self .local_dc
367
+
368
+ def populate (self , cluster , hosts ):
369
+ for (dc , rack ), rack_hosts in groupby (hosts , lambda host : (self ._dc (host ), self ._rack (host ))):
370
+ self ._live_hosts [(dc , rack )] = list (rack_hosts )
371
+ for dc , dc_hosts in groupby (hosts , lambda host : self ._dc (host )):
372
+ self ._dc_live_hosts [dc ] = list (dc_hosts )
373
+
374
+ def distance (self , host ):
375
+ rack = self ._rack (host )
376
+ dc = self ._dc (host )
377
+ if rack == self .local_rack and dc == self .local_dc :
378
+ return HostDistance .LOCAL_RACK
379
+
380
+ if dc == self .local_dc :
381
+ return HostDistance .LOCAL
382
+
383
+ if not self .used_hosts_per_remote_dc :
384
+ return HostDistance .IGNORED
385
+ else :
386
+ dc_hosts = self ._dc_live_hosts .get (dc , ())
387
+ if not dc_hosts :
388
+ return HostDistance .IGNORED
389
+
390
+ if host in dc_hosts [:self .used_hosts_per_remote_dc ]:
391
+
392
+ return HostDistance .REMOTE
393
+ else :
394
+ return HostDistance .IGNORED
395
+
396
+ def make_query_plan (self , working_keyspace = None , query = None ):
397
+
398
+ local_rack_live = self ._live_hosts .get ((self .local_dc , self .local_rack ), [])
399
+ # Slice the cyclic iterator to start from pos and include the next len(local_live) elements
400
+ # This ensures we get exactly one full cycle starting from pos
401
+ for host in local_rack_live [:]:
402
+ yield host
403
+
404
+ local_live = [host for host in self ._dc_live_hosts .get (self .local_dc , []) if host .rack != self .local_rack ]
405
+ for host in local_live [:]:
406
+ yield host
407
+
408
+ # the dict can change, so get candidate DCs iterating over keys of a copy
409
+ for dc , remote_live in self ._dc_live_hosts .copy ().items ():
410
+ if dc != self .local_dc :
411
+ for host in remote_live [:self .used_hosts_per_remote_dc ]:
412
+ yield host
413
+
414
+ def on_up (self , host ):
415
+ dc = self ._dc (host )
416
+ rack = self ._rack (host )
417
+ with self ._hosts_lock :
418
+ self ._live_hosts .setdefault ((dc , rack ), []).append (host )
419
+ self ._dc_live_hosts .setdefault (dc , []).append (host )
420
+
421
+ def on_down (self , host ):
422
+ dc = self ._dc (host )
423
+ rack = self ._rack (host )
424
+ with self ._hosts_lock :
425
+ self ._live_hosts [(dc , rack )].remove (host )
426
+ self ._dc_live_hosts [dc ].remove (host )
427
+ if not self ._live_hosts [(dc , rack )]:
428
+ del self ._live_hosts [(dc , rack )]
429
+ if not self ._dc_live_hosts [dc ]:
430
+ del self ._dc_live_hosts [dc ]
431
+
432
+ def on_add (self , host ):
433
+ self .on_up (host )
434
+
435
+ def on_remove (self , host ):
436
+ self .on_down (host )
319
437
320
438
class TokenAwarePolicy (LoadBalancingPolicy ):
321
439
"""
@@ -396,7 +514,7 @@ def make_query_plan(self, working_keyspace=None, query=None):
396
514
shuffle (replicas )
397
515
for replica in replicas :
398
516
if replica .is_up and \
399
- child .distance (replica ) == HostDistance .LOCAL :
517
+ ( child .distance (replica ) == HostDistance .LOCAL or child . distance ( replica ) == HostDistance . LOCAL_RACK ) :
400
518
yield replica
401
519
402
520
for host in child .make_query_plan (keyspace , query ):
0 commit comments