Skip to content

Commit c9d9d0a

Browse files
committed
Resolve conflicts for dpkp#106
2 parents 6db14de + bcd5539 commit c9d9d0a

File tree

1 file changed

+70
-39
lines changed

1 file changed

+70
-39
lines changed

kafka/consumer.py

+70-39
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import absolute_import
22

3-
from collections import defaultdict
43
from itertools import izip_longest, repeat
54
import logging
65
import time
@@ -235,6 +234,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
235234
buffer_size=FETCH_BUFFER_SIZE_BYTES,
236235
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
237236
iter_timeout=None):
237+
super(SimpleConsumer, self).__init__(
238+
client, group, topic,
239+
partitions=partitions,
240+
auto_commit=auto_commit,
241+
auto_commit_every_n=auto_commit_every_n,
242+
auto_commit_every_t=auto_commit_every_t)
238243

239244
if max_buffer_size is not None and buffer_size > max_buffer_size:
240245
raise ValueError("buffer_size (%d) is greater than "
@@ -245,17 +250,10 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
245250
self.partition_info = False # Do not return partition info in msgs
246251
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
247252
self.fetch_min_bytes = fetch_size_bytes
248-
self.fetch_started = defaultdict(bool) # defaults to false
253+
self.fetch_offsets = self.offsets.copy()
249254
self.iter_timeout = iter_timeout
250255
self.queue = Queue()
251256

252-
super(SimpleConsumer, self).__init__(
253-
client, group, topic,
254-
partitions=partitions,
255-
auto_commit=auto_commit,
256-
auto_commit_every_n=auto_commit_every_n,
257-
auto_commit_every_t=auto_commit_every_t)
258-
259257
def __repr__(self):
260258
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
261259
(self.group, self.topic, str(self.offsets.keys()))
@@ -305,6 +303,10 @@ def seek(self, offset, whence):
305303
else:
306304
raise ValueError("Unexpected value for `whence`, %d" % whence)
307305

306+
# Reset queue and fetch offsets since they are invalid
307+
self.fetch_offsets = self.offsets.copy()
308+
self.queue = Queue()
309+
308310
def get_messages(self, count=1, block=True, timeout=0.1):
309311
"""
310312
Fetch the specified number of messages
@@ -316,33 +318,69 @@ def get_messages(self, count=1, block=True, timeout=0.1):
316318
it will block forever.
317319
"""
318320
messages = []
319-
if timeout:
321+
if timeout is not None:
320322
max_time = time.time() + timeout
321323

324+
new_offsets = {}
322325
while count > 0 and (timeout is None or timeout > 0):
323-
message = self.get_message(block, timeout)
324-
if message:
325-
messages.append(message)
326+
result = self._get_message(block, timeout, get_partition_info=True,
327+
update_offset=False)
328+
if result:
329+
partition, message = result
330+
if self.partition_info:
331+
messages.append(result)
332+
else:
333+
messages.append(message)
334+
new_offsets[partition] = message.offset + 1
326335
count -= 1
327336
else:
328337
# Ran out of messages for the last request.
329338
if not block:
330339
# If we're not blocking, break.
331340
break
332-
if timeout:
341+
if timeout is not None:
333342
# If we're blocking and have a timeout, reduce it to the
334343
# appropriate value
335344
timeout = max_time - time.time()
336345

346+
# Update and commit offsets if necessary
347+
self.offsets.update(new_offsets)
348+
self.count_since_commit += len(messages)
349+
self._auto_commit()
337350
return messages
338351

339-
def get_message(self, block=True, timeout=0.1):
352+
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
353+
return self._get_message(block, timeout, get_partition_info)
354+
355+
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
356+
update_offset=True):
357+
"""
358+
If no messages can be fetched, returns None.
359+
If get_partition_info is None, it defaults to self.partition_info
360+
If get_partition_info is True, returns (partition, message)
361+
If get_partition_info is False, returns message
362+
"""
340363
if self.queue.empty():
341364
# We're out of messages, go grab some more.
342365
with FetchContext(self, block, timeout):
343366
self._fetch()
344367
try:
345-
return self.queue.get_nowait()
368+
partition, message = self.queue.get_nowait()
369+
370+
if update_offset:
371+
# Update partition offset
372+
self.offsets[partition] = message.offset + 1
373+
374+
# Count, check and commit messages if necessary
375+
self.count_since_commit += 1
376+
self._auto_commit()
377+
378+
if get_partition_info is None:
379+
get_partition_info = self.partition_info
380+
if get_partition_info:
381+
return partition, message
382+
else:
383+
return message
346384
except Empty:
347385
return None
348386

@@ -367,11 +405,11 @@ def __iter__(self):
367405
def _fetch(self):
368406
# Create fetch request payloads for all the partitions
369407
requests = []
370-
partitions = self.offsets.keys()
408+
partitions = self.fetch_offsets.keys()
371409
while partitions:
372410
for partition in partitions:
373411
requests.append(FetchRequest(self.topic, partition,
374-
self.offsets[partition],
412+
self.fetch_offsets[partition],
375413
self.buffer_size))
376414
# Send request
377415
responses = self.client.send_fetch_request(
@@ -384,18 +422,9 @@ def _fetch(self):
384422
partition = resp.partition
385423
try:
386424
for message in resp.messages:
387-
# Update partition offset
388-
self.offsets[partition] = message.offset + 1
389-
390-
# Count, check and commit messages if necessary
391-
self.count_since_commit += 1
392-
self._auto_commit()
393-
394425
# Put the message in our queue
395-
if self.partition_info:
396-
self.queue.put((partition, message))
397-
else:
398-
self.queue.put(message)
426+
self.queue.put((partition, message))
427+
self.fetch_offsets[partition] = message.offset + 1
399428
except ConsumerFetchSizeTooSmall, e:
400429
if (self.max_buffer_size is not None and
401430
self.buffer_size == self.max_buffer_size):
@@ -585,12 +614,11 @@ def __iter__(self):
585614
break
586615

587616
# Count, check and commit messages if necessary
588-
self.offsets[partition] = message.offset
617+
self.offsets[partition] = message.offset + 1
589618
self.start.clear()
590-
yield message
591-
592619
self.count_since_commit += 1
593620
self._auto_commit()
621+
yield message
594622

595623
self.start.clear()
596624

@@ -613,9 +641,10 @@ def get_messages(self, count=1, block=True, timeout=10):
613641
self.size.value = count
614642
self.pause.clear()
615643

616-
if timeout:
644+
if timeout is not None:
617645
max_time = time.time() + timeout
618646

647+
new_offsets = {}
619648
while count > 0 and (timeout is None or timeout > 0):
620649
# Trigger consumption only if the queue is empty
621650
# By doing this, we will ensure that consumers do not
@@ -630,16 +659,18 @@ def get_messages(self, count=1, block=True, timeout=10):
630659
break
631660

632661
messages.append(message)
633-
634-
# Count, check and commit messages if necessary
635-
self.offsets[partition] = message.offset
636-
self.count_since_commit += 1
637-
self._auto_commit()
662+
new_offsets[partition] = message.offset + 1
638663
count -= 1
639-
timeout = max_time - time.time()
664+
if timeout is not None:
665+
timeout = max_time - time.time()
640666

641667
self.size.value = 0
642668
self.start.clear()
643669
self.pause.set()
644670

671+
# Update and commit offsets if necessary
672+
self.offsets.update(new_offsets)
673+
self.count_since_commit += len(messages)
674+
self._auto_commit()
675+
645676
return messages

0 commit comments

Comments
 (0)