@@ -31,24 +31,25 @@ def __str__(self):
31
31
ProduceRequest = namedtuple ("ProduceRequest" , ["topic" , "partition" , "messages" ])
32
32
OffsetRequest = namedtuple ("OffsetRequest" , ["topic" , "partition" , "time" , "maxOffsets" ])
33
33
34
- def gzip_compress (payload ):
34
+ def gzip_encode (payload ):
35
35
buf = StringIO ()
36
- f = gzip .GzipFile (fileobj = buf , mode = 'w' )
36
+ f = gzip .GzipFile (fileobj = buf , mode = 'w' , compresslevel = 6 )
37
37
f .write (payload )
38
38
f .close ()
39
39
buf .seek (0 )
40
40
out = buf .read ()
41
41
buf .close ()
42
42
return out
43
43
44
- def gzip_decompress (payload ):
44
+ def gzip_decode (payload ):
45
45
buf = StringIO (payload )
46
46
f = gzip .GzipFile (fileobj = buf , mode = 'r' )
47
47
out = f .read ()
48
48
f .close ()
49
49
buf .close ()
50
50
return out
51
51
52
+
52
53
def length_prefix_message (msg ):
53
54
"""
54
55
Prefix a message with it's length as an int
@@ -84,9 +85,10 @@ class KafkaClient(object):
84
85
85
86
ATTRIBUTE_CODEC_MASK = 0x03
86
87
87
- def __init__ (self , host , port ):
88
+ def __init__ (self , host , port , bufsize = 1024 ):
88
89
self .host = host
89
90
self .port = port
91
+ self .bufsize = bufsize
90
92
self ._sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
91
93
self ._sock .connect ((host , port ))
92
94
self ._sock .settimeout (10 )
@@ -117,7 +119,7 @@ def _consume_response_iter(self):
117
119
# Response iterator
118
120
total = 0
119
121
while total < (size - 2 ):
120
- resp = self ._sock .recv (1024 )
122
+ resp = self ._sock .recv (self . bufsize )
121
123
log .debug ("Read %d bytes from Kafka" , len (resp ))
122
124
if resp == "" :
123
125
raise Exception ("Underflow" )
@@ -133,7 +135,8 @@ def _consume_response(self):
133
135
data += chunk
134
136
return data
135
137
136
- def encode_message (self , message ):
138
+ @classmethod
139
+ def encode_message (cls , message ):
137
140
"""
138
141
Encode a Message from a Message tuple
139
142
@@ -163,20 +166,26 @@ def encode_message(self, message):
163
166
msg = struct .pack ('>BBi%ds' % len (message .payload ),
164
167
message .magic , message .attributes , message .crc , message .payload )
165
168
else :
166
- raise Exception ("Unknown message version : %d" % message .magic )
169
+ raise Exception ("Unexpected magic number : %d" % message .magic )
167
170
msg = length_prefix_message (msg )
168
171
log .debug ("Encoded %s as %r" % (message , msg ))
169
172
return msg
170
173
171
- def encode_message_set (self , messages ):
172
- # TODO document
174
+ @classmethod
175
+ def encode_message_set (cls , messages ):
176
+ """
177
+ Encode a MessageSet
178
+
179
+ One or more concatenated Messages
180
+ """
173
181
message_set = ""
174
182
for message in messages :
175
- encoded_message = self .encode_message (message )
183
+ encoded_message = cls .encode_message (message )
176
184
message_set += encoded_message
177
185
return message_set
178
186
179
- def encode_produce_request (self , produceRequest ):
187
+ @classmethod
188
+ def encode_produce_request (cls , produceRequest ):
180
189
"""
181
190
Encode a ProduceRequest
182
191
@@ -198,16 +207,41 @@ def encode_produce_request(self, produceRequest):
198
207
KafkaClient .PRODUCE_KEY , len (topic ), topic , partition , len (message_set ), message_set )
199
208
return req
200
209
201
- def encode_multi_produce_request (self , produceRequests ):
202
- # TODO document
210
+ @classmethod
211
+ def encode_multi_produce_request (cls , produceRequests ):
212
+ """
213
+ Encode a MultiProducerRequest
214
+
215
+ Params
216
+ ======
217
+ produceRequest: list of ProduceRequest objects
218
+
219
+ Returns
220
+ =======
221
+ Encoded request
222
+
223
+ Wire Format
224
+ ===========
225
+ <MultiProducerReqeust> ::= <request-key> <num> <ProduceRequests>
226
+ <num> ::= <int16>
227
+ <ProduceRequests> ::= <ProduceRequest> [ <ProduceRequests> ]
228
+ <ProduceRequest> ::= <topic> <partition> <len> <MessageSet>
229
+ <topic> ::= <topic-length><string>
230
+ <topic-length> ::= <int16>
231
+ <partition> ::= <int32>
232
+ <len> ::= <int32>
233
+
234
+ num is the number of ProduceRequests being encoded
235
+ """
203
236
req = struct .pack ('>HH' , KafkaClient .MULTIPRODUCE_KEY , len (produceRequests ))
204
237
for (topic , partition , messages ) in produceRequests :
205
- message_set = self .encode_message_set (messages )
238
+ message_set = cls .encode_message_set (messages )
206
239
req += struct .pack ('>H%dsii%ds' % (len (topic ), len (message_set )),
207
240
len (topic ), topic , partition , len (message_set ), message_set )
208
241
return req
209
242
210
- def encode_fetch_request (self , fetchRequest ):
243
+ @classmethod
244
+ def encode_fetch_request (cls , fetchRequest ):
211
245
"""
212
246
Encode a FetchRequest message
213
247
@@ -228,7 +262,8 @@ def encode_fetch_request(self, fetchRequest):
228
262
KafkaClient .FETCH_KEY , len (topic ), topic , partition , offset , size )
229
263
return req
230
264
231
- def encode_multi_fetch_request (self , fetchRequests ):
265
+ @classmethod
266
+ def encode_multi_fetch_request (cls , fetchRequests ):
232
267
"""
233
268
Encode the MultiFetchRequest message from a list of FetchRequest objects
234
269
@@ -260,7 +295,8 @@ def encode_multi_fetch_request(self, fetchRequests):
260
295
req += struct .pack ('>H%dsiqi' % len (topic ), len (topic ), topic , partition , offset , size )
261
296
return req
262
297
263
- def encode_offset_request (self , offsetRequest ):
298
+ @classmethod
299
+ def encode_offset_request (cls , offsetRequest ):
264
300
"""
265
301
Encode an OffsetRequest message
266
302
@@ -281,43 +317,57 @@ def encode_offset_request(self, offsetRequest):
281
317
req = struct .pack ('>HH%dsiqi' % len (topic ), KafkaClient .OFFSET_KEY , len (topic ), topic , partition , offset , maxOffsets )
282
318
return req
283
319
284
- def decode_message (self , data ):
320
+ @classmethod
321
+ def decode_message (cls , data ):
285
322
"""
286
323
Decode a Message
287
324
288
- Since a Message can actually contained a compressed payload of multiple nested Messages,
289
- this method returns a generator.
325
+ Verify crc and decode the Message. A compressed Message's payload is actually
326
+ an encoded MessageSet. This allows Messages to be nested within Messages and
327
+ as such, this method will recurse.
328
+
329
+ Params
330
+ ======
331
+ data, bytes
332
+
333
+ Returns
334
+ =======
335
+ Generator of Messages (depth-first)
290
336
"""
291
- # TODO document
292
337
N = len (data )
293
338
(magic ,) = struct .unpack ('>B' , data [0 :1 ])
294
- if magic == 0 : # v0 Message
295
- # Read crc; check the crc; append the message
339
+ if magic == 0 :
340
+ # version 0
296
341
(crc ,) = struct .unpack ('>i' , data [1 :5 ])
297
342
payload = data [5 :N ]
298
343
assert zlib .crc32 (payload ) == crc
299
344
msg = Message (magic , None , crc , payload )
300
345
log .debug ("Got v0 Message, %s" , msg )
301
346
yield msg
302
- elif magic == 1 : # v1 Message
303
- # Read attributes, crc; check the crc; append the message
347
+ elif magic == 1 :
348
+ # version 1
304
349
(att , crc ) = struct .unpack ('>Bi' , data [1 :6 ])
305
350
payload = data [6 :N ]
306
351
assert zlib .crc32 (payload ) == crc
307
- # Uncompressed, just a single Message
308
352
if att & KafkaClient .ATTRIBUTE_CODEC_MASK == 0 :
353
+ # Uncompressed, just a single Message
309
354
msg = Message (magic , att , crc , payload )
310
355
log .debug ("Got v1 Message, %s" , msg )
311
356
yield msg
312
357
elif att & KafkaClient .ATTRIBUTE_CODEC_MASK == 1 :
313
- gz = gzip_decompress (payload )
314
- (msgs , _ ) = self .read_message_set (gz )
358
+ # Gzip encoded Message
359
+ gz = gzip_decode (payload )
360
+ (msgs , _ ) = cls .read_message_set (gz )
315
361
for msg in msgs :
316
362
yield msg
363
+ elif att & KafkaClient .ATTRIBUTE_CODEC_MASK == 2 :
364
+ # Snappy encoded Message
365
+ raise NotImplementedError ("Snappy codec is not yet supported" )
317
366
else :
318
367
raise RuntimeError ("Unsupported compression type: %d" % (att & KafkaClient .ATTRIBUTE_CODEC_MASK ))
319
368
320
- def read_message_set (self , data ):
369
+ @classmethod
370
+ def read_message_set (cls , data ):
321
371
"""
322
372
Read a MessageSet
323
373
@@ -363,7 +413,7 @@ def read_message_set(self, data):
363
413
cur += 4
364
414
365
415
# Decode the message(s)
366
- for m in self .decode_message (data [cur :cur + N ]):
416
+ for m in cls .decode_message (data [cur :cur + N ]):
367
417
msgs .append (m )
368
418
369
419
# Advance the cursor
@@ -376,15 +426,37 @@ def read_message_set(self, data):
376
426
# Advanced User API #
377
427
#########################
378
428
379
- def create_message_from_string (self , payload ):
380
- #TODO document
429
+ @classmethod
430
+ def create_message (cls , payload ):
431
+ """
432
+ Create a standard Message
433
+
434
+ Params
435
+ ======
436
+ payload, bytes
437
+
438
+ Returns
439
+ =======
440
+ A Message tuple
441
+ """
381
442
return Message (1 , 0 , zlib .crc32 (payload ), payload )
382
443
383
- def create_gzipped_message (self , * payloads ):
384
- #TODO document
385
- messages = [self .create_message_from_string (payload ) for payload in payloads ]
386
- message_set = self .encode_message_set (messages )
387
- gzipped = gzip_compress (message_set )
444
+ @classmethod
445
+ def create_gzip_message (cls , * payloads ):
446
+ """
447
+ Create a Gzip encoded Message
448
+
449
+ Params
450
+ ======
451
+ payloads, list of messages (bytes) to be encoded
452
+
453
+ Returns
454
+ =======
455
+ A Message tuple
456
+ """
457
+ messages = [cls .create_message (payload ) for payload in payloads ]
458
+ message_set = cls .encode_message_set (messages )
459
+ gzipped = gzip_encode (message_set )
388
460
return Message (1 , 0x00 | (KafkaClient .ATTRIBUTE_CODEC_MASK & 0x01 ), zlib .crc32 (gzipped ), gzipped )
389
461
390
462
def send_message_set (self , produceRequest ):
@@ -522,7 +594,7 @@ def send_messages_simple(self, topic, partition, *payloads):
522
594
partition: int
523
595
payloads: strings
524
596
"""
525
- messages = tuple ([create_message_from_string (payload ) for payload in payloads ])
597
+ messages = tuple ([create_message (payload ) for payload in payloads ])
526
598
self .send_message_set (ProduceRequest (topic , partition , messages ))
527
599
528
600
def iter_messages (self , topic , partition , offset , size , auto = True ):
0 commit comments