diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 17731af2..5916ea3f 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -221,6 +221,7 @@ set(PAIMON_CORE_SRCS core/mergetree/compact/aggregate/field_sum_agg.cpp core/mergetree/compact/interval_partition.cpp core/mergetree/compact/loser_tree.cpp + core/mergetree/compact/merge_tree_compact_rewriter.cpp core/mergetree/compact/partial_update_merge_function.cpp core/mergetree/compact/sort_merge_reader_with_loser_tree.cpp core/mergetree/compact/sort_merge_reader_with_min_heap.cpp @@ -540,6 +541,7 @@ if(PAIMON_BUILD_TESTS) core/mergetree/compact/universal_compaction_test.cpp core/mergetree/compact/force_up_level0_compaction_test.cpp core/mergetree/compact/compact_strategy_test.cpp + core/mergetree/compact/merge_tree_compact_rewriter_test.cpp core/mergetree/drop_delete_reader_test.cpp core/mergetree/merge_tree_writer_test.cpp core/mergetree/sorted_run_test.cpp diff --git a/src/paimon/common/data/columnar/columnar_array.cpp b/src/paimon/common/data/columnar/columnar_array.cpp index a845a919..4b79817a 100644 --- a/src/paimon/common/data/columnar/columnar_array.cpp +++ b/src/paimon/common/data/columnar/columnar_array.cpp @@ -85,7 +85,7 @@ std::shared_ptr ColumnarArray::GetMap(int32_t pos) const { std::shared_ptr ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const { auto struct_array = arrow::internal::checked_cast(array_); assert(struct_array); - auto row_ctx = std::make_shared(nullptr, struct_array->fields(), pool_); + auto row_ctx = std::make_shared(struct_array->fields(), pool_); return std::make_shared(std::move(row_ctx), offset_ + pos); } diff --git a/src/paimon/common/data/columnar/columnar_batch_context.h b/src/paimon/common/data/columnar/columnar_batch_context.h index 6da8cd30..1bea6d1e 100644 --- a/src/paimon/common/data/columnar/columnar_batch_context.h +++ b/src/paimon/common/data/columnar/columnar_batch_context.h @@ -29,21 +29,11 @@ namespace paimon { class MemoryPool; struct ColumnarBatchContext { - ColumnarBatchContext(const std::shared_ptr& struct_array_in, - const arrow::ArrayVector& array_vec_in, + ColumnarBatchContext(const arrow::ArrayVector& array_vec_in, const std::shared_ptr& pool_in) - : struct_array(struct_array_in), pool(pool_in) { - array_ptrs.reserve(array_vec_in.size()); - for (const auto& array : array_vec_in) { - array_ptrs.push_back(array.get()); - } - } + : pool(pool_in), array_vec(array_vec_in) {} - /// @note `struct_array` is the data holder for columnar row, ensure that the data life - /// cycle is consistent with the columnar row, `array_ptrs` maybe a subset of - /// `struct_array`, so `struct_array` cannot be used for `GetXXX()` - std::shared_ptr struct_array; std::shared_ptr pool; - std::vector array_ptrs; + arrow::ArrayVector array_vec; }; } // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_ref.cpp b/src/paimon/common/data/columnar/columnar_row_ref.cpp index 2530cac0..a3708a92 100644 --- a/src/paimon/common/data/columnar/columnar_row_ref.cpp +++ b/src/paimon/common/data/columnar/columnar_row_ref.cpp @@ -31,7 +31,7 @@ namespace paimon { Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const { using ArrayType = typename arrow::TypeTraits::ArrayType; - auto array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + auto array = arrow::internal::checked_cast(ctx_->array_vec[pos].get()); assert(array); arrow::Decimal128 decimal(array->GetValue(row_id_)); return Decimal(precision, scale, @@ -40,7 +40,7 @@ Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const { using ArrayType = typename arrow::TypeTraits::ArrayType; - auto array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + auto array = arrow::internal::checked_cast(ctx_->array_vec[pos].get()); assert(array); int64_t data = array->Value(row_id_); auto timestamp_type = @@ -55,15 +55,15 @@ Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const { std::shared_ptr ColumnarRowRef::GetRow(int32_t pos, int32_t num_fields) const { auto struct_array = - arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + arrow::internal::checked_cast(ctx_->array_vec[pos].get()); assert(struct_array); - auto nested_ctx = - std::make_shared(nullptr, struct_array->fields(), ctx_->pool); + auto nested_ctx = std::make_shared(struct_array->fields(), ctx_->pool); return std::make_shared(std::move(nested_ctx), row_id_); } std::shared_ptr ColumnarRowRef::GetArray(int32_t pos) const { - auto list_array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + auto list_array = + arrow::internal::checked_cast(ctx_->array_vec[pos].get()); assert(list_array); int32_t offset = list_array->value_offset(row_id_); int32_t length = list_array->value_length(row_id_); @@ -71,7 +71,8 @@ std::shared_ptr ColumnarRowRef::GetArray(int32_t pos) const { } std::shared_ptr ColumnarRowRef::GetMap(int32_t pos) const { - auto map_array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + auto map_array = + arrow::internal::checked_cast(ctx_->array_vec[pos].get()); assert(map_array); int32_t offset = map_array->value_offset(row_id_); int32_t length = map_array->value_length(row_id_); diff --git a/src/paimon/common/data/columnar/columnar_row_ref.h b/src/paimon/common/data/columnar/columnar_row_ref.h index 315ef6a2..6939a859 100644 --- a/src/paimon/common/data/columnar/columnar_row_ref.h +++ b/src/paimon/common/data/columnar/columnar_row_ref.h @@ -51,61 +51,61 @@ class ColumnarRowRef : public InternalRow { } int32_t GetFieldCount() const override { - return static_cast(ctx_->array_ptrs.size()); + return static_cast(ctx_->array_vec.size()); } bool IsNullAt(int32_t pos) const override { - return ctx_->array_ptrs[pos]->IsNull(row_id_); + return ctx_->array_vec[pos]->IsNull(row_id_); } bool GetBoolean(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), row_id_); } char GetByte(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), row_id_); } int16_t GetShort(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), row_id_); } int32_t GetInt(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), row_id_); } int32_t GetDate(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], - row_id_); + return ColumnarUtils::GetGenericValue( + ctx_->array_vec[pos].get(), row_id_); } int64_t GetLong(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), row_id_); } float GetFloat(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), row_id_); } double GetDouble(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), row_id_); } BinaryString GetString(int32_t pos) const override { - auto bytes = ColumnarUtils::GetBytes(ctx_->array_ptrs[pos], row_id_, + auto bytes = ColumnarUtils::GetBytes(ctx_->array_vec[pos].get(), row_id_, ctx_->pool.get()); return BinaryString::FromBytes(bytes); } std::string_view GetStringView(int32_t pos) const override { - return ColumnarUtils::GetView(ctx_->array_ptrs[pos], row_id_); + return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_); } Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override; @@ -113,7 +113,7 @@ class ColumnarRowRef : public InternalRow { Timestamp GetTimestamp(int32_t pos, int32_t precision) const override; std::shared_ptr GetBinary(int32_t pos) const override { - return ColumnarUtils::GetBytes(ctx_->array_ptrs[pos], row_id_, + return ColumnarUtils::GetBytes(ctx_->array_vec[pos].get(), row_id_, ctx_->pool.get()); } diff --git a/src/paimon/common/data/columnar/columnar_row_test.cpp b/src/paimon/common/data/columnar/columnar_row_test.cpp index da56e0a7..3a965632 100644 --- a/src/paimon/common/data/columnar/columnar_row_test.cpp +++ b/src/paimon/common/data/columnar/columnar_row_test.cpp @@ -82,7 +82,7 @@ TEST(ColumnarRowRefTest, TestSimple) { .ValueOrDie(); auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie(); - auto ctx = std::make_shared(data, data->fields(), pool); + auto ctx = std::make_shared(data->fields(), pool); ColumnarRowRef row(ctx, 1); ASSERT_EQ(row.GetFieldCount(), 2); ASSERT_EQ(row.GetInt(0), 2); diff --git a/src/paimon/common/table/special_fields.h b/src/paimon/common/table/special_fields.h index e1e57906..422a444f 100644 --- a/src/paimon/common/table/special_fields.h +++ b/src/paimon/common/table/special_fields.h @@ -27,6 +27,9 @@ namespace paimon { struct SpecialFields { + SpecialFields() = delete; + ~SpecialFields() = delete; + static constexpr char KEY_FIELD_PREFIX[] = "_KEY_"; static constexpr int32_t KEY_VALUE_SPECIAL_FIELD_COUNT = 2; @@ -62,6 +65,15 @@ struct SpecialFields { return false; } // TODO(xinyu.lxy): add a func to complete row-tracking fields + + static std::shared_ptr CompleteSequenceAndValueKindField( + const std::shared_ptr& schema) { + arrow::FieldVector target_fields; + target_fields.push_back(DataField::ConvertDataFieldToArrowField(SequenceNumber())); + target_fields.push_back(DataField::ConvertDataFieldToArrowField(ValueKind())); + target_fields.insert(target_fields.end(), schema->fields().begin(), schema->fields().end()); + return arrow::schema(target_fields); + } }; } // namespace paimon diff --git a/src/paimon/common/utils/arrow/arrow_utils.cpp b/src/paimon/common/utils/arrow/arrow_utils.cpp index ebd270f6..0db69fb9 100644 --- a/src/paimon/common/utils/arrow/arrow_utils.cpp +++ b/src/paimon/common/utils/arrow/arrow_utils.cpp @@ -27,14 +27,14 @@ Result> ArrowUtils::DataTypeToSchema( } Result> ArrowUtils::CreateProjection( - const std::shared_ptr& file_schema, const arrow::FieldVector& read_fields) { + const std::shared_ptr& src_schema, const arrow::FieldVector& target_fields) { std::vector target_to_src_mapping; - target_to_src_mapping.reserve(read_fields.size()); - for (const auto& field : read_fields) { - auto src_field_idx = file_schema->GetFieldIndex(field->name()); + target_to_src_mapping.reserve(target_fields.size()); + for (const auto& field : target_fields) { + auto src_field_idx = src_schema->GetFieldIndex(field->name()); if (src_field_idx < 0) { return Status::Invalid( - fmt::format("Field '{}' not found or duplicate in file schema", field->name())); + fmt::format("Field '{}' not found or duplicate in src schema", field->name())); } target_to_src_mapping.push_back(src_field_idx); } diff --git a/src/paimon/common/utils/arrow/arrow_utils.h b/src/paimon/common/utils/arrow/arrow_utils.h index abb26cf9..41b43e65 100644 --- a/src/paimon/common/utils/arrow/arrow_utils.h +++ b/src/paimon/common/utils/arrow/arrow_utils.h @@ -33,7 +33,7 @@ class PAIMON_EXPORT ArrowUtils { const std::shared_ptr& data_type); static Result> CreateProjection( - const std::shared_ptr& file_schema, const arrow::FieldVector& read_fields); + const std::shared_ptr& src_schema, const arrow::FieldVector& target_fields); static Status CheckNullabilityMatch(const std::shared_ptr& schema, const std::shared_ptr& data); diff --git a/src/paimon/common/utils/arrow/arrow_utils_test.cpp b/src/paimon/common/utils/arrow/arrow_utils_test.cpp index 6dd730b4..42ef9c4b 100644 --- a/src/paimon/common/utils/arrow/arrow_utils_test.cpp +++ b/src/paimon/common/utils/arrow/arrow_utils_test.cpp @@ -70,17 +70,17 @@ TEST(ArrowUtilsTest, TestCreateProjection) { ASSERT_EQ(projection, expected_projection); } { - // read field not found in file schema + // read field not found in src schema arrow::FieldVector read_fields = { arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()), arrow::field("v2", arrow::float64()), arrow::field("v1", arrow::boolean())}; auto read_schema = arrow::schema(read_fields); ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema, read_schema->fields()), - "Field 'v2' not found or duplicate in file schema"); + "Field 'v2' not found or duplicate in src schema"); } { - // duplicate field in file schema + // duplicate field in src schema arrow::FieldVector file_fields_dup = { arrow::field("k0", arrow::int32()), arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()), @@ -93,7 +93,7 @@ TEST(ArrowUtilsTest, TestCreateProjection) { arrow::field("v1", arrow::boolean())}; auto read_schema = arrow::schema(read_fields); ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema_dup, read_schema->fields()), - "Field 'v1' not found or duplicate in file schema"); + "Field 'v1' not found or duplicate in src schema"); } { arrow::FieldVector read_fields = { diff --git a/src/paimon/core/compact/compact_result.h b/src/paimon/core/compact/compact_result.h new file mode 100644 index 00000000..54628527 --- /dev/null +++ b/src/paimon/core/compact/compact_result.h @@ -0,0 +1,79 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/io/data_file_meta.h" + +namespace paimon { +class CompactDeletionFile; + +/// Result of compaction. +class CompactResult { + public: + CompactResult() = default; + CompactResult(const std::vector>& before, + const std::vector>& after) + : CompactResult(before, after, {}) {} + + CompactResult(const std::vector>& before, + const std::vector>& after, + const std::vector>& changelog) + : before_(before), after_(after), changelog_(changelog) {} + + const std::vector>& Before() const { + return before_; + } + + const std::vector>& After() const { + return after_; + } + + const std::vector>& Changelog() const { + return changelog_; + } + + std::shared_ptr DeletionFile() const { + return deletion_file_; + } + + void SetDeletionFile(const std::shared_ptr& deletion_file) { + deletion_file_ = deletion_file; + } + + Status Merge(const CompactResult& other) { + before_.insert(before_.end(), other.Before().begin(), other.Before().end()); + after_.insert(after_.end(), other.After().begin(), other.After().end()); + changelog_.insert(changelog_.end(), other.Changelog().begin(), other.Changelog().end()); + + if (deletion_file_ != nullptr || other.deletion_file_ != nullptr) { + return Status::NotImplemented( + "There is a bug, deletion file can't be set before merge."); + } + return Status::OK(); + } + + private: + std::vector> before_; + std::vector> after_; + std::vector> changelog_; + std::shared_ptr deletion_file_; +}; + +} // namespace paimon diff --git a/src/paimon/core/io/async_key_value_producer_and_consumer.cpp b/src/paimon/core/io/async_key_value_producer_and_consumer.cpp index f2bce55c..fb516fde 100644 --- a/src/paimon/core/io/async_key_value_producer_and_consumer.cpp +++ b/src/paimon/core/io/async_key_value_producer_and_consumer.cpp @@ -31,14 +31,13 @@ class MemoryPool; template AsyncKeyValueProducerAndConsumer::AsyncKeyValueProducerAndConsumer( - std::unique_ptr&& sort_merge_reader, - const std::function>>()>& create_consumer, + std::unique_ptr&& sort_merge_reader, ConsumerCreator create_consumer, int32_t batch_size, int32_t consumer_thread_num, const std::shared_ptr& pool) - : batch_size_(batch_size), + : batch_size_(std::min(batch_size, MAX_PROJECTION_BATCH_SIZE)), consumer_thread_num_(consumer_thread_num), pool_(pool), sort_merge_reader_(std::move(sort_merge_reader)), - create_consumer_(create_consumer) { + create_consumer_(std::move(create_consumer)) { kv_queue_.set_capacity(batch_size); result_queue_.set_capacity(RESULT_BATCH_COUNT); } diff --git a/src/paimon/core/io/async_key_value_producer_and_consumer.h b/src/paimon/core/io/async_key_value_producer_and_consumer.h index 279c2731..6a2fee67 100644 --- a/src/paimon/core/io/async_key_value_producer_and_consumer.h +++ b/src/paimon/core/io/async_key_value_producer_and_consumer.h @@ -43,11 +43,13 @@ class Metrics; template class AsyncKeyValueProducerAndConsumer { public: - AsyncKeyValueProducerAndConsumer( - std::unique_ptr&& sort_merge_reader, - const std::function>>()>& - create_consumer, - int32_t batch_size, int32_t consumer_thread_num, const std::shared_ptr& pool); + using ConsumerCreator = + std::function>>()>; + + AsyncKeyValueProducerAndConsumer(std::unique_ptr&& sort_merge_reader, + ConsumerCreator create_consumer, int32_t batch_size, + int32_t consumer_thread_num, + const std::shared_ptr& pool); ~AsyncKeyValueProducerAndConsumer() { CleanUp(); @@ -66,6 +68,10 @@ class AsyncKeyValueProducerAndConsumer { private: static constexpr int32_t RESULT_BATCH_COUNT = 3; + + // in case write batch size is too large and overflow arrow array + static constexpr int32_t MAX_PROJECTION_BATCH_SIZE = 100000; + void CleanUpQueue(); Status ProduceLoop(); void CleanUp(); @@ -77,7 +83,7 @@ class AsyncKeyValueProducerAndConsumer { int32_t consumer_thread_num_; std::shared_ptr pool_; std::unique_ptr sort_merge_reader_; - std::function>>()> create_consumer_; + ConsumerCreator create_consumer_; // produce: merge sort KeyValue and push result KeyValue to kv_queue_, consume: project KeyValue // to arrow array and push result array to result_queue_ diff --git a/src/paimon/core/io/data_file_meta.cpp b/src/paimon/core/io/data_file_meta.cpp index 43b1daf8..319b89fd 100644 --- a/src/paimon/core/io/data_file_meta.cpp +++ b/src/paimon/core/io/data_file_meta.cpp @@ -73,6 +73,18 @@ Result> DataFileMeta::ForAppend( embedded_index, file_source, value_stats_cols, external_path, first_row_id, write_cols); } +Result> DataFileMeta::Upgrade(int32_t new_level) const { + if (new_level <= level) { + return Status::Invalid( + fmt::format("new level {} should be greater than current level {}", new_level, level)); + } + return std::make_shared( + file_name, file_size, row_count, min_key, max_key, key_stats, value_stats, + min_sequence_number, max_sequence_number, schema_id, new_level, extra_files, creation_time, + delete_row_count, embedded_index, file_source, value_stats_cols, external_path, + first_row_id, write_cols); +} + DataFileMeta::DataFileMeta( const std::string& _file_name, int64_t _file_size, int64_t _row_count, const BinaryRow& _min_key, const BinaryRow& _max_key, const SimpleStats& _key_stats, diff --git a/src/paimon/core/io/data_file_meta.h b/src/paimon/core/io/data_file_meta.h index d44f5de9..ee89df3e 100644 --- a/src/paimon/core/io/data_file_meta.h +++ b/src/paimon/core/io/data_file_meta.h @@ -75,6 +75,8 @@ struct DataFileMeta { const std::optional& external_path, const std::optional& first_row_id, const std::optional>& write_cols); + Result> Upgrade(int32_t new_level) const; + std::optional AddRowCount() const { return delete_row_count == std::nullopt ? std::optional() : row_count - delete_row_count.value(); diff --git a/src/paimon/core/io/data_file_meta_test.cpp b/src/paimon/core/io/data_file_meta_test.cpp index 11f75f6e..80756442 100644 --- a/src/paimon/core/io/data_file_meta_test.cpp +++ b/src/paimon/core/io/data_file_meta_test.cpp @@ -174,4 +174,26 @@ TEST(DataFileMetaTest, TestToFileSelection) { } } +TEST(DataFileMetaTest, TestUpgrade) { + auto file_meta = std::make_shared( + "data-0.orc", /*file_size=*/645, + /*row_count=*/10, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), SimpleStats::EmptyStats(), + SimpleStats::EmptyStats(), + /*min_sequence_number=*/100, /*max_sequence_number=*/109, /*schema_id=*/0, + /*level=*/5, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1737111915429ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/100, /*write_cols=*/std::nullopt); + // test normal upgrade + ASSERT_OK_AND_ASSIGN(auto new_file_meta, file_meta->Upgrade(10)); + ASSERT_EQ(new_file_meta->level, 10); + // check other members + file_meta->level = 10; + ASSERT_EQ(*new_file_meta, *file_meta); + + // test invalid upgrade + ASSERT_NOK_WITH_MSG(file_meta->Upgrade(1), + "new level 1 should be greater than current level 10"); +} } // namespace paimon::test diff --git a/src/paimon/core/io/key_value_data_file_record_reader.cpp b/src/paimon/core/io/key_value_data_file_record_reader.cpp index ecff5095..35a7aaef 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.cpp +++ b/src/paimon/core/io/key_value_data_file_record_reader.cpp @@ -77,7 +77,6 @@ Result KeyValueDataFileRecordReader::Iterator::Next() { RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_))); int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_); cursor_++; - // TODO(xinyu.lxy): reuse KeyValue and ColumnarRow to avoid construct and destruction return KeyValue(row_kind, sequence_number, reader_->level_, std::move(key), std::move(value)); } @@ -96,7 +95,7 @@ Result> KeyValueDataFileRecordRe } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, arrow::ImportArray(c_array.get(), c_schema.get())); - auto* data_batch = arrow::internal::checked_cast(arrow_array.get()); + auto data_batch = arrow::internal::checked_pointer_cast(arrow_array); assert(data_batch); // do not use arrow::checked_pointer_cast as in release compile, checked_pointer_cast is // static_cast without check @@ -110,34 +109,32 @@ Result> KeyValueDataFileRecordRe if (!row_kind_array_) { return Status::Invalid("cannot cast VALUE_KIND column to int8 arrow array"); } - - key_fields_.reserve(key_arity_); + arrow::ArrayVector key_fields; + key_fields.reserve(key_arity_); for (int32_t i = 0; i < key_arity_; i++) { // skip special fields - key_fields_.emplace_back( + key_fields.emplace_back( data_batch->field(i + SpecialFields::KEY_VALUE_SPECIAL_FIELD_COUNT)); } // e.g., file schema: seq, kind, key1, key2, s1, s2, v1, v2 // user raw read schema: key1, v1, s1 // format reader read schema: seq, kind, key1, key2, v1, s1, s2 // in KeyValue object: key: key1, key2 / value: key1, v1, s1, s2 - value_fields_.reserve(value_schema_->num_fields()); + arrow::ArrayVector value_fields; + value_fields.reserve(value_schema_->num_fields()); for (const auto& value_field : value_schema_->fields()) { auto field_array = data_batch->GetFieldByName(value_field->name()); if (!field_array) { return Status::Invalid( fmt::format("cannot find field {} in data batch", value_field->name())); } - value_fields_.emplace_back(field_array); + value_fields.emplace_back(field_array); } - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(value_struct_array_, - arrow::StructArray::Make(value_fields_, value_names_)); selection_bitmap_ = std::move(bitmap); - value_fields_ = value_struct_array_->fields(); - key_ctx_ = std::make_shared(nullptr, key_fields_, pool_); - value_ctx_ = std::make_shared(value_struct_array_, value_fields_, pool_); - ArrowUtils::TraverseArray(value_struct_array_); + key_ctx_ = std::make_shared(key_fields, pool_); + value_ctx_ = std::make_shared(value_fields, pool_); + ArrowUtils::TraverseArray(data_batch); return std::make_unique(this); } @@ -145,9 +142,6 @@ void KeyValueDataFileRecordReader::Reset() { selection_bitmap_ = RoaringBitmap32(); key_ctx_.reset(); value_ctx_.reset(); - key_fields_.clear(); - value_fields_.clear(); - value_struct_array_.reset(); sequence_number_array_.reset(); row_kind_array_.reset(); } diff --git a/src/paimon/core/io/key_value_data_file_record_reader.h b/src/paimon/core/io/key_value_data_file_record_reader.h index 3a1a298b..c08ea177 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.h +++ b/src/paimon/core/io/key_value_data_file_record_reader.h @@ -84,11 +84,10 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader { std::shared_ptr value_schema_; std::vector value_names_; RoaringBitmap32 selection_bitmap_; - std::shared_ptr value_struct_array_; - arrow::ArrayVector key_fields_; - arrow::ArrayVector value_fields_; std::shared_ptr> sequence_number_array_; std::shared_ptr> row_kind_array_; + + protected: std::shared_ptr key_ctx_; std::shared_ptr value_ctx_; }; diff --git a/src/paimon/core/io/key_value_data_file_writer.cpp b/src/paimon/core/io/key_value_data_file_writer.cpp index 6b6fa7ab..c3d17395 100644 --- a/src/paimon/core/io/key_value_data_file_writer.cpp +++ b/src/paimon/core/io/key_value_data_file_writer.cpp @@ -46,13 +46,15 @@ class MemoryPool; KeyValueDataFileWriter::KeyValueDataFileWriter( const std::string& compression, std::function converter, - int64_t schema_id, FileSource file_source, const std::vector& primary_keys, + int64_t schema_id, int32_t level, FileSource file_source, + const std::vector& primary_keys, const std::shared_ptr& stats_extractor, const std::shared_ptr& write_schema, bool is_external_path, const std::shared_ptr& pool) : SingleFileWriter(compression, converter), pool_(pool), schema_id_(schema_id), + level_(level), file_source_(file_source), primary_keys_(primary_keys), stats_extractor_(stats_extractor), @@ -102,7 +104,7 @@ Result> KeyValueDataFileWriter::GetResult() { PAIMON_ASSIGN_OR_RAISE(int64_t local_micro, DateTimeUtils::GetCurrentLocalTimeUs()); return std::make_shared( PathUtil::GetName(path_), output_bytes_, RecordCount(), min_key, max_key, key_stats, - value_stats, min_sequence_number_, max_sequence_number_, schema_id_, /*level=*/0, + value_stats, min_sequence_number_, max_sequence_number_, schema_id_, level_, /*extra_files=*/std::vector>(), Timestamp(/*millisecond=*/local_micro / 1000, /*nano_of_millisecond=*/0), delete_row_count_, /*embedded_index=*/nullptr, file_source_, diff --git a/src/paimon/core/io/key_value_data_file_writer.h b/src/paimon/core/io/key_value_data_file_writer.h index 8ab15cb5..bb55565d 100644 --- a/src/paimon/core/io/key_value_data_file_writer.h +++ b/src/paimon/core/io/key_value_data_file_writer.h @@ -47,7 +47,7 @@ class KeyValueDataFileWriter public: KeyValueDataFileWriter(const std::string& compression, std::function converter, - int64_t schema_id, FileSource file_source, + int64_t schema_id, int32_t level, FileSource file_source, const std::vector& primary_keys, const std::shared_ptr& stats_extractor, const std::shared_ptr& write_schema, @@ -69,6 +69,7 @@ class KeyValueDataFileWriter private: std::shared_ptr pool_; int64_t schema_id_; + int32_t level_; FileSource file_source_; std::vector primary_keys_; std::shared_ptr stats_extractor_; diff --git a/src/paimon/core/io/key_value_meta_projection_consumer.cpp b/src/paimon/core/io/key_value_meta_projection_consumer.cpp index b46aac42..15c5c8d5 100644 --- a/src/paimon/core/io/key_value_meta_projection_consumer.cpp +++ b/src/paimon/core/io/key_value_meta_projection_consumer.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "arrow/api.h" #include "arrow/array/builder_dict.h" @@ -37,9 +38,26 @@ namespace paimon { class MemoryPool; - Result> KeyValueMetaProjectionConsumer::Create( const std::shared_ptr& target_schema, const std::shared_ptr& pool) { + std::vector target_to_src_mapping(target_schema->num_fields() - + SpecialFields::KEY_VALUE_SPECIAL_FIELD_COUNT); + std::iota(target_to_src_mapping.begin(), target_to_src_mapping.end(), 0); + return Create(target_schema, target_to_src_mapping, pool); +} +Result> KeyValueMetaProjectionConsumer::Create( + const std::shared_ptr& target_schema, + const std::vector& target_to_src_mapping, const std::shared_ptr& pool) { + if (static_cast(target_schema->num_fields() - + SpecialFields::KEY_VALUE_SPECIAL_FIELD_COUNT) != + target_to_src_mapping.size()) { + return Status::Invalid( + fmt::format("target_schema field count without special fields {} and " + "target_to_src_mapping size {} mismatch in KeyValueMetaProjectionConsumer", + target_schema->num_fields() - SpecialFields::KEY_VALUE_SPECIAL_FIELD_COUNT, + target_to_src_mapping.size())); + } + auto arrow_pool = GetArrowPool(pool); // target fields of output array: special fields + value fields std::unique_ptr array_builder; @@ -74,7 +92,7 @@ Result> KeyValueMetaProjectionCo } return std::unique_ptr(new KeyValueMetaProjectionConsumer( reserve_count, std::move(appenders), std::move(struct_builder), std::move(arrow_pool), - sequence_appender, value_kind_appender)); + target_to_src_mapping, sequence_appender, value_kind_appender)); } Result KeyValueMetaProjectionConsumer::NextBatch( @@ -108,7 +126,7 @@ Result KeyValueMetaProjectionConsumer::NextBatch( // append value fields for (size_t i = 0; i < appenders_.size(); i++) { for (const auto& row : key_value_vec) { - PAIMON_RETURN_NOT_OK_FROM_ARROW(appenders_[i](*(row.value), i)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(appenders_[i](*(row.value), target_to_src_mapping_[i])); } } PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch result_batch, FinishAndAccumulate()); diff --git a/src/paimon/core/io/key_value_meta_projection_consumer.h b/src/paimon/core/io/key_value_meta_projection_consumer.h index ea5a8991..a8ab7860 100644 --- a/src/paimon/core/io/key_value_meta_projection_consumer.h +++ b/src/paimon/core/io/key_value_meta_projection_consumer.h @@ -43,20 +43,28 @@ class KeyValueMetaProjectionConsumer : public RowToArrowArrayConverter& target_schema, const std::shared_ptr& pool); + // target_to_src_mapping is the mapping excluding special fields. + static Result> Create( + const std::shared_ptr& target_schema, + const std::vector& target_to_src_mapping, const std::shared_ptr& pool); + Result NextBatch(const std::vector& key_value_vec) override; private: KeyValueMetaProjectionConsumer(int32_t reserve_count, std::vector&& appenders, std::unique_ptr&& array_builder, std::unique_ptr&& arrow_pool, + const std::vector& target_to_src_mapping, arrow::Int64Builder* sequence_appender, arrow::Int8Builder* value_kind_appender) : RowToArrowArrayConverter(reserve_count, std::move(appenders), std::move(array_builder), std::move(arrow_pool)), + target_to_src_mapping_(target_to_src_mapping), sequence_appender_(sequence_appender), value_kind_appender_(value_kind_appender) {} private: + std::vector target_to_src_mapping_; arrow::Int64Builder* sequence_appender_; arrow::Int8Builder* value_kind_appender_; }; diff --git a/src/paimon/core/mergetree/compact/compact_rewriter.h b/src/paimon/core/mergetree/compact/compact_rewriter.h new file mode 100644 index 00000000..393b98f1 --- /dev/null +++ b/src/paimon/core/mergetree/compact/compact_rewriter.h @@ -0,0 +1,53 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/core/compact/compact_result.h" +#include "paimon/core/mergetree/sorted_run.h" +#include "paimon/result.h" + +namespace paimon { + +/// Rewrite sections to new level. +class CompactRewriter { + public: + virtual ~CompactRewriter() = default; + /// Rewrite sections to new level + /// + /// @param output_level new level + /// @param drop_delete whether to drop the deletion, see + /// `MergeTreeCompactManager::TriggerCompaction` + /// @param sections list of sections (section is a list of `SortedRun`s, and key intervals + /// between sections do not overlap) + /// @return compaction result + virtual Result Rewrite(int32_t output_level, bool drop_delete, + const std::vector>& sections) = 0; + + /// Upgrade file to new level, usually file data is not rewritten, only the metadata is updated. + /// But in some certain scenarios, we must rewrite file too, e.g. `ChangelogMergeTreeRewriter` + /// + /// @param output_level new level + /// @param file file to be updated + /// @return compaction result + virtual Result Upgrade(int32_t output_level, + const std::shared_ptr& file) const = 0; + + /// Close rewriter. + virtual Status Close() = 0; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/interval_partition.h b/src/paimon/core/mergetree/compact/interval_partition.h index a8eea2b9..af87804b 100644 --- a/src/paimon/core/mergetree/compact/interval_partition.h +++ b/src/paimon/core/mergetree/compact/interval_partition.h @@ -30,7 +30,7 @@ struct DataFileMeta; /// Algorithm to partition several data files into the minimum number of `SortedRun`s. class IntervalPartition { public: - IntervalPartition(const std::vector>& inputFiles, + IntervalPartition(const std::vector>& input_files, const std::shared_ptr& key_comparator); /// Returns a two-dimensional list of `SortedRun`s. diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp new file mode 100644 index 00000000..653e91d9 --- /dev/null +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp @@ -0,0 +1,217 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/core/mergetree/compact/merge_tree_compact_rewriter.h" + +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/core/io/key_value_data_file_writer.h" +#include "paimon/core/io/key_value_meta_projection_consumer.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/io/row_to_arrow_array_converter.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/format/file_format.h" +#include "paimon/format/writer_builder.h" +#include "paimon/read_context.h" + +namespace paimon { +MergeTreeCompactRewriter::MergeTreeCompactRewriter( + const BinaryRow& partition, int64_t schema_id, + const std::vector& trimmed_primary_keys, const CoreOptions& options, + const std::shared_ptr& data_schema, + const std::shared_ptr& write_schema, + const std::shared_ptr& data_file_path_factory, + std::unique_ptr&& merge_file_split_read, + const std::shared_ptr& pool) + : pool_(pool), + partition_(partition), + schema_id_(schema_id), + trimmed_primary_keys_(trimmed_primary_keys), + options_(options), + data_schema_(data_schema), + write_schema_(write_schema), + data_file_path_factory_(data_file_path_factory), + merge_file_split_read_(std::move(merge_file_split_read)) {} + +Result> MergeTreeCompactRewriter::Create( + int32_t bucket, const BinaryRow& partition, const std::shared_ptr& table_schema, + const std::shared_ptr& path_factory, const CoreOptions& options, + const std::shared_ptr& pool, const std::shared_ptr& executor) { + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_primary_keys, + table_schema->TrimmedPrimaryKeys()); + auto data_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(data_schema); + + ReadContextBuilder read_context_builder(path_factory->RootPath()); + read_context_builder.SetOptions(options.ToMap()) + .EnablePrefetch(true) + .WithMemoryPool(pool) + .WithExecutor(executor); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, + read_context_builder.Finish()); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr internal_context, + InternalReadContext::Create(read_context, table_schema, options.ToMap())); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr merge_file_split_read, + MergeFileSplitRead::Create(path_factory, internal_context, pool, executor)); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + path_factory->CreateDataFilePathFactory(partition, bucket)); + + return std::unique_ptr(new MergeTreeCompactRewriter( + partition, table_schema->Id(), trimmed_primary_keys, options, data_schema, write_schema, + data_file_path_factory, std::move(merge_file_split_read), pool)); +} + +Result MergeTreeCompactRewriter::Upgrade( + int32_t output_level, const std::shared_ptr& file) const { + PAIMON_ASSIGN_OR_RAISE(auto upgraded_file, file->Upgrade(output_level)); + return CompactResult({file}, {upgraded_file}); +} + +Result MergeTreeCompactRewriter::Rewrite( + int32_t output_level, bool drop_delete, const std::vector>& sections) { + return RewriteCompaction(output_level, drop_delete, sections); +} + +std::vector> MergeTreeCompactRewriter::ExtractFilesFromSections( + const std::vector>& sections) { + std::vector> files; + for (const auto& section : sections) { + for (const auto& sorted_run : section) { + auto files_in_run = sorted_run.Files(); + files.insert(files.end(), files_in_run.begin(), files_in_run.end()); + } + } + return files; +} + +std::unique_ptr +MergeTreeCompactRewriter::CreateRollingRowWriter(int32_t level) const { + auto create_file_writer = [this, level]() + -> Result>>> { + ::ArrowSchema arrow_schema{}; + ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*write_schema_, &arrow_schema)); + auto format = options_.GetWriteFileFormat(); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer_builder, + format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); + writer_builder->WithMemoryPool(pool_); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*write_schema_, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr stats_extractor, + format->CreateStatsExtractor(&arrow_schema)); + auto converter = [](KeyValueBatch key_value_batch, ArrowArray* array) -> Status { + ArrowArrayMove(key_value_batch.batch.get(), array); + return Status::OK(); + }; + auto writer = std::make_unique( + options_.GetFileCompression(), converter, schema_id_, level, FileSource::Compact(), + trimmed_primary_keys_, stats_extractor, write_schema_, + data_file_path_factory_->IsExternalPath(), pool_); + PAIMON_RETURN_NOT_OK(writer->Init(options_.GetFileSystem(), + data_file_path_factory_->NewPath(), writer_builder)); + return writer; + }; + return std::make_unique( + options_.GetTargetFileSize(/*has_primary_key=*/true), create_file_writer); +} + +Result +MergeTreeCompactRewriter::GenerateKeyValueConsumer() const { + PAIMON_ASSIGN_OR_RAISE(std::vector target_to_src_mapping, + ArrowUtils::CreateProjection( + /*src_schema=*/merge_file_split_read_->GetValueSchema(), + /*target_fields=*/data_schema_->fields())); + return MergeTreeCompactRewriter::KeyValueConsumerCreator( + [target_schema = write_schema_, pool = pool_, + target_to_src_mapping = std::move(target_to_src_mapping)]() + -> Result>> { + return KeyValueMetaProjectionConsumer::Create(target_schema, target_to_src_mapping, + pool); + }); +} + +Status MergeTreeCompactRewriter::MergeReadAndWrite( + bool drop_delete, const std::vector& section, + const MergeTreeCompactRewriter::KeyValueConsumerCreator& create_consumer, + MergeTreeCompactRewriter::KeyValueRollingFileWriter* rolling_writer, + std::vector>* + reader_holders_ptr) { + auto& reader_holders = *reader_holders_ptr; + // prepare loser tree sort merge reader + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr sort_merge_reader, + merge_file_split_read_->CreateSortMergeReaderForSection( + section, partition_, /*deletion_file_map=*/{}, + /*predicate=*/nullptr, data_file_path_factory_, drop_delete)); + + // consumer batch size is WriteBatchSize + auto async_key_value_producer_consumer = + std::make_unique>( + std::move(sort_merge_reader), create_consumer, options_.GetWriteBatchSize(), + /*projection_thread_num=*/1, pool_); + + // read KeyValueBatch from SortMergeReader and write to RollingWriter + while (true) { + PAIMON_ASSIGN_OR_RAISE(KeyValueBatch key_value_batch, + async_key_value_producer_consumer->NextBatch()); + if (key_value_batch.batch == nullptr) { + break; + } + PAIMON_RETURN_NOT_OK(rolling_writer->Write(std::move(key_value_batch))); + } + reader_holders.push_back(std::move(async_key_value_producer_consumer)); + return Status::OK(); +} + +Result MergeTreeCompactRewriter::RewriteCompaction( + int32_t output_level, bool drop_delete, const std::vector>& sections) { + PAIMON_ASSIGN_OR_RAISE(MergeTreeCompactRewriter::KeyValueConsumerCreator create_consumer, + GenerateKeyValueConsumer()); + + std::vector> reader_holders; + auto rolling_writer = CreateRollingRowWriter(output_level); + + ScopeGuard write_guard([&]() -> void { + (void)rolling_writer->Close(); + rolling_writer->Abort(); + for (const auto& reader : reader_holders) { + reader->Close(); + } + }); + + for (const auto& section : sections) { + PAIMON_RETURN_NOT_OK(MergeReadAndWrite(drop_delete, section, create_consumer, + rolling_writer.get(), &reader_holders)); + } + + PAIMON_RETURN_NOT_OK(rolling_writer->Close()); + + auto before = ExtractFilesFromSections(sections); + NotifyRewriteCompactBefore(before); + PAIMON_ASSIGN_OR_RAISE(std::vector> after, + rolling_writer->GetResult()); + write_guard.Release(); + + after = NotifyRewriteCompactAfter(after); + return CompactResult(before, after); +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.h b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.h new file mode 100644 index 00000000..ea4f8529 --- /dev/null +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.h @@ -0,0 +1,104 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "arrow/api.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/async_key_value_producer_and_consumer.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/rolling_file_writer.h" +#include "paimon/core/key_value.h" +#include "paimon/core/mergetree/compact/compact_rewriter.h" +#include "paimon/core/mergetree/merge_tree_writer.h" +#include "paimon/core/operation/merge_file_split_read.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/utils/file_store_path_factory.h" +namespace paimon { +/// Default `CompactRewriter` for merge trees. +class MergeTreeCompactRewriter : public CompactRewriter { + public: + static Result> Create( + int32_t bucket, const BinaryRow& partition, + const std::shared_ptr& table_schema, + const std::shared_ptr& path_factory, const CoreOptions& options, + const std::shared_ptr& memory_pool, const std::shared_ptr& executor); + + Result Rewrite(int32_t output_level, bool drop_delete, + const std::vector>& sections) override; + + Result Upgrade(int32_t output_level, + const std::shared_ptr& file) const override; + + Status Close() override { + return Status::OK(); + } + + protected: + Result RewriteCompaction(int32_t output_level, bool drop_delete, + const std::vector>& sections); + + virtual void NotifyRewriteCompactBefore( + const std::vector>& files) {} + + virtual std::vector> NotifyRewriteCompactAfter( + const std::vector>& files) { + return files; + } + + static std::vector> ExtractFilesFromSections( + const std::vector>& sections); + + private: + using KeyValueRollingFileWriter = + RollingFileWriter>; + using KeyValueMergeReader = AsyncKeyValueProducerAndConsumer; + using KeyValueConsumerCreator = + AsyncKeyValueProducerAndConsumer::ConsumerCreator; + + MergeTreeCompactRewriter(const BinaryRow& partition, int64_t schema_id, + const std::vector& trimmed_primary_keys, + const CoreOptions& options, + const std::shared_ptr& data_schema, + const std::shared_ptr& write_schema, + const std::shared_ptr& data_file_path_factory, + std::unique_ptr&& merge_file_split_read, + const std::shared_ptr& pool); + + std::unique_ptr CreateRollingRowWriter(int32_t level) const; + + Result GenerateKeyValueConsumer() const; + + Status MergeReadAndWrite(bool drop_delete, const std::vector& section, + const KeyValueConsumerCreator& create_consumer, + KeyValueRollingFileWriter* rolling_writer, + std::vector>* reader_holders_ptr); + + private: + std::shared_ptr pool_; + BinaryRow partition_; + int64_t schema_id_; + std::vector trimmed_primary_keys_; + CoreOptions options_; + // all data fields in table schema + std::shared_ptr data_schema_; + // SequenceNumber + ValueKind + data_schema_ + std::shared_ptr write_schema_; + std::shared_ptr data_file_path_factory_; + std::unique_ptr merge_file_split_read_; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp new file mode 100644 index 00000000..7333f1b8 --- /dev/null +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp @@ -0,0 +1,305 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/merge_tree_compact_rewriter.h" + +#include "arrow/api.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/factories/io_hook.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/core/mergetree/compact/interval_partition.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/scan_context.h" +#include "paimon/table/source/table_scan.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/io_exception_helper.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class MergeTreeCompactRewriterTest : public testing::Test { + public: + Result> CreateCompactRewriter( + const std::string& table_path, const std::shared_ptr& table_schema, + int32_t bucket, const BinaryRow& partition) const { + PAIMON_ASSIGN_OR_RAISE(auto options, CoreOptions::FromMap(table_schema->Options())); + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + FileStorePathFactory::Create( + table_path, arrow_schema, table_schema->PartitionKeys(), + options.GetPartitionDefaultName(), options.GetWriteFileFormat()->Identifier(), + options.DataFilePrefix(), options.LegacyPartitionNameEnabled(), external_paths, + global_index_external_path, options.IndexFileInDataFileDir(), pool_)); + + return MergeTreeCompactRewriter::Create(bucket, partition, table_schema, path_factory, + options, pool_, CreateDefaultExecutor()); + } + + Result>> GenerateSortedRuns( + const std::string& table_path, const std::shared_ptr& table_schema, + int32_t bucket, const std::map& partition) const { + ScanContextBuilder scan_context_builder(table_path); + scan_context_builder.SetBucketFilter(bucket).SetPartitionFilter({partition}); + PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_scan, TableScan::Create(std::move(scan_context))); + PAIMON_ASSIGN_OR_RAISE(auto result_plan, table_scan->CreatePlan()); + auto splits = result_plan->Splits(); + EXPECT_EQ(1, splits.size()); + + auto data_split_impl = std::dynamic_pointer_cast(splits[0]); + EXPECT_TRUE(data_split_impl); + auto metas = data_split_impl->DataFiles(); + + PAIMON_ASSIGN_OR_RAISE(auto pk_fields, + table_schema->GetFields(table_schema->TrimmedPrimaryKeys().value())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr key_comparator, + FieldsComparator::Create(pk_fields, /*is_ascending_order=*/true, /*use_view=*/false)); + IntervalPartition interval_partition(metas, key_comparator); + return interval_partition.Partition(); + } + + void CheckResult(const std::string& compact_file_name, const std::shared_ptr& fs, + const std::shared_ptr& table_schema, + const std::shared_ptr& expected_array) const { + ASSERT_OK_AND_ASSIGN(auto file_format, + FileFormatFactory::Get("orc", table_schema->Options())); + ASSERT_OK_AND_ASSIGN(auto reader_builder, + file_format->CreateReaderBuilder(/*batch_size=*/10)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + fs->Open(compact_file_name)); + ASSERT_OK_AND_ASSIGN(auto file_batch_reader, reader_builder->Build(input_stream)); + ASSERT_OK_AND_ASSIGN(auto result_array, + ReadResultCollector::CollectResult(file_batch_reader.get())); + // handle type nullable, as result_array does not have not null flag + result_array = result_array->View(expected_array->type()).ValueOrDie(); + + ASSERT_TRUE(expected_array->type()->Equals(result_array->type())) + << "result=" << result_array->type()->ToString() + << ", expected=" << expected_array->type()->ToString() << std::endl; + ASSERT_TRUE(expected_array->Equals(*result_array)) << result_array->ToString(); + } + + private: + std::shared_ptr pool_ = GetDefaultPool(); +}; + +TEST_F(MergeTreeCompactRewriterTest, TestSimple) { + std::string origin_table_path = GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/"; + auto table_dir = UniqueTestDirectory::Create("local"); + ASSERT_TRUE(TestUtil::CopyDirectory(origin_table_path, table_dir->Str())); + std::string table_path = table_dir->Str() + "/pk_table_scan_and_read_mor"; + auto fs = table_dir->GetFileSystem(); + + // load table schema + SchemaManager schema_manager(fs, table_path); + ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0)); + ASSERT_OK_AND_ASSIGN( + auto rewriter, + CreateCompactRewriter(table_path, table_schema, /*bucket=*/1, + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool_.get()))); + + // generate sorted runs and rewrite + ASSERT_OK_AND_ASSIGN(auto runs, GenerateSortedRuns(table_path, table_schema, /*bucket=*/1, + /*partition=*/{{"f1", "10"}})) + ASSERT_OK_AND_ASSIGN(auto compact_result, rewriter->Rewrite( + /*output_level=*/5, /*drop_delete=*/true, runs)); + // check compact result + ASSERT_EQ(4, compact_result.Before().size()); + ASSERT_EQ(1, compact_result.After().size()); + const auto& compact_file_meta = compact_result.After()[0]; + auto expected_file_meta = std::make_shared( + "file.orc", 100l, /*row_count=*/7, + /*min_key=*/BinaryRowGenerator::GenerateRow({"Bob", 0}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({"Skye2", 0}, pool_.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({"Bob", 0}, {"Skye2", 0}, {0, 0}, pool_.get()), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({"Bob", 10, 0, 12.1}, {"Skye2", 10, 0, 31.1}, + {0, 0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0l, /*max_sequence_number=*/10l, /*schema_id=*/0, /*level=*/5, + std::vector>(), Timestamp(0l, 0), /*delete_row_count=*/0, + nullptr, FileSource::Compact(), std::nullopt, std::nullopt, std::nullopt, std::nullopt); + ASSERT_TRUE(expected_file_meta->TEST_Equal(*compact_file_meta)); + + // check compact file exist + std::string compact_file_name = + table_path + "/f1=10/bucket-1/" + compact_result.After()[0]->file_name; + ASSERT_OK_AND_ASSIGN(bool exist, fs->Exists(compact_file_name)); + ASSERT_TRUE(exist); + + // check file content + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + auto type_with_special_fields = + arrow::struct_(SpecialFields::CompleteSequenceAndValueKindField(arrow_schema)->fields()); + std::shared_ptr expected_array; + auto array_status = + arrow::ipc::internal::json::ChunkedArrayFromJSON(type_with_special_fields, {R"([ +[0, 0, "Bob", 10, 0, 12.1], +[4, 0, "David", 10, 0, 17.1], +[1, 0, "Emily", 10, 0, 13.1], +[7, 0, "Marco", 10, 0, 21.1], +[10, 0, "Marco2", 10, 0, 31.1], +[6, 0, "Skye", 10, 0, 21], +[9, 0, "Skye2", 10, 0, 31] +])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()); + CheckResult(compact_file_name, fs, table_schema, expected_array); +} + +TEST_F(MergeTreeCompactRewriterTest, TestNotDropDelete) { + std::string origin_table_path = GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/"; + auto table_dir = UniqueTestDirectory::Create("local"); + ASSERT_TRUE(TestUtil::CopyDirectory(origin_table_path, table_dir->Str())); + std::string table_path = table_dir->Str() + "/pk_table_scan_and_read_mor"; + auto fs = table_dir->GetFileSystem(); + + // load table schema + SchemaManager schema_manager(fs, table_path); + ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0)); + ASSERT_OK_AND_ASSIGN( + auto rewriter, + CreateCompactRewriter(table_path, table_schema, /*bucket=*/1, + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool_.get()))); + + // generate sorted runs and rewrite + ASSERT_OK_AND_ASSIGN(auto runs, GenerateSortedRuns(table_path, table_schema, /*bucket=*/1, + /*partition=*/{{"f1", "10"}})) + ASSERT_OK_AND_ASSIGN(auto compact_result, rewriter->Rewrite( + /*output_level=*/5, /*drop_delete=*/false, runs)); + // check compact result + ASSERT_EQ(4, compact_result.Before().size()); + ASSERT_EQ(1, compact_result.After().size()); + const auto& compact_file_meta = compact_result.After()[0]; + auto expected_file_meta = std::make_shared( + "file.orc", 100l, /*row_count=*/9, + /*min_key=*/BinaryRowGenerator::GenerateRow({"Alex", 0}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({"Tony", 0}, pool_.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({"Alex", 0}, {"Tony", 0}, {0, 0}, pool_.get()), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({"Alex", 10, 0, 12.1}, {"Tony", 10, 0, 31.2}, + {0, 0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0l, /*max_sequence_number=*/11l, /*schema_id=*/0, /*level=*/5, + std::vector>(), Timestamp(0l, 0), /*delete_row_count=*/2, + nullptr, FileSource::Compact(), std::nullopt, std::nullopt, std::nullopt, std::nullopt); + ASSERT_TRUE(expected_file_meta->TEST_Equal(*compact_file_meta)); + + std::string compact_file_name = + table_path + "/f1=10/bucket-1/" + compact_result.After()[0]->file_name; + ASSERT_OK_AND_ASSIGN(bool exist, fs->Exists(compact_file_name)); + ASSERT_TRUE(exist); + + // check file content + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + auto type_with_special_fields = + arrow::struct_(SpecialFields::CompleteSequenceAndValueKindField(arrow_schema)->fields()); + std::shared_ptr expected_array; + auto array_status = + arrow::ipc::internal::json::ChunkedArrayFromJSON(type_with_special_fields, {R"([ +[11, 3, "Alex", 10, 0, 31.2], +[0, 0, "Bob", 10, 0, 12.1], +[4, 0, "David", 10, 0, 17.1], +[1, 0, "Emily", 10, 0, 13.1], +[7, 0, "Marco", 10, 0, 21.1], +[10, 0, "Marco2", 10, 0, 31.1], +[6, 0, "Skye", 10, 0, 21], +[9, 0, "Skye2", 10, 0, 31], +[5, 3, "Tony", 10, 0, 14.1] +])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()); + CheckResult(compact_file_name, fs, table_schema, expected_array); +} + +TEST_F(MergeTreeCompactRewriterTest, TestIOException) { + std::string origin_table_path = GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/"; + + bool run_complete = false; + auto io_hook = IOHook::GetInstance(); + for (size_t i = 0; i < 500; i += RandomNumber(1, 17)) { + auto table_dir = UniqueTestDirectory::Create("local"); + ASSERT_TRUE(TestUtil::CopyDirectory(origin_table_path, table_dir->Str())); + std::string table_path = table_dir->Str() + "/pk_table_scan_and_read_mor"; + auto fs = table_dir->GetFileSystem(); + + // load table schema + SchemaManager schema_manager(fs, table_path); + ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0)); + ASSERT_OK_AND_ASSIGN(auto rewriter, + CreateCompactRewriter( + table_path, table_schema, /*bucket=*/1, + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool_.get()))); + + // generate sorted runs and rewrite + ASSERT_OK_AND_ASSIGN(auto runs, GenerateSortedRuns(table_path, table_schema, /*bucket=*/1, + /*partition=*/{{"f1", "10"}})) + // rewrite may trigger I/O exception + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + auto compact_result = rewriter->Rewrite( + /*output_level=*/5, /*drop_delete=*/true, runs); + CHECK_HOOK_STATUS(compact_result.status(), i); + io_hook->Clear(); + + // check compact result + ASSERT_EQ(4, compact_result.value().Before().size()); + ASSERT_EQ(1, compact_result.value().After().size()); + std::string compact_file_name = + table_path + "/f1=10/bucket-1/" + compact_result.value().After()[0]->file_name; + ASSERT_OK_AND_ASSIGN(bool exist, fs->Exists(compact_file_name)); + ASSERT_TRUE(exist); + + // check file content + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + auto type_with_special_fields = arrow::struct_( + SpecialFields::CompleteSequenceAndValueKindField(arrow_schema)->fields()); + std::shared_ptr expected_array; + auto array_status = + arrow::ipc::internal::json::ChunkedArrayFromJSON(type_with_special_fields, {R"([ +[0, 0, "Bob", 10, 0, 12.1], +[4, 0, "David", 10, 0, 17.1], +[1, 0, "Emily", 10, 0, 13.1], +[7, 0, "Marco", 10, 0, 21.1], +[10, 0, "Marco2", 10, 0, 31.1], +[6, 0, "Skye", 10, 0, 21], +[9, 0, "Skye2", 10, 0, 31] +])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()); + CheckResult(compact_file_name, fs, table_schema, expected_array); + run_complete = true; + break; + } + ASSERT_TRUE(run_complete); +} + +} // namespace paimon::test +// TODO(xinyu.lxy): e2e test +// test multiple MergeFunction +// test multiple RowKind +// test external path +// test branch diff --git a/src/paimon/core/mergetree/merge_tree_writer.cpp b/src/paimon/core/mergetree/merge_tree_writer.cpp index baba908d..b1bc3fbb 100644 --- a/src/paimon/core/mergetree/merge_tree_writer.cpp +++ b/src/paimon/core/mergetree/merge_tree_writer.cpp @@ -80,13 +80,7 @@ MergeTreeWriter::MergeTreeWriter( schema_id_(schema_id), value_type_(arrow::struct_(value_schema->fields())), metrics_(std::make_shared()) { - arrow::FieldVector target_fields; - target_fields.push_back( - DataField::ConvertDataFieldToArrowField(SpecialFields::SequenceNumber())); - target_fields.push_back(DataField::ConvertDataFieldToArrowField(SpecialFields::ValueKind())); - target_fields.insert(target_fields.end(), value_schema->fields().begin(), - value_schema->fields().end()); - write_schema_ = arrow::schema(target_fields); + write_schema_ = SpecialFields::CompleteSequenceAndValueKindField(value_schema); } Status MergeTreeWriter::Write(std::unique_ptr&& moved_batch) { @@ -148,10 +142,14 @@ Status MergeTreeWriter::Flush() { // consumer batch size is WriteBatchSize auto async_key_value_producer_consumer = std::make_unique>( - std::move(sort_merge_reader), create_consumer, - std::min(options_.GetWriteBatchSize(), MAX_PROJECTION_BATCH_SIZE), + std::move(sort_merge_reader), create_consumer, options_.GetWriteBatchSize(), /*projection_thread_num=*/1, pool_); auto rolling_writer = CreateRollingRowWriter(); + ScopeGuard write_guard([&]() -> void { + (void)rolling_writer->Close(); + rolling_writer->Abort(); + async_key_value_producer_consumer->Close(); + }); while (true) { PAIMON_ASSIGN_OR_RAISE(KeyValueBatch key_value_batch, async_key_value_producer_consumer->NextBatch()); @@ -163,6 +161,7 @@ Status MergeTreeWriter::Flush() { PAIMON_RETURN_NOT_OK(rolling_writer->Close()); PAIMON_ASSIGN_OR_RAISE(std::vector> flushed_files, rolling_writer->GetResult()); + write_guard.Release(); new_files_.insert(new_files_.end(), flushed_files.begin(), flushed_files.end()); metrics_->Merge(rolling_writer->GetMetrics()); return Status::OK(); @@ -196,7 +195,7 @@ MergeTreeWriter::CreateRollingRowWriter() const { return Status::OK(); }; auto writer = std::make_unique( - options_.GetFileCompression(), converter, schema_id_, FileSource::Append(), + options_.GetFileCompression(), converter, schema_id_, /*level=*/0, FileSource::Append(), trimmed_primary_keys_, stats_extractor, write_schema_, path_factory_->IsExternalPath(), pool_); PAIMON_RETURN_NOT_OK( diff --git a/src/paimon/core/mergetree/merge_tree_writer.h b/src/paimon/core/mergetree/merge_tree_writer.h index 53a869a9..0788c014 100644 --- a/src/paimon/core/mergetree/merge_tree_writer.h +++ b/src/paimon/core/mergetree/merge_tree_writer.h @@ -93,9 +93,6 @@ class MergeTreeWriter : public BatchWriter { CreateRollingRowWriter() const; static Result EstimateMemoryUse(const std::shared_ptr& array); - // in case write batch size is too large and overflow arrow array - static constexpr int32_t MAX_PROJECTION_BATCH_SIZE = 100000; - private: int64_t last_sequence_number_; int64_t current_memory_in_bytes_; diff --git a/src/paimon/core/operation/merge_file_split_read.cpp b/src/paimon/core/operation/merge_file_split_read.cpp index f02e1dc8..4c003c0f 100644 --- a/src/paimon/core/operation/merge_file_split_read.cpp +++ b/src/paimon/core/operation/merge_file_split_read.cpp @@ -138,20 +138,25 @@ Result> MergeFileSplitRead::CreateReader( CreateNoMergeReader(data_split, /*only_filter_key=*/data_split->IsStreaming(), data_file_path_factory)); } else { - if (!merge_function_wrapper_) { - // In deletion vector mode, streaming data split or postpone bucket mode, we don't need - // to use merge function. Even if the merge function in CoreOptions is not supported, it - // should not affect data reading. So we create merge_function_wrapper_ lazily, to avoid - // raise errors when creating MergeFileSplitRead at the beginning. - PAIMON_ASSIGN_OR_RAISE( - merge_function_wrapper_, - CreateMergeFunctionWrapper(options_, context_->GetTableSchema(), value_schema_)); - } PAIMON_ASSIGN_OR_RAISE(batch_reader, CreateMergeReader(data_split, data_file_path_factory)); } return std::make_unique(std::move(batch_reader), pool_); } +Result>> +MergeFileSplitRead::GetMergeFunctionWrapper() { + if (!merge_function_wrapper_) { + // In deletion vector mode, streaming data split or postpone bucket mode, we don't need + // to use merge function. Even if the merge function in CoreOptions is not supported, it + // should not affect data reading. So we create merge_function_wrapper_ lazily, to avoid + // raise errors when creating MergeFileSplitRead at the beginning. + PAIMON_ASSIGN_OR_RAISE( + merge_function_wrapper_, + CreateMergeFunctionWrapper(options_, context_->GetTableSchema(), value_schema_)); + } + return merge_function_wrapper_; +} + Result>> MergeFileSplitRead::CreateMergeFunctionWrapper(const CoreOptions& core_options, const std::shared_ptr& table_schema, @@ -194,7 +199,7 @@ Result> MergeFileSplitRead::ApplyIndexAndDvReaderIf Result> MergeFileSplitRead::CreateMergeReader( const std::shared_ptr& data_split, - const std::shared_ptr& data_file_path_factory) const { + const std::shared_ptr& data_file_path_factory) { auto deletion_file_map = AbstractSplitRead::CreateDeletionFileMap(*data_split); std::vector> sections = IntervalPartition(data_split->DataFiles(), interval_partition_comparator_).Partition(); @@ -202,10 +207,9 @@ Result> MergeFileSplitRead::CreateMergeReader( batch_readers.reserve(sections.size()); // no overlap through multiple sections for (const auto& section : sections) { - PAIMON_ASSIGN_OR_RAISE( - std::unique_ptr projection_reader, - CreateReaderForSection(section, data_split->BucketPath(), data_split->Partition(), - deletion_file_map, data_file_path_factory)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr projection_reader, + CreateReaderForSection(section, data_split->Partition(), + deletion_file_map, data_file_path_factory)); batch_readers.push_back(std::move(projection_reader)); } auto concat_batch_reader = std::make_unique(std::move(batch_readers), pool_); @@ -372,44 +376,57 @@ Result> MergeFileSplitRead::GenerateKeyPredicates( } Result> MergeFileSplitRead::CreateReaderForSection( - const std::vector& section, const std::string& bucket_path, - const BinaryRow& partition, + const std::vector& section, const BinaryRow& partition, const std::unordered_map& deletion_file_map, - const std::shared_ptr& data_file_path_factory) const { + const std::shared_ptr& data_file_path_factory) { // with overlap in one section - std::vector> record_readers; - record_readers.reserve(section.size()); std::shared_ptr predicate; if (section.size() > 1) { predicate = predicate_for_keys_; } else { predicate = context_->GetPredicate(); } - for (const auto& run : section) { - // no overlap in a run - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr run_reader, - CreateReaderForRun(bucket_path, partition, run, deletion_file_map, - predicate, data_file_path_factory)); - record_readers.emplace_back(std::move(run_reader)); - } - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr sort_merge_reader, - CreateSortMergeReader(std::move(record_readers))); - - auto drop_delete_reader = std::make_unique(std::move(sort_merge_reader)); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr sort_merge_reader, + CreateSortMergeReaderForSection(section, partition, deletion_file_map, predicate, + data_file_path_factory, /*drop_delete=*/true)); // KeyValueProjectionReader converts KeyValue objects to arrow array according to projection if (!context_->EnableMultiThreadRowToBatch()) { - return KeyValueProjectionReader::Create(std::move(drop_delete_reader), raw_read_schema_, + return KeyValueProjectionReader::Create(std::move(sort_merge_reader), raw_read_schema_, projection_, options_.GetReadBatchSize(), pool_); } int32_t thread_number = context_->GetRowToBatchThreadNumber(); assert(thread_number > 0); return std::make_unique( - std::move(drop_delete_reader), raw_read_schema_, projection_, options_.GetReadBatchSize(), + std::move(sort_merge_reader), raw_read_schema_, projection_, options_.GetReadBatchSize(), thread_number, pool_); } +Result> MergeFileSplitRead::CreateSortMergeReaderForSection( + const std::vector& section, const BinaryRow& partition, + const std::unordered_map& deletion_file_map, + const std::shared_ptr& predicate, + const std::shared_ptr& data_file_path_factory, bool drop_delete) { + // with overlap in one section + std::vector> record_readers; + record_readers.reserve(section.size()); + for (const auto& run : section) { + // no overlap in a run + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr run_reader, + CreateReaderForRun(partition, run, deletion_file_map, predicate, + data_file_path_factory)); + record_readers.emplace_back(std::move(run_reader)); + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr sort_merge_reader, + CreateSortMergeReader(std::move(record_readers))); + if (drop_delete) { + sort_merge_reader = std::make_unique(std::move(sort_merge_reader)); + } + return sort_merge_reader; +} + Result> MergeFileSplitRead::CreateReaderForRun( - const std::string& bucket_path, const BinaryRow& partition, const SortedRun& sorted_run, + const BinaryRow& partition, const SortedRun& sorted_run, const std::unordered_map& deletion_file_map, const std::shared_ptr& predicate, const std::shared_ptr& data_file_path_factory) const { @@ -433,16 +450,18 @@ Result> MergeFileSplitRead::CreateReaderFo } Result> MergeFileSplitRead::CreateSortMergeReader( - std::vector>&& record_readers) const { + std::vector>&& record_readers) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr> merge_function_wrapper, + GetMergeFunctionWrapper()); auto sort_engine = options_.GetSortEngine(); if (sort_engine == SortEngine::MIN_HEAP) { return std::make_unique( std::move(record_readers), key_comparator_, user_defined_seq_comparator_, - merge_function_wrapper_); + merge_function_wrapper); } else if (sort_engine == SortEngine::LOSER_TREE) { return std::make_unique( std::move(record_readers), key_comparator_, user_defined_seq_comparator_, - merge_function_wrapper_); + merge_function_wrapper); } return Status::Invalid("only support loser-tree or min-heap sort engine"); } diff --git a/src/paimon/core/operation/merge_file_split_read.h b/src/paimon/core/operation/merge_file_split_read.h index 483e4742..471a1e44 100644 --- a/src/paimon/core/operation/merge_file_split_read.h +++ b/src/paimon/core/operation/merge_file_split_read.h @@ -93,29 +93,42 @@ class MergeFileSplitRead : public AbstractSplitRead { const std::optional>& ranges, const std::shared_ptr& data_file_path_factory) const override; + Result> CreateSortMergeReaderForSection( + const std::vector& section, const BinaryRow& partition, + const std::unordered_map& deletion_file_map, + const std::shared_ptr& predicate, + const std::shared_ptr& data_file_path_factory, bool drop_delete); + + std::shared_ptr GetPathFactory() const { + return path_factory_; + } + + std::shared_ptr GetValueSchema() const { + return value_schema_; + } + private: Result> CreateMergeReader( const std::shared_ptr& data_split, - const std::shared_ptr& data_file_path_factory) const; + const std::shared_ptr& data_file_path_factory); Result> CreateNoMergeReader( const std::shared_ptr& data_split, bool only_filter_key, const std::shared_ptr& data_file_path_factory) const; Result> CreateReaderForSection( - const std::vector& section, const std::string& bucket_path, - const BinaryRow& partition, + const std::vector& section, const BinaryRow& partition, const std::unordered_map& deletion_file_map, - const std::shared_ptr& data_file_path_factory) const; + const std::shared_ptr& data_file_path_factory); Result> CreateReaderForRun( - const std::string& bucket_path, const BinaryRow& partition, const SortedRun& sorted_run, + const BinaryRow& partition, const SortedRun& sorted_run, const std::unordered_map& deletion_file_map, const std::shared_ptr& predicate, const std::shared_ptr& data_file_path_factory) const; Result> CreateSortMergeReader( - std::vector>&& record_readers) const; + std::vector>&& record_readers); MergeFileSplitRead(const std::shared_ptr& path_factory, const std::shared_ptr& context, @@ -131,6 +144,8 @@ class MergeFileSplitRead : public AbstractSplitRead { const std::shared_ptr& executor); private: + Result>> GetMergeFunctionWrapper(); + static Result>> CreateMergeFunctionWrapper( const CoreOptions& core_options, const std::shared_ptr& table_schema, const std::shared_ptr& value_schema); @@ -161,6 +176,7 @@ class MergeFileSplitRead : public AbstractSplitRead { // actual read schema, e.g., complete all key fields, user defined sequence fields std::shared_ptr read_schema_; std::vector projection_; + // merge_function_wrapper is lazy created, use through GetMergeFunctionWrapper() std::shared_ptr> merge_function_wrapper_; std::shared_ptr key_comparator_; std::shared_ptr interval_partition_comparator_; diff --git a/src/paimon/core/postpone/postpone_bucket_writer.cpp b/src/paimon/core/postpone/postpone_bucket_writer.cpp index bbb53f53..b40b27fa 100644 --- a/src/paimon/core/postpone/postpone_bucket_writer.cpp +++ b/src/paimon/core/postpone/postpone_bucket_writer.cpp @@ -253,7 +253,7 @@ PostponeBucketWriter::CreateRollingRowWriter() const { return Status::OK(); }; auto writer = std::make_unique( - options_.GetFileCompression(), converter, schema_id_, FileSource::Append(), + options_.GetFileCompression(), converter, schema_id_, /*level=*/0, FileSource::Append(), trimmed_primary_keys_, /*stats_extractor=*/nullptr, write_schema_, path_factory_->IsExternalPath(), pool_); PAIMON_RETURN_NOT_OK( diff --git a/src/paimon/core/postpone/postpone_bucket_writer.h b/src/paimon/core/postpone/postpone_bucket_writer.h index 036c638c..2c845aec 100644 --- a/src/paimon/core/postpone/postpone_bucket_writer.h +++ b/src/paimon/core/postpone/postpone_bucket_writer.h @@ -78,6 +78,10 @@ class PostponeBucketWriter : public BatchWriter { Status DoClose() { sequence_number_array_.reset(); row_kind_array_.reset(); + if (writer_) { + writer_->Abort(); + writer_.reset(); + } return Status::OK(); } diff --git a/src/paimon/testing/mock/mock_key_value_data_file_record_reader.h b/src/paimon/testing/mock/mock_key_value_data_file_record_reader.h index c58a1d8d..fa98e5af 100644 --- a/src/paimon/testing/mock/mock_key_value_data_file_record_reader.h +++ b/src/paimon/testing/mock/mock_key_value_data_file_record_reader.h @@ -19,8 +19,8 @@ #include #include +#include "paimon/common/data/columnar/columnar_batch_context.h" #include "paimon/core/io/key_value_data_file_record_reader.h" - namespace paimon::test { // mock reader hold data array class MockKeyValueDataFileRecordReader : public KeyValueDataFileRecordReader { @@ -31,11 +31,15 @@ class MockKeyValueDataFileRecordReader : public KeyValueDataFileRecordReader { : KeyValueDataFileRecordReader(std::move(reader), key_arity, value_schema, level, pool) {} void Reset() override { - for (const auto& field : key_fields_) { - data_holder_.push_back(field); + if (key_ctx_) { + for (const auto& field : key_ctx_->array_vec) { + data_holder_.push_back(field); + } } - for (const auto& field : value_fields_) { - data_holder_.push_back(field); + if (value_ctx_) { + for (const auto& field : value_ctx_->array_vec) { + data_holder_.push_back(field); + } } KeyValueDataFileRecordReader::Reset(); }