Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ set(PAIMON_CORE_SRCS
core/mergetree/compact/aggregate/field_sum_agg.cpp
core/mergetree/compact/interval_partition.cpp
core/mergetree/compact/loser_tree.cpp
core/mergetree/compact/merge_tree_compact_rewriter.cpp
core/mergetree/compact/partial_update_merge_function.cpp
core/mergetree/compact/sort_merge_reader_with_loser_tree.cpp
core/mergetree/compact/sort_merge_reader_with_min_heap.cpp
Expand Down Expand Up @@ -540,6 +541,7 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/compact/universal_compaction_test.cpp
core/mergetree/compact/force_up_level0_compaction_test.cpp
core/mergetree/compact/compact_strategy_test.cpp
core/mergetree/compact/merge_tree_compact_rewriter_test.cpp
core/mergetree/drop_delete_reader_test.cpp
core/mergetree/merge_tree_writer_test.cpp
core/mergetree/sorted_run_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/data/columnar/columnar_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ std::shared_ptr<InternalMap> ColumnarArray::GetMap(int32_t pos) const {
std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const {
auto struct_array = arrow::internal::checked_cast<const arrow::StructArray*>(array_);
assert(struct_array);
auto row_ctx = std::make_shared<ColumnarBatchContext>(nullptr, struct_array->fields(), pool_);
auto row_ctx = std::make_shared<ColumnarBatchContext>(struct_array->fields(), pool_);
return std::make_shared<ColumnarRowRef>(std::move(row_ctx), offset_ + pos);
}

Expand Down
16 changes: 3 additions & 13 deletions src/paimon/common/data/columnar/columnar_batch_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,11 @@ namespace paimon {
class MemoryPool;

struct ColumnarBatchContext {
ColumnarBatchContext(const std::shared_ptr<arrow::StructArray>& struct_array_in,
const arrow::ArrayVector& array_vec_in,
ColumnarBatchContext(const arrow::ArrayVector& array_vec_in,
const std::shared_ptr<MemoryPool>& pool_in)
: struct_array(struct_array_in), pool(pool_in) {
array_ptrs.reserve(array_vec_in.size());
for (const auto& array : array_vec_in) {
array_ptrs.push_back(array.get());
}
}
: pool(pool_in), array_vec(array_vec_in) {}

/// @note `struct_array` is the data holder for columnar row, ensure that the data life
/// cycle is consistent with the columnar row, `array_ptrs` maybe a subset of
/// `struct_array`, so `struct_array` cannot be used for `GetXXX()`
std::shared_ptr<arrow::StructArray> struct_array;
std::shared_ptr<MemoryPool> pool;
std::vector<const arrow::Array*> array_ptrs;
arrow::ArrayVector array_vec;
};
} // namespace paimon
15 changes: 8 additions & 7 deletions src/paimon/common/data/columnar/columnar_row_ref.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
namespace paimon {
Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const {
using ArrayType = typename arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_vec[pos].get());
assert(array);
arrow::Decimal128 decimal(array->GetValue(row_id_));
return Decimal(precision, scale,
Expand All @@ -40,7 +40,7 @@ Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale

Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {
using ArrayType = typename arrow::TypeTraits<arrow::TimestampType>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_vec[pos].get());
assert(array);
int64_t data = array->Value(row_id_);
auto timestamp_type =
Expand All @@ -55,23 +55,24 @@ Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {

std::shared_ptr<InternalRow> ColumnarRowRef::GetRow(int32_t pos, int32_t num_fields) const {
auto struct_array =
arrow::internal::checked_cast<const arrow::StructArray*>(ctx_->array_ptrs[pos]);
arrow::internal::checked_cast<const arrow::StructArray*>(ctx_->array_vec[pos].get());
assert(struct_array);
auto nested_ctx =
std::make_shared<ColumnarBatchContext>(nullptr, struct_array->fields(), ctx_->pool);
auto nested_ctx = std::make_shared<ColumnarBatchContext>(struct_array->fields(), ctx_->pool);
return std::make_shared<ColumnarRowRef>(std::move(nested_ctx), row_id_);
}

std::shared_ptr<InternalArray> ColumnarRowRef::GetArray(int32_t pos) const {
auto list_array = arrow::internal::checked_cast<const arrow::ListArray*>(ctx_->array_ptrs[pos]);
auto list_array =
arrow::internal::checked_cast<const arrow::ListArray*>(ctx_->array_vec[pos].get());
assert(list_array);
int32_t offset = list_array->value_offset(row_id_);
int32_t length = list_array->value_length(row_id_);
return std::make_shared<ColumnarArray>(list_array->values(), ctx_->pool, offset, length);
}

std::shared_ptr<InternalMap> ColumnarRowRef::GetMap(int32_t pos) const {
auto map_array = arrow::internal::checked_cast<const arrow::MapArray*>(ctx_->array_ptrs[pos]);
auto map_array =
arrow::internal::checked_cast<const arrow::MapArray*>(ctx_->array_vec[pos].get());
assert(map_array);
int32_t offset = map_array->value_offset(row_id_);
int32_t length = map_array->value_length(row_id_);
Expand Down
28 changes: 14 additions & 14 deletions src/paimon/common/data/columnar/columnar_row_ref.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,69 +51,69 @@ class ColumnarRowRef : public InternalRow {
}

int32_t GetFieldCount() const override {
return static_cast<int32_t>(ctx_->array_ptrs.size());
return static_cast<int32_t>(ctx_->array_vec.size());
}

bool IsNullAt(int32_t pos) const override {
return ctx_->array_ptrs[pos]->IsNull(row_id_);
return ctx_->array_vec[pos]->IsNull(row_id_);
}

bool GetBoolean(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_vec[pos].get(),
row_id_);
}

char GetByte(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_vec[pos].get(),
row_id_);
}

int16_t GetShort(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_vec[pos].get(),
row_id_);
}

int32_t GetInt(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_vec[pos].get(),
row_id_);
}

int32_t GetDate(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(ctx_->array_ptrs[pos],
row_id_);
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(
ctx_->array_vec[pos].get(), row_id_);
}

int64_t GetLong(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_vec[pos].get(),
row_id_);
}

float GetFloat(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_vec[pos].get(),
row_id_);
}

double GetDouble(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_vec[pos].get(),
row_id_);
}

BinaryString GetString(int32_t pos) const override {
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_ptrs[pos], row_id_,
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_vec[pos].get(), row_id_,
ctx_->pool.get());
return BinaryString::FromBytes(bytes);
}

std::string_view GetStringView(int32_t pos) const override {
return ColumnarUtils::GetView(ctx_->array_ptrs[pos], row_id_);
return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_);
}

Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override;

Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;

std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
return ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_ptrs[pos], row_id_,
return ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_vec[pos].get(), row_id_,
ctx_->pool.get());
}

Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/data/columnar/columnar_row_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ TEST(ColumnarRowRefTest, TestSimple) {
.ValueOrDie();
auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie();

auto ctx = std::make_shared<ColumnarBatchContext>(data, data->fields(), pool);
auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
ColumnarRowRef row(ctx, 1);
ASSERT_EQ(row.GetFieldCount(), 2);
ASSERT_EQ(row.GetInt(0), 2);
Expand Down
12 changes: 12 additions & 0 deletions src/paimon/common/table/special_fields.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
namespace paimon {

struct SpecialFields {
SpecialFields() = delete;
~SpecialFields() = delete;

static constexpr char KEY_FIELD_PREFIX[] = "_KEY_";
static constexpr int32_t KEY_VALUE_SPECIAL_FIELD_COUNT = 2;

Expand Down Expand Up @@ -62,6 +65,15 @@ struct SpecialFields {
return false;
}
// TODO(xinyu.lxy): add a func to complete row-tracking fields

static std::shared_ptr<arrow::Schema> CompleteSequenceAndValueKindField(
const std::shared_ptr<arrow::Schema>& schema) {
arrow::FieldVector target_fields;
target_fields.push_back(DataField::ConvertDataFieldToArrowField(SequenceNumber()));
target_fields.push_back(DataField::ConvertDataFieldToArrowField(ValueKind()));
target_fields.insert(target_fields.end(), schema->fields().begin(), schema->fields().end());
return arrow::schema(target_fields);
}
};

} // namespace paimon
10 changes: 5 additions & 5 deletions src/paimon/common/utils/arrow/arrow_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ Result<std::shared_ptr<arrow::Schema>> ArrowUtils::DataTypeToSchema(
}

Result<std::vector<int32_t>> ArrowUtils::CreateProjection(
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) {
const std::shared_ptr<arrow::Schema>& src_schema, const arrow::FieldVector& target_fields) {
std::vector<int32_t> target_to_src_mapping;
target_to_src_mapping.reserve(read_fields.size());
for (const auto& field : read_fields) {
auto src_field_idx = file_schema->GetFieldIndex(field->name());
target_to_src_mapping.reserve(target_fields.size());
for (const auto& field : target_fields) {
auto src_field_idx = src_schema->GetFieldIndex(field->name());
if (src_field_idx < 0) {
return Status::Invalid(
fmt::format("Field '{}' not found or duplicate in file schema", field->name()));
fmt::format("Field '{}' not found or duplicate in src schema", field->name()));
}
target_to_src_mapping.push_back(src_field_idx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/utils/arrow/arrow_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class PAIMON_EXPORT ArrowUtils {
const std::shared_ptr<arrow::DataType>& data_type);

static Result<std::vector<int32_t>> CreateProjection(
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields);
const std::shared_ptr<arrow::Schema>& src_schema, const arrow::FieldVector& target_fields);

static Status CheckNullabilityMatch(const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::Array>& data);
Expand Down
79 changes: 79 additions & 0 deletions src/paimon/core/compact/compact_result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <vector>

#include "paimon/core/io/data_file_meta.h"

namespace paimon {
class CompactDeletionFile;

/// Result of compaction.
class CompactResult {
public:
CompactResult() = default;
CompactResult(const std::vector<std::shared_ptr<DataFileMeta>>& before,
const std::vector<std::shared_ptr<DataFileMeta>>& after)
: CompactResult(before, after, {}) {}

CompactResult(const std::vector<std::shared_ptr<DataFileMeta>>& before,
const std::vector<std::shared_ptr<DataFileMeta>>& after,
const std::vector<std::shared_ptr<DataFileMeta>>& changelog)
: before_(before), after_(after), changelog_(changelog) {}

const std::vector<std::shared_ptr<DataFileMeta>>& Before() const {
return before_;
}

const std::vector<std::shared_ptr<DataFileMeta>>& After() const {
return after_;
}

const std::vector<std::shared_ptr<DataFileMeta>>& Changelog() const {
return changelog_;
}

std::shared_ptr<CompactDeletionFile> DeletionFile() const {
return deletion_file_;
}

void SetDeletionFile(const std::shared_ptr<CompactDeletionFile>& deletion_file) {
deletion_file_ = deletion_file;
}

Status Merge(const CompactResult& other) {
before_.insert(before_.end(), other.Before().begin(), other.Before().end());
after_.insert(after_.end(), other.After().begin(), other.After().end());
changelog_.insert(changelog_.end(), other.Changelog().begin(), other.Changelog().end());

if (deletion_file_ != nullptr || other.deletion_file_ != nullptr) {
return Status::NotImplemented(
"There is a bug, deletion file can't be set before merge.");
Comment thread
lxy-9602 marked this conversation as resolved.
}
return Status::OK();
}

private:
std::vector<std::shared_ptr<DataFileMeta>> before_;
std::vector<std::shared_ptr<DataFileMeta>> after_;
std::vector<std::shared_ptr<DataFileMeta>> changelog_;
std::shared_ptr<CompactDeletionFile> deletion_file_;
};

} // namespace paimon
7 changes: 3 additions & 4 deletions src/paimon/core/io/async_key_value_producer_and_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ class MemoryPool;

template <typename T, typename R>
AsyncKeyValueProducerAndConsumer<T, R>::AsyncKeyValueProducerAndConsumer(
std::unique_ptr<SortMergeReader>&& sort_merge_reader,
const std::function<Result<std::unique_ptr<RowToArrowArrayConverter<T, R>>>()>& create_consumer,
std::unique_ptr<SortMergeReader>&& sort_merge_reader, ConsumerCreator create_consumer,
int32_t batch_size, int32_t consumer_thread_num, const std::shared_ptr<MemoryPool>& pool)
: batch_size_(batch_size),
: batch_size_(std::min(batch_size, MAX_PROJECTION_BATCH_SIZE)),
consumer_thread_num_(consumer_thread_num),
pool_(pool),
sort_merge_reader_(std::move(sort_merge_reader)),
create_consumer_(create_consumer) {
create_consumer_(std::move(create_consumer)) {
kv_queue_.set_capacity(batch_size);
result_queue_.set_capacity(RESULT_BATCH_COUNT);
}
Expand Down
18 changes: 12 additions & 6 deletions src/paimon/core/io/async_key_value_producer_and_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ class Metrics;
template <typename T, typename R>
class AsyncKeyValueProducerAndConsumer {
public:
AsyncKeyValueProducerAndConsumer(
std::unique_ptr<SortMergeReader>&& sort_merge_reader,
const std::function<Result<std::unique_ptr<RowToArrowArrayConverter<T, R>>>()>&
create_consumer,
int32_t batch_size, int32_t consumer_thread_num, const std::shared_ptr<MemoryPool>& pool);
using ConsumerCreator =
std::function<Result<std::unique_ptr<RowToArrowArrayConverter<T, R>>>()>;

AsyncKeyValueProducerAndConsumer(std::unique_ptr<SortMergeReader>&& sort_merge_reader,
ConsumerCreator create_consumer, int32_t batch_size,
int32_t consumer_thread_num,
const std::shared_ptr<MemoryPool>& pool);

~AsyncKeyValueProducerAndConsumer() {
CleanUp();
Expand All @@ -66,6 +68,10 @@ class AsyncKeyValueProducerAndConsumer {

private:
static constexpr int32_t RESULT_BATCH_COUNT = 3;

// in case write batch size is too large and overflow arrow array
static constexpr int32_t MAX_PROJECTION_BATCH_SIZE = 100000;

void CleanUpQueue();
Status ProduceLoop();
void CleanUp();
Expand All @@ -77,7 +83,7 @@ class AsyncKeyValueProducerAndConsumer {
int32_t consumer_thread_num_;
std::shared_ptr<MemoryPool> pool_;
std::unique_ptr<SortMergeReader> sort_merge_reader_;
std::function<Result<std::unique_ptr<RowToArrowArrayConverter<T, R>>>()> create_consumer_;
ConsumerCreator create_consumer_;

// produce: merge sort KeyValue and push result KeyValue to kv_queue_, consume: project KeyValue
// to arrow array and push result array to result_queue_
Expand Down
Loading
Loading