Skip to content

Commit a4601d3

Browse files
committed
Spawn the commit thread only if necessary
If there are no messages being consumed, the timer keeps creating new threads at the specified intervals. This may not be necessary. We can control this behaviour such that the timer thread is started only when a message is consumed
1 parent 77b8301 commit a4601d3

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

kafka/consumer.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ def __init__(self, client, group, topic, auto_commit=True,
5959
if auto_commit is True and auto_commit_every_t is not None:
6060
self.commit_timer = ReentrantTimer(auto_commit_every_t,
6161
self._timed_commit)
62-
self.commit_timer.start()
6362

6463
def get_or_init_offset_callback(resp):
6564
if resp.error == ErrorMapping.NO_ERROR:
@@ -157,7 +156,7 @@ def _timed_commit(self):
157156
self.commit()
158157

159158
# Once the commit is done, start the timer again
160-
self.commit_timer.start()
159+
# self.commit_timer.start()
161160

162161
def commit(self, partitions=[]):
163162
"""
@@ -224,6 +223,10 @@ def __iter__(self):
224223
if len(iters) == 0:
225224
break
226225

226+
# Now that we are consuming data, start the commit thread
227+
if self.commit_timer and not self.commit_timer.is_active:
228+
self.commit_timer.start()
229+
227230
for partition, it in iters.items():
228231
try:
229232
yield it.next()

kafka/util.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,17 @@ def __init__(self, t, fn):
8686
self.timer = None
8787
self.t = t
8888
self.fn = fn
89+
self.is_active = False
8990

9091
def start(self):
9192
if self.timer is not None:
9293
self.timer.cancel()
9394

9495
self.timer = Timer(self.t / 1000., self.fn)
96+
self.is_active = True
9597
self.timer.start()
9698

9799
def stop(self):
98100
self.timer.cancel()
99101
self.timer = None
102+
self.is_active = False

0 commit comments

Comments
 (0)