Skip to content

Commit 461ccbd

Browse files
authored
check_version should scan nodes until version found or timeout (#731)
* Mute all connection logging during conn.check_version * Always process pending MetadataRequest in conn.check_version * KakfaClient.check_version: Scan all brokers until a version is identified or timeout
1 parent 0b5a49e commit 461ccbd

File tree

2 files changed

+46
-17
lines changed

2 files changed

+46
-17
lines changed

kafka/client_async.py

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -662,20 +662,49 @@ def unschedule(self, task):
662662
self._delayed_tasks.remove(task)
663663

664664
def check_version(self, node_id=None, timeout=2, strict=False):
665-
"""Attempt to guess the broker version"""
666-
if node_id is None:
667-
node_id = self.least_loaded_node()
668-
if node_id is None:
665+
"""Attempt to guess a broker version
666+
667+
Note: it is possible that this method blocks longer than the
668+
specified timeout. This can happen if the entire cluster
669+
is down and the client enters a bootstrap backoff sleep.
670+
This is only possible if node_id is None.
671+
672+
Returns: version str, i.e. '0.10', '0.9', '0.8.2', '0.8.1', '0.8.0'
673+
674+
Raises:
675+
NodeNotReadyError (if node_id is provided)
676+
NoBrokersAvailable (if node_id is None)
677+
UnrecognizedBrokerVersion: please file bug if seen!
678+
AssertionError (if strict=True): please file bug if seen!
679+
"""
680+
end = time.time() + timeout
681+
while time.time() < end:
682+
683+
# It is possible that least_loaded_node falls back to bootstrap,
684+
# which can block for an increasing backoff period
685+
try_node = node_id or self.least_loaded_node()
686+
if try_node is None:
669687
raise Errors.NoBrokersAvailable()
688+
self._maybe_connect(try_node)
689+
conn = self._conns[try_node]
670690

671-
# We will be intentionally causing socket failures
672-
# and should not trigger metadata refresh
673-
self._refresh_on_disconnects = False
674-
self._maybe_connect(node_id)
675-
conn = self._conns[node_id]
676-
version = conn.check_version()
677-
self._refresh_on_disconnects = True
678-
return version
691+
# We will intentionally cause socket failures
692+
# These should not trigger metadata refresh
693+
self._refresh_on_disconnects = False
694+
try:
695+
remaining = end - time.time()
696+
version = conn.check_version(timeout=remaining, strict=strict)
697+
return version
698+
except Errors.NodeNotReadyError:
699+
# Only raise to user if this is a node-specific request
700+
if node_id is not None:
701+
raise
702+
finally:
703+
self._refresh_on_disconnects = True
704+
705+
# Timeout
706+
else:
707+
raise Errors.NoBrokersAvailable()
679708

680709
def wakeup(self):
681710
if self._wake_w.send(b'x') != 1:

kafka/conn.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -561,9 +561,9 @@ def check_version(self, timeout=2, strict=False):
561561

562562
class ConnFilter(Filter):
563563
def filter(self, record):
564-
if record.funcName in ('recv', 'send'):
565-
return False
566-
return True
564+
if record.funcName == 'check_version':
565+
return True
566+
return False
567567
log_filter = ConnFilter()
568568
log.addFilter(log_filter)
569569

@@ -598,11 +598,11 @@ def connect():
598598
# the attempt to write to a disconnected socket should
599599
# immediately fail and allow us to infer that the prior
600600
# request was unrecognized
601-
self.send(MetadataRequest[0]([]))
601+
mr = self.send(MetadataRequest[0]([]))
602602

603603
if self._sock:
604604
self._sock.setblocking(True)
605-
while not f.is_done:
605+
while not (f.is_done and mr.is_done):
606606
self.recv()
607607
if self._sock:
608608
self._sock.setblocking(False)

0 commit comments

Comments
 (0)