1
- import binascii
1
+ import os
2
+ import random
3
+ import struct
2
4
import unittest
3
5
4
6
from kafka import KafkaClient
7
+ from kafka import gzip_encode , gzip_decode , length_prefix_message
8
+
9
+ ITERATIONS = 1000
10
+ STRLEN = 100
11
+
12
+ def random_string ():
13
+ return os .urandom (random .randint (0 , STRLEN ))
14
+
15
+ class TestMisc (unittest .TestCase ):
16
+ def test_length_prefix (self ):
17
+ for i in xrange (ITERATIONS ):
18
+ s1 = random_string ()
19
+ s2 = length_prefix_message (s1 )
20
+ self .assertEquals (struct .unpack ('>i' , s2 [0 :4 ])[0 ], len (s1 ))
21
+
22
+ class TestCodec (unittest .TestCase ):
23
+ def test_gzip (self ):
24
+ for i in xrange (ITERATIONS ):
25
+ s1 = random_string ()
26
+ s2 = gzip_decode (gzip_encode (s1 ))
27
+ self .assertEquals (s1 ,s2 )
5
28
6
29
class TestMessage (unittest .TestCase ):
30
+ def test_create (self ):
31
+ msg = KafkaClient .create_message ("testing" )
32
+ self .assertEquals (msg .payload , "testing" )
33
+ self .assertEquals (msg .magic , 1 )
34
+ self .assertEquals (msg .attributes , 0 )
35
+ self .assertEquals (msg .crc , - 386704890 )
36
+
37
+ def test_create_gzip (self ):
38
+ msg = KafkaClient .create_gzip_message ("testing" )
39
+ self .assertEquals (msg .magic , 1 )
40
+ self .assertEquals (msg .attributes , 1 )
41
+ # Can't check the crc or payload for gzip since it's non-deterministic
42
+ (messages , _ ) = KafkaClient .read_message_set (gzip_decode (msg .payload ))
43
+ inner = messages [0 ]
44
+ self .assertEquals (inner .magic , 1 )
45
+ self .assertEquals (inner .attributes , 0 )
46
+ self .assertEquals (inner .payload , "testing" )
47
+ self .assertEquals (inner .crc , - 386704890 )
48
+
7
49
def test_message_simple (self ):
8
50
msg = KafkaClient .create_message ("testing" )
9
51
enc = KafkaClient .encode_message (msg )
@@ -28,7 +70,6 @@ def test_message_list(self):
28
70
self .assertEquals (messages [0 ].payload , "one" )
29
71
self .assertEquals (messages [1 ].payload , "two" )
30
72
self .assertEquals (messages [2 ].payload , "three" )
31
-
32
73
33
74
def test_message_gzip (self ):
34
75
msg = KafkaClient .create_gzip_message ("one" , "two" , "three" )
@@ -40,5 +81,26 @@ def test_message_gzip(self):
40
81
self .assertEquals (messages [1 ].payload , "two" )
41
82
self .assertEquals (messages [2 ].payload , "three" )
42
83
84
+ def test_message_simple_random (self ):
85
+ for i in xrange (ITERATIONS ):
86
+ n = random .randint (0 , 10 )
87
+ msgs = [KafkaClient .create_message (random_string ()) for j in range (n )]
88
+ enc = KafkaClient .encode_message_set (msgs )
89
+ (messages , read ) = KafkaClient .read_message_set (enc )
90
+ self .assertEquals (len (messages ), n )
91
+ for j in range (n ):
92
+ self .assertEquals (messages [j ], msgs [j ])
93
+
94
+ def test_message_gzip_random (self ):
95
+ for i in xrange (ITERATIONS ):
96
+ n = random .randint (0 , 10 )
97
+ strings = [random_string () for j in range (n )]
98
+ msg = KafkaClient .create_gzip_message (* strings )
99
+ enc = KafkaClient .encode_message (msg )
100
+ (messages , read ) = KafkaClient .read_message_set (enc )
101
+ self .assertEquals (len (messages ), n )
102
+ for j in range (n ):
103
+ self .assertEquals (messages [j ].payload , strings [j ])
104
+
43
105
if __name__ == '__main__' :
44
106
unittest .main ()
0 commit comments