Skip to content

Commit 40d8e9e

Browse files
committed
Fixed dpkp#42, make fetch size configurable
Was hard coded to 1024 bytes which meant that larger messages were unconsumable since they would always get split causing the consumer to stop. It would probably be best to automatically retry truncated messages with a larger request size so you don't have to know your max message size ahead of time
1 parent c3bce13 commit 40d8e9e

File tree

2 files changed

+41
-4
lines changed

2 files changed

+41
-4
lines changed

kafka/consumer.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ class SimpleConsumer(Consumer):
206206
auto_commit_every_t: default 5000. How much time (in milliseconds) to
207207
wait before commit
208208
209+
fetch_size_bytes: number of bytes to request in a FetchRequest
210+
209211
Auto commit details:
210212
If both auto_commit_every_n and auto_commit_every_t are set, they will
211213
reset one another when one is triggered. These triggers simply call the
@@ -214,11 +216,12 @@ class SimpleConsumer(Consumer):
214216
"""
215217
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
216218
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
217-
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
219+
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
220+
fetch_size_bytes=FETCH_MIN_BYTES):
218221

219222
self.partition_info = False # Do not return partition info in msgs
220223
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
221-
self.fetch_min_bytes = FETCH_MIN_BYTES
224+
self.fetch_min_bytes = fetch_size_bytes
222225
self.fetch_started = defaultdict(bool) # defaults to false
223226

224227
super(SimpleConsumer, self).__init__(client, group, topic,
@@ -243,6 +246,7 @@ def seek(self, offset, whence):
243246
1 is relative to the current offset
244247
2 is relative to the latest known offset (tail)
245248
"""
249+
246250
if whence == 1: # relative to current position
247251
for partition, _offset in self.offsets.items():
248252
self.offsets[partition] = _offset + offset
@@ -354,8 +358,7 @@ def __iter_partition__(self, partition, offset):
354358
offset += 1
355359

356360
while True:
357-
# TODO: configure fetch size
358-
req = FetchRequest(self.topic, partition, offset, 1024)
361+
req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes)
359362

360363
(resp,) = self.client.send_fetch_request([req],
361364
max_wait_time=self.fetch_max_wait_time,

test/test_integration.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import unittest
33
import time
44
from datetime import datetime
5+
import string
6+
import random
57

68
from kafka import * # noqa
79
from kafka.common import * # noqa
@@ -738,6 +740,38 @@ def test_multi_proc_pending(self):
738740

739741
consumer.stop()
740742

743+
def test_large_messages(self):
744+
# Produce 10 "normal" size messages
745+
messages1 = [create_message(random_string(1024)) for i in range(10)]
746+
produce1 = ProduceRequest("test_large_messages", 0, messages1)
747+
748+
for resp in self.client.send_produce_request([produce1]):
749+
self.assertEquals(resp.error, 0)
750+
self.assertEquals(resp.offset, 0)
751+
752+
# Produce 10 messages that are too large (bigger than default fetch size)
753+
messages2=[create_message(random_string(5000)) for i in range(10)]
754+
produce2 = ProduceRequest("test_large_messages", 0, messages2)
755+
756+
for resp in self.client.send_produce_request([produce2]):
757+
self.assertEquals(resp.error, 0)
758+
self.assertEquals(resp.offset, 10)
759+
760+
consumer = SimpleConsumer(self.client, "group1", "test_large_messages")
761+
it = consumer.__iter__()
762+
for i in range(10):
763+
self.assertEquals(messages1[i], it.next().message)
764+
765+
consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120)
766+
it = consumer.__iter__()
767+
for i in range(10):
768+
self.assertEquals(messages1[i], it.next().message)
769+
for i in range(10):
770+
self.assertEquals(messages2[i], it.next().message)
771+
772+
def random_string(l):
773+
s = "".join(random.choice(string.printable) for i in xrange(l))
774+
return s
741775

742776
if __name__ == "__main__":
743777
logging.basicConfig(level=logging.DEBUG)

0 commit comments

Comments
 (0)