Skip to content

Commit eb2c173

Browse files
committed
Merge pull request dpkp#55 from quixey/fault-tolerance
Improve fault tolerance by handling leadership election and other metadata changes Thanks, @jimjh!
2 parents cfd9f86 + a6c99b2 commit eb2c173

File tree

7 files changed

+173
-30
lines changed

7 files changed

+173
-30
lines changed

kafka/client.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
from functools import partial
33
from itertools import count
44
import logging
5+
import socket
56
import time
67

78
from kafka.common import ErrorMapping, TopicAndPartition
9+
from kafka.common import ConnectionError, FailedPayloadsException
810
from kafka.conn import KafkaConnection
911
from kafka.protocol import KafkaProtocol
1012

@@ -70,7 +72,7 @@ def _load_metadata_for_topics(self, *topics):
7072
log.debug("Broker metadata: %s", brokers)
7173
log.debug("Topic metadata: %s", topics)
7274

73-
self.brokers.update(brokers)
75+
self.brokers = brokers
7476
self.topics_to_brokers = {}
7577

7678
for topic, partitions in topics.items():
@@ -146,13 +148,15 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
146148
for payload in payloads:
147149
leader = self._get_leader_for_partition(payload.topic,
148150
payload.partition)
149-
150151
payloads_by_broker[leader].append(payload)
151152
original_keys.append((payload.topic, payload.partition))
152153

153154
# Accumulate the responses in a dictionary
154155
acc = {}
155156

157+
# keep a list of payloads that were failed to be sent to brokers
158+
failed_payloads = []
159+
156160
# For each broker, send the list of request payloads
157161
for broker, payloads in payloads_by_broker.items():
158162
conn = self._get_conn_for_broker(broker)
@@ -161,15 +165,23 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
161165
correlation_id=requestId, payloads=payloads)
162166

163167
# Send the request, recv the response
164-
conn.send(requestId, request)
165-
166-
if decoder_fn is None:
168+
try:
169+
conn.send(requestId, request)
170+
if decoder_fn is None:
171+
continue
172+
response = conn.recv(requestId)
173+
except ConnectionError, e: # ignore BufferUnderflow for now
174+
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
175+
failed_payloads += payloads
176+
self.topics_to_brokers = {} # reset metadata
167177
continue
168178

169-
response = conn.recv(requestId)
170179
for response in decoder_fn(response):
171180
acc[(response.topic, response.partition)] = response
172181

182+
if failed_payloads:
183+
raise FailedPayloadsException(failed_payloads)
184+
173185
# Order the accumulated responses by the original key order
174186
return (acc[k] for k in original_keys) if acc else ()
175187

kafka/common.py

+5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ class ErrorMapping(object):
6969
# Exceptions #
7070
#################
7171

72+
class FailedPayloadsException(Exception):
73+
pass
74+
75+
class ConnectionError(Exception):
76+
pass
7277

7378
class BufferUnderflowError(Exception):
7479
pass

kafka/conn.py

+18-10
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from threading import local
55

66
from kafka.common import BufferUnderflowError
7-
7+
from kafka.common import ConnectionError
88

99
log = logging.getLogger("kafka")
1010

@@ -26,6 +26,7 @@ def __init__(self, host, port, bufsize=4096):
2626
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2727
self._sock.connect((host, port))
2828
self._sock.settimeout(10)
29+
self._dirty = False
2930

3031
def __str__(self):
3132
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -53,7 +54,7 @@ def _consume_response_iter(self):
5354
# Read the size off of the header
5455
resp = self._sock.recv(4)
5556
if resp == "":
56-
raise Exception("Got no response from Kafka")
57+
self._raise_connection_error()
5758
(size,) = struct.unpack('>i', resp)
5859

5960
messagesize = size - 4
@@ -71,6 +72,10 @@ def _consume_response_iter(self):
7172
total += len(resp)
7273
yield resp
7374

75+
def _raise_connection_error(self):
76+
self._dirty = True
77+
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
78+
7479
##################
7580
# Public API #
7681
##################
@@ -79,14 +84,16 @@ def _consume_response_iter(self):
7984

8085
def send(self, request_id, payload):
8186
"Send a request to Kafka"
82-
83-
log.debug(
84-
"About to send %d bytes to Kafka, request %d" %
85-
(len(payload), request_id))
86-
87-
sent = self._sock.sendall(payload)
88-
if sent is not None:
89-
raise RuntimeError("Kafka went away")
87+
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
88+
try:
89+
if self._dirty:
90+
self.reinit()
91+
sent = self._sock.sendall(payload)
92+
if sent is not None:
93+
self._raise_connection_error()
94+
except socket.error:
95+
log.exception('Unable to send payload to Kafka')
96+
self._raise_connection_error()
9097

9198
def recv(self, request_id):
9299
"""
@@ -110,3 +117,4 @@ def reinit(self):
110117
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
111118
self._sock.connect((self.host, self.port))
112119
self._sock.settimeout(10)
120+
self._dirty = False

kafka/producer.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sys
88

99
from kafka.common import ProduceRequest
10+
from kafka.common import FailedPayloadsException
1011
from kafka.protocol import create_message
1112
from kafka.partitioner import HashedPartitioner
1213

@@ -113,7 +114,7 @@ def _send_upstream(self, queue):
113114
self.client.send_produce_request(reqs, acks=self.req_acks,
114115
timeout=self.ack_timeout)
115116
except Exception:
116-
log.error("Error sending message", exc_info=sys.exc_info())
117+
log.exception("Unable to send message")
117118

118119
def send_messages(self, partition, *msg):
119120
"""
@@ -126,8 +127,12 @@ def send_messages(self, partition, *msg):
126127
else:
127128
messages = [create_message(m) for m in msg]
128129
req = ProduceRequest(self.topic, partition, messages)
129-
resp = self.client.send_produce_request([req], acks=self.req_acks,
130-
timeout=self.ack_timeout)
130+
try:
131+
resp = self.client.send_produce_request([req], acks=self.req_acks,
132+
timeout=self.ack_timeout)
133+
except Exception as e:
134+
log.exception("Unable to send messages")
135+
raise e
131136
return resp
132137

133138
def stop(self, timeout=1):

test/fixtures.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ def close(self):
242242

243243
class KafkaFixture(object):
244244
@staticmethod
245-
def instance(broker_id, zk_host, zk_port, zk_chroot=None):
245+
def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
246246
if zk_chroot is None:
247247
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
248248
if "KAFKA_URI" in os.environ:
@@ -251,11 +251,11 @@ def instance(broker_id, zk_host, zk_port, zk_chroot=None):
251251
fixture = ExternalService(host, port)
252252
else:
253253
(host, port) = ("127.0.0.1", get_open_port())
254-
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot)
254+
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
255255
fixture.open()
256256
return fixture
257257

258-
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
258+
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
259259
self.host = host
260260
self.port = port
261261

@@ -265,19 +265,24 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
265265
self.zk_port = zk_port
266266
self.zk_chroot = zk_chroot
267267

268+
self.replicas = replicas
269+
self.partitions = partitions
270+
268271
self.tmp_dir = None
269272
self.child = None
270273

271274
def open(self):
272275
self.tmp_dir = tempfile.mkdtemp()
273276
print("*** Running local Kafka instance")
274-
print(" host = %s" % self.host)
275-
print(" port = %s" % self.port)
276-
print(" broker_id = %s" % self.broker_id)
277-
print(" zk_host = %s" % self.zk_host)
278-
print(" zk_port = %s" % self.zk_port)
279-
print(" zk_chroot = %s" % self.zk_chroot)
280-
print(" tmp_dir = %s" % self.tmp_dir)
277+
print(" host = %s" % self.host)
278+
print(" port = %s" % self.port)
279+
print(" broker_id = %s" % self.broker_id)
280+
print(" zk_host = %s" % self.zk_host)
281+
print(" zk_port = %s" % self.zk_port)
282+
print(" zk_chroot = %s" % self.zk_chroot)
283+
print(" replicas = %s" % self.replicas)
284+
print(" partitions = %s" % self.partitions)
285+
print(" tmp_dir = %s" % self.tmp_dir)
281286

282287
# Create directories
283288
os.mkdir(os.path.join(self.tmp_dir, "logs"))

test/resources/kafka.properties

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ socket.request.max.bytes=104857600
3232
############################# Log Basics #############################
3333

3434
log.dir={tmp_dir}/data
35-
num.partitions=2
35+
num.partitions={partitions}
36+
default.replication.factor={replicas}
3637

3738
############################# Log Flush Policy #############################
3839

test/test_integration.py

+107
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,113 @@ def test_large_messages(self):
770770
self.assertEquals(all_messages[i], message.message)
771771
self.assertEquals(i, 19)
772772

773+
class TestFailover(unittest.TestCase):
774+
775+
@classmethod
776+
def setUpClass(cls):
777+
778+
zk_chroot = random_string(10)
779+
replicas = 2
780+
partitions = 2
781+
782+
# mini zookeeper, 2 kafka brokers
783+
cls.zk = ZookeeperFixture.instance()
784+
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
785+
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
786+
cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
787+
788+
@classmethod
789+
def tearDownClass(cls):
790+
cls.client.close()
791+
for broker in cls.brokers:
792+
broker.close()
793+
cls.zk.close()
794+
795+
def test_switch_leader(self):
796+
797+
key, topic, partition = random_string(5), 'test_switch_leader', 0
798+
producer = SimpleProducer(self.client, topic)
799+
800+
for i in range(1, 4):
801+
802+
# XXX unfortunately, the conns dict needs to be warmed for this to work
803+
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
804+
self._send_random_messages(producer, 10)
805+
806+
# kil leader for partition 0
807+
broker = self._kill_leader(topic, partition)
808+
809+
# expect failure, reload meta data
810+
with self.assertRaises(FailedPayloadsException):
811+
producer.send_messages('part 1')
812+
producer.send_messages('part 2')
813+
time.sleep(1)
814+
815+
# send to new leader
816+
self._send_random_messages(producer, 10)
817+
818+
broker.open()
819+
time.sleep(3)
820+
821+
# count number of messages
822+
count = self._count_messages('test_switch_leader group %s' % i, topic)
823+
self.assertIn(count, range(20 * i, 22 * i + 1))
824+
825+
producer.stop()
826+
827+
def test_switch_leader_async(self):
828+
829+
key, topic, partition = random_string(5), 'test_switch_leader_async', 0
830+
producer = SimpleProducer(self.client, topic, async=True)
831+
832+
for i in range(1, 4):
833+
834+
self._send_random_messages(producer, 10)
835+
836+
# kil leader for partition 0
837+
broker = self._kill_leader(topic, partition)
838+
839+
# expect failure, reload meta data
840+
producer.send_messages('part 1')
841+
producer.send_messages('part 2')
842+
time.sleep(1)
843+
844+
# send to new leader
845+
self._send_random_messages(producer, 10)
846+
847+
broker.open()
848+
time.sleep(3)
849+
850+
# count number of messages
851+
count = self._count_messages('test_switch_leader_async group %s' % i, topic)
852+
self.assertIn(count, range(20 * i, 22 * i + 1))
853+
854+
producer.stop()
855+
856+
def _send_random_messages(self, producer, n):
857+
for j in range(n):
858+
resp = producer.send_messages(random_string(10))
859+
if len(resp) > 0:
860+
self.assertEquals(resp[0].error, 0)
861+
time.sleep(1) # give it some time
862+
863+
def _kill_leader(self, topic, partition):
864+
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
865+
broker = self.brokers[leader.nodeId]
866+
broker.close()
867+
time.sleep(1) # give it some time
868+
return broker
869+
870+
def _count_messages(self, group, topic):
871+
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
872+
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
873+
all_messages = []
874+
for message in consumer:
875+
all_messages.append(message)
876+
consumer.stop()
877+
client.close()
878+
return len(all_messages)
879+
773880

774881
def random_string(l):
775882
s = "".join(random.choice(string.letters) for i in xrange(l))

0 commit comments

Comments
 (0)