Skip to content

Commit b0c87ee

Browse files
committed
Optimize auto-commit thread
The previous commit optimized the commit thread such that the timer started only when there were messages to be consumed. This commit goes a step further and ensures the following: * Only one timer thread is created * The main app does not block on exit (waiting for timer thread to finish) This is ensured by having a single thread blocking on an event and keeps calling a function. We use events instead of time.sleep() so as to prevent the python interpreter from running every 50ms checking if the timer has expired (logic copied from threading.Timer)
1 parent a4601d3 commit b0c87ee

File tree

2 files changed

+41
-33
lines changed

2 files changed

+41
-33
lines changed

kafka/consumer.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ def __init__(self, client, group, topic, auto_commit=True,
5858
# Set up the auto-commit timer
5959
if auto_commit is True and auto_commit_every_t is not None:
6060
self.commit_timer = ReentrantTimer(auto_commit_every_t,
61-
self._timed_commit)
61+
self.commit)
62+
self.commit_timer.start()
6263

6364
def get_or_init_offset_callback(resp):
6465
if resp.error == ErrorMapping.NO_ERROR:
@@ -149,15 +150,6 @@ def pending(self, partitions=[]):
149150

150151
return total
151152

152-
def _timed_commit(self):
153-
"""
154-
Commit offsets as part of timer
155-
"""
156-
self.commit()
157-
158-
# Once the commit is done, start the timer again
159-
# self.commit_timer.start()
160-
161153
def commit(self, partitions=[]):
162154
"""
163155
Commit offsets for this consumer
@@ -166,11 +158,17 @@ def commit(self, partitions=[]):
166158
all of them
167159
"""
168160

169-
# short circuit if nothing happened
161+
# short circuit if nothing happened. This check is kept outside
162+
# to prevent un-necessarily acquiring a lock for checking the state
170163
if self.count_since_commit == 0:
171164
return
172165

173166
with self.commit_lock:
167+
# Do this check again, just in case the state has changed
168+
# during the lock acquiring timeout
169+
if self.count_since_commit == 0:
170+
return
171+
174172
reqs = []
175173
if len(partitions) == 0: # commit all partitions
176174
partitions = self.offsets.keys()
@@ -200,12 +198,7 @@ def _auto_commit(self):
200198
return
201199

202200
if self.count_since_commit > self.auto_commit_every_n:
203-
if self.commit_timer is not None:
204-
self.commit_timer.stop()
205-
self.commit()
206-
self.commit_timer.start()
207-
else:
208-
self.commit()
201+
self.commit()
209202

210203
def __iter__(self):
211204
"""
@@ -223,10 +216,6 @@ def __iter__(self):
223216
if len(iters) == 0:
224217
break
225218

226-
# Now that we are consuming data, start the commit thread
227-
if self.commit_timer and not self.commit_timer.is_active:
228-
self.commit_timer.start()
229-
230219
for partition, it in iters.items():
231220
try:
232221
yield it.next()

kafka/util.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections import defaultdict
22
from itertools import groupby
33
import struct
4-
from threading import Timer
4+
from threading import Thread, Event
55

66

77
def write_int_string(s):
@@ -81,22 +81,41 @@ class ReentrantTimer(object):
8181
8282
t: timer interval in milliseconds
8383
fn: a callable to invoke
84+
args: tuple of args to be passed to function
85+
kwargs: keyword arguments to be passed to function
8486
"""
85-
def __init__(self, t, fn):
86-
self.timer = None
87-
self.t = t
87+
def __init__(self, t, fn, *args, **kwargs):
88+
89+
if t <= 0:
90+
raise ValueError('Invalid timeout value')
91+
92+
if not callable(fn):
93+
raise ValueError('fn must be callable')
94+
95+
self.thread = None
96+
self.t = t / 1000.0
8897
self.fn = fn
89-
self.is_active = False
98+
self.args = args
99+
self.kwargs = kwargs
100+
self.active = None
101+
102+
def _timer(self, active):
103+
while not active.wait(self.t):
104+
self.fn(*self.args, **self.kwargs)
90105

91106
def start(self):
92-
if self.timer is not None:
93-
self.timer.cancel()
107+
if self.thread is not None:
108+
self.stop()
94109

95-
self.timer = Timer(self.t / 1000., self.fn)
96-
self.is_active = True
97-
self.timer.start()
110+
self.active = Event()
111+
self.thread = Thread(target=self._timer, args=(self.active))
112+
self.thread.daemon = True # So the app exits when main thread exits
113+
self.thread.start()
98114

99115
def stop(self):
100-
self.timer.cancel()
116+
if self.thread is None:
117+
return
118+
119+
self.active.set()
120+
self.thread.join(self.t + 1)
101121
self.timer = None
102-
self.is_active = False

0 commit comments

Comments
 (0)