Skip to content

Commit cbcb4a6

Browse files
authored
Avoid busy poll during metadata refresh failure with retry_backoff_ms (#733)
1 parent 461ccbd commit cbcb4a6

File tree

3 files changed

+147
-32
lines changed

3 files changed

+147
-32
lines changed

kafka/client_async.py

+43-30
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def __init__(self, **configs):
126126
self.cluster = ClusterMetadata(**self.config)
127127
self._topics = set() # empty set will fetch all topic metadata
128128
self._metadata_refresh_in_progress = False
129+
self._last_no_node_available_ms = 0
129130
self._selector = selectors.DefaultSelector()
130131
self._conns = {}
131132
self._connecting = set()
@@ -600,38 +601,50 @@ def _maybe_refresh_metadata(self):
600601
int: milliseconds until next refresh
601602
"""
602603
ttl = self.cluster.ttl()
603-
if ttl > 0:
604-
return ttl
604+
next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
605+
next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
606+
wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
607+
timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)
608+
609+
if timeout == 0:
610+
node_id = self.least_loaded_node()
611+
if node_id is None:
612+
log.debug("Give up sending metadata request since no node is available")
613+
# mark the timestamp for no node available to connect
614+
self._last_no_node_available_ms = time.time() * 1000
615+
return timeout
616+
617+
topics = list(self._topics)
618+
if self.cluster.need_all_topic_metadata:
619+
topics = []
605620

606-
if self._metadata_refresh_in_progress:
607-
return 9999999999
608-
609-
node_id = self.least_loaded_node()
610-
if node_id is None:
611-
return 0
612-
613-
topics = list(self._topics)
614-
if self.cluster.need_all_topic_metadata:
615-
topics = []
616-
617-
if self._can_send_request(node_id):
618-
request = MetadataRequest[0](topics)
619-
log.debug("Sending metadata request %s to node %s", request, node_id)
620-
future = self.send(node_id, request)
621-
future.add_callback(self.cluster.update_metadata)
622-
future.add_errback(self.cluster.failed_update)
623-
624-
self._metadata_refresh_in_progress = True
625-
def refresh_done(val_or_error):
626-
self._metadata_refresh_in_progress = False
627-
future.add_callback(refresh_done)
628-
future.add_errback(refresh_done)
629-
630-
elif self._can_connect(node_id):
631-
log.debug("Initializing connection to node %s for metadata request", node_id)
632-
self._maybe_connect(node_id)
621+
if self._can_send_request(node_id):
622+
request = MetadataRequest[0](topics)
623+
log.debug("Sending metadata request %s to node %s", request, node_id)
624+
future = self.send(node_id, request)
625+
future.add_callback(self.cluster.update_metadata)
626+
future.add_errback(self.cluster.failed_update)
627+
628+
self._metadata_refresh_in_progress = True
629+
def refresh_done(val_or_error):
630+
self._metadata_refresh_in_progress = False
631+
future.add_callback(refresh_done)
632+
future.add_errback(refresh_done)
633+
634+
elif self._can_connect(node_id):
635+
log.debug("Initializing connection to node %s for metadata request", node_id)
636+
self._maybe_connect(node_id)
637+
# If initiateConnect failed immediately, this node will be put into blackout and we
638+
# should allow immediately retrying in case there is another candidate node. If it
639+
# is still connecting, the worst case is that we end up setting a longer timeout
640+
# on the next round and then wait for the response.
641+
else:
642+
# connected, but can't send more OR connecting
643+
# In either case, we just need to wait for a network event to let us know the selected
644+
# connection might be usable again.
645+
self._last_no_node_available_ms = time.time() * 1000
633646

634-
return 0
647+
return timeout
635648

636649
def schedule(self, task, at):
637650
"""Schedule a new task to be executed at the given time.

kafka/cluster.py

+4
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ def ttl(self):
131131

132132
return max(ttl, next_retry, 0)
133133

134+
def refresh_backoff(self):
135+
"""Return milliseconds to wait before attempting to retry after failure"""
136+
return self.config['retry_backoff_ms']
137+
134138
def request_update(self):
135139
"""Flags metadata for update, return Future()
136140

test/test_client_async.py

+100-2
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,106 @@ def test_set_topics():
293293
pass
294294

295295

296-
def test_maybe_refresh_metadata():
297-
pass
296+
def test_maybe_refresh_metadata_ttl(mocker):
297+
mocker.patch.object(KafkaClient, '_bootstrap')
298+
_poll = mocker.patch.object(KafkaClient, '_poll')
299+
300+
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
301+
302+
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
303+
tasks.return_value = 9999999
304+
305+
ttl = mocker.patch.object(cli.cluster, 'ttl')
306+
ttl.return_value = 1234
307+
308+
cli.poll(timeout_ms=9999999, sleep=True)
309+
_poll.assert_called_with(1.234, sleep=True)
310+
311+
312+
def test_maybe_refresh_metadata_backoff(mocker):
313+
mocker.patch.object(KafkaClient, '_bootstrap')
314+
_poll = mocker.patch.object(KafkaClient, '_poll')
315+
316+
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
317+
318+
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
319+
tasks.return_value = 9999999
320+
321+
ttl = mocker.patch.object(cli.cluster, 'ttl')
322+
ttl.return_value = 0
323+
324+
now = time.time()
325+
t = mocker.patch('time.time')
326+
t.return_value = now
327+
cli._last_no_node_available_ms = now * 1000
328+
329+
cli.poll(timeout_ms=9999999, sleep=True)
330+
_poll.assert_called_with(2.222, sleep=True)
331+
332+
333+
def test_maybe_refresh_metadata_in_progress(mocker):
334+
mocker.patch.object(KafkaClient, '_bootstrap')
335+
_poll = mocker.patch.object(KafkaClient, '_poll')
336+
337+
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
338+
339+
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
340+
tasks.return_value = 9999999
341+
342+
ttl = mocker.patch.object(cli.cluster, 'ttl')
343+
ttl.return_value = 0
344+
345+
cli._metadata_refresh_in_progress = True
346+
347+
cli.poll(timeout_ms=9999999, sleep=True)
348+
_poll.assert_called_with(9999.999, sleep=True)
349+
350+
351+
def test_maybe_refresh_metadata_update(mocker):
352+
mocker.patch.object(KafkaClient, '_bootstrap')
353+
_poll = mocker.patch.object(KafkaClient, '_poll')
354+
355+
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
356+
357+
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
358+
tasks.return_value = 9999999
359+
360+
ttl = mocker.patch.object(cli.cluster, 'ttl')
361+
ttl.return_value = 0
362+
363+
mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
364+
mocker.patch.object(cli, '_can_send_request', return_value=True)
365+
send = mocker.patch.object(cli, 'send')
366+
367+
cli.poll(timeout_ms=9999999, sleep=True)
368+
_poll.assert_called_with(0, sleep=True)
369+
assert cli._metadata_refresh_in_progress
370+
request = MetadataRequest[0]([])
371+
send.assert_called_with('foobar', request)
372+
373+
374+
def test_maybe_refresh_metadata_failure(mocker):
375+
mocker.patch.object(KafkaClient, '_bootstrap')
376+
_poll = mocker.patch.object(KafkaClient, '_poll')
377+
378+
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
379+
380+
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
381+
tasks.return_value = 9999999
382+
383+
ttl = mocker.patch.object(cli.cluster, 'ttl')
384+
ttl.return_value = 0
385+
386+
mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
387+
388+
now = time.time()
389+
t = mocker.patch('time.time')
390+
t.return_value = now
391+
392+
cli.poll(timeout_ms=9999999, sleep=True)
393+
_poll.assert_called_with(0, sleep=True)
394+
assert cli._last_no_node_available_ms == now * 1000
395+
assert not cli._metadata_refresh_in_progress
298396

299397

300398
def test_schedule():

0 commit comments

Comments
 (0)