Skip to content

Commit 0b5a49e

Browse files
authored
Update KafkaClient.least_loaded_node (#730)
- Main node loop should check all known brokers, not just conn objects, which is consistent with the official java client. - This fixes a bug which could cause least_loaded_node to always return the same unavailable node
1 parent 6271c02 commit 0b5a49e

File tree

1 file changed

+17
-23
lines changed

1 file changed

+17
-23
lines changed

kafka/client_async.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -525,44 +525,38 @@ def least_loaded_node(self):
525525
Returns:
526526
node_id or None if no suitable node was found
527527
"""
528-
nodes = list(self._conns.keys())
528+
nodes = [broker.nodeId for broker in self.cluster.brokers()]
529529
random.shuffle(nodes)
530530

531-
# If there's a lingering bootstrap node, always try it last
532-
# really we should just kill this connection
533-
if 'bootstrap' in nodes:
534-
nodes.remove('bootstrap')
535-
nodes.append('bootstrap')
536-
537531
inflight = float('inf')
538532
found = None
539533
for node_id in nodes:
540-
conn = self._conns[node_id]
541-
curr_inflight = len(conn.in_flight_requests)
542-
if curr_inflight == 0 and conn.connected():
543-
# if we find an established connection with no in-flight requests we can stop right away
534+
conn = self._conns.get(node_id)
535+
connected = conn is not None and conn.connected()
536+
blacked_out = conn is not None and conn.blacked_out()
537+
curr_inflight = len(conn.in_flight_requests) if conn else 0
538+
if connected and curr_inflight == 0:
539+
# if we find an established connection
540+
# with no in-flight requests, we can stop right away
544541
return node_id
545-
elif not conn.blacked_out() and curr_inflight < inflight:
542+
elif not blacked_out and curr_inflight < inflight:
546543
# otherwise if this is the best we have found so far, record that
547544
inflight = curr_inflight
548545
found = node_id
549546

550547
if found is not None:
551548
return found
552549

553-
# if we found no connected node, return a disconnected one
554-
log.debug("No connected nodes found. Trying disconnected nodes.")
555-
for node_id in nodes:
556-
if not self._conns[node_id].blacked_out():
557-
return node_id
558-
559-
# if still no luck, look for a node not in self._conns yet
560-
log.debug("No luck. Trying all broker metadata")
561-
for broker in self.cluster.brokers():
562-
if broker.nodeId not in self._conns:
563-
return broker.nodeId
550+
# some broker versions return an empty list of broker metadata
551+
# if there are no topics created yet. the bootstrap process
552+
# should detect this and keep a 'bootstrap' node alive until
553+
# a non-bootstrap node is connected and non-empty broker
554+
# metadata is available
555+
elif 'bootstrap' in self._conns:
556+
return 'bootstrap'
564557

565558
# Last option: try to bootstrap again
559+
# this should only happen if no prior bootstrap has been successful
566560
log.error('No nodes found in metadata -- retrying bootstrap')
567561
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
568562
return None

0 commit comments

Comments
 (0)