Skip to content

Commit a28120a

Browse files
author
Dana Powers
committed
Raise TypeError in kafka.producer.send_messages if any msg is not a str (or subclass); document
1 parent f7be23f commit a28120a

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,14 @@ kafka = KafkaClient("localhost:9092")
4242

4343
# To send messages synchronously
4444
producer = SimpleProducer(kafka)
45+
46+
# Note that the application is responsible for encoding messages to type str
4547
producer.send_messages("my-topic", "some message")
4648
producer.send_messages("my-topic", "this method", "is variadic")
4749

50+
# Send unicode message
51+
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
52+
4853
# To send messages asynchronously
4954
producer = SimpleProducer(kafka, async=True)
5055
producer.send_messages("my-topic", "async message")
@@ -78,6 +83,8 @@ producer = SimpleProducer(kafka, batch_send=True,
7883
# To consume messages
7984
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
8085
for message in consumer:
86+
# message is raw byte string -- decode if necessary!
87+
# e.g., for unicode: `message.decode('utf-8')`
8188
print(message)
8289

8390
kafka.close()

kafka/producer.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,28 @@ def __init__(self, client, async=False,
148148
def send_messages(self, topic, partition, *msg):
149149
"""
150150
Helper method to send produce requests
151+
@param: topic, name of topic for produce request -- type str
152+
@param: partition, partition number for produce request -- type int
153+
@param: *msg, one or more message payloads -- type str
154+
@returns: ResponseRequest returned by server
155+
raises on error
156+
157+
Note that msg type *must* be encoded to str by user.
158+
Passing unicode message will not work, for example
159+
you should encode before calling send_messages via
160+
something like `unicode_message.encode('utf-8')`
161+
162+
All messages produced via this method will set the message 'key' to Null
151163
"""
164+
165+
# Guarantee that msg is actually a list or tuple (should always be true)
166+
if not isinstance(msg, (list, tuple)):
167+
raise TypeError("msg is not a list or tuple!")
168+
169+
# Raise TypeError if any message is not encoded as a str
170+
if any(not isinstance(m, str) for m in msg):
171+
raise TypeError("all produce message payloads must be type str")
172+
152173
if self.async:
153174
for m in msg:
154175
self.queue.put((TopicAndPartition(topic, partition), m))

0 commit comments

Comments
 (0)