Skip to content

Commit 8ad3a28

Browse files
authored
feat(compaction): add MergeTreeCompactRewriter for compacting files in MOR (#161)
1 parent bd2e7dc commit 8ad3a28

34 files changed

+1008
-143
lines changed

src/paimon/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ set(PAIMON_CORE_SRCS
221221
core/mergetree/compact/aggregate/field_sum_agg.cpp
222222
core/mergetree/compact/interval_partition.cpp
223223
core/mergetree/compact/loser_tree.cpp
224+
core/mergetree/compact/merge_tree_compact_rewriter.cpp
224225
core/mergetree/compact/partial_update_merge_function.cpp
225226
core/mergetree/compact/sort_merge_reader_with_loser_tree.cpp
226227
core/mergetree/compact/sort_merge_reader_with_min_heap.cpp
@@ -540,6 +541,7 @@ if(PAIMON_BUILD_TESTS)
540541
core/mergetree/compact/universal_compaction_test.cpp
541542
core/mergetree/compact/force_up_level0_compaction_test.cpp
542543
core/mergetree/compact/compact_strategy_test.cpp
544+
core/mergetree/compact/merge_tree_compact_rewriter_test.cpp
543545
core/mergetree/drop_delete_reader_test.cpp
544546
core/mergetree/merge_tree_writer_test.cpp
545547
core/mergetree/sorted_run_test.cpp

src/paimon/common/data/columnar/columnar_array.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ std::shared_ptr<InternalMap> ColumnarArray::GetMap(int32_t pos) const {
8585
std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const {
8686
auto struct_array = arrow::internal::checked_cast<const arrow::StructArray*>(array_);
8787
assert(struct_array);
88-
auto row_ctx = std::make_shared<ColumnarBatchContext>(nullptr, struct_array->fields(), pool_);
88+
auto row_ctx = std::make_shared<ColumnarBatchContext>(struct_array->fields(), pool_);
8989
return std::make_shared<ColumnarRowRef>(std::move(row_ctx), offset_ + pos);
9090
}
9191

src/paimon/common/data/columnar/columnar_batch_context.h

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,11 @@ namespace paimon {
2929
class MemoryPool;
3030

3131
struct ColumnarBatchContext {
32-
ColumnarBatchContext(const std::shared_ptr<arrow::StructArray>& struct_array_in,
33-
const arrow::ArrayVector& array_vec_in,
32+
ColumnarBatchContext(const arrow::ArrayVector& array_vec_in,
3433
const std::shared_ptr<MemoryPool>& pool_in)
35-
: struct_array(struct_array_in), pool(pool_in) {
36-
array_ptrs.reserve(array_vec_in.size());
37-
for (const auto& array : array_vec_in) {
38-
array_ptrs.push_back(array.get());
39-
}
40-
}
34+
: pool(pool_in), array_vec(array_vec_in) {}
4135

42-
/// @note `struct_array` is the data holder for columnar row, ensure that the data life
43-
/// cycle is consistent with the columnar row, `array_ptrs` maybe a subset of
44-
/// `struct_array`, so `struct_array` cannot be used for `GetXXX()`
45-
std::shared_ptr<arrow::StructArray> struct_array;
4636
std::shared_ptr<MemoryPool> pool;
47-
std::vector<const arrow::Array*> array_ptrs;
37+
arrow::ArrayVector array_vec;
4838
};
4939
} // namespace paimon

src/paimon/common/data/columnar/columnar_row_ref.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
namespace paimon {
3232
Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const {
3333
using ArrayType = typename arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
34-
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
34+
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_vec[pos].get());
3535
assert(array);
3636
arrow::Decimal128 decimal(array->GetValue(row_id_));
3737
return Decimal(precision, scale,
@@ -40,7 +40,7 @@ Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale
4040

4141
Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {
4242
using ArrayType = typename arrow::TypeTraits<arrow::TimestampType>::ArrayType;
43-
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
43+
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_vec[pos].get());
4444
assert(array);
4545
int64_t data = array->Value(row_id_);
4646
auto timestamp_type =
@@ -55,23 +55,24 @@ Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {
5555

5656
std::shared_ptr<InternalRow> ColumnarRowRef::GetRow(int32_t pos, int32_t num_fields) const {
5757
auto struct_array =
58-
arrow::internal::checked_cast<const arrow::StructArray*>(ctx_->array_ptrs[pos]);
58+
arrow::internal::checked_cast<const arrow::StructArray*>(ctx_->array_vec[pos].get());
5959
assert(struct_array);
60-
auto nested_ctx =
61-
std::make_shared<ColumnarBatchContext>(nullptr, struct_array->fields(), ctx_->pool);
60+
auto nested_ctx = std::make_shared<ColumnarBatchContext>(struct_array->fields(), ctx_->pool);
6261
return std::make_shared<ColumnarRowRef>(std::move(nested_ctx), row_id_);
6362
}
6463

6564
std::shared_ptr<InternalArray> ColumnarRowRef::GetArray(int32_t pos) const {
66-
auto list_array = arrow::internal::checked_cast<const arrow::ListArray*>(ctx_->array_ptrs[pos]);
65+
auto list_array =
66+
arrow::internal::checked_cast<const arrow::ListArray*>(ctx_->array_vec[pos].get());
6767
assert(list_array);
6868
int32_t offset = list_array->value_offset(row_id_);
6969
int32_t length = list_array->value_length(row_id_);
7070
return std::make_shared<ColumnarArray>(list_array->values(), ctx_->pool, offset, length);
7171
}
7272

7373
std::shared_ptr<InternalMap> ColumnarRowRef::GetMap(int32_t pos) const {
74-
auto map_array = arrow::internal::checked_cast<const arrow::MapArray*>(ctx_->array_ptrs[pos]);
74+
auto map_array =
75+
arrow::internal::checked_cast<const arrow::MapArray*>(ctx_->array_vec[pos].get());
7576
assert(map_array);
7677
int32_t offset = map_array->value_offset(row_id_);
7778
int32_t length = map_array->value_length(row_id_);

src/paimon/common/data/columnar/columnar_row_ref.h

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,69 +51,69 @@ class ColumnarRowRef : public InternalRow {
5151
}
5252

5353
int32_t GetFieldCount() const override {
54-
return static_cast<int32_t>(ctx_->array_ptrs.size());
54+
return static_cast<int32_t>(ctx_->array_vec.size());
5555
}
5656

5757
bool IsNullAt(int32_t pos) const override {
58-
return ctx_->array_ptrs[pos]->IsNull(row_id_);
58+
return ctx_->array_vec[pos]->IsNull(row_id_);
5959
}
6060

6161
bool GetBoolean(int32_t pos) const override {
62-
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_ptrs[pos],
62+
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_vec[pos].get(),
6363
row_id_);
6464
}
6565

6666
char GetByte(int32_t pos) const override {
67-
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_ptrs[pos],
67+
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_vec[pos].get(),
6868
row_id_);
6969
}
7070

7171
int16_t GetShort(int32_t pos) const override {
72-
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_ptrs[pos],
72+
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_vec[pos].get(),
7373
row_id_);
7474
}
7575

7676
int32_t GetInt(int32_t pos) const override {
77-
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_ptrs[pos],
77+
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_vec[pos].get(),
7878
row_id_);
7979
}
8080

8181
int32_t GetDate(int32_t pos) const override {
82-
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(ctx_->array_ptrs[pos],
83-
row_id_);
82+
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(
83+
ctx_->array_vec[pos].get(), row_id_);
8484
}
8585

8686
int64_t GetLong(int32_t pos) const override {
87-
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_ptrs[pos],
87+
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_vec[pos].get(),
8888
row_id_);
8989
}
9090

9191
float GetFloat(int32_t pos) const override {
92-
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_ptrs[pos],
92+
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_vec[pos].get(),
9393
row_id_);
9494
}
9595

9696
double GetDouble(int32_t pos) const override {
97-
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_ptrs[pos],
97+
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_vec[pos].get(),
9898
row_id_);
9999
}
100100

101101
BinaryString GetString(int32_t pos) const override {
102-
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_ptrs[pos], row_id_,
102+
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_vec[pos].get(), row_id_,
103103
ctx_->pool.get());
104104
return BinaryString::FromBytes(bytes);
105105
}
106106

107107
std::string_view GetStringView(int32_t pos) const override {
108-
return ColumnarUtils::GetView(ctx_->array_ptrs[pos], row_id_);
108+
return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_);
109109
}
110110

111111
Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override;
112112

113113
Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
114114

115115
std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
116-
return ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_ptrs[pos], row_id_,
116+
return ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_vec[pos].get(), row_id_,
117117
ctx_->pool.get());
118118
}
119119

src/paimon/common/data/columnar/columnar_row_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ TEST(ColumnarRowRefTest, TestSimple) {
8282
.ValueOrDie();
8383
auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie();
8484

85-
auto ctx = std::make_shared<ColumnarBatchContext>(data, data->fields(), pool);
85+
auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
8686
ColumnarRowRef row(ctx, 1);
8787
ASSERT_EQ(row.GetFieldCount(), 2);
8888
ASSERT_EQ(row.GetInt(0), 2);

src/paimon/common/table/special_fields.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
namespace paimon {
2828

2929
struct SpecialFields {
30+
SpecialFields() = delete;
31+
~SpecialFields() = delete;
32+
3033
static constexpr char KEY_FIELD_PREFIX[] = "_KEY_";
3134
static constexpr int32_t KEY_VALUE_SPECIAL_FIELD_COUNT = 2;
3235

@@ -62,6 +65,15 @@ struct SpecialFields {
6265
return false;
6366
}
6467
// TODO(xinyu.lxy): add a func to complete row-tracking fields
68+
69+
static std::shared_ptr<arrow::Schema> CompleteSequenceAndValueKindField(
70+
const std::shared_ptr<arrow::Schema>& schema) {
71+
arrow::FieldVector target_fields;
72+
target_fields.push_back(DataField::ConvertDataFieldToArrowField(SequenceNumber()));
73+
target_fields.push_back(DataField::ConvertDataFieldToArrowField(ValueKind()));
74+
target_fields.insert(target_fields.end(), schema->fields().begin(), schema->fields().end());
75+
return arrow::schema(target_fields);
76+
}
6577
};
6678

6779
} // namespace paimon

src/paimon/common/utils/arrow/arrow_utils.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ Result<std::shared_ptr<arrow::Schema>> ArrowUtils::DataTypeToSchema(
2727
}
2828

2929
Result<std::vector<int32_t>> ArrowUtils::CreateProjection(
30-
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) {
30+
const std::shared_ptr<arrow::Schema>& src_schema, const arrow::FieldVector& target_fields) {
3131
std::vector<int32_t> target_to_src_mapping;
32-
target_to_src_mapping.reserve(read_fields.size());
33-
for (const auto& field : read_fields) {
34-
auto src_field_idx = file_schema->GetFieldIndex(field->name());
32+
target_to_src_mapping.reserve(target_fields.size());
33+
for (const auto& field : target_fields) {
34+
auto src_field_idx = src_schema->GetFieldIndex(field->name());
3535
if (src_field_idx < 0) {
3636
return Status::Invalid(
37-
fmt::format("Field '{}' not found or duplicate in file schema", field->name()));
37+
fmt::format("Field '{}' not found or duplicate in src schema", field->name()));
3838
}
3939
target_to_src_mapping.push_back(src_field_idx);
4040
}

src/paimon/common/utils/arrow/arrow_utils.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class PAIMON_EXPORT ArrowUtils {
3333
const std::shared_ptr<arrow::DataType>& data_type);
3434

3535
static Result<std::vector<int32_t>> CreateProjection(
36-
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields);
36+
const std::shared_ptr<arrow::Schema>& src_schema, const arrow::FieldVector& target_fields);
3737

3838
static Status CheckNullabilityMatch(const std::shared_ptr<arrow::Schema>& schema,
3939
const std::shared_ptr<arrow::Array>& data);

src/paimon/common/utils/arrow/arrow_utils_test.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,17 @@ TEST(ArrowUtilsTest, TestCreateProjection) {
7070
ASSERT_EQ(projection, expected_projection);
7171
}
7272
{
73-
// read field not found in file schema
73+
// read field not found in src schema
7474
arrow::FieldVector read_fields = {
7575
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
7676
arrow::field("s1", arrow::utf8()), arrow::field("v2", arrow::float64()),
7777
arrow::field("v1", arrow::boolean())};
7878
auto read_schema = arrow::schema(read_fields);
7979
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema, read_schema->fields()),
80-
"Field 'v2' not found or duplicate in file schema");
80+
"Field 'v2' not found or duplicate in src schema");
8181
}
8282
{
83-
// duplicate field in file schema
83+
// duplicate field in src schema
8484
arrow::FieldVector file_fields_dup = {
8585
arrow::field("k0", arrow::int32()), arrow::field("k1", arrow::int32()),
8686
arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()),
@@ -93,7 +93,7 @@ TEST(ArrowUtilsTest, TestCreateProjection) {
9393
arrow::field("v1", arrow::boolean())};
9494
auto read_schema = arrow::schema(read_fields);
9595
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema_dup, read_schema->fields()),
96-
"Field 'v1' not found or duplicate in file schema");
96+
"Field 'v1' not found or duplicate in src schema");
9797
}
9898
{
9999
arrow::FieldVector read_fields = {

0 commit comments

Comments
 (0)