Skip to content

Commit a96bc9c

Browse files
authored
Call default_offset_commit_callback after _maybe_auto_commit_offsets_async (#2546)
1 parent 3493380 commit a96bc9c

File tree

1 file changed

+11
-12
lines changed

1 file changed

+11
-12
lines changed

Diff for: kafka/coordinator/consumer.py

+11-12
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__(self, client, subscription, metrics, **configs):
5454
auto_commit_interval_ms (int): milliseconds between automatic
5555
offset commits, if enable_auto_commit is True. Default: 5000.
5656
default_offset_commit_callback (callable): called as
57-
callback(offsets, exception) response will be either an Exception
57+
callback(offsets, response) response will be either an Exception
5858
or None. This callback can be used to trigger custom actions when
5959
a commit request completes.
6060
assignors (list): List of objects to use to distribute partition
@@ -453,8 +453,8 @@ def close(self, autocommit=True, timeout_ms=None):
453453

454454
def _invoke_completed_offset_commit_callbacks(self):
455455
while self.completed_offset_commits:
456-
callback, offsets, exception = self.completed_offset_commits.popleft()
457-
callback(offsets, exception)
456+
callback, offsets, res_or_exc = self.completed_offset_commits.popleft()
457+
callback(offsets, res_or_exc)
458458

459459
def commit_offsets_async(self, offsets, callback=None):
460460
"""Commit specific offsets asynchronously.
@@ -859,20 +859,19 @@ def _handle_offset_fetch_response(self, future, response):
859859
" %s", self.group_id, tp)
860860
future.success(offsets)
861861

862-
def _default_offset_commit_callback(self, offsets, exception):
863-
if exception is not None:
864-
log.error("Offset commit failed: %s", exception)
865-
866-
def _commit_offsets_async_on_complete(self, offsets, exception):
867-
if exception is not None:
862+
def _default_offset_commit_callback(self, offsets, res_or_exc):
863+
if isinstance(res_or_exc, Exception):
868864
log.warning("Auto offset commit failed for group %s: %s",
869-
self.group_id, exception)
870-
if getattr(exception, 'retriable', False):
871-
self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
865+
self.group_id, res_or_exc)
872866
else:
873867
log.debug("Completed autocommit of offsets %s for group %s",
874868
offsets, self.group_id)
875869

870+
def _commit_offsets_async_on_complete(self, offsets, res_or_exc):
871+
if isinstance(res_or_exc, Exception) and getattr(res_or_exc, 'retriable', False):
872+
self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
873+
self.config['default_offset_commit_callback'](offsets, res_or_exc)
874+
876875
def _maybe_auto_commit_offsets_async(self):
877876
if self.config['enable_auto_commit']:
878877
if self.coordinator_unknown():

0 commit comments

Comments
 (0)