Skip to content

Commit 0f4ee69

Browse files
author
Dana Powers
committed
Merge pull request #136 from DataDog/fix-multifetch-buffer-size
fix consumer retry logic (fixes #135) Conflicts: kafka/consumer.py
2 parents 67dd121 + a798c1e commit 0f4ee69

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

kafka/consumer.py

+14-12
Original file line numberDiff line numberDiff line change
@@ -416,47 +416,49 @@ def __iter__(self):
416416

417417
def _fetch(self):
418418
# Create fetch request payloads for all the partitions
419-
requests = []
420-
partitions = self.fetch_offsets.keys()
419+
partitions = dict((p, self.buffer_size)
420+
for p in self.fetch_offsets.keys())
421421
while partitions:
422-
for partition in partitions:
422+
requests = []
423+
for partition, buffer_size in partitions.iteritems():
423424
requests.append(FetchRequest(self.topic, partition,
424425
self.fetch_offsets[partition],
425-
self.buffer_size))
426+
buffer_size))
426427
# Send request
427428
responses = self.client.send_fetch_request(
428429
requests,
429430
max_wait_time=int(self.fetch_max_wait_time),
430431
min_bytes=self.fetch_min_bytes)
431432

432-
retry_partitions = set()
433+
retry_partitions = {}
433434
for resp in responses:
434435
partition = resp.partition
436+
buffer_size = partitions[partition]
435437
try:
436438
for message in resp.messages:
437439
# Put the message in our queue
438440
self.queue.put((partition, message))
439441
self.fetch_offsets[partition] = message.offset + 1
440442
except ConsumerFetchSizeTooSmall:
441443
if (self.max_buffer_size is not None and
442-
self.buffer_size == self.max_buffer_size):
444+
buffer_size == self.max_buffer_size):
443445
log.error("Max fetch size %d too small",
444446
self.max_buffer_size)
445447
raise
446448
if self.max_buffer_size is None:
447-
self.buffer_size *= 2
449+
buffer_size *= 2
448450
else:
449-
self.buffer_size = max(self.buffer_size * 2,
450-
self.max_buffer_size)
451+
buffer_size = max(buffer_size * 2,
452+
self.max_buffer_size)
451453
log.warn("Fetch size too small, increase to %d (2x) "
452-
"and retry", self.buffer_size)
453-
retry_partitions.add(partition)
454+
"and retry", buffer_size)
455+
retry_partitions[partition] = buffer_size
454456
except ConsumerNoMoreData as e:
455457
log.debug("Iteration was ended by %r", e)
456458
except StopIteration:
457459
# Stop iterating through this partition
458460
log.debug("Done iterating over partition %s" % partition)
459-
partitions = retry_partitions
461+
partitions = retry_partitions
460462

461463
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
462464
"""

0 commit comments

Comments
 (0)