Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ set(PAIMON_CORE_SRCS
core/mergetree/compact/aggregate/field_sum_agg.cpp
core/mergetree/compact/interval_partition.cpp
core/mergetree/compact/loser_tree.cpp
core/mergetree/compact/merge_tree_compact_rewriter.cpp
core/mergetree/compact/partial_update_merge_function.cpp
core/mergetree/compact/sort_merge_reader_with_loser_tree.cpp
core/mergetree/compact/sort_merge_reader_with_min_heap.cpp
Expand Down Expand Up @@ -540,6 +541,7 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/compact/universal_compaction_test.cpp
core/mergetree/compact/force_up_level0_compaction_test.cpp
core/mergetree/compact/compact_strategy_test.cpp
core/mergetree/compact/merge_tree_compact_rewriter_test.cpp
core/mergetree/drop_delete_reader_test.cpp
core/mergetree/merge_tree_writer_test.cpp
core/mergetree/sorted_run_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/data/columnar/columnar_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ std::shared_ptr<InternalMap> ColumnarArray::GetMap(int32_t pos) const {
std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const {
auto struct_array = arrow::internal::checked_cast<const arrow::StructArray*>(array_);
assert(struct_array);
auto row_ctx = std::make_shared<ColumnarBatchContext>(nullptr, struct_array->fields(), pool_);
auto row_ctx = std::make_shared<ColumnarBatchContext>(struct_array->fields(), pool_);
return std::make_shared<ColumnarRowRef>(std::move(row_ctx), offset_ + pos);
}

Expand Down
16 changes: 3 additions & 13 deletions src/paimon/common/data/columnar/columnar_batch_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,11 @@ namespace paimon {
class MemoryPool;

struct ColumnarBatchContext {
ColumnarBatchContext(const std::shared_ptr<arrow::StructArray>& struct_array_in,
const arrow::ArrayVector& array_vec_in,
ColumnarBatchContext(const arrow::ArrayVector& array_vec_in,
const std::shared_ptr<MemoryPool>& pool_in)
: struct_array(struct_array_in), pool(pool_in) {
array_ptrs.reserve(array_vec_in.size());
for (const auto& array : array_vec_in) {
array_ptrs.push_back(array.get());
}
}
: pool(pool_in), array_vec(array_vec_in) {}

/// @note `struct_array` is the data holder for columnar row, ensure that the data life
/// cycle is consistent with the columnar row, `array_ptrs` maybe a subset of
/// `struct_array`, so `struct_array` cannot be used for `GetXXX()`
std::shared_ptr<arrow::StructArray> struct_array;
std::shared_ptr<MemoryPool> pool;
std::vector<const arrow::Array*> array_ptrs;
arrow::ArrayVector array_vec;
};
} // namespace paimon
15 changes: 8 additions & 7 deletions src/paimon/common/data/columnar/columnar_row_ref.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
namespace paimon {
Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const {
using ArrayType = typename arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_vec[pos].get());
assert(array);
arrow::Decimal128 decimal(array->GetValue(row_id_));
return Decimal(precision, scale,
Expand All @@ -40,7 +40,7 @@ Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale

Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {
using ArrayType = typename arrow::TypeTraits<arrow::TimestampType>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_vec[pos].get());
assert(array);
int64_t data = array->Value(row_id_);
auto timestamp_type =
Expand All @@ -55,23 +55,24 @@ Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {

std::shared_ptr<InternalRow> ColumnarRowRef::GetRow(int32_t pos, int32_t num_fields) const {
auto struct_array =
arrow::internal::checked_cast<const arrow::StructArray*>(ctx_->array_ptrs[pos]);
arrow::internal::checked_cast<const arrow::StructArray*>(ctx_->array_vec[pos].get());
assert(struct_array);
auto nested_ctx =
std::make_shared<ColumnarBatchContext>(nullptr, struct_array->fields(), ctx_->pool);
auto nested_ctx = std::make_shared<ColumnarBatchContext>(struct_array->fields(), ctx_->pool);
return std::make_shared<ColumnarRowRef>(std::move(nested_ctx), row_id_);
}

std::shared_ptr<InternalArray> ColumnarRowRef::GetArray(int32_t pos) const {
auto list_array = arrow::internal::checked_cast<const arrow::ListArray*>(ctx_->array_ptrs[pos]);
auto list_array =
arrow::internal::checked_cast<const arrow::ListArray*>(ctx_->array_vec[pos].get());
assert(list_array);
int32_t offset = list_array->value_offset(row_id_);
int32_t length = list_array->value_length(row_id_);
return std::make_shared<ColumnarArray>(list_array->values(), ctx_->pool, offset, length);
}

std::shared_ptr<InternalMap> ColumnarRowRef::GetMap(int32_t pos) const {
auto map_array = arrow::internal::checked_cast<const arrow::MapArray*>(ctx_->array_ptrs[pos]);
auto map_array =
arrow::internal::checked_cast<const arrow::MapArray*>(ctx_->array_vec[pos].get());
assert(map_array);
int32_t offset = map_array->value_offset(row_id_);
int32_t length = map_array->value_length(row_id_);
Expand Down
28 changes: 14 additions & 14 deletions src/paimon/common/data/columnar/columnar_row_ref.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,69 +51,69 @@ class ColumnarRowRef : public InternalRow {
}

int32_t GetFieldCount() const override {
return static_cast<int32_t>(ctx_->array_ptrs.size());
return static_cast<int32_t>(ctx_->array_vec.size());
}

bool IsNullAt(int32_t pos) const override {
return ctx_->array_ptrs[pos]->IsNull(row_id_);
return ctx_->array_vec[pos]->IsNull(row_id_);
}

bool GetBoolean(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_vec[pos].get(),
row_id_);
}

char GetByte(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_vec[pos].get(),
row_id_);
}

int16_t GetShort(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_vec[pos].get(),
row_id_);
}

int32_t GetInt(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_vec[pos].get(),
row_id_);
}

int32_t GetDate(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(ctx_->array_ptrs[pos],
row_id_);
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(
ctx_->array_vec[pos].get(), row_id_);
}

int64_t GetLong(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_vec[pos].get(),
row_id_);
}

float GetFloat(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_vec[pos].get(),
row_id_);
}

double GetDouble(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_ptrs[pos],
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_vec[pos].get(),
row_id_);
}

BinaryString GetString(int32_t pos) const override {
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_ptrs[pos], row_id_,
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_vec[pos].get(), row_id_,
ctx_->pool.get());
return BinaryString::FromBytes(bytes);
}

std::string_view GetStringView(int32_t pos) const override {
return ColumnarUtils::GetView(ctx_->array_ptrs[pos], row_id_);
return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_);
}

Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override;

Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;

std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
return ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_ptrs[pos], row_id_,
return ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_vec[pos].get(), row_id_,
ctx_->pool.get());
}

Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/data/columnar/columnar_row_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ TEST(ColumnarRowRefTest, TestSimple) {
.ValueOrDie();
auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie();

auto ctx = std::make_shared<ColumnarBatchContext>(data, data->fields(), pool);
auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
ColumnarRowRef row(ctx, 1);
ASSERT_EQ(row.GetFieldCount(), 2);
ASSERT_EQ(row.GetInt(0), 2);
Expand Down
12 changes: 12 additions & 0 deletions src/paimon/common/table/special_fields.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
namespace paimon {

struct SpecialFields {
SpecialFields() = delete;
~SpecialFields() = delete;

static constexpr char KEY_FIELD_PREFIX[] = "_KEY_";
static constexpr int32_t KEY_VALUE_SPECIAL_FIELD_COUNT = 2;

Expand Down Expand Up @@ -62,6 +65,15 @@ struct SpecialFields {
return false;
}
// TODO(xinyu.lxy): add a func to complete row-tracking fields

static std::shared_ptr<arrow::Schema> CompleteSequenceAndValueKindField(
const std::shared_ptr<arrow::Schema>& schema) {
arrow::FieldVector target_fields;
target_fields.push_back(DataField::ConvertDataFieldToArrowField(SequenceNumber()));
target_fields.push_back(DataField::ConvertDataFieldToArrowField(ValueKind()));
target_fields.insert(target_fields.end(), schema->fields().begin(), schema->fields().end());
return arrow::schema(target_fields);
}
};

} // namespace paimon
10 changes: 5 additions & 5 deletions src/paimon/common/utils/arrow/arrow_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ Result<std::shared_ptr<arrow::Schema>> ArrowUtils::DataTypeToSchema(
}

Result<std::vector<int32_t>> ArrowUtils::CreateProjection(
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) {
const std::shared_ptr<arrow::Schema>& src_schema, const arrow::FieldVector& target_fields) {
std::vector<int32_t> target_to_src_mapping;
target_to_src_mapping.reserve(read_fields.size());
for (const auto& field : read_fields) {
auto src_field_idx = file_schema->GetFieldIndex(field->name());
target_to_src_mapping.reserve(target_fields.size());
for (const auto& field : target_fields) {
auto src_field_idx = src_schema->GetFieldIndex(field->name());
if (src_field_idx < 0) {
return Status::Invalid(
fmt::format("Field '{}' not found or duplicate in file schema", field->name()));
fmt::format("Field '{}' not found or duplicate in src schema", field->name()));
}
target_to_src_mapping.push_back(src_field_idx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/utils/arrow/arrow_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class PAIMON_EXPORT ArrowUtils {
const std::shared_ptr<arrow::DataType>& data_type);

static Result<std::vector<int32_t>> CreateProjection(
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields);
const std::shared_ptr<arrow::Schema>& src_schema, const arrow::FieldVector& target_fields);

static Status CheckNullabilityMatch(const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::Array>& data);
Expand Down
8 changes: 4 additions & 4 deletions src/paimon/common/utils/arrow/arrow_utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ TEST(ArrowUtilsTest, TestCreateProjection) {
ASSERT_EQ(projection, expected_projection);
}
{
// read field not found in file schema
// read field not found in src schema
arrow::FieldVector read_fields = {
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
arrow::field("s1", arrow::utf8()), arrow::field("v2", arrow::float64()),
arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema, read_schema->fields()),
"Field 'v2' not found or duplicate in file schema");
"Field 'v2' not found or duplicate in src schema");
}
{
// duplicate field in file schema
// duplicate field in src schema
arrow::FieldVector file_fields_dup = {
arrow::field("k0", arrow::int32()), arrow::field("k1", arrow::int32()),
arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()),
Expand All @@ -93,7 +93,7 @@ TEST(ArrowUtilsTest, TestCreateProjection) {
arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema_dup, read_schema->fields()),
"Field 'v1' not found or duplicate in file schema");
"Field 'v1' not found or duplicate in src schema");
}
{
arrow::FieldVector read_fields = {
Expand Down
79 changes: 79 additions & 0 deletions src/paimon/core/compact/compact_result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <vector>

#include "paimon/core/io/data_file_meta.h"

namespace paimon {
class CompactDeletionFile;

/// Result of compaction.
class CompactResult {
public:
CompactResult() = default;
CompactResult(const std::vector<std::shared_ptr<DataFileMeta>>& before,
const std::vector<std::shared_ptr<DataFileMeta>>& after)
: CompactResult(before, after, {}) {}

CompactResult(const std::vector<std::shared_ptr<DataFileMeta>>& before,
const std::vector<std::shared_ptr<DataFileMeta>>& after,
const std::vector<std::shared_ptr<DataFileMeta>>& changelog)
: before_(before), after_(after), changelog_(changelog) {}

const std::vector<std::shared_ptr<DataFileMeta>>& Before() const {
return before_;
}

const std::vector<std::shared_ptr<DataFileMeta>>& After() const {
return after_;
}

const std::vector<std::shared_ptr<DataFileMeta>>& Changelog() const {
return changelog_;
}

std::shared_ptr<CompactDeletionFile> DeletionFile() const {
return deletion_file_;
}

void SetDeletionFile(const std::shared_ptr<CompactDeletionFile>& deletion_file) {
deletion_file_ = deletion_file;
}

Status Merge(const CompactResult& other) {
before_.insert(before_.end(), other.Before().begin(), other.Before().end());
after_.insert(after_.end(), other.After().begin(), other.After().end());
changelog_.insert(changelog_.end(), other.Changelog().begin(), other.Changelog().end());

if (deletion_file_ != nullptr || other.deletion_file_ != nullptr) {
return Status::NotImplemented(
"There is a bug, deletion file can't be set before merge.");
}
return Status::OK();
}

private:
std::vector<std::shared_ptr<DataFileMeta>> before_;
std::vector<std::shared_ptr<DataFileMeta>> after_;
std::vector<std::shared_ptr<DataFileMeta>> changelog_;
std::shared_ptr<CompactDeletionFile> deletion_file_;
};

} // namespace paimon
7 changes: 3 additions & 4 deletions src/paimon/core/io/async_key_value_producer_and_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ class MemoryPool;

template <typename T, typename R>
AsyncKeyValueProducerAndConsumer<T, R>::AsyncKeyValueProducerAndConsumer(
std::unique_ptr<SortMergeReader>&& sort_merge_reader,
const std::function<Result<std::unique_ptr<RowToArrowArrayConverter<T, R>>>()>& create_consumer,
std::unique_ptr<SortMergeReader>&& sort_merge_reader, ConsumerCreator create_consumer,
int32_t batch_size, int32_t consumer_thread_num, const std::shared_ptr<MemoryPool>& pool)
: batch_size_(batch_size),
: batch_size_(std::min(batch_size, MAX_PROJECTION_BATCH_SIZE)),
consumer_thread_num_(consumer_thread_num),
pool_(pool),
sort_merge_reader_(std::move(sort_merge_reader)),
create_consumer_(create_consumer) {
create_consumer_(std::move(create_consumer)) {
kv_queue_.set_capacity(batch_size);
result_queue_.set_capacity(RESULT_BATCH_COUNT);
}
Expand Down
Loading
Loading