Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions include/paimon/global_index/row_range_global_index_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ class PAIMON_EXPORT RowRangeGlobalIndexScanner {
/// format).
virtual Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
const std::string& field_name, const std::string& index_type) const = 0;

/// Creates several `GlobalIndexReader`s for a specific field within this range.
///
/// @param field_name Name of the indexed column.
/// @return A `Result` that is:
/// - Successful with several readers if the indexes exist and load correctly;
/// - Successful with an empty vector if no index was built for the given field;
/// - Error returns when loading fails (e.g., file corruption, I/O error, unsupported
/// format) or the predicate method was incorrectly invoked (e.g., VisitTopK was invoked
/// incorrectly).
virtual Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
const std::string& field_name) const = 0;
};

} // namespace paimon
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,20 @@ Result<std::shared_ptr<GlobalIndexReader>> RowRangeGlobalIndexScannerImpl::Creat
return CreateReader(field, index_type, entries);
}

Result<std::vector<std::shared_ptr<GlobalIndexReader>>>
RowRangeGlobalIndexScannerImpl::CreateReaders(const std::string& field_name) const {
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name));
return CreateReaders(field);
}

Result<std::vector<std::shared_ptr<GlobalIndexReader>>>
RowRangeGlobalIndexScannerImpl::CreateReaders(int32_t field_id) const {
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_id));
return CreateReaders(field);
}

Result<std::vector<std::shared_ptr<GlobalIndexReader>>>
RowRangeGlobalIndexScannerImpl::CreateReaders(const DataField& field) const {
auto field_iter = grouped_entries_.find(field.Id());
if (field_iter == grouped_entries_.end()) {
return std::vector<std::shared_ptr<GlobalIndexReader>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ class RowRangeGlobalIndexScannerImpl
Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
const std::string& field_name, const std::string& index_type) const override;

Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
const std::string& field_name) const override;

private:
Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(int32_t field_id) const;
Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
const DataField& field) const;

Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
const DataField& field, const std::string& index_type,
Expand Down
73 changes: 73 additions & 0 deletions test/inte/global_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,79 @@ TEST_P(GlobalIndexTest, TestDataEvolutionBatchScanWithPartitionWithTwoFields) {
}
}

TEST_P(GlobalIndexTest, TestScanIndexWithTwoIndexes) {
arrow::FieldVector fields = {
arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::list(arrow::float32())),
arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())};
std::map<std::string, std::string> lumina_options = {
{"lumina.dimension", "4"},
{"lumina.indextype", "bruteforce"},
{"lumina.distance.metric", "l2"},
{"lumina.encoding.type", "encoding.rawf32"},
{"lumina.search.threadcount", "10"}};
auto schema = arrow::schema(fields);
std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, "orc"},
{Options::FILE_FORMAT, GetParam()},
{Options::FILE_SYSTEM, "local"},
{Options::ROW_TRACKING_ENABLED, "true"},
{Options::DATA_EVOLUTION_ENABLED, "true"}};
CreateTable(/*partition_keys=*/{}, schema, options);

std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar");
std::vector<std::string> write_cols = schema->field_names();

auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1],
["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1],
["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1],
["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1],
["Lucy", [10.0, 10.0, 10.0, 10.0], 20, 15.1],
["Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1],
["Tony", [11.0, 10.0, 11.0, 10.0], 20, 17.1],
["Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1],
["Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1]
])")
.ValueOrDie());
ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols, src_array));
ASSERT_OK(Commit(table_path, commit_msgs));

// write and commit bitmap global index
ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f0", "bitmap",
/*options=*/{}, Range(0, 8)));

// write and commit lumina global index
ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina",
/*options=*/lumina_options, Range(0, 8)));

ASSERT_OK_AND_ASSIGN(
auto global_index_scan,
GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt,
/*partitions=*/std::nullopt, /*options=*/lumina_options,
/*file_system=*/nullptr, pool_));
ASSERT_OK_AND_ASSIGN(std::vector<Range> ranges, global_index_scan->GetRowRangeList());
ASSERT_EQ(ranges, std::vector<Range>({Range(0, 8)}));
ASSERT_OK_AND_ASSIGN(auto range_scanner, global_index_scan->CreateRangeScan(Range(0, 8)));
// query f0
ASSERT_OK_AND_ASSIGN(auto index_readers, range_scanner->CreateReaders("f0"));
ASSERT_EQ(index_readers.size(), 1);
ASSERT_OK_AND_ASSIGN(auto index_result,
index_readers[0]->VisitEqual(Literal(FieldType::STRING, "Alice", 5)));
ASSERT_EQ(index_result->ToString(), "{0,7}");

// query f1
ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f1"));
ASSERT_EQ(index_readers.size(), 1);
std::vector<float> query = {11.0f, 11.0f, 11.0f, 11.0f};
ASSERT_OK_AND_ASSIGN(auto topk_result, index_readers[0]->VisitTopK(1, query, /*filter=*/nullptr,
/*predicate*/ nullptr));
ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0}");

// query f2
ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f2"));
ASSERT_EQ(index_readers.size(), 0);
}

std::vector<std::string> GetTestValuesForGlobalIndexTest() {
std::vector<std::string> values = {"parquet"};
#ifdef PAIMON_ENABLE_ORC
Expand Down
Loading