Skip to content

Commit a940054

Browse files
author
Dana Powers
committed
Add warnings to README, docstring, and logging that async producer does not retry failed messages
1 parent 26042ae commit a940054

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ producer.send_messages("my-topic", "this method", "is variadic")
5151
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
5252

5353
# To send messages asynchronously
54+
# WARNING: current implementation does not guarantee message delivery on failure!
55+
# messages can get dropped! Use at your own risk! Or help us improve with a PR!
5456
producer = SimpleProducer(kafka, async=True)
5557
producer.send_messages("my-topic", "async message")
5658

@@ -63,7 +65,7 @@ producer = SimpleProducer(kafka, async=False,
6365
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
6466
ack_timeout=2000)
6567

66-
response = producer.send_messages("my-topic", "async message")
68+
response = producer.send_messages("my-topic", "another message")
6769

6870
if response:
6971
print(response[0].error)

kafka/producer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class Producer(object):
8787
client - The Kafka client instance to use
8888
async - If set to true, the messages are sent asynchronously via another
8989
thread (process). We will not wait for a response to these
90+
WARNING!!! current implementation of async producer does not
91+
guarantee message delivery. Use at your own risk! Or help us
92+
improve with a PR!
9093
req_acks - A value indicating the acknowledgements that the server must
9194
receive before responding to the request
9295
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
@@ -131,6 +134,9 @@ def __init__(self, client, async=False,
131134
self.codec = codec
132135

133136
if self.async:
137+
log.warning("async producer does not guarantee message delivery!")
138+
log.warning("Current implementation does not retry Failed messages")
139+
log.warning("Use at your own risk! (or help improve with a PR!)")
134140
self.queue = Queue() # Messages are sent through this queue
135141
self.proc = Process(target=_send_upstream,
136142
args=(self.queue,

0 commit comments

Comments
 (0)