Skip to content

Commit 64b7578

Browse files
committed
Merge pull request dpkp#31 from mahendra/lazythread
Optimize auto-commit process
2 parents 883eed1 + 119d411 commit 64b7578

File tree

2 files changed

+40
-26
lines changed

2 files changed

+40
-26
lines changed

kafka/consumer.py

+9-17
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(self, client, group, topic, auto_commit=True,
5858
# Set up the auto-commit timer
5959
if auto_commit is True and auto_commit_every_t is not None:
6060
self.commit_timer = ReentrantTimer(auto_commit_every_t,
61-
self._timed_commit)
61+
self.commit)
6262
self.commit_timer.start()
6363

6464
def get_or_init_offset_callback(resp):
@@ -150,15 +150,6 @@ def pending(self, partitions=[]):
150150

151151
return total
152152

153-
def _timed_commit(self):
154-
"""
155-
Commit offsets as part of timer
156-
"""
157-
self.commit()
158-
159-
# Once the commit is done, start the timer again
160-
self.commit_timer.start()
161-
162153
def commit(self, partitions=[]):
163154
"""
164155
Commit offsets for this consumer
@@ -167,11 +158,17 @@ def commit(self, partitions=[]):
167158
all of them
168159
"""
169160

170-
# short circuit if nothing happened
161+
# short circuit if nothing happened. This check is kept outside
162+
# to prevent un-necessarily acquiring a lock for checking the state
171163
if self.count_since_commit == 0:
172164
return
173165

174166
with self.commit_lock:
167+
# Do this check again, just in case the state has changed
168+
# during the lock acquiring timeout
169+
if self.count_since_commit == 0:
170+
return
171+
175172
reqs = []
176173
if len(partitions) == 0: # commit all partitions
177174
partitions = self.offsets.keys()
@@ -201,12 +198,7 @@ def _auto_commit(self):
201198
return
202199

203200
if self.count_since_commit > self.auto_commit_every_n:
204-
if self.commit_timer is not None:
205-
self.commit_timer.stop()
206-
self.commit()
207-
self.commit_timer.start()
208-
else:
209-
self.commit()
201+
self.commit()
210202

211203
def __iter__(self):
212204
"""

kafka/util.py

+31-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections import defaultdict
22
from itertools import groupby
33
import struct
4-
from threading import Timer
4+
from threading import Thread, Event
55

66

77
def write_int_string(s):
@@ -81,19 +81,41 @@ class ReentrantTimer(object):
8181
8282
t: timer interval in milliseconds
8383
fn: a callable to invoke
84+
args: tuple of args to be passed to function
85+
kwargs: keyword arguments to be passed to function
8486
"""
85-
def __init__(self, t, fn):
86-
self.timer = None
87-
self.t = t
87+
def __init__(self, t, fn, *args, **kwargs):
88+
89+
if t <= 0:
90+
raise ValueError('Invalid timeout value')
91+
92+
if not callable(fn):
93+
raise ValueError('fn must be callable')
94+
95+
self.thread = None
96+
self.t = t / 1000.0
8897
self.fn = fn
98+
self.args = args
99+
self.kwargs = kwargs
100+
self.active = None
101+
102+
def _timer(self, active):
103+
while not active.wait(self.t):
104+
self.fn(*self.args, **self.kwargs)
89105

90106
def start(self):
91-
if self.timer is not None:
92-
self.timer.cancel()
107+
if self.thread is not None:
108+
self.stop()
93109

94-
self.timer = Timer(self.t / 1000., self.fn)
95-
self.timer.start()
110+
self.active = Event()
111+
self.thread = Thread(target=self._timer, args=(self.active,))
112+
self.thread.daemon = True # So the app exits when main thread exits
113+
self.thread.start()
96114

97115
def stop(self):
98-
self.timer.cancel()
116+
if self.thread is None:
117+
return
118+
119+
self.active.set()
120+
self.thread.join(self.t + 1)
99121
self.timer = None

0 commit comments

Comments
 (0)