Skip to content

Commit 8805d30

Browse files
committed
Fix regression in MessageSet decoding wrt PartialMessages (#716)
1 parent 644a114 commit 8805d30

File tree

2 files changed

+107
-4
lines changed

2 files changed

+107
-4
lines changed

kafka/protocol/message.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,17 @@ def decode(cls, data, bytes_to_read=None):
169169
data = io.BytesIO(data)
170170
if bytes_to_read is None:
171171
bytes_to_read = Int32.decode(data)
172-
items = []
173172

174173
# if FetchRequest max_bytes is smaller than the available message set
175174
# the server returns partial data for the final message
175+
# So create an internal buffer to avoid over-reading
176+
raw = io.BytesIO(data.read(bytes_to_read))
177+
178+
items = []
176179
while bytes_to_read:
177180
try:
178-
offset = Int64.decode(data)
179-
msg_bytes = Bytes.decode(data)
181+
offset = Int64.decode(raw)
182+
msg_bytes = Bytes.decode(raw)
180183
bytes_to_read -= 8 + 4 + len(msg_bytes)
181184
items.append((offset, len(msg_bytes), Message.decode(msg_bytes)))
182185
except ValueError:

test/test_protocol.py

+101-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
#pylint: skip-file
2+
import io
23
import struct
34

45
import pytest
56
import six
67

78
from kafka.protocol.api import RequestHeader
89
from kafka.protocol.commit import GroupCoordinatorRequest
9-
from kafka.protocol.message import Message, MessageSet
10+
from kafka.protocol.fetch import FetchResponse
11+
from kafka.protocol.message import Message, MessageSet, PartialMessage
12+
from kafka.protocol.types import Int16, Int32, Int64, String
1013

1114

1215
def test_create_message():
@@ -144,3 +147,100 @@ def test_encode_message_header():
144147
req = GroupCoordinatorRequest[0]('foo')
145148
header = RequestHeader(req, correlation_id=4, client_id='client3')
146149
assert header.encode() == expect
150+
151+
152+
def test_decode_message_set_partial():
153+
encoded = b''.join([
154+
struct.pack('>q', 0), # Msg Offset
155+
struct.pack('>i', 18), # Msg Size
156+
struct.pack('>i', 1474775406), # CRC
157+
struct.pack('>bb', 0, 0), # Magic, flags
158+
struct.pack('>i', 2), # Length of key
159+
b'k1', # Key
160+
struct.pack('>i', 2), # Length of value
161+
b'v1', # Value
162+
163+
struct.pack('>q', 1), # Msg Offset
164+
struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size)
165+
struct.pack('>i', -16383415), # CRC
166+
struct.pack('>bb', 0, 0), # Magic, flags
167+
struct.pack('>i', 2), # Length of key
168+
b'k2', # Key
169+
struct.pack('>i', 8), # Length of value
170+
b'ar', # Value (truncated)
171+
])
172+
173+
msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded))
174+
assert len(msgs) == 2
175+
msg1, msg2 = msgs
176+
177+
returned_offset1, message1_size, decoded_message1 = msg1
178+
returned_offset2, message2_size, decoded_message2 = msg2
179+
180+
assert returned_offset1 == 0
181+
message1 = Message(b'v1', key=b'k1')
182+
message1.encode()
183+
assert decoded_message1 == message1
184+
185+
assert returned_offset2 is None
186+
assert message2_size is None
187+
assert decoded_message2 == PartialMessage()
188+
189+
190+
def test_decode_fetch_response_partial():
191+
encoded = b''.join([
192+
Int32.encode(1), # Num Topics (Array)
193+
String('utf-8').encode('foobar'),
194+
Int32.encode(2), # Num Partitions (Array)
195+
Int32.encode(0), # Partition id
196+
Int16.encode(0), # Error Code
197+
Int64.encode(1234), # Highwater offset
198+
Int32.encode(52), # MessageSet size
199+
Int64.encode(0), # Msg Offset
200+
Int32.encode(18), # Msg Size
201+
struct.pack('>i', 1474775406), # CRC
202+
struct.pack('>bb', 0, 0), # Magic, flags
203+
struct.pack('>i', 2), # Length of key
204+
b'k1', # Key
205+
struct.pack('>i', 2), # Length of value
206+
b'v1', # Value
207+
208+
Int64.encode(1), # Msg Offset
209+
struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size)
210+
struct.pack('>i', -16383415), # CRC
211+
struct.pack('>bb', 0, 0), # Magic, flags
212+
struct.pack('>i', 2), # Length of key
213+
b'k2', # Key
214+
struct.pack('>i', 8), # Length of value
215+
b'ar', # Value (truncated)
216+
Int32.encode(1),
217+
Int16.encode(0),
218+
Int64.encode(2345),
219+
Int32.encode(52), # MessageSet size
220+
Int64.encode(0), # Msg Offset
221+
Int32.encode(18), # Msg Size
222+
struct.pack('>i', 1474775406), # CRC
223+
struct.pack('>bb', 0, 0), # Magic, flags
224+
struct.pack('>i', 2), # Length of key
225+
b'k1', # Key
226+
struct.pack('>i', 2), # Length of value
227+
b'v1', # Value
228+
229+
Int64.encode(1), # Msg Offset
230+
struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size)
231+
struct.pack('>i', -16383415), # CRC
232+
struct.pack('>bb', 0, 0), # Magic, flags
233+
struct.pack('>i', 2), # Length of key
234+
b'k2', # Key
235+
struct.pack('>i', 8), # Length of value
236+
b'ar', # Value (truncated)
237+
])
238+
239+
resp = FetchResponse[0].decode(io.BytesIO(encoded))
240+
assert len(resp.topics) == 1
241+
topic, partitions = resp.topics[0]
242+
assert topic == 'foobar'
243+
assert len(partitions) == 2
244+
m1 = partitions[0][3]
245+
assert len(m1) == 2
246+
assert m1[1] == (None, None, PartialMessage())

0 commit comments

Comments
 (0)