Skip to content

Commit 7c83059

Browse files
committed
Merge pull request #213 from dpkp/improve_failover_tests
Warn users about async producer Refactor producer failover tests (5x speedup) Skip async producer failover test for now, because it is broken
2 parents 99a4f2b + a940054 commit 7c83059

File tree

3 files changed

+76
-46
lines changed

3 files changed

+76
-46
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ producer.send_messages("my-topic", "this method", "is variadic")
5454
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
5555

5656
# To send messages asynchronously
57+
# WARNING: current implementation does not guarantee message delivery on failure!
58+
# messages can get dropped! Use at your own risk! Or help us improve with a PR!
5759
producer = SimpleProducer(kafka, async=True)
5860
producer.send_messages("my-topic", "async message")
5961

@@ -66,7 +68,7 @@ producer = SimpleProducer(kafka, async=False,
6668
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
6769
ack_timeout=2000)
6870

69-
response = producer.send_messages("my-topic", "async message")
71+
response = producer.send_messages("my-topic", "another message")
7072

7173
if response:
7274
print(response[0].error)

kafka/producer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class Producer(object):
8787
client - The Kafka client instance to use
8888
async - If set to true, the messages are sent asynchronously via another
8989
thread (process). We will not wait for a response to these
90+
WARNING!!! current implementation of async producer does not
91+
guarantee message delivery. Use at your own risk! Or help us
92+
improve with a PR!
9093
req_acks - A value indicating the acknowledgements that the server must
9194
receive before responding to the request
9295
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
@@ -131,6 +134,9 @@ def __init__(self, client, async=False,
131134
self.codec = codec
132135

133136
if self.async:
137+
log.warning("async producer does not guarantee message delivery!")
138+
log.warning("Current implementation does not retry Failed messages")
139+
log.warning("Use at your own risk! (or help improve with a PR!)")
134140
self.queue = Queue() # Messages are sent through this queue
135141
self.proc = Process(target=_send_upstream,
136142
args=(self.queue,

test/test_failover_integration.py

Lines changed: 67 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
import logging
12
import os
23
import time
4+
import unittest2
35

46
from kafka import * # noqa
57
from kafka.common import * # noqa
8+
from kafka.producer import Producer
69
from fixtures import ZookeeperFixture, KafkaFixture
710
from testutil import *
811

12+
913
class TestFailover(KafkaIntegrationTestCase):
1014
create_client = False
1115

@@ -39,82 +43,100 @@ def tearDownClass(cls):
3943
@kafka_versions("all")
4044
def test_switch_leader(self):
4145
key, topic, partition = random_string(5), self.topic, 0
42-
producer = SimpleProducer(self.client)
4346

44-
for i in range(1, 4):
47+
# Test the base class Producer -- send_messages to a specific partition
48+
producer = Producer(self.client, async=False)
4549

46-
# XXX unfortunately, the conns dict needs to be warmed for this to work
47-
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
48-
self._send_random_messages(producer, self.topic, 10)
50+
# Send 10 random messages
51+
self._send_random_messages(producer, topic, partition, 10)
4952

50-
# kil leader for partition 0
51-
broker = self._kill_leader(topic, partition)
53+
# kill leader for partition
54+
broker = self._kill_leader(topic, partition)
5255

53-
# expect failure, reload meta data
54-
with self.assertRaises(FailedPayloadsError):
55-
producer.send_messages(self.topic, 'part 1')
56-
producer.send_messages(self.topic, 'part 2')
57-
time.sleep(1)
56+
# expect failure, but dont wait more than 60 secs to recover
57+
recovered = False
58+
started = time.time()
59+
timeout = 60
60+
while not recovered and (time.time() - started) < timeout:
61+
try:
62+
logging.debug("attempting to send 'success' message after leader killed")
63+
producer.send_messages(topic, partition, 'success')
64+
logging.debug("success!")
65+
recovered = True
66+
except FailedPayloadsError, ConnectionError:
67+
logging.debug("caught exception sending message -- will retry")
68+
continue
5869

59-
# send to new leader
60-
self._send_random_messages(producer, self.topic, 10)
70+
# Verify we successfully sent the message
71+
self.assertTrue(recovered)
6172

62-
broker.open()
63-
time.sleep(3)
73+
# send some more messages to new leader
74+
self._send_random_messages(producer, topic, partition, 10)
6475

65-
# count number of messages
66-
count = self._count_messages('test_switch_leader group %s' % i, topic)
67-
self.assertIn(count, range(20 * i, 22 * i + 1))
76+
# count number of messages
77+
count = self._count_messages('test_switch_leader group', topic, partition)
6878

69-
producer.stop()
79+
# Should be equal to 10 before + 1 recovery + 10 after
80+
self.assertEquals(count, 21)
7081

71-
@kafka_versions("all")
82+
83+
#@kafka_versions("all")
84+
@unittest2.skip("async producer does not support reliable failover yet")
7285
def test_switch_leader_async(self):
7386
key, topic, partition = random_string(5), self.topic, 0
74-
producer = SimpleProducer(self.client, async=True)
75-
76-
for i in range(1, 4):
7787

78-
self._send_random_messages(producer, self.topic, 10)
88+
# Test the base class Producer -- send_messages to a specific partition
89+
producer = Producer(self.client, async=True)
7990

80-
# kil leader for partition 0
81-
broker = self._kill_leader(topic, partition)
91+
# Send 10 random messages
92+
self._send_random_messages(producer, topic, partition, 10)
8293

83-
# expect failure, reload meta data
84-
producer.send_messages(self.topic, 'part 1')
85-
producer.send_messages(self.topic, 'part 2')
86-
time.sleep(1)
94+
# kill leader for partition
95+
broker = self._kill_leader(topic, partition)
8796

88-
# send to new leader
89-
self._send_random_messages(producer, self.topic, 10)
97+
logging.debug("attempting to send 'success' message after leader killed")
9098

91-
broker.open()
92-
time.sleep(3)
99+
# in async mode, this should return immediately
100+
producer.send_messages(topic, partition, 'success')
93101

94-
# count number of messages
95-
count = self._count_messages('test_switch_leader_async group %s' % i, topic)
96-
self.assertIn(count, range(20 * i, 22 * i + 1))
102+
# send to new leader
103+
self._send_random_messages(producer, topic, partition, 10)
97104

105+
# wait until producer queue is empty
106+
while not producer.queue.empty():
107+
time.sleep(0.1)
98108
producer.stop()
99109

100-
def _send_random_messages(self, producer, topic, n):
110+
# count number of messages
111+
count = self._count_messages('test_switch_leader_async group', topic, partition)
112+
113+
# Should be equal to 10 before + 1 recovery + 10 after
114+
self.assertEquals(count, 21)
115+
116+
117+
def _send_random_messages(self, producer, topic, partition, n):
101118
for j in range(n):
102-
resp = producer.send_messages(topic, random_string(10))
119+
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
120+
resp = producer.send_messages(topic, partition, random_string(10))
103121
if len(resp) > 0:
104122
self.assertEquals(resp[0].error, 0)
105-
time.sleep(1) # give it some time
123+
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
106124

107125
def _kill_leader(self, topic, partition):
108126
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
109127
broker = self.brokers[leader.nodeId]
110128
broker.close()
111-
time.sleep(1) # give it some time
112129
return broker
113130

114-
def _count_messages(self, group, topic):
115-
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
131+
def _count_messages(self, group, topic, timeout=1):
132+
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
133+
for broker in self.brokers])
134+
116135
client = KafkaClient(hosts)
117-
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
136+
consumer = SimpleConsumer(client, group, topic,
137+
auto_commit=False,
138+
iter_timeout=timeout)
139+
118140
all_messages = []
119141
for message in consumer:
120142
all_messages.append(message)

0 commit comments

Comments
 (0)