Skip to content

Commit bd24486

Browse files
committed
More timeout_ms args in coordinator + heartbeat close
1 parent 23d21f5 commit bd24486

File tree

2 files changed

+12
-10
lines changed

2 files changed

+12
-10
lines changed

kafka/coordinator/base.py

+11-9
Original file line numberDiff line numberDiff line change
@@ -776,12 +776,12 @@ def _disable_heartbeat_thread(self):
776776
if self._heartbeat_thread is not None:
777777
self._heartbeat_thread.disable()
778778

779-
def _close_heartbeat_thread(self):
779+
def _close_heartbeat_thread(self, timeout_ms=None):
780780
with self._lock:
781781
if self._heartbeat_thread is not None:
782782
log.info('Stopping heartbeat thread')
783783
try:
784-
self._heartbeat_thread.close()
784+
self._heartbeat_thread.close(timeout_ms=timeout_ms)
785785
except ReferenceError:
786786
pass
787787
self._heartbeat_thread = None
@@ -790,13 +790,13 @@ def __del__(self):
790790
if hasattr(self, '_heartbeat_thread'):
791791
self._close_heartbeat_thread()
792792

793-
def close(self):
793+
def close(self, timeout_ms=None):
794794
"""Close the coordinator, leave the current group,
795795
and reset local generation / member_id"""
796-
self._close_heartbeat_thread()
797-
self.maybe_leave_group()
796+
self._close_heartbeat_thread(timeout_ms=timeout_ms)
797+
self.maybe_leave_group(timeout_ms=timeout_ms)
798798

799-
def maybe_leave_group(self):
799+
def maybe_leave_group(self, timeout_ms=None):
800800
"""Leave the current group and reset local generation/memberId."""
801801
with self._client._lock, self._lock:
802802
if (not self.coordinator_unknown()
@@ -811,7 +811,7 @@ def maybe_leave_group(self):
811811
future = self._client.send(self.coordinator_id, request)
812812
future.add_callback(self._handle_leave_group_response)
813813
future.add_errback(log.error, "LeaveGroup request failed: %s")
814-
self._client.poll(future=future)
814+
self._client.poll(future=future, timeout_ms=timeout_ms)
815815

816816
self.reset_generation()
817817

@@ -957,7 +957,7 @@ def disable(self):
957957
log.debug('Disabling heartbeat thread')
958958
self.enabled = False
959959

960-
def close(self):
960+
def close(self, timeout_ms=None):
961961
if self.closed:
962962
return
963963
self.closed = True
@@ -972,7 +972,9 @@ def close(self):
972972
self.coordinator._lock.notify()
973973

974974
if self.is_alive():
975-
self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
975+
if timeout_ms is None:
976+
timeout_ms = self.coordinator.config['heartbeat_interval_ms']
977+
self.join(timeout_ms / 1000)
976978
if self.is_alive():
977979
log.warning("Heartbeat thread did not fully terminate during close")
978980

kafka/coordinator/consumer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ def close(self, autocommit=True, timeout_ms=None):
449449
if autocommit:
450450
self._maybe_auto_commit_offsets_sync(timeout_ms=timeout_ms)
451451
finally:
452-
super(ConsumerCoordinator, self).close()
452+
super(ConsumerCoordinator, self).close(timeout_ms=timeout_ms)
453453

454454
def _invoke_completed_offset_commit_callbacks(self):
455455
while self.completed_offset_commits:

0 commit comments

Comments
 (0)