Skip to content

Commit 28f8fde

Browse files
committed
fix lance GetPreviousBatchFirstRowNumber
1 parent 0b7782f commit 28f8fde

File tree

3 files changed

+76
-6
lines changed

3 files changed

+76
-6
lines changed

src/paimon/format/lance/lance_file_batch_reader.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ Status LanceFileBatchReader::SetReadSchema(::ArrowSchema* read_schema,
7070
arrow::ImportSchema(read_schema));
7171
read_field_names_ = arrow_schema->field_names();
7272
assert(!read_field_names_.empty());
73+
read_row_ids_.clear();
7374
if (selection_bitmap) {
74-
read_row_ids_.clear();
7575
read_row_ids_.reserve(selection_bitmap.value().Cardinality());
7676
for (auto iter = selection_bitmap.value().Begin(); iter != selection_bitmap.value().End();
7777
++iter) {
@@ -84,6 +84,8 @@ Status LanceFileBatchReader::SetReadSchema(::ArrowSchema* read_schema,
8484
release_stream_reader(stream_reader_, error_message_.data(), error_message_.size());
8585
PAIMON_RETURN_NOT_OK(LanceToPaimonStatus(err_code, error_message_));
8686
stream_reader_ = nullptr;
87+
previous_batch_first_row_num_ = std::numeric_limits<uint64_t>::max();
88+
last_batch_row_num_ = 0;
8789
}
8890
return Status::OK();
8991
}
@@ -102,6 +104,12 @@ Result<BatchReader::ReadBatch> LanceFileBatchReader::NextBatch() {
102104
PAIMON_RETURN_NOT_OK(LanceToPaimonStatus(err_code, error_message_));
103105
assert(stream_reader_);
104106
}
107+
if (previous_batch_first_row_num_ == std::numeric_limits<uint64_t>::max()) {
108+
// first read
109+
previous_batch_first_row_num_ = 0;
110+
} else {
111+
previous_batch_first_row_num_ += last_batch_row_num_;
112+
}
105113
auto c_array = std::make_unique<ArrowArray>();
106114
auto c_schema = std::make_unique<ArrowSchema>();
107115
bool is_eof = false;
@@ -111,6 +119,7 @@ Result<BatchReader::ReadBatch> LanceFileBatchReader::NextBatch() {
111119
if (is_eof) {
112120
return BatchReader::MakeEofBatch();
113121
}
122+
last_batch_row_num_ = c_array->length;
114123
return std::make_pair(std::move(c_array), std::move(c_schema));
115124
}
116125

src/paimon/format/lance/lance_file_batch_reader.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,14 @@ class LanceFileBatchReader : public FileBatchReader {
4242
Result<ReadBatch> NextBatch() override;
4343

4444
Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
45-
// TODO(xinyu.lxy): support function
46-
return Status::Invalid(
47-
"Cannot call GetPreviousBatchFirstRowNumber in LanceFileBatchReader because, after "
48-
"bitmap pushdown, rows in the array returned by NextBatch are no longer "
49-
"contiguous.");
45+
if (!read_row_ids_.empty() && read_row_ids_.size() != num_rows_) {
46+
// TODO(xinyu.lxy): support function
47+
return Status::Invalid(
48+
"Cannot call GetPreviousBatchFirstRowNumber in LanceFileBatchReader because, after "
49+
"bitmap pushdown, rows in the array returned by NextBatch are no longer "
50+
"contiguous.");
51+
}
52+
return previous_batch_first_row_num_;
5053
}
5154

5255
Result<uint64_t> GetNumberOfRows() const override {
@@ -76,6 +79,9 @@ class LanceFileBatchReader : public FileBatchReader {
7679
int32_t batch_size_ = -1;
7780
int32_t batch_readahead_ = -1;
7881
uint64_t num_rows_ = 0;
82+
// only validate when there is no bitmap pushdown
83+
uint64_t previous_batch_first_row_num_ = std::numeric_limits<uint64_t>::max();
84+
uint64_t last_batch_row_num_ = 0;
7985
mutable std::string error_message_;
8086
LanceFileReader* file_reader_ = nullptr;
8187
LanceReaderAdapter* stream_reader_ = nullptr;

src/paimon/format/lance/lance_format_reader_writer_test.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,4 +419,59 @@ TEST_F(LanceFileReaderWriterTest, TestTimestampType) {
419419
CheckResult(src_chunk_array, schema, /*enable_tz=*/false);
420420
}
421421

422+
TEST_F(LanceFileReaderWriterTest, TestPreviousBatchFirstRowNumber) {
423+
arrow::FieldVector fields = {arrow::field("f1", arrow::int32()),
424+
arrow::field("f2", arrow::utf8())};
425+
auto schema = arrow::schema(fields);
426+
auto array = std::dynamic_pointer_cast<arrow::StructArray>(
427+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields}), R"([
428+
[1, "Hello"],
429+
[2, "World"],
430+
[3, "apple"],
431+
[4, "Alice"],
432+
[5, "Bob"],
433+
[6, "Lucy"]
434+
])")
435+
.ValueOrDie());
436+
auto src_chunk_array = std::make_shared<arrow::ChunkedArray>(arrow::ArrayVector({array}));
437+
438+
auto dir = paimon::test::UniqueTestDirectory::Create();
439+
ASSERT_TRUE(dir);
440+
std::string file_path = dir->Str() + "/test.lance";
441+
WriteFile(file_path, src_chunk_array, schema);
442+
ASSERT_OK_AND_ASSIGN(
443+
std::unique_ptr<LanceFileBatchReader> reader,
444+
LanceFileBatchReader::Create(file_path, /*batch_size=*/4, /*batch_readahead=*/2));
445+
ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
446+
reader->GetPreviousBatchFirstRowNumber().value());
447+
448+
// first batch row 0-3
449+
ASSERT_OK_AND_ASSIGN(auto read_batch, reader->NextBatch());
450+
ASSERT_OK_AND_ASSIGN(auto read_array,
451+
paimon::test::ReadResultCollector::GetArray(std::move(read_batch)));
452+
ASSERT_TRUE(read_array->Equals(array->Slice(0, 4)));
453+
ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value());
454+
455+
// second batch 4-5
456+
ASSERT_OK_AND_ASSIGN(read_batch, reader->NextBatch());
457+
ASSERT_OK_AND_ASSIGN(read_array,
458+
paimon::test::ReadResultCollector::GetArray(std::move(read_batch)));
459+
ASSERT_TRUE(read_array->Equals(array->Slice(4, 2)));
460+
ASSERT_EQ(4, reader->GetPreviousBatchFirstRowNumber().value());
461+
462+
// eof
463+
ASSERT_OK_AND_ASSIGN(read_batch, reader->NextBatch());
464+
ASSERT_TRUE(BatchReader::IsEofBatch(read_batch));
465+
ASSERT_EQ(6, reader->GetPreviousBatchFirstRowNumber().value());
466+
467+
// test with bitmap pushdown
468+
ArrowSchema c_read_schema;
469+
ASSERT_TRUE(arrow::ExportSchema(*schema, &c_read_schema).ok());
470+
ASSERT_OK(reader->SetReadSchema(&c_read_schema, /*predicate=*/nullptr,
471+
/*selection_bitmap=*/RoaringBitmap32::From({0, 3})));
472+
ASSERT_NOK_WITH_MSG(
473+
reader->GetPreviousBatchFirstRowNumber(),
474+
"Cannot call GetPreviousBatchFirstRowNumber in LanceFileBatchReader because, after bitmap "
475+
"pushdown, rows in the array returned by NextBatch are no longer contiguous.");
476+
}
422477
} // namespace paimon::lance::test

0 commit comments

Comments
 (0)