Skip to content

Commit 2b557d4

Browse files
[python] Fix BlockHandle varlen decoding in SstFileIterator.read_batch (#7597)
read_batch() incorrectly used fixed-width struct.unpack to decode BlockHandle, while the SST format uses variable-length encoding. Extract shared _parse_block_handle() to ensure both seek_to() and read_batch() use consistent varlen decoding.
1 parent 9227cf7 commit 2b557d4

File tree

3 files changed

+242
-19
lines changed

3 files changed

+242
-19
lines changed

paimon-python/pypaimon/globalindex/btree/sst_file_reader.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from typing import Optional, Callable
3131
from typing import BinaryIO
3232

33-
from pypaimon.globalindex.btree.btree_file_footer import BlockHandle
33+
from pypaimon.globalindex.btree.block_handle import BlockHandle
3434
from pypaimon.globalindex.btree.block_entry import BlockEntry
3535
from pypaimon.globalindex.btree.block_reader import BlockReader, BlockIterator
3636
from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
@@ -48,27 +48,28 @@ def __init__(self, read_block: Callable[[BlockHandle], BlockReader], index_block
4848
self.index_iterator = index_block_iterator
4949
self.sought_data_block: Optional[BlockIterator] = None
5050

51+
@staticmethod
52+
def _parse_block_handle(block_handle_bytes: bytes) -> BlockHandle:
53+
handle_input = MemorySliceInput(block_handle_bytes)
54+
return BlockHandle(
55+
handle_input.read_var_len_long(),
56+
handle_input.read_var_len_int()
57+
)
58+
5159
def seek_to(self, key: bytes) -> None:
5260
"""
5361
Seek to the position of the record whose key is exactly equal to or
5462
greater than the specified key.
55-
63+
5664
Args:
5765
key: The key to seek to
5866
"""
5967
self.index_iterator.seek_to(key)
60-
68+
6169
if self.index_iterator.has_next():
6270
index_entry: BlockEntry = self.index_iterator.__next__()
63-
block_handle_bytes = index_entry.__getattribute__("value")
64-
handle_input = MemorySliceInput(block_handle_bytes)
65-
66-
# Parse block handle
67-
block_handle = BlockHandle(
68-
handle_input.read_var_len_long(),
69-
handle_input.read_var_len_int()
70-
)
71-
71+
block_handle = self._parse_block_handle(index_entry.value)
72+
7273
# Create data block reader and seek
7374
data_block_reader = self.read_block(block_handle)
7475
self.sought_data_block = data_block_reader.iterator()
@@ -93,13 +94,7 @@ def read_batch(self) -> Optional[BlockIterator]:
9394
return None
9495

9596
index_entry = self.index_iterator.__next__()
96-
block_handle_bytes = index_entry.value
97-
98-
# Parse block handle
99-
block_handle = BlockHandle(
100-
struct.unpack('<Q', block_handle_bytes[0:8])[0],
101-
struct.unpack('<I', block_handle_bytes[8:12])[0]
102-
)
97+
block_handle = self._parse_block_handle(index_entry.value)
10398

10499
# Create data block reader
105100
data_block_reader = self.read_block(block_handle)

paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,13 @@ def _test_read_btree_index_large(self):
452452
})
453453
self.assertEqual(expected, actual)
454454

455+
# read is_not_null index (full scan across all data blocks)
456+
read_builder.with_filter(predicate_builder.is_not_null('k'))
457+
table_read = read_builder.new_read()
458+
splits = read_builder.new_scan().plan().splits()
459+
actual = table_read.to_arrow(splits)
460+
self.assertEqual(len(actual), 2000)
461+
455462
def _test_read_btree_index_null(self):
456463
table = self.catalog.get_table('default.test_btree_index_null')
457464

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
"""Tests for SstFileIterator BlockHandle varlen decoding."""
20+
21+
import unittest
22+
from unittest.mock import MagicMock
23+
24+
from pypaimon.globalindex.btree.block_handle import BlockHandle
25+
from pypaimon.globalindex.btree.block_entry import BlockEntry
26+
from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
27+
from pypaimon.globalindex.btree.sst_file_reader import SstFileIterator
28+
29+
30+
def _encode_var_len(value):
31+
result = bytearray()
32+
while value > 0x7F:
33+
result.append((value & 0x7F) | 0x80)
34+
value >>= 7
35+
result.append(value & 0x7F)
36+
return bytes(result)
37+
38+
39+
def _encode_block_handle(offset, size):
40+
return _encode_var_len(offset) + _encode_var_len(size)
41+
42+
43+
def _mock_block_iterator(entries):
44+
"""Mock a BlockIterator with has_next/next/seek_to over a list of BlockEntry."""
45+
state = {'pos': 0}
46+
entry_list = list(entries)
47+
48+
mock = MagicMock()
49+
mock.has_next = lambda: state['pos'] < len(entry_list)
50+
51+
def next_entry(_self=None):
52+
if state['pos'] >= len(entry_list):
53+
raise StopIteration
54+
entry = entry_list[state['pos']]
55+
state['pos'] += 1
56+
return entry
57+
mock.__next__ = next_entry
58+
mock.__iter__ = lambda _self=None: mock
59+
60+
def seek_to(target_key):
61+
for i, entry in enumerate(entry_list):
62+
if entry.key >= target_key:
63+
state['pos'] = i
64+
return entry.key == target_key
65+
state['pos'] = len(entry_list)
66+
return False
67+
mock.seek_to = seek_to
68+
69+
return mock
70+
71+
72+
class SstFileIteratorTest(unittest.TestCase):
73+
74+
def _make_iterator(self, index_entries, data_blocks):
75+
mock_index_entries = []
76+
for key, handle in index_entries:
77+
value = _encode_block_handle(handle.offset, handle.size)
78+
mock_index_entries.append(BlockEntry(key, value))
79+
80+
index_iter = _mock_block_iterator(mock_index_entries)
81+
82+
def read_block(block_handle):
83+
entries = data_blocks.get((block_handle.offset, block_handle.size))
84+
if entries is None:
85+
raise ValueError(
86+
"Unexpected BlockHandle(offset={}, size={})".format(
87+
block_handle.offset, block_handle.size))
88+
reader = MagicMock()
89+
reader.iterator = lambda e=entries: _mock_block_iterator(e)
90+
return reader
91+
92+
return SstFileIterator(read_block, index_iter)
93+
94+
def test_read_batch_varlen_small_values(self):
95+
handle = BlockHandle(100, 50)
96+
data = [BlockEntry(b"k1", b"v1"), BlockEntry(b"k2", b"v2")]
97+
98+
it = self._make_iterator(
99+
[(b"k2", handle)],
100+
{(100, 50): data}
101+
)
102+
103+
batch = it.read_batch()
104+
self.assertIsNotNone(batch)
105+
entries = [batch.__next__() for _ in range(2)]
106+
self.assertEqual(len(entries), 2)
107+
self.assertEqual(entries[0].key, b"k1")
108+
self.assertEqual(entries[1].key, b"k2")
109+
self.assertIsNone(it.read_batch())
110+
111+
def test_read_batch_varlen_large_offset(self):
112+
handle = BlockHandle(300, 200)
113+
data = [BlockEntry(b"a", b"1")]
114+
115+
it = self._make_iterator(
116+
[(b"a", handle)],
117+
{(300, 200): data}
118+
)
119+
120+
batch = it.read_batch()
121+
self.assertIsNotNone(batch)
122+
entry = batch.__next__()
123+
self.assertEqual(entry.key, b"a")
124+
125+
def test_read_batch_varlen_very_large_offset(self):
126+
handle = BlockHandle(1000000, 65535)
127+
data = [BlockEntry(b"big", b"val")]
128+
129+
it = self._make_iterator(
130+
[(b"big", handle)],
131+
{(1000000, 65535): data}
132+
)
133+
134+
batch = it.read_batch()
135+
self.assertIsNotNone(batch)
136+
entry = batch.__next__()
137+
self.assertEqual(entry.key, b"big")
138+
139+
def test_read_batch_multiple_blocks(self):
140+
h1 = BlockHandle(0, 100)
141+
h2 = BlockHandle(200, 150)
142+
h3 = BlockHandle(500, 80)
143+
144+
it = self._make_iterator(
145+
[(b"b", h1), (b"d", h2), (b"f", h3)],
146+
{
147+
(0, 100): [BlockEntry(b"a", b"1"), BlockEntry(b"b", b"2")],
148+
(200, 150): [BlockEntry(b"c", b"3"), BlockEntry(b"d", b"4")],
149+
(500, 80): [BlockEntry(b"e", b"5"), BlockEntry(b"f", b"6")],
150+
}
151+
)
152+
153+
all_entries = []
154+
while True:
155+
batch = it.read_batch()
156+
if batch is None:
157+
break
158+
while batch.has_next():
159+
all_entries.append(batch.__next__())
160+
161+
self.assertEqual(len(all_entries), 6)
162+
keys = [e.key for e in all_entries]
163+
self.assertEqual(keys, [b"a", b"b", b"c", b"d", b"e", b"f"])
164+
165+
def test_seek_then_read_batch_crosses_blocks(self):
166+
h1 = BlockHandle(0, 100)
167+
h2 = BlockHandle(256, 128)
168+
169+
it = self._make_iterator(
170+
[(b"b", h1), (b"d", h2)],
171+
{
172+
(0, 100): [BlockEntry(b"a", b"1"), BlockEntry(b"b", b"2")],
173+
(256, 128): [BlockEntry(b"c", b"3"), BlockEntry(b"d", b"4")],
174+
}
175+
)
176+
177+
it.seek_to(b"a")
178+
self.assertIsNotNone(it.sought_data_block)
179+
180+
batch1 = it.read_batch()
181+
self.assertIsNotNone(batch1)
182+
self.assertEqual(batch1.__next__().key, b"a")
183+
184+
batch2 = it.read_batch()
185+
self.assertIsNotNone(batch2)
186+
entries2 = []
187+
while batch2.has_next():
188+
entries2.append(batch2.__next__())
189+
self.assertEqual(len(entries2), 2)
190+
self.assertEqual(entries2[0].key, b"c")
191+
self.assertEqual(entries2[1].key, b"d")
192+
193+
self.assertIsNone(it.read_batch())
194+
195+
def test_read_batch_empty_index(self):
196+
it = self._make_iterator([], {})
197+
self.assertIsNone(it.read_batch())
198+
199+
def test_varlen_encoding_roundtrip(self):
200+
test_cases = [
201+
(0, 0),
202+
(127, 127),
203+
(128, 128),
204+
(300, 200),
205+
(16384, 255),
206+
(1000000, 65535),
207+
(2**31 - 1, 2**31 - 1),
208+
]
209+
for offset, size in test_cases:
210+
encoded = _encode_block_handle(offset, size)
211+
inp = MemorySliceInput(encoded)
212+
decoded_offset = inp.read_var_len_long()
213+
decoded_size = inp.read_var_len_int()
214+
self.assertEqual(decoded_offset, offset,
215+
"offset mismatch for ({}, {})".format(offset, size))
216+
self.assertEqual(decoded_size, size,
217+
"size mismatch for ({}, {})".format(offset, size))
218+
219+
220+
if __name__ == '__main__':
221+
unittest.main()

0 commit comments

Comments
 (0)