Skip to content

Commit 8811298

Browse files
committed
Merge pull request dpkp#127 from GregBowyer/master
Make it possible to read and write xerial snappy
2 parents 4abf7ee + 96c9ce0 commit 8811298

File tree

2 files changed

+138
-3
lines changed

2 files changed

+138
-3
lines changed

kafka/codec.py

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from cStringIO import StringIO
22
import gzip
3+
import struct
4+
5+
_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
6+
_XERIAL_V1_FORMAT = 'bccccccBii'
37

48
try:
59
import snappy
@@ -36,13 +40,101 @@ def gzip_decode(payload):
3640
return result
3741

3842

39-
def snappy_encode(payload):
43+
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
44+
"""Encodes the given data with snappy if xerial_compatible is set then the
45+
stream is encoded in a fashion compatible with the xerial snappy library
46+
47+
The block size (xerial_blocksize) controls how frequent the blocking occurs
48+
32k is the default in the xerial library.
49+
50+
The format winds up being
51+
+-------------+------------+--------------+------------+--------------+
52+
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
53+
|-------------+------------+--------------+------------+--------------|
54+
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
55+
+-------------+------------+--------------+------------+--------------+
56+
57+
It is important to not that the blocksize is the amount of uncompressed
58+
data presented to snappy at each block, whereas the blocklen is the
59+
number of bytes that will be present in the stream, that is the
60+
length will always be <= blocksize.
61+
"""
62+
4063
if not _has_snappy:
4164
raise NotImplementedError("Snappy codec is not available")
42-
return snappy.compress(payload)
65+
66+
if xerial_compatible:
67+
def _chunker():
68+
for i in xrange(0, len(payload), xerial_blocksize):
69+
yield payload[i:i+xerial_blocksize]
70+
71+
out = StringIO()
72+
73+
header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
74+
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
75+
76+
out.write(header)
77+
for chunk in _chunker():
78+
block = snappy.compress(chunk)
79+
block_size = len(block)
80+
out.write(struct.pack('!i', block_size))
81+
out.write(block)
82+
83+
out.seek(0)
84+
return out.read()
85+
86+
else:
87+
return snappy.compress(payload)
88+
89+
90+
def _detect_xerial_stream(payload):
91+
"""Detects if the data given might have been encoded with the blocking mode
92+
of the xerial snappy library.
93+
94+
This mode writes a magic header of the format:
95+
+--------+--------------+------------+---------+--------+
96+
| Marker | Magic String | Null / Pad | Version | Compat |
97+
|--------+--------------+------------+---------+--------|
98+
| byte | c-string | byte | int32 | int32 |
99+
|--------+--------------+------------+---------+--------|
100+
| -126 | 'SNAPPY' | \0 | | |
101+
+--------+--------------+------------+---------+--------+
102+
103+
The pad appears to be to ensure that SNAPPY is a valid cstring
104+
The version is the version of this format as written by xerial,
105+
in the wild this is currently 1 as such we only support v1.
106+
107+
Compat is there to claim the miniumum supported version that
108+
can read a xerial block stream, presently in the wild this is
109+
1.
110+
"""
111+
112+
if len(payload) > 16:
113+
header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
114+
return header == _XERIAL_V1_HEADER
115+
return False
43116

44117

45118
def snappy_decode(payload):
46119
if not _has_snappy:
47120
raise NotImplementedError("Snappy codec is not available")
48-
return snappy.decompress(payload)
121+
122+
if _detect_xerial_stream(payload):
123+
# TODO ? Should become a fileobj ?
124+
out = StringIO()
125+
byt = buffer(payload[16:])
126+
length = len(byt)
127+
cursor = 0
128+
129+
while cursor < length:
130+
block_size = struct.unpack_from('!i', byt[cursor:])[0]
131+
# Skip the block size
132+
cursor += 4
133+
end = cursor + block_size
134+
out.write(snappy.decompress(byt[cursor:end]))
135+
cursor = end
136+
137+
out.seek(0)
138+
return out.read()
139+
else:
140+
return snappy.decompress(payload)

test/test_unit.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,49 @@ def test_snappy(self):
7070
s2 = snappy_decode(snappy_encode(s1))
7171
self.assertEquals(s1, s2)
7272

73+
@unittest.skipUnless(has_snappy(), "Snappy not available")
74+
def test_snappy_detect_xerial(self):
75+
import kafka as kafka1
76+
_detect_xerial_stream = kafka1.codec._detect_xerial_stream
77+
78+
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
79+
false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
80+
random_snappy = snappy_encode('SNAPPY' * 50)
81+
short_data = b'\x01\x02\x03\x04'
82+
83+
self.assertTrue(_detect_xerial_stream(header))
84+
self.assertFalse(_detect_xerial_stream(b''))
85+
self.assertFalse(_detect_xerial_stream(b'\x00'))
86+
self.assertFalse(_detect_xerial_stream(false_header))
87+
self.assertFalse(_detect_xerial_stream(random_snappy))
88+
self.assertFalse(_detect_xerial_stream(short_data))
89+
90+
@unittest.skipUnless(has_snappy(), "Snappy not available")
91+
def test_snappy_decode_xerial(self):
92+
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
93+
random_snappy = snappy_encode('SNAPPY' * 50)
94+
block_len = len(random_snappy)
95+
random_snappy2 = snappy_encode('XERIAL' * 50)
96+
block_len2 = len(random_snappy2)
97+
98+
to_test = header \
99+
+ struct.pack('!i', block_len) + random_snappy \
100+
+ struct.pack('!i', block_len2) + random_snappy2 \
101+
102+
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
103+
104+
@unittest.skipUnless(has_snappy(), "Snappy not available")
105+
def test_snappy_encode_xerial(self):
106+
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
107+
'\x00\x00\x00\x18' + \
108+
'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
109+
'\x00\x00\x00\x18' + \
110+
'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
111+
112+
to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
113+
114+
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
115+
self.assertEquals(compressed, to_ensure)
73116

74117
class TestProtocol(unittest.TestCase):
75118

0 commit comments

Comments
 (0)