10
10
11
11
from kafka .common import *
12
12
from kafka .conn import KafkaConnection
13
- from kafka .protocol import KafkaProtocol
13
+ from kafka .protocol import KafkaProtocol
14
14
15
15
log = logging .getLogger ("kafka" )
16
16
17
+
17
18
class KafkaClient (object ):
18
19
19
20
CLIENT_ID = "kafka-python"
20
- ID_GEN = count ()
21
+ ID_GEN = count ()
21
22
22
23
def __init__ (self , host , port , bufsize = 4096 ):
23
- # We need one connection to bootstrap
24
+ # We need one connection to bootstrap
24
25
self .bufsize = bufsize
25
- self .conns = { # (host, port) -> KafkaConnection
26
+ self .conns = { # (host, port) -> KafkaConnection
26
27
(host , port ): KafkaConnection (host , port , bufsize )
27
- }
28
- self .brokers = {} # broker_id -> BrokerMetadata
29
- self .topics_to_brokers = {} # topic_id -> broker_id
30
- self .topic_partitions = defaultdict (list ) # topic_id -> [0, 1, 2, ...]
28
+ }
29
+ self .brokers = {} # broker_id -> BrokerMetadata
30
+ self .topics_to_brokers = {} # topic_id -> broker_id
31
+ self .topic_partitions = defaultdict (list ) # topic_id -> [0, 1, 2, ...]
31
32
self ._load_metadata_for_topics ()
32
33
33
34
##################
34
35
# Private API #
35
36
##################
36
37
37
-
38
38
def _get_conn_for_broker (self , broker ):
39
39
"Get or create a connection to a broker"
40
40
if (broker .host , broker .port ) not in self .conns :
41
- self .conns [(broker .host , broker .port )] = KafkaConnection (broker .host , broker .port , self .bufsize )
41
+ self .conns [(broker .host , broker .port )] = \
42
+ KafkaConnection (broker .host , broker .port , self .bufsize )
43
+
42
44
return self .conns [(broker .host , broker .port )]
43
45
44
46
def _get_leader_for_partition (self , topic , partition ):
45
47
key = TopicAndPartition (topic , partition )
46
48
if key not in self .topics_to_brokers :
47
49
self ._load_metadata_for_topics (topic )
50
+
48
51
if key not in self .topics_to_brokers :
49
52
raise Exception ("Partition does not exist: %s" % str (key ))
53
+
50
54
return self .topics_to_brokers [key ]
51
55
52
56
def _load_metadata_for_topics (self , * topics ):
@@ -55,13 +59,18 @@ def _load_metadata_for_topics(self, *topics):
55
59
recurse in the event of a retry.
56
60
"""
57
61
requestId = self ._next_id ()
58
- request = KafkaProtocol .encode_metadata_request (KafkaClient .CLIENT_ID , requestId , topics )
62
+ request = KafkaProtocol .encode_metadata_request (KafkaClient .CLIENT_ID ,
63
+ requestId , topics )
64
+
59
65
response = self ._send_broker_unaware_request (requestId , request )
60
66
if response is None :
61
67
raise Exception ("All servers failed to process request" )
68
+
62
69
(brokers , topics ) = KafkaProtocol .decode_metadata_response (response )
70
+
63
71
log .debug ("Broker metadata: %s" , brokers )
64
72
log .debug ("Topic metadata: %s" , topics )
73
+
65
74
self .brokers .update (brokers )
66
75
self .topics_to_brokers = {}
67
76
for topic , partitions in topics .items ():
@@ -77,7 +86,8 @@ def _load_metadata_for_topics(self, *topics):
77
86
time .sleep (1 )
78
87
self ._load_metadata_for_topics (topic )
79
88
else :
80
- self .topics_to_brokers [TopicAndPartition (topic , partition )] = brokers [meta .leader ]
89
+ topic_part = TopicAndPartition (topic , partition )
90
+ self .topics_to_brokers [topic_part ] = brokers [meta .leader ]
81
91
self .topic_partitions [topic ].append (partition )
82
92
83
93
def _next_id (self ):
@@ -86,41 +96,52 @@ def _next_id(self):
86
96
87
97
def _send_broker_unaware_request (self , requestId , request ):
88
98
"""
89
- Attempt to send a broker-agnostic request to one of the available brokers.
90
- Keep trying until you succeed.
99
+ Attempt to send a broker-agnostic request to one of the available
100
+ brokers. Keep trying until you succeed.
91
101
"""
92
102
for conn in self .conns .values ():
93
103
try :
94
104
conn .send (requestId , request )
95
105
response = conn .recv (requestId )
96
106
return response
97
107
except Exception , e :
98
- log .warning ("Could not send request [%r] to server %s, trying next server: %s" % (request , conn , e ))
108
+ log .warning ("Could not send request [%r] to server %s, "
109
+ "trying next server: %s" % (request , conn , e ))
99
110
continue
111
+
100
112
return None
101
113
102
114
def _send_broker_aware_request (self , payloads , encoder_fn , decoder_fn ):
103
115
"""
104
- Group a list of request payloads by topic+partition and send them to the
105
- leader broker for that partition using the supplied encode/decode functions
116
+ Group a list of request payloads by topic+partition and send them to
117
+ the leader broker for that partition using the supplied encode/decode
118
+ functions
106
119
107
120
Params
108
121
======
109
- payloads: list of object-like entities with a topic and partition attribute
110
- encode_fn: a method to encode the list of payloads to a request body, must accept
111
- client_id, correlation_id, and payloads as keyword arguments
112
- decode_fn: a method to decode a response body into response objects. The response
113
- objects must be object-like and have topic and partition attributes
122
+ payloads: list of object-like entities with a topic and
123
+ partition attribute
124
+ encode_fn: a method to encode the list of payloads to a request body,
125
+ must accept client_id, correlation_id, and payloads as
126
+ keyword arguments
127
+ decode_fn: a method to decode a response body into response objects.
128
+ The response objects must be object-like and have topic
129
+ and partition attributes
114
130
115
131
Return
116
132
======
117
133
List of response objects in the same order as the supplied payloads
118
134
"""
135
+
119
136
# Group the requests by topic+partition
120
137
original_keys = []
121
138
payloads_by_broker = defaultdict (list )
139
+
122
140
for payload in payloads :
123
- payloads_by_broker [self ._get_leader_for_partition (payload .topic , payload .partition )].append (payload )
141
+ leader = self ._get_leader_for_partition (payload .topic ,
142
+ payload .partition )
143
+
144
+ payloads_by_broker [leader ].append (payload )
124
145
original_keys .append ((payload .topic , payload .partition ))
125
146
126
147
# Accumulate the responses in a dictionary
@@ -130,7 +151,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
130
151
for broker , payloads in payloads_by_broker .items ():
131
152
conn = self ._get_conn_for_broker (broker )
132
153
requestId = self ._next_id ()
133
- request = encoder_fn (client_id = KafkaClient .CLIENT_ID , correlation_id = requestId , payloads = payloads )
154
+ request = encoder_fn (client_id = KafkaClient .CLIENT_ID ,
155
+ correlation_id = requestId , payloads = payloads )
134
156
135
157
# Send the request, recv the response
136
158
conn .send (requestId , request )
@@ -149,100 +171,127 @@ def close(self):
149
171
for conn in self .conns .values ():
150
172
conn .close ()
151
173
152
- def send_produce_request (self , payloads = [], acks = 1 , timeout = 1000 , fail_on_error = True , callback = None ):
174
+ def send_produce_request (self , payloads = [], acks = 1 , timeout = 1000 ,
175
+ fail_on_error = True , callback = None ):
153
176
"""
154
177
Encode and send some ProduceRequests
155
178
156
- ProduceRequests will be grouped by (topic, partition) and then sent to a specific
157
- broker. Output is a list of responses in the same order as the list of payloads
158
- specified
179
+ ProduceRequests will be grouped by (topic, partition) and then
180
+ sent to a specific broker. Output is a list of responses in the
181
+ same order as the list of payloads specified
159
182
160
183
Params
161
184
======
162
185
payloads: list of ProduceRequest
163
- fail_on_error: boolean, should we raise an Exception if we encounter an API error?
164
- callback: function, instead of returning the ProduceResponse, first pass it through this function
186
+ fail_on_error: boolean, should we raise an Exception if we
187
+ encounter an API error?
188
+ callback: function, instead of returning the ProduceResponse,
189
+ first pass it through this function
165
190
166
191
Return
167
192
======
168
- list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
193
+ list of ProduceResponse or callback(ProduceResponse), in the
194
+ order of input payloads
169
195
"""
170
- resps = self ._send_broker_aware_request (payloads ,
171
- partial (KafkaProtocol .encode_produce_request , acks = acks , timeout = timeout ),
172
- KafkaProtocol .decode_produce_response )
196
+
197
+ encoder = partial (KafkaProtocol .encode_produce_request ,
198
+ acks = acks , timeout = timeout )
199
+ decoder = KafkaProtocol .decode_produce_response
200
+ resps = self ._send_broker_aware_request (payloads , encoder , decoder )
201
+
173
202
out = []
174
203
for resp in resps :
175
204
# Check for errors
176
- if fail_on_error == True and resp .error != ErrorMapping .NO_ERROR :
177
- raise Exception ("ProduceRequest for %s failed with errorcode=%d" %
178
- (TopicAndPartition (resp .topic , resp .partition ), resp .error ))
205
+ if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
206
+ raise Exception ("ProduceRequest for %s failed with "
207
+ "errorcode=%d" % (
208
+ TopicAndPartition (resp .topic , resp .partition ),
209
+ resp .error ))
210
+
179
211
# Run the callback
180
212
if callback is not None :
181
213
out .append (callback (resp ))
182
214
else :
183
215
out .append (resp )
184
216
return out
185
217
186
- def send_fetch_request (self , payloads = [], fail_on_error = True , callback = None ):
218
+ def send_fetch_request (self , payloads = [], fail_on_error = True ,
219
+ callback = None ):
187
220
"""
188
221
Encode and send a FetchRequest
189
-
190
- Payloads are grouped by topic and partition so they can be pipelined to the same
191
- brokers.
222
+
223
+ Payloads are grouped by topic and partition so they can be pipelined
224
+ to the same brokers.
192
225
"""
193
226
resps = self ._send_broker_aware_request (payloads ,
194
227
KafkaProtocol .encode_fetch_request ,
195
228
KafkaProtocol .decode_fetch_response )
229
+
196
230
out = []
197
231
for resp in resps :
198
232
# Check for errors
199
- if fail_on_error == True and resp .error != ErrorMapping .NO_ERROR :
200
- raise Exception ("FetchRequest for %s failed with errorcode=%d" %
201
- (TopicAndPartition (resp .topic , resp .partition ), resp .error ))
233
+ if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
234
+ raise Exception ("FetchRequest for %s failed with "
235
+ "errorcode=%d" % (
236
+ TopicAndPartition (resp .topic , resp .partition ),
237
+ resp .error ))
238
+
202
239
# Run the callback
203
240
if callback is not None :
204
241
out .append (callback (resp ))
205
242
else :
206
243
out .append (resp )
207
244
return out
208
245
209
-
210
- def send_offset_request ( self , payloads = [], fail_on_error = True , callback = None ):
246
+ def send_offset_request ( self , payloads = [], fail_on_error = True ,
247
+ callback = None ):
211
248
resps = self ._send_broker_aware_request (payloads ,
212
249
KafkaProtocol .encode_offset_request ,
213
250
KafkaProtocol .decode_offset_response )
251
+
214
252
out = []
215
253
for resp in resps :
216
- if fail_on_error == True and resp .error != ErrorMapping .NO_ERROR :
217
- raise Exception ("OffsetRequest failed with errorcode=%s" , resp .error )
254
+ if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
255
+ raise Exception ("OffsetRequest failed with errorcode=%s" ,
256
+ resp .error )
218
257
if callback is not None :
219
258
out .append (callback (resp ))
220
259
else :
221
260
out .append (resp )
222
261
return out
223
262
224
- def send_offset_commit_request (self , group , payloads = [], fail_on_error = True , callback = None ):
225
- resps = self ._send_broker_aware_request (payloads ,
226
- partial (KafkaProtocol .encode_offset_commit_request , group = group ),
227
- KafkaProtocol .decode_offset_commit_response )
263
+ def send_offset_commit_request (self , group , payloads = [],
264
+ fail_on_error = True , callback = None ):
265
+ encoder = partial (KafkaProtocol .encode_offset_commit_request ,
266
+ group = group )
267
+ decoder = KafkaProtocol .decode_offset_commit_response
268
+ resps = self ._send_broker_aware_request (payloads , encoder , decoder )
269
+
228
270
out = []
229
271
for resp in resps :
230
- if fail_on_error == True and resp .error != ErrorMapping .NO_ERROR :
231
- raise Exception ("OffsetCommitRequest failed with errorcode=%s" , resp .error )
272
+ if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
273
+ raise Exception ("OffsetCommitRequest failed with "
274
+ "errorcode=%s" , resp .error )
275
+
232
276
if callback is not None :
233
277
out .append (callback (resp ))
234
278
else :
235
279
out .append (resp )
236
280
return out
237
281
238
- def send_offset_fetch_request (self , group , payloads = [], fail_on_error = True , callback = None ):
239
- resps = self ._send_broker_aware_request (payloads ,
240
- partial (KafkaProtocol .encode_offset_fetch_request , group = group ),
241
- KafkaProtocol .decode_offset_fetch_response )
282
+ def send_offset_fetch_request (self , group , payloads = [],
283
+ fail_on_error = True , callback = None ):
284
+
285
+ encoder = partial (KafkaProtocol .encode_offset_fetch_request ,
286
+ group = group )
287
+ decoder = KafkaProtocol .decode_offset_fetch_response
288
+ resps = self ._send_broker_aware_request (payloads , encoder , decoder )
289
+
242
290
out = []
243
291
for resp in resps :
244
- if fail_on_error == True and resp .error != ErrorMapping .NO_ERROR :
245
- raise Exception ("OffsetCommitRequest failed with errorcode=%s" , resp .error )
292
+ if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
293
+ raise Exception ("OffsetCommitRequest failed with errorcode=%s" ,
294
+ resp .error )
246
295
if callback is not None :
247
296
out .append (callback (resp ))
248
297
else :
0 commit comments