-
Notifications
You must be signed in to change notification settings - Fork 40
refactor: improve RowCompactedSerializer by using string_view to avoid data copies #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -144,11 +144,11 @@ Status BinarySerializerUtils::WriteBinaryData(const std::shared_ptr<arrow::DataT | |||||
| break; | ||||||
| } | ||||||
| case arrow::Type::type::STRING: { | ||||||
| writer->WriteString(pos, getter->GetString(pos)); | ||||||
| writer->WriteStringView(pos, getter->GetStringView(pos)); | ||||||
| break; | ||||||
| } | ||||||
| case arrow::Type::type::BINARY: { | ||||||
| writer->WriteBinary(pos, *(getter->GetBinary(pos))); | ||||||
| writer->WriteStringView(pos, getter->GetStringView(pos)); | ||||||
|
||||||
| writer->WriteStringView(pos, getter->GetStringView(pos)); | |
| writer->WriteBinary(pos, getter->GetBinary(pos)); |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |||||||||||||||||||
| #pragma once | ||||||||||||||||||||
|
|
||||||||||||||||||||
| #include "paimon/core/compact/compact_unit.h" | ||||||||||||||||||||
| #include "paimon/core/deletionvectors/bucketed_dv_maintainer.h" | ||||||||||||||||||||
|
||||||||||||||||||||
| #include "paimon/core/io/data_file_meta.h" | ||||||||||||||||||||
| #include "paimon/core/mergetree/level_sorted_run.h" | ||||||||||||||||||||
| namespace paimon { | ||||||||||||||||||||
|
|
@@ -33,9 +34,9 @@ class CompactStrategy { | |||||||||||||||||||
| const std::vector<LevelSortedRun>& runs) = 0; | ||||||||||||||||||||
| /// Pick a compaction unit consisting of all existing files. | ||||||||||||||||||||
| // TODO(xinyu.lxy): support RecordLevelExpire and BucketedDvMaintainer | ||||||||||||||||||||
| static std::optional<CompactUnit> PickFullCompaction(int32_t num_levels, | ||||||||||||||||||||
| const std::vector<LevelSortedRun>& runs, | ||||||||||||||||||||
| bool force_rewrite_all_files) { | ||||||||||||||||||||
| static std::optional<CompactUnit> PickFullCompaction( | ||||||||||||||||||||
| int32_t num_levels, const std::vector<LevelSortedRun>& runs, | ||||||||||||||||||||
| const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer, bool force_rewrite_all_files) { | ||||||||||||||||||||
|
||||||||||||||||||||
| const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer, bool force_rewrite_all_files) { | |
| bool force_rewrite_all_files) { | |
| return PickFullCompaction(num_levels, runs, nullptr, force_rewrite_all_files); | |
| } | |
| static std::optional<CompactUnit> PickFullCompaction( | |
| int32_t num_levels, const std::vector<LevelSortedRun>& runs, | |
| const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer, | |
| bool force_rewrite_all_files) { |
Copilot
AI
Mar 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says ‘check deletion vector for large files’, but the condition does not check file size—only whether a deletion vector exists. To avoid misleading future readers, update the comment to reflect the actual logic (e.g., ‘rewrite files with deletion vectors’) or add an explicit size-based condition if that was intended.
| // check deletion vector for large files | |
| // rewrite files that have deletion vectors |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -55,22 +55,25 @@ TEST_F(CompactStrategyTest, TestPickFullCompaction) { | |||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||
| // no sorted run, no need to compact | ||||||||||||||||||||||||||||||||||||||||||||||
| auto runs = CreateRunsWithLevelAndSize({}, {}); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/3, runs, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/false); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = | ||||||||||||||||||||||||||||||||||||||||||||||
| CompactStrategy::PickFullCompaction(/*num_levels=*/3, runs, /*dv_maintainer=*/nullptr, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/false); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_FALSE(unit); | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||
| // only max level files, not rewrite | ||||||||||||||||||||||||||||||||||||||||||||||
| auto runs = CreateRunsWithLevelAndSize(/*levels=*/{3}, /*sizes*/ {10}); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/false); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = | ||||||||||||||||||||||||||||||||||||||||||||||
| CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, /*dv_maintainer=*/nullptr, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/false); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_FALSE(unit); | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||
| // only max level files, force rewrite | ||||||||||||||||||||||||||||||||||||||||||||||
| auto runs = CreateRunsWithLevelAndSize(/*levels=*/{3}, /*sizes*/ {10}); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/true); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = | ||||||||||||||||||||||||||||||||||||||||||||||
| CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, /*dv_maintainer=*/nullptr, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/true); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_TRUE(unit); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_EQ(unit.value().output_level, 3); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_EQ(unit.value().files.size(), 1); | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -79,12 +82,30 @@ TEST_F(CompactStrategyTest, TestPickFullCompaction) { | |||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||
| // full compaction | ||||||||||||||||||||||||||||||||||||||||||||||
| auto runs = CreateRunsWithLevelAndSize(/*levels=*/{0, 3}, /*sizes*/ {1, 10}); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/false); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto unit = | ||||||||||||||||||||||||||||||||||||||||||||||
| CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, /*dv_maintainer=*/nullptr, | ||||||||||||||||||||||||||||||||||||||||||||||
| /*force_rewrite_all_files=*/false); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_TRUE(unit); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_EQ(unit.value().output_level, 3); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_EQ(unit.value().files.size(), 2); | ||||||||||||||||||||||||||||||||||||||||||||||
| ASSERT_FALSE(unit.value().file_rewrite); | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||
| // test with dv maintainer | ||||||||||||||||||||||||||||||||||||||||||||||
| std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors = { | ||||||||||||||||||||||||||||||||||||||||||||||
| {"fake.data", std::make_shared<BitmapDeletionVector>(RoaringBitmap32())}}; | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+95
to
+96
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| auto dv_maintainer = std::make_shared<BucketedDvMaintainer>( | ||||||||||||||||||||||||||||||||||||||||||||||
| std::make_shared<DeletionVectorsIndexFile>(nullptr, nullptr, /*bitmap64=*/false, | ||||||||||||||||||||||||||||||||||||||||||||||
| GetDefaultPool()), | ||||||||||||||||||||||||||||||||||||||||||||||
| deletion_vectors); | ||||||||||||||||||||||||||||||||||||||||||||||
| auto runs = CreateRunsWithLevelAndSize(/*levels=*/{3}, /*sizes*/ {10}); | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+95
to
+102
|
||||||||||||||||||||||||||||||||||||||||||||||
| std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors = { | |
| {"fake.data", std::make_shared<BitmapDeletionVector>(RoaringBitmap32())}}; | |
| auto dv_maintainer = std::make_shared<BucketedDvMaintainer>( | |
| std::make_shared<DeletionVectorsIndexFile>(nullptr, nullptr, /*bitmap64=*/false, | |
| GetDefaultPool()), | |
| deletion_vectors); | |
| auto runs = CreateRunsWithLevelAndSize(/*levels=*/{3}, /*sizes*/ {10}); | |
| auto runs = CreateRunsWithLevelAndSize(/*levels=*/{3}, /*sizes*/ {10}); | |
| // Derive the file name from the created runs to ensure the DV map key matches. | |
| ASSERT_FALSE(runs.empty()); | |
| ASSERT_TRUE(runs[0].run); | |
| ASSERT_FALSE(runs[0].run->files.empty()); | |
| const std::string& file_name = runs[0].run->files[0]->file_name; | |
| std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors = { | |
| {file_name, std::make_shared<BitmapDeletionVector>(RoaringBitmap32())}}; | |
| auto dv_maintainer = std::make_shared<BucketedDvMaintainer>( | |
| std::make_shared<DeletionVectorsIndexFile>(nullptr, nullptr, /*bitmap64=*/false, | |
| GetDefaultPool()), | |
| deletion_vectors); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,17 +16,39 @@ | |
|
|
||
| #include "paimon/core/mergetree/lookup/persist_processor.h" | ||
|
|
||
| #include "arrow/ipc/json_simple.h" | ||
|
||
| #include "gtest/gtest.h" | ||
| #include "paimon/common/data/columnar/columnar_row.h" | ||
| #include "paimon/core/mergetree/lookup/default_lookup_serializer_factory.h" | ||
| #include "paimon/core/mergetree/lookup/persist_empty_processor.h" | ||
| #include "paimon/core/mergetree/lookup/persist_position_processor.h" | ||
| #include "paimon/core/mergetree/lookup/persist_value_and_pos_processor.h" | ||
| #include "paimon/core/mergetree/lookup/persist_value_processor.h" | ||
| #include "paimon/testing/utils/binary_row_generator.h" | ||
| #include "paimon/testing/utils/testharness.h" | ||
| namespace paimon::test { | ||
| class PersistProcessorTest : public testing::Test { | ||
| public: | ||
| void SetUp() override { | ||
| auto key_type = arrow::struct_({arrow::field("f1", arrow::int32())}); | ||
| auto key_array = std::dynamic_pointer_cast<arrow::StructArray>( | ||
| arrow::ipc::internal::json::ArrayFromJSON(key_type, R"([[10]])").ValueOrDie()); | ||
|
Comment on lines
+33
to
+34
|
||
|
|
||
| auto value_type = arrow::struct_( | ||
| {arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), | ||
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}); | ||
| auto value_array = std::dynamic_pointer_cast<arrow::StructArray>( | ||
| arrow::ipc::internal::json::ArrayFromJSON(value_type, R"([["Alice", 10, null, 10.1]])") | ||
| .ValueOrDie()); | ||
|
Comment on lines
+39
to
+41
|
||
|
|
||
| auto key_row = std::make_shared<ColumnarRow>( | ||
| /*struct_array=*/key_array, key_array->fields(), pool_, /*row_id=*/0); | ||
| auto value_row = std::make_unique<ColumnarRow>( | ||
| /*struct_array=*/value_array, value_array->fields(), pool_, /*row_id=*/0); | ||
|
|
||
| kv_ = KeyValue(RowKind::Insert(), /*sequence_number=*/500, /*level=*/4, std::move(key_row), | ||
| std::move(value_row)); | ||
| } | ||
|
|
||
| void CheckResult(const KeyValue& kv) { | ||
| ASSERT_EQ(kv_.key, kv.key); | ||
|
|
||
|
|
@@ -44,11 +66,7 @@ class PersistProcessorTest : public testing::Test { | |
|
|
||
| private: | ||
| std::shared_ptr<MemoryPool> pool_ = GetDefaultPool(); | ||
| KeyValue kv_ = KeyValue(RowKind::Insert(), /*sequence_number=*/500, /*level=*/4, /*key=*/ | ||
| BinaryRowGenerator::GenerateRowPtr({10}, pool_.get()), | ||
| /*value=*/ | ||
| BinaryRowGenerator::GenerateRowPtr( | ||
| {std::string("Alice"), 10, NullType(), 10.1}, pool_.get())); | ||
| KeyValue kv_; | ||
| std::shared_ptr<arrow::Schema> file_schema_ = | ||
| arrow::schema({arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), | ||
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For
arrow::Type::BINARY, this switches fromgetter->GetBinary(pos)togetter->GetStringView(pos)and fromWriteBinarytoWriteStringView. IfGetStringViewis implemented with STRING semantics (e.g., expecting UTF-8, or only supported for STRING arrays), this can mis-serialize BINARY data or fail at runtime. Prefer a binary-specific view API (e.g.,GetBinaryView/WriteBinaryView) or keep usingGetBinary+WriteBinaryfor the BINARY case while still avoiding allocations.