diff --git a/include/paimon/global_index/row_range_global_index_scanner.h b/include/paimon/global_index/row_range_global_index_scanner.h index cd6e637a..f598138f 100644 --- a/include/paimon/global_index/row_range_global_index_scanner.h +++ b/include/paimon/global_index/row_range_global_index_scanner.h @@ -52,6 +52,18 @@ class PAIMON_EXPORT RowRangeGlobalIndexScanner { /// format). virtual Result> CreateReader( const std::string& field_name, const std::string& index_type) const = 0; + + /// Creates several `GlobalIndexReader`s for a specific field within this range. + /// + /// @param field_name Name of the indexed column. + /// @return A `Result` that is: + /// - Successful with several readers if the indexes exist and load correctly; + /// - Successful with an empty vector if no index was built for the given field; + /// - Error returns when loading fails (e.g., file corruption, I/O error, unsupported + /// format) or the predicate method was incorrectly invoked (e.g., VisitTopK was invoked + /// incorrectly). + virtual Result>> CreateReaders( + const std::string& field_name) const = 0; }; } // namespace paimon diff --git a/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp b/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp index e5d57eb1..93f6c27b 100644 --- a/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp +++ b/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp @@ -64,9 +64,20 @@ Result> RowRangeGlobalIndexScannerImpl::Creat return CreateReader(field, index_type, entries); } +Result>> +RowRangeGlobalIndexScannerImpl::CreateReaders(const std::string& field_name) const { + PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name)); + return CreateReaders(field); +} + Result>> RowRangeGlobalIndexScannerImpl::CreateReaders(int32_t field_id) const { PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_id)); + return CreateReaders(field); +} + +Result>> +RowRangeGlobalIndexScannerImpl::CreateReaders(const DataField& field) const { auto field_iter = grouped_entries_.find(field.Id()); if (field_iter == grouped_entries_.end()) { return std::vector>(); diff --git a/src/paimon/core/global_index/row_range_global_index_scanner_impl.h b/src/paimon/core/global_index/row_range_global_index_scanner_impl.h index b127e414..a9d1f522 100644 --- a/src/paimon/core/global_index/row_range_global_index_scanner_impl.h +++ b/src/paimon/core/global_index/row_range_global_index_scanner_impl.h @@ -47,8 +47,13 @@ class RowRangeGlobalIndexScannerImpl Result> CreateReader( const std::string& field_name, const std::string& index_type) const override; + Result>> CreateReaders( + const std::string& field_name) const override; + private: Result>> CreateReaders(int32_t field_id) const; + Result>> CreateReaders( + const DataField& field) const; Result> CreateReader( const DataField& field, const std::string& index_type, diff --git a/test/inte/global_index_test.cpp b/test/inte/global_index_test.cpp index dc3db76a..820b97da 100644 --- a/test/inte/global_index_test.cpp +++ b/test/inte/global_index_test.cpp @@ -1601,6 +1601,79 @@ TEST_P(GlobalIndexTest, TestDataEvolutionBatchScanWithPartitionWithTwoFields) { } } +TEST_P(GlobalIndexTest, TestScanIndexWithTwoIndexes) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::list(arrow::float32())), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + std::map lumina_options = { + {"lumina.dimension", "4"}, + {"lumina.indextype", "bruteforce"}, + {"lumina.distance.metric", "l2"}, + {"lumina.encoding.type", "encoding.rawf32"}, + {"lumina.search.threadcount", "10"}}; + auto schema = arrow::schema(fields); + std::map options = {{Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_SYSTEM, "local"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}}; + CreateTable(/*partition_keys=*/{}, schema, options); + + std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); + std::vector write_cols = schema->field_names(); + + auto src_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ +["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1], +["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1], +["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1], +["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1], +["Lucy", [10.0, 10.0, 10.0, 10.0], 20, 15.1], +["Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1], +["Tony", [11.0, 10.0, 11.0, 10.0], 20, 17.1], +["Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1], +["Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1] + ])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols, src_array)); + ASSERT_OK(Commit(table_path, commit_msgs)); + + // write and commit bitmap global index + ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f0", "bitmap", + /*options=*/{}, Range(0, 8))); + + // write and commit lumina global index + ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina", + /*options=*/lumina_options, Range(0, 8))); + + ASSERT_OK_AND_ASSIGN( + auto global_index_scan, + GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt, + /*partitions=*/std::nullopt, /*options=*/lumina_options, + /*file_system=*/nullptr, pool_)); + ASSERT_OK_AND_ASSIGN(std::vector ranges, global_index_scan->GetRowRangeList()); + ASSERT_EQ(ranges, std::vector({Range(0, 8)})); + ASSERT_OK_AND_ASSIGN(auto range_scanner, global_index_scan->CreateRangeScan(Range(0, 8))); + // query f0 + ASSERT_OK_AND_ASSIGN(auto index_readers, range_scanner->CreateReaders("f0")); + ASSERT_EQ(index_readers.size(), 1); + ASSERT_OK_AND_ASSIGN(auto index_result, + index_readers[0]->VisitEqual(Literal(FieldType::STRING, "Alice", 5))); + ASSERT_EQ(index_result->ToString(), "{0,7}"); + + // query f1 + ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f1")); + ASSERT_EQ(index_readers.size(), 1); + std::vector query = {11.0f, 11.0f, 11.0f, 11.0f}; + ASSERT_OK_AND_ASSIGN(auto topk_result, index_readers[0]->VisitTopK(1, query, /*filter=*/nullptr, + /*predicate*/ nullptr)); + ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0}"); + + // query f2 + ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f2")); + ASSERT_EQ(index_readers.size(), 0); +} + std::vector GetTestValuesForGlobalIndexTest() { std::vector values = {"parquet"}; #ifdef PAIMON_ENABLE_ORC