Skip to content

Commit f9cf628

Browse files
committed
Merge branch 'master' into prod-windows
Conflicts: kafka/producer.py
2 parents 75de0f0 + eb2c173 commit f9cf628

File tree

7 files changed

+173
-30
lines changed

7 files changed

+173
-30
lines changed

kafka/client.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
from functools import partial
44
from itertools import count
55
import logging
6+
import socket
67
import time
78

89
from kafka.common import ErrorMapping, TopicAndPartition
10+
from kafka.common import ConnectionError, FailedPayloadsException
911
from kafka.conn import KafkaConnection
1012
from kafka.protocol import KafkaProtocol
1113

@@ -71,7 +73,7 @@ def _load_metadata_for_topics(self, *topics):
7173
log.debug("Broker metadata: %s", brokers)
7274
log.debug("Topic metadata: %s", topics)
7375

74-
self.brokers.update(brokers)
76+
self.brokers = brokers
7577
self.topics_to_brokers = {}
7678

7779
for topic, partitions in topics.items():
@@ -147,13 +149,15 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
147149
for payload in payloads:
148150
leader = self._get_leader_for_partition(payload.topic,
149151
payload.partition)
150-
151152
payloads_by_broker[leader].append(payload)
152153
original_keys.append((payload.topic, payload.partition))
153154

154155
# Accumulate the responses in a dictionary
155156
acc = {}
156157

158+
# keep a list of payloads that were failed to be sent to brokers
159+
failed_payloads = []
160+
157161
# For each broker, send the list of request payloads
158162
for broker, payloads in payloads_by_broker.items():
159163
conn = self._get_conn_for_broker(broker)
@@ -162,15 +166,23 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
162166
correlation_id=requestId, payloads=payloads)
163167

164168
# Send the request, recv the response
165-
conn.send(requestId, request)
166-
167-
if decoder_fn is None:
169+
try:
170+
conn.send(requestId, request)
171+
if decoder_fn is None:
172+
continue
173+
response = conn.recv(requestId)
174+
except ConnectionError, e: # ignore BufferUnderflow for now
175+
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
176+
failed_payloads += payloads
177+
self.topics_to_brokers = {} # reset metadata
168178
continue
169179

170-
response = conn.recv(requestId)
171180
for response in decoder_fn(response):
172181
acc[(response.topic, response.partition)] = response
173182

183+
if failed_payloads:
184+
raise FailedPayloadsException(failed_payloads)
185+
174186
# Order the accumulated responses by the original key order
175187
return (acc[k] for k in original_keys) if acc else ()
176188

kafka/common.py

+5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ class ErrorMapping(object):
6969
# Exceptions #
7070
#################
7171

72+
class FailedPayloadsException(Exception):
73+
pass
74+
75+
class ConnectionError(Exception):
76+
pass
7277

7378
class BufferUnderflowError(Exception):
7479
pass

kafka/conn.py

+18-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from threading import local
66

77
from kafka.common import BufferUnderflowError
8-
8+
from kafka.common import ConnectionError
99

1010
log = logging.getLogger("kafka")
1111

@@ -27,6 +27,7 @@ def __init__(self, host, port, bufsize=4096):
2727
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2828
self._sock.connect((host, port))
2929
self._sock.settimeout(10)
30+
self._dirty = False
3031

3132
def __str__(self):
3233
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -54,7 +55,7 @@ def _consume_response_iter(self):
5455
# Read the size off of the header
5556
resp = self._sock.recv(4)
5657
if resp == "":
57-
raise Exception("Got no response from Kafka")
58+
self._raise_connection_error()
5859
(size,) = struct.unpack('>i', resp)
5960

6061
messagesize = size - 4
@@ -72,6 +73,10 @@ def _consume_response_iter(self):
7273
total += len(resp)
7374
yield resp
7475

76+
def _raise_connection_error(self):
77+
self._dirty = True
78+
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
79+
7580
##################
7681
# Public API #
7782
##################
@@ -80,14 +85,16 @@ def _consume_response_iter(self):
8085

8186
def send(self, request_id, payload):
8287
"Send a request to Kafka"
83-
84-
log.debug(
85-
"About to send %d bytes to Kafka, request %d" %
86-
(len(payload), request_id))
87-
88-
sent = self._sock.sendall(payload)
89-
if sent is not None:
90-
raise RuntimeError("Kafka went away")
88+
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
89+
try:
90+
if self._dirty:
91+
self.reinit()
92+
sent = self._sock.sendall(payload)
93+
if sent is not None:
94+
self._raise_connection_error()
95+
except socket.error:
96+
log.exception('Unable to send payload to Kafka')
97+
self._raise_connection_error()
9198

9299
def recv(self, request_id):
93100
"""
@@ -121,3 +128,4 @@ def reinit(self):
121128
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
122129
self._sock.connect((self.host, self.port))
123130
self._sock.settimeout(10)
131+
self._dirty = False

kafka/producer.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sys
88

99
from kafka.common import ProduceRequest
10+
from kafka.common import FailedPayloadsException
1011
from kafka.protocol import create_message
1112
from kafka.partitioner import HashedPartitioner
1213

@@ -67,7 +68,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
6768
acks=req_acks,
6869
timeout=ack_timeout)
6970
except Exception as exp:
70-
log.error("Error sending message", exc_info=sys.exc_info())
71+
log.exception("Unable to send message")
7172

7273

7374
class Producer(object):
@@ -140,8 +141,12 @@ def send_messages(self, partition, *msg):
140141
else:
141142
messages = [create_message(m) for m in msg]
142143
req = ProduceRequest(self.topic, partition, messages)
143-
resp = self.client.send_produce_request([req], acks=self.req_acks,
144-
timeout=self.ack_timeout)
144+
try:
145+
resp = self.client.send_produce_request([req], acks=self.req_acks,
146+
timeout=self.ack_timeout)
147+
except Exception as e:
148+
log.exception("Unable to send messages")
149+
raise e
145150
return resp
146151

147152
def stop(self, timeout=1):

test/fixtures.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ def close(self):
242242

243243
class KafkaFixture(object):
244244
@staticmethod
245-
def instance(broker_id, zk_host, zk_port, zk_chroot=None):
245+
def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
246246
if zk_chroot is None:
247247
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
248248
if "KAFKA_URI" in os.environ:
@@ -251,11 +251,11 @@ def instance(broker_id, zk_host, zk_port, zk_chroot=None):
251251
fixture = ExternalService(host, port)
252252
else:
253253
(host, port) = ("127.0.0.1", get_open_port())
254-
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot)
254+
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
255255
fixture.open()
256256
return fixture
257257

258-
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
258+
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
259259
self.host = host
260260
self.port = port
261261

@@ -265,19 +265,24 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
265265
self.zk_port = zk_port
266266
self.zk_chroot = zk_chroot
267267

268+
self.replicas = replicas
269+
self.partitions = partitions
270+
268271
self.tmp_dir = None
269272
self.child = None
270273

271274
def open(self):
272275
self.tmp_dir = tempfile.mkdtemp()
273276
print("*** Running local Kafka instance")
274-
print(" host = %s" % self.host)
275-
print(" port = %s" % self.port)
276-
print(" broker_id = %s" % self.broker_id)
277-
print(" zk_host = %s" % self.zk_host)
278-
print(" zk_port = %s" % self.zk_port)
279-
print(" zk_chroot = %s" % self.zk_chroot)
280-
print(" tmp_dir = %s" % self.tmp_dir)
277+
print(" host = %s" % self.host)
278+
print(" port = %s" % self.port)
279+
print(" broker_id = %s" % self.broker_id)
280+
print(" zk_host = %s" % self.zk_host)
281+
print(" zk_port = %s" % self.zk_port)
282+
print(" zk_chroot = %s" % self.zk_chroot)
283+
print(" replicas = %s" % self.replicas)
284+
print(" partitions = %s" % self.partitions)
285+
print(" tmp_dir = %s" % self.tmp_dir)
281286

282287
# Create directories
283288
os.mkdir(os.path.join(self.tmp_dir, "logs"))

test/resources/kafka.properties

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ socket.request.max.bytes=104857600
3232
############################# Log Basics #############################
3333

3434
log.dir={tmp_dir}/data
35-
num.partitions=2
35+
num.partitions={partitions}
36+
default.replication.factor={replicas}
3637

3738
############################# Log Flush Policy #############################
3839

test/test_integration.py

+107
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,113 @@ def test_large_messages(self):
770770
self.assertEquals(all_messages[i], message.message)
771771
self.assertEquals(i, 19)
772772

773+
class TestFailover(unittest.TestCase):
774+
775+
@classmethod
776+
def setUpClass(cls):
777+
778+
zk_chroot = random_string(10)
779+
replicas = 2
780+
partitions = 2
781+
782+
# mini zookeeper, 2 kafka brokers
783+
cls.zk = ZookeeperFixture.instance()
784+
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
785+
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
786+
cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
787+
788+
@classmethod
789+
def tearDownClass(cls):
790+
cls.client.close()
791+
for broker in cls.brokers:
792+
broker.close()
793+
cls.zk.close()
794+
795+
def test_switch_leader(self):
796+
797+
key, topic, partition = random_string(5), 'test_switch_leader', 0
798+
producer = SimpleProducer(self.client, topic)
799+
800+
for i in range(1, 4):
801+
802+
# XXX unfortunately, the conns dict needs to be warmed for this to work
803+
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
804+
self._send_random_messages(producer, 10)
805+
806+
# kil leader for partition 0
807+
broker = self._kill_leader(topic, partition)
808+
809+
# expect failure, reload meta data
810+
with self.assertRaises(FailedPayloadsException):
811+
producer.send_messages('part 1')
812+
producer.send_messages('part 2')
813+
time.sleep(1)
814+
815+
# send to new leader
816+
self._send_random_messages(producer, 10)
817+
818+
broker.open()
819+
time.sleep(3)
820+
821+
# count number of messages
822+
count = self._count_messages('test_switch_leader group %s' % i, topic)
823+
self.assertIn(count, range(20 * i, 22 * i + 1))
824+
825+
producer.stop()
826+
827+
def test_switch_leader_async(self):
828+
829+
key, topic, partition = random_string(5), 'test_switch_leader_async', 0
830+
producer = SimpleProducer(self.client, topic, async=True)
831+
832+
for i in range(1, 4):
833+
834+
self._send_random_messages(producer, 10)
835+
836+
# kil leader for partition 0
837+
broker = self._kill_leader(topic, partition)
838+
839+
# expect failure, reload meta data
840+
producer.send_messages('part 1')
841+
producer.send_messages('part 2')
842+
time.sleep(1)
843+
844+
# send to new leader
845+
self._send_random_messages(producer, 10)
846+
847+
broker.open()
848+
time.sleep(3)
849+
850+
# count number of messages
851+
count = self._count_messages('test_switch_leader_async group %s' % i, topic)
852+
self.assertIn(count, range(20 * i, 22 * i + 1))
853+
854+
producer.stop()
855+
856+
def _send_random_messages(self, producer, n):
857+
for j in range(n):
858+
resp = producer.send_messages(random_string(10))
859+
if len(resp) > 0:
860+
self.assertEquals(resp[0].error, 0)
861+
time.sleep(1) # give it some time
862+
863+
def _kill_leader(self, topic, partition):
864+
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
865+
broker = self.brokers[leader.nodeId]
866+
broker.close()
867+
time.sleep(1) # give it some time
868+
return broker
869+
870+
def _count_messages(self, group, topic):
871+
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
872+
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
873+
all_messages = []
874+
for message in consumer:
875+
all_messages.append(message)
876+
consumer.stop()
877+
client.close()
878+
return len(all_messages)
879+
773880

774881
def random_string(l):
775882
s = "".join(random.choice(string.letters) for i in xrange(l))

0 commit comments

Comments
 (0)