Skip to content

Commit a798c1e

Browse files
author
Carlo Cabanilla
committed
fix consumer retry logic (fixes #135)
Fixes bug in the follow condition: * Starting buffer size is 1024, max buffer size is 2048, both set on an instance level * Fetch from p0, p1 and received response * p0 has more than 1024 bytes, consumer doubles buffer size to 2048 and marks p0 for retry * p1 has more than 1024 bytes, consumer tries to double buffer size, but sees that it's at the max and raises ConsumerFetchSizeTooSmall The fix changes the logic to the following: * Starting buffer size is 1024 set on a per-partition level, max buffer size is 2048 set on an instance level * Fetch from p0, p1 and received response * p0 has more than 1024 bytes, consumer doubles buffer size to 2048 for p0 and marks p0 for retry * p1 has more than 1024 bytes, consumer double buffer size to 2048 for p1 and marks p1 for retry * Consumer sees that there's partitions to retry, repeats parsing loop * p0 sent all the bytes this time, consumer yields these messages * p1 sent all the bytes this time, consumer yields these messages
1 parent 4abf7ee commit a798c1e

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

kafka/consumer.py

+14-12
Original file line numberDiff line numberDiff line change
@@ -404,47 +404,49 @@ def __iter__(self):
404404

405405
def _fetch(self):
406406
# Create fetch request payloads for all the partitions
407-
requests = []
408-
partitions = self.fetch_offsets.keys()
407+
partitions = dict((p, self.buffer_size)
408+
for p in self.fetch_offsets.keys())
409409
while partitions:
410-
for partition in partitions:
410+
requests = []
411+
for partition, buffer_size in partitions.iteritems():
411412
requests.append(FetchRequest(self.topic, partition,
412413
self.fetch_offsets[partition],
413-
self.buffer_size))
414+
buffer_size))
414415
# Send request
415416
responses = self.client.send_fetch_request(
416417
requests,
417418
max_wait_time=int(self.fetch_max_wait_time),
418419
min_bytes=self.fetch_min_bytes)
419420

420-
retry_partitions = set()
421+
retry_partitions = {}
421422
for resp in responses:
422423
partition = resp.partition
424+
buffer_size = partitions[partition]
423425
try:
424426
for message in resp.messages:
425427
# Put the message in our queue
426428
self.queue.put((partition, message))
427429
self.fetch_offsets[partition] = message.offset + 1
428430
except ConsumerFetchSizeTooSmall, e:
429431
if (self.max_buffer_size is not None and
430-
self.buffer_size == self.max_buffer_size):
432+
buffer_size == self.max_buffer_size):
431433
log.error("Max fetch size %d too small",
432434
self.max_buffer_size)
433435
raise e
434436
if self.max_buffer_size is None:
435-
self.buffer_size *= 2
437+
buffer_size *= 2
436438
else:
437-
self.buffer_size = max(self.buffer_size * 2,
438-
self.max_buffer_size)
439+
buffer_size = max(buffer_size * 2,
440+
self.max_buffer_size)
439441
log.warn("Fetch size too small, increase to %d (2x) "
440-
"and retry", self.buffer_size)
441-
retry_partitions.add(partition)
442+
"and retry", buffer_size)
443+
retry_partitions[partition] = buffer_size
442444
except ConsumerNoMoreData, e:
443445
log.debug("Iteration was ended by %r", e)
444446
except StopIteration:
445447
# Stop iterating through this partition
446448
log.debug("Done iterating over partition %s" % partition)
447-
partitions = retry_partitions
449+
partitions = retry_partitions
448450

449451
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
450452
"""

0 commit comments

Comments
 (0)