Skip to content

Commit 96c9ce0

Browse files
committed
Make it possible to read and write xerial snappy
Fixes dpkp#126 TL;DR ===== This makes it possible to read and write snappy compressed streams that are compatible with the java and scala kafka clients (the xerial blocking format)) Xerial Details ============== Kafka supports transparent compression of data (both in transit and at rest) of messages, one of the allowable compression algorithms is Google's snappy, an algorithm which has excellent performance at the cost of efficiency. The specific implementation of snappy used in kafka is the xerial-snappy implementation, this is a readily available java library for snappy. As part of this implementation, there is a specialised blocking format that is somewhat none standard in the snappy world. Xerial Format ------------- The blocking mode of the xerial snappy library is fairly simple, using a magic header to identify itself and then a size + block scheme, unless otherwise noted all items in xerials blocking format are assumed to be big-endian. A block size (```xerial_blocksize``` in implementation) controls how frequent the blocking occurs 32k is the default in the xerial library, this blocking controls the size of the uncompressed chunks that will be fed to snappy to be compressed. The format winds up being | Header | Block1 len | Block1 data | Blockn len | Blockn data | | ----------- | ---------- | ------------ | ---------- | ------------ | | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | It is important to not that the blocksize is the amount of uncompressed data presented to snappy at each block, whereas the blocklen is the number of bytes that will be present in the stream, that is the length will always be <= blocksize. Xerial blocking header ---------------------- Marker | Magic String | Null / Pad | Version | Compat ------ | ------------ | ---------- | -------- | -------- byte | c-string | byte | int32 | int32 ------ | ------------ | ---------- | -------- | -------- -126 | 'SNAPPY' | \0 | variable | variable The pad appears to be to ensure that SNAPPY is a valid cstring, and to align the header on a word boundary. The version is the version of this format as written by xerial, in the wild this is currently 1 as such we only support v1. Compat is there to claim the minimum supported version that can read a xerial block stream, presently in the wild this is 1. Implementation specific details =============================== The implementation presented here follows the Xerial implementation as of its v1 blocking format, no attempts are made to check for future versions. Since none-xerial aware clients might have persisted snappy compressed messages to kafka brokers we allow clients to turn on xerial compatibility for message sending, and perform header sniffing to detect xerial vs plain snappy payloads.
1 parent 4abf7ee commit 96c9ce0

File tree

2 files changed

+138
-3
lines changed

2 files changed

+138
-3
lines changed

kafka/codec.py

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from cStringIO import StringIO
22
import gzip
3+
import struct
4+
5+
_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
6+
_XERIAL_V1_FORMAT = 'bccccccBii'
37

48
try:
59
import snappy
@@ -36,13 +40,101 @@ def gzip_decode(payload):
3640
return result
3741

3842

39-
def snappy_encode(payload):
43+
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
44+
"""Encodes the given data with snappy if xerial_compatible is set then the
45+
stream is encoded in a fashion compatible with the xerial snappy library
46+
47+
The block size (xerial_blocksize) controls how frequent the blocking occurs
48+
32k is the default in the xerial library.
49+
50+
The format winds up being
51+
+-------------+------------+--------------+------------+--------------+
52+
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
53+
|-------------+------------+--------------+------------+--------------|
54+
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
55+
+-------------+------------+--------------+------------+--------------+
56+
57+
It is important to not that the blocksize is the amount of uncompressed
58+
data presented to snappy at each block, whereas the blocklen is the
59+
number of bytes that will be present in the stream, that is the
60+
length will always be <= blocksize.
61+
"""
62+
4063
if not _has_snappy:
4164
raise NotImplementedError("Snappy codec is not available")
42-
return snappy.compress(payload)
65+
66+
if xerial_compatible:
67+
def _chunker():
68+
for i in xrange(0, len(payload), xerial_blocksize):
69+
yield payload[i:i+xerial_blocksize]
70+
71+
out = StringIO()
72+
73+
header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
74+
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
75+
76+
out.write(header)
77+
for chunk in _chunker():
78+
block = snappy.compress(chunk)
79+
block_size = len(block)
80+
out.write(struct.pack('!i', block_size))
81+
out.write(block)
82+
83+
out.seek(0)
84+
return out.read()
85+
86+
else:
87+
return snappy.compress(payload)
88+
89+
90+
def _detect_xerial_stream(payload):
91+
"""Detects if the data given might have been encoded with the blocking mode
92+
of the xerial snappy library.
93+
94+
This mode writes a magic header of the format:
95+
+--------+--------------+------------+---------+--------+
96+
| Marker | Magic String | Null / Pad | Version | Compat |
97+
|--------+--------------+------------+---------+--------|
98+
| byte | c-string | byte | int32 | int32 |
99+
|--------+--------------+------------+---------+--------|
100+
| -126 | 'SNAPPY' | \0 | | |
101+
+--------+--------------+------------+---------+--------+
102+
103+
The pad appears to be to ensure that SNAPPY is a valid cstring
104+
The version is the version of this format as written by xerial,
105+
in the wild this is currently 1 as such we only support v1.
106+
107+
Compat is there to claim the miniumum supported version that
108+
can read a xerial block stream, presently in the wild this is
109+
1.
110+
"""
111+
112+
if len(payload) > 16:
113+
header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
114+
return header == _XERIAL_V1_HEADER
115+
return False
43116

44117

45118
def snappy_decode(payload):
46119
if not _has_snappy:
47120
raise NotImplementedError("Snappy codec is not available")
48-
return snappy.decompress(payload)
121+
122+
if _detect_xerial_stream(payload):
123+
# TODO ? Should become a fileobj ?
124+
out = StringIO()
125+
byt = buffer(payload[16:])
126+
length = len(byt)
127+
cursor = 0
128+
129+
while cursor < length:
130+
block_size = struct.unpack_from('!i', byt[cursor:])[0]
131+
# Skip the block size
132+
cursor += 4
133+
end = cursor + block_size
134+
out.write(snappy.decompress(byt[cursor:end]))
135+
cursor = end
136+
137+
out.seek(0)
138+
return out.read()
139+
else:
140+
return snappy.decompress(payload)

test/test_unit.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,49 @@ def test_snappy(self):
7070
s2 = snappy_decode(snappy_encode(s1))
7171
self.assertEquals(s1, s2)
7272

73+
@unittest.skipUnless(has_snappy(), "Snappy not available")
74+
def test_snappy_detect_xerial(self):
75+
import kafka as kafka1
76+
_detect_xerial_stream = kafka1.codec._detect_xerial_stream
77+
78+
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
79+
false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
80+
random_snappy = snappy_encode('SNAPPY' * 50)
81+
short_data = b'\x01\x02\x03\x04'
82+
83+
self.assertTrue(_detect_xerial_stream(header))
84+
self.assertFalse(_detect_xerial_stream(b''))
85+
self.assertFalse(_detect_xerial_stream(b'\x00'))
86+
self.assertFalse(_detect_xerial_stream(false_header))
87+
self.assertFalse(_detect_xerial_stream(random_snappy))
88+
self.assertFalse(_detect_xerial_stream(short_data))
89+
90+
@unittest.skipUnless(has_snappy(), "Snappy not available")
91+
def test_snappy_decode_xerial(self):
92+
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
93+
random_snappy = snappy_encode('SNAPPY' * 50)
94+
block_len = len(random_snappy)
95+
random_snappy2 = snappy_encode('XERIAL' * 50)
96+
block_len2 = len(random_snappy2)
97+
98+
to_test = header \
99+
+ struct.pack('!i', block_len) + random_snappy \
100+
+ struct.pack('!i', block_len2) + random_snappy2 \
101+
102+
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
103+
104+
@unittest.skipUnless(has_snappy(), "Snappy not available")
105+
def test_snappy_encode_xerial(self):
106+
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
107+
'\x00\x00\x00\x18' + \
108+
'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
109+
'\x00\x00\x00\x18' + \
110+
'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
111+
112+
to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
113+
114+
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
115+
self.assertEquals(compressed, to_ensure)
73116

74117
class TestProtocol(unittest.TestCase):
75118

0 commit comments

Comments
 (0)