Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ struct PAIMON_EXPORT Options {
static const char BLOB_AS_DESCRIPTOR[];
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
static const char GLOBAL_INDEX_ENABLED[];
/// "global-index.external-path" - Global index root directory, if not set, the global index
/// files will be stored under the index directory.
static const char GLOBAL_INDEX_EXTERNAL_PATH[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
6 changes: 3 additions & 3 deletions include/paimon/global_index/global_index_io_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
namespace paimon {
/// Metadata describing a single file entry in a global index.
struct PAIMON_EXPORT GlobalIndexIOMeta {
GlobalIndexIOMeta(const std::string& _file_name, int64_t _file_size, int64_t _range_end,
GlobalIndexIOMeta(const std::string& _file_path, int64_t _file_size, int64_t _range_end,
const std::shared_ptr<Bytes>& _metadata)
: file_name(_file_name),
: file_path(_file_path),
file_size(_file_size),
range_end(_range_end),
metadata(_metadata) {}

std::string file_name;
std::string file_path;
int64_t file_size;
/// The inclusive range end covered by this file (i.e., the last local row id).
int64_t range_end;
Expand Down
3 changes: 3 additions & 0 deletions include/paimon/global_index/io/global_index_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class PAIMON_EXPORT GlobalIndexFileWriter {

/// Get the file size of input file name.
virtual Result<int64_t> GetFileSize(const std::string& file_name) const = 0;

/// Get the index file path of input file name.
virtual std::string ToPath(const std::string& file_name) const = 0;
};

} // namespace paimon
1 change: 1 addition & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,5 @@ const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled";
const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
} // namespace paimon
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Result<std::shared_ptr<GlobalIndexReader>> BitmapGlobalIndex::CreateReader(
}
const auto& meta = files[0];
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> in,
file_reader->GetInputStream(meta.file_name));
file_reader->GetInputStream(meta.file_path));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<FileIndexReader> reader,
index_->CreateReader(arrow_schema, /*start=*/0, meta.file_size, in, pool));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ class BitmapGlobalIndexTest : public ::testing::Test {
PAIMON_ASSIGN_OR_RAISE(auto result_metas, global_writer->Finish());
// check meta
EXPECT_EQ(result_metas.size(), 1);
EXPECT_TRUE(StringUtils::StartsWith(result_metas[0].file_name, "bitmap-global-index-"));
EXPECT_TRUE(StringUtils::EndsWith(result_metas[0].file_name, ".index"));
auto file_name = PathUtil::GetName(result_metas[0].file_path);
EXPECT_TRUE(StringUtils::StartsWith(file_name, "bitmap-global-index-"));
EXPECT_TRUE(StringUtils::EndsWith(file_name, ".index"));
EXPECT_EQ(result_metas[0].range_end, expected_range.to);
EXPECT_FALSE(result_metas[0].metadata);
return result_metas[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class FileIndexWriterWrapper : public GlobalIndexWriter {
}
PAIMON_RETURN_NOT_OK(out->Flush());
PAIMON_RETURN_NOT_OK(out->Close());
GlobalIndexIOMeta meta(file_name, /*file_size=*/bytes->size(), /*range_end=*/count_ - 1,
GlobalIndexIOMeta meta(file_manager_->ToPath(file_name), /*file_size=*/bytes->size(),
/*range_end=*/count_ - 1,
/*metadata=*/nullptr);
return std::vector<GlobalIndexIOMeta>({meta});
}
Expand Down
29 changes: 29 additions & 0 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ struct CoreOptions::Impl {
bool data_evolution_enabled = false;
bool legacy_partition_name_enabled = true;
bool global_index_enabled = true;
std::optional<std::string> global_index_external_path;
};

// Parse configurations from a map and return a populated CoreOptions object
Expand Down Expand Up @@ -470,6 +471,15 @@ Result<CoreOptions> CoreOptions::FromMap(
// Parse global-index.enabled
PAIMON_RETURN_NOT_OK(
parser.Parse<bool>(Options::GLOBAL_INDEX_ENABLED, &impl->global_index_enabled));

// Parse global_index.external-path
std::string global_index_external_path;
PAIMON_RETURN_NOT_OK(
parser.ParseString(Options::GLOBAL_INDEX_EXTERNAL_PATH, &global_index_external_path));
if (!global_index_external_path.empty()) {
impl->global_index_external_path = global_index_external_path;
}

return options;
}

Expand Down Expand Up @@ -746,4 +756,23 @@ bool CoreOptions::LegacyPartitionNameEnabled() const {
bool CoreOptions::GlobalIndexEnabled() const {
return impl_->global_index_enabled;
}

std::optional<std::string> CoreOptions::GetGlobalIndexExternalPath() const {
return impl_->global_index_external_path;
}

Result<std::optional<std::string>> CoreOptions::CreateGlobalIndexExternalPath() const {
std::optional<std::string> global_index_external_path = GetGlobalIndexExternalPath();
if (global_index_external_path == std::nullopt) {
return global_index_external_path;
}
std::string tmp_path = global_index_external_path.value();
StringUtils::Trim(&tmp_path);
PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(tmp_path));
if (path.scheme.empty()) {
return Status::Invalid(fmt::format("scheme is null, path is {}", tmp_path));
}
return std::optional<std::string>(path.ToString());
}

} // namespace paimon
7 changes: 6 additions & 1 deletion src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ class PAIMON_EXPORT CoreOptions {
std::optional<std::string> GetScanFallbackBranch() const;
std::string GetBranch() const;

std::optional<std::string> GetDataFileExternalPaths() const;
ExternalPathStrategy GetExternalPathStrategy() const;
Result<std::vector<std::string>> CreateExternalPaths() const;
bool EnableAdaptivePrefetchStrategy() const;
Expand All @@ -117,8 +116,14 @@ class PAIMON_EXPORT CoreOptions {
bool LegacyPartitionNameEnabled() const;

bool GlobalIndexEnabled() const;
Result<std::optional<std::string>> CreateGlobalIndexExternalPath() const;

const std::map<std::string, std::string>& ToMap() const;

private:
std::optional<std::string> GetDataFileExternalPaths() const;
std::optional<std::string> GetGlobalIndexExternalPath() const;

private:
struct Impl;

Expand Down
23 changes: 23 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_FALSE(core_options.DataEvolutionEnabled());
ASSERT_TRUE(core_options.LegacyPartitionNameEnabled());
ASSERT_TRUE(core_options.GlobalIndexEnabled());
ASSERT_FALSE(core_options.GetGlobalIndexExternalPath());
}

TEST(CoreOptionsTest, TestFromMap) {
Expand Down Expand Up @@ -144,6 +145,7 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::DATA_EVOLUTION_ENABLED, "true"},
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
{Options::GLOBAL_INDEX_ENABLED, "false"},
{Options::GLOBAL_INDEX_EXTERNAL_PATH, "FILE:///tmp/global_index/"},
};

ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
Expand Down Expand Up @@ -212,6 +214,8 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_TRUE(core_options.DataEvolutionEnabled());
ASSERT_FALSE(core_options.LegacyPartitionNameEnabled());
ASSERT_FALSE(core_options.GlobalIndexEnabled());
ASSERT_TRUE(core_options.GetGlobalIndexExternalPath());
ASSERT_EQ(core_options.GetGlobalIndexExternalPath().value(), "FILE:///tmp/global_index/");
}

TEST(CoreOptionsTest, TestInvalidCase) {
Expand Down Expand Up @@ -273,6 +277,25 @@ TEST(CoreOptionsTest, TestInvalidCreateExternalPath) {
}
}

TEST(CoreOptionsTest, TestCreateGlobalIndexExternalPath) {
std::map<std::string, std::string> options = {
{Options::GLOBAL_INDEX_EXTERNAL_PATH, " FILE:///tmp/index1"},
};
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
ASSERT_OK_AND_ASSIGN(std::optional<std::string> external_path,
core_options.CreateGlobalIndexExternalPath());
ASSERT_EQ("FILE:/tmp/index1", external_path.value());
}

TEST(CoreOptionsTest, TestInvalidCreateGlobalIndexExternalPath) {
std::map<std::string, std::string> options = {
{Options::GLOBAL_INDEX_EXTERNAL_PATH, "/tmp/index1"},
};
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
ASSERT_NOK_WITH_MSG(core_options.CreateGlobalIndexExternalPath(),
"scheme is null, path is /tmp/index1");
}

TEST(CoreOptionsTest, TestFileSystem) {
{
auto mock_fs = std::make_shared<MockFileSystem>();
Expand Down
20 changes: 16 additions & 4 deletions src/paimon/core/global_index/global_index_file_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class GlobalIndexFileManager : public GlobalIndexFileReader, public GlobalIndexF
: fs_(fs), path_factory_(path_factory) {}

Result<std::unique_ptr<InputStream>> GetInputStream(
const std::string& file_name) const override {
return fs_->Open(path_factory_->ToPath(file_name));
const std::string& file_path) const override {
return fs_->Open(file_path);
}

Result<std::string> NewFileName(const std::string& prefix) const override {
Expand All @@ -46,17 +46,29 @@ class GlobalIndexFileManager : public GlobalIndexFileReader, public GlobalIndexF
return prefix + "-" + "global-index-" + uuid + ".index";
}

std::string ToPath(const std::string& file_name) const override {
return path_factory_->ToPath(file_name);
}

std::string ToPath(const std::shared_ptr<IndexFileMeta>& file) const {
return path_factory_->ToPath(file);
}

Result<std::unique_ptr<OutputStream>> NewOutputStream(
const std::string& file_name) const override {
return fs_->Create(path_factory_->ToPath(file_name), /*overwrite=*/false);
return fs_->Create(ToPath(file_name), /*overwrite=*/false);
}

Result<int64_t> GetFileSize(const std::string& file_name) const override {
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status,
fs_->GetFileStatus(path_factory_->ToPath(file_name)));
fs_->GetFileStatus(ToPath(file_name)));
return file_status->GetLen();
}

bool IsExternalPath() const {
return path_factory_->IsExternalPath();
}

private:
std::shared_ptr<FileSystem> fs_;
std::shared_ptr<IndexPathFactory> path_factory_;
Expand Down
4 changes: 3 additions & 1 deletion src/paimon/core/global_index/global_index_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ Status GlobalIndexScanImpl::Scan() {
}
auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema_->Fields());
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> external_paths, options_.CreateExternalPaths());
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> global_index_external_path,
options_.CreateGlobalIndexExternalPath());
PAIMON_ASSIGN_OR_RAISE(
path_factory_,
FileStorePathFactory::Create(
root_path_, arrow_schema, table_schema_->PartitionKeys(),
options_.GetPartitionDefaultName(), options_.GetWriteFileFormat()->Identifier(),
options_.DataFilePrefix(), options_.LegacyPartitionNameEnabled(), external_paths,
options_.IndexFileInDataFileDir(), pool_));
global_index_external_path, options_.IndexFileInDataFileDir(), pool_));

PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<IndexManifestFile> index_manifest_file,
IndexManifestFile::Create(
Expand Down
18 changes: 14 additions & 4 deletions src/paimon/core/global_index/global_index_write_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ Result<std::shared_ptr<GlobalIndexFileManager>> CreateGlobalIndexFileManager(
auto all_arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> external_paths,
core_options.CreateExternalPaths());
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> global_index_external_path,
core_options.CreateGlobalIndexExternalPath());
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<FileStorePathFactory> path_factory,
FileStorePathFactory::Create(
table_path, all_arrow_schema, table_schema->PartitionKeys(),
core_options.GetPartitionDefaultName(), core_options.GetWriteFileFormat()->Identifier(),
core_options.DataFilePrefix(), core_options.LegacyPartitionNameEnabled(),
external_paths, core_options.IndexFileInDataFileDir(), pool));
external_paths, global_index_external_path, core_options.IndexFileInDataFileDir(),
pool));
std::shared_ptr<IndexPathFactory> index_path_factory =
path_factory->CreateGlobalIndexFileFactory();
return std::make_shared<GlobalIndexFileManager>(core_options.GetFileSystem(),
Expand Down Expand Up @@ -118,17 +121,24 @@ Result<std::vector<GlobalIndexIOMeta>> BuildIndex(const std::string& field_name,
Result<std::shared_ptr<CommitMessage>> ToCommitMessage(
const std::string& index_type, int32_t field_id, const Range& range,
const std::vector<GlobalIndexIOMeta>& global_index_io_metas, const BinaryRow& partition,
int32_t bucket) {
int32_t bucket, const std::shared_ptr<GlobalIndexFileManager>& file_manager) {
std::vector<std::shared_ptr<IndexFileMeta>> index_file_metas;
index_file_metas.reserve(global_index_io_metas.size());
bool is_external_path = file_manager->IsExternalPath();
for (const auto& io_meta : global_index_io_metas) {
if (range.Count() != io_meta.range_end + 1) {
return Status::Invalid(
fmt::format("specified range length {} mismatch indexed range length {}",
range.Count(), io_meta.range_end + 1));
}
std::optional<std::string> external_path;
if (is_external_path) {
PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(io_meta.file_path));
external_path = path.ToString();
}
index_file_metas.push_back(std::make_shared<IndexFileMeta>(
index_type, io_meta.file_name, io_meta.file_size, io_meta.range_end + 1,
index_type, PathUtil::GetName(io_meta.file_path), io_meta.file_size,
io_meta.range_end + 1, /*dv_ranges=*/std::nullopt, external_path,
GlobalIndexMeta(range.from, io_meta.range_end + range.from, field_id,
/*extra_field_ids=*/std::nullopt, io_meta.metadata)));
}
Expand Down Expand Up @@ -192,7 +202,7 @@ Result<std::shared_ptr<CommitMessage>> GlobalIndexWriteTask::WriteIndex(

// generate commit message
return ToCommitMessage(index_type, field.Id(), range, global_index_io_metas,
data_split->Partition(), data_split->Bucket());
data_split->Partition(), data_split->Bucket(), index_file_manager);
}

} // namespace paimon
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Result<std::shared_ptr<GlobalIndexReader>> RowRangeGlobalIndexScannerImpl::Creat
}

std::vector<GlobalIndexIOMeta> RowRangeGlobalIndexScannerImpl::ToGlobalIndexIOMetas(
const std::vector<IndexManifestEntry>& entries) {
const std::vector<IndexManifestEntry>& entries) const {
std::vector<GlobalIndexIOMeta> index_io_metas;
index_io_metas.reserve(entries.size());
for (const auto& entry : entries) {
Expand All @@ -127,11 +127,11 @@ std::vector<GlobalIndexIOMeta> RowRangeGlobalIndexScannerImpl::ToGlobalIndexIOMe
}

GlobalIndexIOMeta RowRangeGlobalIndexScannerImpl::ToGlobalIndexIOMeta(
const IndexManifestEntry& entry) {
const IndexManifestEntry& entry) const {
const auto& index_file = entry.index_file;
assert(index_file->GetGlobalIndexMeta());
const auto& global_index_meta = index_file->GetGlobalIndexMeta().value();
return {index_file->FileName(), index_file->FileSize(),
return {index_file_manager_->ToPath(index_file), index_file->FileSize(),
/*range_end=*/global_index_meta.row_range_end - global_index_meta.row_range_start,
global_index_meta.index_meta};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class RowRangeGlobalIndexScannerImpl
const DataField& field, const std::string& index_type,
const std::vector<IndexManifestEntry>& entries) const;

static std::vector<GlobalIndexIOMeta> ToGlobalIndexIOMetas(
const std::vector<IndexManifestEntry>& entries);
std::vector<GlobalIndexIOMeta> ToGlobalIndexIOMetas(
const std::vector<IndexManifestEntry>& entries) const;

static GlobalIndexIOMeta ToGlobalIndexIOMeta(const IndexManifestEntry& entry);
GlobalIndexIOMeta ToGlobalIndexIOMeta(const IndexManifestEntry& entry) const;

private:
std::shared_ptr<MemoryPool> pool_;
Expand Down
4 changes: 4 additions & 0 deletions src/paimon/core/index/index_file_handler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ class IndexFileHandlerTest : public testing::Test {
auto schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> external_paths,
core_options.CreateExternalPaths());
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> global_index_external_path,
core_options.CreateGlobalIndexExternalPath());

PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<FileStorePathFactory> path_factory,
FileStorePathFactory::Create(
table_path, schema, table_schema->PartitionKeys(),
core_options.GetPartitionDefaultName(),
/*identifier=*/"orc", core_options.DataFilePrefix(),
core_options.LegacyPartitionNameEnabled(), external_paths,
global_index_external_path,
/*index_file_in_data_file_dir=*/core_options.IndexFileInDataFileDir(),
memory_pool_));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<IndexManifestFile> index_manifest_file,
Expand Down
5 changes: 0 additions & 5 deletions src/paimon/core/index/index_file_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ class IndexFileMeta {
: IndexFileMeta(index_type, file_name, file_size, row_count, dv_ranges, external_path,
/*global_index_meta=*/std::nullopt) {}

IndexFileMeta(const std::string& index_type, const std::string& file_name, int64_t file_size,
int64_t row_count, const std::optional<GlobalIndexMeta>& global_index_meta)
: IndexFileMeta(index_type, file_name, file_size, row_count, /*dv_ranges=*/std::nullopt,
/*external_path=*/std::nullopt, global_index_meta) {}

IndexFileMeta(const std::string& index_type, const std::string& file_name, int64_t file_size,
int64_t row_count,
const std::optional<LinkedHashMap<std::string, DeletionVectorMeta>>& dv_ranges,
Expand Down
Loading
Loading