diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 758e25fc9..297d122a9 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -303,6 +303,32 @@ struct PAIMON_EXPORT Options { /// "compaction.force-rewrite-all-files" - Whether to force pick all files for a full /// compaction. Usually seen in a compaction task to external paths. Default value is "false". static const char COMPACTION_FORCE_REWRITE_ALL_FILES[]; + /// "compaction.optimization-interval" - Implying how often to perform an optimization + /// compaction, this configuration is used to ensure the query timeliness of the read-optimized + /// system table. No default value. + static const char COMPACTION_OPTIMIZATION_INTERVAL[]; + /// "compaction.total-size-threshold" - When total size is smaller than this threshold, force a + /// full compaction. No default value. + static const char COMPACTION_TOTAL_SIZE_THRESHOLD[]; + /// "compaction.incremental-size-threshold" - When incremental size is bigger than this + /// threshold, force a full compaction. No default value. + static const char COMPACTION_INCREMENTAL_SIZE_THRESHOLD[]; + /// "compaction.offpeak.start.hour" - The start of off-peak hours, expressed as an integer + /// between 0 and 23, inclusive. Set to -1 to disable off-peak. Default is -1. + static const char COMPACT_OFFPEAK_START_HOUR[]; + /// "compaction.offpeak.end.hour" - The end of off-peak hours, expressed as an integer between 0 + /// and 23, exclusive. Set to -1 to disable off-peak. Default is -1. + static const char COMPACT_OFFPEAK_END_HOUR[]; + /// "compaction.offpeak-ratio" - Allows you to set a different (by default, more aggressive) + /// percentage ratio for determining whether larger sorted run's size are included in + /// compactions during off-peak hours. Works in the same way as compaction.size-ratio. Only + /// applies if offpeak.start.hour and offpeak.end.hour are also enabled. + /// For instance, if your cluster experiences low pressure between 2 AM and 6 PM , you can + /// configure `compaction.offpeak.start.hour=2` and `compaction.offpeak.end.hour=18` to define + /// this period as off-peak hours. During these hours, you can increase the off-peak compaction + /// ratio (e.g. `compaction.offpeak-ratio=20`) to enable more aggressive data compaction. + /// Default is 0. + static const char COMPACTION_OFFPEAK_RATIO[]; }; static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits::max(); diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 9a0c958b7..f180aae39 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -210,6 +210,8 @@ set(PAIMON_CORE_SRCS core/manifest/manifest_list.cpp core/manifest/partition_entry.cpp core/manifest/index_manifest_file_handler.cpp + core/mergetree/compact/universal_compaction.cpp + core/mergetree/compact/early_full_compaction.cpp core/mergetree/compact/aggregate/aggregate_merge_function.cpp core/mergetree/compact/aggregate/field_sum_agg.cpp core/mergetree/compact/interval_partition.cpp @@ -527,6 +529,11 @@ if(PAIMON_BUILD_TESTS) core/mergetree/compact/partial_update_merge_function_test.cpp core/mergetree/compact/reducer_merge_function_wrapper_test.cpp core/mergetree/compact/sort_merge_reader_test.cpp + core/mergetree/compact/off_peak_hours_test.cpp + core/mergetree/compact/early_full_compaction_test.cpp + core/mergetree/compact/universal_compaction_test.cpp + core/mergetree/compact/force_up_level0_compaction_test.cpp + core/mergetree/compact/compact_strategy_test.cpp core/mergetree/drop_delete_reader_test.cpp core/mergetree/merge_tree_writer_test.cpp core/mergetree/sorted_run_test.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 00feb0b56..916970850 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -86,5 +86,11 @@ const char Options::SCAN_TAG_NAME[] = "scan.tag-name"; const char Options::WRITE_ONLY[] = "write-only"; const char Options::COMPACTION_MIN_FILE_NUM[] = "compaction.min.file-num"; const char Options::COMPACTION_FORCE_REWRITE_ALL_FILES[] = "compaction.force-rewrite-all-files"; - +const char Options::COMPACTION_OPTIMIZATION_INTERVAL[] = "compaction.optimization-interval"; +const char Options::COMPACTION_TOTAL_SIZE_THRESHOLD[] = "compaction.total-size-threshold"; +const char Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD[] = + "compaction.incremental-size-threshold"; +const char Options::COMPACT_OFFPEAK_START_HOUR[] = "compaction.offpeak.start.hour"; +const char Options::COMPACT_OFFPEAK_END_HOUR[] = "compaction.offpeak.end.hour"; +const char Options::COMPACTION_OFFPEAK_RATIO[] = "compaction.offpeak-ratio"; } // namespace paimon diff --git a/src/paimon/common/utils/date_time_utils.h b/src/paimon/common/utils/date_time_utils.h index 6a48f4380..2b1afd949 100644 --- a/src/paimon/common/utils/date_time_utils.h +++ b/src/paimon/common/utils/date_time_utils.h @@ -116,6 +116,14 @@ class DateTimeUtils { return *(static_cast(local_ts_scalar->data())); } + static inline Result GetCurrentLocalHour() { + PAIMON_ASSIGN_OR_RAISE(uint64_t local_us, GetCurrentLocalTimeUs()); + auto local_seconds = static_cast(local_us / 1000000); + std::tm local_tm{}; + gmtime_r(&local_seconds, &local_tm); + return local_tm.tm_hour; + } + static inline int32_t GetPrecisionFromType( const std::shared_ptr& timestamp_type) { int32_t precision = Timestamp::MAX_PRECISION; diff --git a/src/paimon/common/utils/date_time_utils_test.cpp b/src/paimon/common/utils/date_time_utils_test.cpp index 534e47570..d5217717a 100644 --- a/src/paimon/common/utils/date_time_utils_test.cpp +++ b/src/paimon/common/utils/date_time_utils_test.cpp @@ -273,7 +273,7 @@ TEST(DateTimeUtilsTest, TestGetLocalTimezoneName) { ASSERT_EQ(DateTimeUtils::GetLocalTimezoneName(), timezone); } -TEST(DateTimeUtilsTest, TestGetCurrentTime) { +TEST(DateTimeUtilsTest, TestGetCurrentLocalTimeUs) { TimezoneGuard guard("Asia/Shanghai"); uint64_t utc_ts = DateTimeUtils::GetCurrentUTCTimeUs(); uint64_t local_ts = DateTimeUtils::GetCurrentLocalTimeUs().value(); @@ -281,6 +281,20 @@ TEST(DateTimeUtilsTest, TestGetCurrentTime) { ASSERT_GE(local_ts - utc_ts, 28800000000l); } +TEST(DateTimeUtilsTest, TestGetCurrentLocalHour) { + int32_t shanghai_hour = 0; + int32_t utc_hour = 0; + { + TimezoneGuard guard("Asia/Shanghai"); + ASSERT_OK_AND_ASSIGN(shanghai_hour, DateTimeUtils::GetCurrentLocalHour()); + } + { + TimezoneGuard guard("UTC"); + ASSERT_OK_AND_ASSIGN(utc_hour, DateTimeUtils::GetCurrentLocalHour()); + } + ASSERT_EQ((shanghai_hour - utc_hour + 24) % 24, 8); +} + TEST(DateTimeUtilsTest, TestToUTCTimestamp) { TimezoneGuard guard("Asia/Shanghai"); { diff --git a/src/paimon/core/compact/compact_unit.h b/src/paimon/core/compact/compact_unit.h new file mode 100644 index 000000000..ef2be58c6 --- /dev/null +++ b/src/paimon/core/compact/compact_unit.h @@ -0,0 +1,48 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/mergetree/level_sorted_run.h" +namespace paimon { +/// A files unit for compaction. +struct CompactUnit { + static CompactUnit FromLevelRuns(int32_t output_level, + const std::vector& runs) { + std::vector> files; + for (const auto& run : runs) { + const auto& files_in_run = run.run.Files(); + files.insert(files.end(), files_in_run.begin(), files_in_run.end()); + } + return FromFiles(output_level, files, /*file_rewrite=*/false); + } + + static CompactUnit FromFiles(int32_t output_level, + const std::vector>& files, + bool file_rewrite) { + return CompactUnit(output_level, files, file_rewrite); + } + + CompactUnit(int32_t _output_level, const std::vector>& _files, + bool _file_rewrite) + : output_level(_output_level), files(_files), file_rewrite(_file_rewrite) {} + + int32_t output_level; + std::vector> files; + bool file_rewrite; +}; +} // namespace paimon diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 97396ee7d..0dd5c93b5 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -324,6 +324,12 @@ struct CoreOptions::Impl { std::optional global_index_external_path; std::optional scan_tag_name; + std::optional optimized_compaction_interval; + std::optional compaction_total_size_threshold; + std::optional compaction_incremental_size_threshold; + int32_t compact_off_peak_start_hour = -1; + int32_t compact_off_peak_end_hour = -1; + int32_t compact_off_peak_ratio = 0; }; // Parse configurations from a map and return a populated CoreOptions object @@ -507,6 +513,7 @@ Result CoreOptions::FromMap( impl->scan_tag_name = scan_tag_name; } + // Parse compaction options // Parse commit.force-compact PAIMON_RETURN_NOT_OK( parser.Parse(Options::COMMIT_FORCE_COMPACT, &impl->commit_force_compact)); @@ -519,6 +526,41 @@ Result CoreOptions::FromMap( PAIMON_RETURN_NOT_OK(parser.Parse(Options::COMPACTION_FORCE_REWRITE_ALL_FILES, &impl->compaction_force_rewrite_all_files)); + // Parse compaction.optimization-interval + std::string optimized_compaction_interval_str; + PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_OPTIMIZATION_INTERVAL, + &optimized_compaction_interval_str)); + if (!optimized_compaction_interval_str.empty()) { + PAIMON_ASSIGN_OR_RAISE(impl->optimized_compaction_interval, + TimeDuration::Parse(optimized_compaction_interval_str)); + } + // Parse compaction.total-size-threshold + std::string compaction_total_size_threshold_str; + PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_TOTAL_SIZE_THRESHOLD, + &compaction_total_size_threshold_str)); + if (!compaction_total_size_threshold_str.empty()) { + PAIMON_ASSIGN_OR_RAISE(impl->compaction_total_size_threshold, + MemorySize::ParseBytes(compaction_total_size_threshold_str)); + } + // Parse compaction.incremental-size-threshold + std::string compaction_incremental_size_threshold_str; + PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, + &compaction_incremental_size_threshold_str)); + if (!compaction_incremental_size_threshold_str.empty()) { + PAIMON_ASSIGN_OR_RAISE(impl->compaction_incremental_size_threshold, + MemorySize::ParseBytes(compaction_incremental_size_threshold_str)); + } + + // Parse compaction.offpeak.start.hour + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACT_OFFPEAK_START_HOUR, &impl->compact_off_peak_start_hour)); + // Parse compaction.offpeak.end.hour + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACT_OFFPEAK_END_HOUR, &impl->compact_off_peak_end_hour)); + // Parse compaction.offpeak-ratio + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACTION_OFFPEAK_RATIO, &impl->compact_off_peak_ratio)); + return options; } @@ -847,4 +889,24 @@ std::optional CoreOptions::GetScanTagName() const { return impl_->scan_tag_name; } +std::optional CoreOptions::GetOptimizedCompactionInterval() const { + return impl_->optimized_compaction_interval; +} +std::optional CoreOptions::GetCompactionTotalSizeThreshold() const { + return impl_->compaction_total_size_threshold; +} +std::optional CoreOptions::GetCompactionIncrementalSizeThreshold() const { + return impl_->compaction_incremental_size_threshold; +} + +int32_t CoreOptions::GetCompactOffPeakStartHour() const { + return impl_->compact_off_peak_start_hour; +} +int32_t CoreOptions::GetCompactOffPeakEndHour() const { + return impl_->compact_off_peak_end_hour; +} +int32_t CoreOptions::GetCompactOffPeakRatio() const { + return impl_->compact_off_peak_ratio; +} + } // namespace paimon diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 4c8ee9ce7..112c1d7eb 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -125,6 +125,14 @@ class PAIMON_EXPORT CoreOptions { std::optional GetScanTagName() const; + std::optional GetOptimizedCompactionInterval() const; + std::optional GetCompactionTotalSizeThreshold() const; + std::optional GetCompactionIncrementalSizeThreshold() const; + + int32_t GetCompactOffPeakStartHour() const; + int32_t GetCompactOffPeakEndHour() const; + int32_t GetCompactOffPeakRatio() const; + const std::map& ToMap() const; private: diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 1f78c5eb2..3acc538f6 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -97,6 +97,12 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_TRUE(core_options.GlobalIndexEnabled()); ASSERT_FALSE(core_options.GetGlobalIndexExternalPath()); ASSERT_EQ(std::nullopt, core_options.GetScanTagName()); + ASSERT_EQ(std::nullopt, core_options.GetOptimizedCompactionInterval()); + ASSERT_EQ(std::nullopt, core_options.GetCompactionTotalSizeThreshold()); + ASSERT_EQ(std::nullopt, core_options.GetCompactionIncrementalSizeThreshold()); + ASSERT_EQ(-1, core_options.GetCompactOffPeakStartHour()); + ASSERT_EQ(-1, core_options.GetCompactOffPeakEndHour()); + ASSERT_EQ(0, core_options.GetCompactOffPeakRatio()); } TEST(CoreOptionsTest, TestFromMap) { @@ -159,7 +165,13 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::SCAN_TAG_NAME, "test-tag"}, {Options::WRITE_ONLY, "true"}, {Options::COMPACTION_MIN_FILE_NUM, "10"}, - {Options::COMPACTION_FORCE_REWRITE_ALL_FILES, "true"}}; + {Options::COMPACTION_FORCE_REWRITE_ALL_FILES, "true"}, + {Options::COMPACTION_OPTIMIZATION_INTERVAL, "2s"}, + {Options::COMPACTION_TOTAL_SIZE_THRESHOLD, "5 GB"}, + {Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, "12 kB"}, + {Options::COMPACT_OFFPEAK_START_HOUR, "3"}, + {Options::COMPACT_OFFPEAK_END_HOUR, "16"}, + {Options::COMPACTION_OFFPEAK_RATIO, "8"}}; ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); auto fs = core_options.GetFileSystem(); @@ -238,7 +250,13 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_TRUE(core_options.WriteOnly()); ASSERT_EQ(10, core_options.GetCompactionMinFileNum()); ASSERT_TRUE(core_options.CompactionForceRewriteAllFiles()); -} // namespace paimon::test + ASSERT_EQ(2000, core_options.GetOptimizedCompactionInterval().value()); + ASSERT_EQ(5l * 1024 * 1024 * 1024, core_options.GetCompactionTotalSizeThreshold().value()); + ASSERT_EQ(12l * 1024, core_options.GetCompactionIncrementalSizeThreshold().value()); + ASSERT_EQ(3, core_options.GetCompactOffPeakStartHour()); + ASSERT_EQ(16, core_options.GetCompactOffPeakEndHour()); + ASSERT_EQ(8, core_options.GetCompactOffPeakRatio()); +} TEST(CoreOptionsTest, TestInvalidCase) { ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::BUCKET, "3.5"}}), diff --git a/src/paimon/core/mergetree/compact/compact_strategy.h b/src/paimon/core/mergetree/compact/compact_strategy.h new file mode 100644 index 000000000..e5633388a --- /dev/null +++ b/src/paimon/core/mergetree/compact/compact_strategy.h @@ -0,0 +1,64 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/core/compact/compact_unit.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/mergetree/level_sorted_run.h" +namespace paimon { +/// Compact strategy to decide which files to select for compaction. +class CompactStrategy { + public: + virtual ~CompactStrategy() = default; + + /// Pick compaction unit from runs. + /// @note Compaction is runs-based, not file-based. + /// Level 0 is special, one run per file; all other levels are one run per level. + /// Compaction is sequential from small level to large level. + virtual Result> Pick(int32_t num_levels, + const std::vector& runs) = 0; + /// Pick a compaction unit consisting of all existing files. + // TODO(xinyu.lxy): support RecordLevelExpire and BucketedDvMaintainer + static std::optional PickFullCompaction(int32_t num_levels, + const std::vector& runs, + bool force_rewrite_all_files) { + int32_t max_level = num_levels - 1; + if (runs.empty()) { + // no sorted run, no need to compact + return std::nullopt; + } + // only max level files + if (runs.size() == 1 && runs[0].level == max_level) { + std::vector> files_to_be_compacted; + const auto& run = runs[0]; + for (const auto& file : run.run.Files()) { + if (force_rewrite_all_files) { + // add all files when force compacted + files_to_be_compacted.push_back(file); + } + // TODO(xinyu.lxy): support RecordLevelExpire and BucketedDvMaintainer + } + if (files_to_be_compacted.empty()) { + return std::nullopt; + } + return CompactUnit::FromFiles(max_level, files_to_be_compacted, /*file_rewrite=*/true); + } + // full compaction + return CompactUnit::FromLevelRuns(max_level, runs); + } +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/compact_strategy_test.cpp b/src/paimon/core/mergetree/compact/compact_strategy_test.cpp new file mode 100644 index 000000000..817341470 --- /dev/null +++ b/src/paimon/core/mergetree/compact/compact_strategy_test.cpp @@ -0,0 +1,90 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/compact_strategy.h" + +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class CompactStrategyTest : public testing::Test { + public: + LevelSortedRun CreateLevelSortedRun(int32_t level, int64_t total_size) const { + auto file_meta = std::make_shared( + "fake.data", /*file_size=*/total_size, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/ + BinaryRow::EmptyRow(), + /*key_stats=*/ + SimpleStats::EmptyStats(), + /*value_stats=*/ + SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + return {level, SortedRun::FromSingle(file_meta)}; + } + + std::vector CreateRunsWithLevelAndSize( + const std::vector& levels, const std::vector& sizes) const { + EXPECT_EQ(levels.size(), sizes.size()); + std::vector runs; + for (size_t i = 0; i < levels.size(); i++) { + runs.push_back(CreateLevelSortedRun(levels[i], sizes[i])); + } + return runs; + } +}; + +TEST_F(CompactStrategyTest, TestPickFullCompaction) { + { + // no sorted run, no need to compact + auto runs = CreateRunsWithLevelAndSize({}, {}); + auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/3, runs, + /*force_rewrite_all_files=*/false); + ASSERT_FALSE(unit); + } + { + // only max level files, not rewrite + auto runs = CreateRunsWithLevelAndSize(/*levels=*/{3}, /*sizes*/ {10}); + auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, + /*force_rewrite_all_files=*/false); + ASSERT_FALSE(unit); + } + { + // only max level files, force rewrite + auto runs = CreateRunsWithLevelAndSize(/*levels=*/{3}, /*sizes*/ {10}); + auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, + /*force_rewrite_all_files=*/true); + ASSERT_TRUE(unit); + ASSERT_EQ(unit.value().output_level, 3); + ASSERT_EQ(unit.value().files.size(), 1); + ASSERT_TRUE(unit.value().file_rewrite); + } + { + // full compaction + auto runs = CreateRunsWithLevelAndSize(/*levels=*/{0, 3}, /*sizes*/ {1, 10}); + auto unit = CompactStrategy::PickFullCompaction(/*num_levels=*/4, runs, + /*force_rewrite_all_files=*/false); + ASSERT_TRUE(unit); + ASSERT_EQ(unit.value().output_level, 3); + ASSERT_EQ(unit.value().files.size(), 2); + ASSERT_FALSE(unit.value().file_rewrite); + } +} +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/early_full_compaction.cpp b/src/paimon/core/mergetree/compact/early_full_compaction.cpp new file mode 100644 index 000000000..3e07d7230 --- /dev/null +++ b/src/paimon/core/mergetree/compact/early_full_compaction.cpp @@ -0,0 +1,85 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/early_full_compaction.h" + +#include "paimon/common/utils/date_time_utils.h" +namespace paimon { +std::shared_ptr EarlyFullCompaction::Create(const CoreOptions& options) { + std::optional interval = options.GetOptimizedCompactionInterval(); + std::optional total_size_threshold = options.GetCompactionTotalSizeThreshold(); + std::optional incremental_size_threshold = + options.GetCompactionIncrementalSizeThreshold(); + if (!interval && !total_size_threshold && !incremental_size_threshold) { + return nullptr; + } + return std::shared_ptr( + new EarlyFullCompaction(interval, total_size_threshold, incremental_size_threshold)); +} + +void EarlyFullCompaction::UpdateLastFullCompaction() { + last_full_compaction_ = CurrentTimeMillis(); +} + +int64_t EarlyFullCompaction::CurrentTimeMillis() const { + return DateTimeUtils::GetCurrentUTCTimeUs() / + DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::TimeType::MILLISECOND]; +} +EarlyFullCompaction::EarlyFullCompaction(const std::optional& full_compaction_interval, + const std::optional& total_size_threshold, + const std::optional& incremental_size_threshold) + : full_compaction_interval_(full_compaction_interval), + total_size_threshold_(total_size_threshold), + incremental_size_threshold_(incremental_size_threshold) {} + +std::optional EarlyFullCompaction::TryFullCompact( + int32_t num_levels, const std::vector& runs) { + if (runs.empty() || runs.size() == 1) { + return std::nullopt; + } + int32_t max_level = num_levels - 1; + if (full_compaction_interval_) { + if (!last_full_compaction_ || CurrentTimeMillis() - last_full_compaction_.value() > + full_compaction_interval_.value()) { + UpdateLastFullCompaction(); + return CompactUnit::FromLevelRuns(max_level, runs); + } + } + if (total_size_threshold_) { + int64_t total_size = 0; + for (const auto& run : runs) { + total_size += run.run.TotalSize(); + } + if (total_size < total_size_threshold_.value()) { + UpdateLastFullCompaction(); + return CompactUnit::FromLevelRuns(max_level, runs); + } + } + if (incremental_size_threshold_) { + int64_t incremental_size = 0; + for (const auto& run : runs) { + if (run.level != max_level) { + incremental_size += run.run.TotalSize(); + } + } + if (incremental_size > incremental_size_threshold_.value()) { + UpdateLastFullCompaction(); + return CompactUnit::FromLevelRuns(max_level, runs); + } + } + return std::nullopt; +} +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/early_full_compaction.h b/src/paimon/core/mergetree/compact/early_full_compaction.h new file mode 100644 index 000000000..b912b08d3 --- /dev/null +++ b/src/paimon/core/mergetree/compact/early_full_compaction.h @@ -0,0 +1,48 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/core/compact/compact_unit.h" +#include "paimon/core/core_options.h" +namespace paimon { +/// Early trigger full compaction. +class EarlyFullCompaction { + public: + virtual ~EarlyFullCompaction() = default; + /// @return Pointer to `EarlyFullCompaction` if the options contain EarlyFullCompaction + /// settings; otherwise, nullptr. + static std::shared_ptr Create(const CoreOptions& options); + + void UpdateLastFullCompaction(); + + std::optional TryFullCompact(int32_t num_levels, + const std::vector& runs); + + protected: + // virtual only for test + virtual int64_t CurrentTimeMillis() const; + + EarlyFullCompaction(const std::optional& full_compaction_interval, + const std::optional& total_size_threshold, + const std::optional& incremental_size_threshold); + + private: + std::optional full_compaction_interval_; + std::optional total_size_threshold_; + std::optional incremental_size_threshold_; + std::optional last_full_compaction_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/early_full_compaction_test.cpp b/src/paimon/core/mergetree/compact/early_full_compaction_test.cpp new file mode 100644 index 000000000..55bd397ef --- /dev/null +++ b/src/paimon/core/mergetree/compact/early_full_compaction_test.cpp @@ -0,0 +1,265 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/early_full_compaction.h" + +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class EarlyFullCompactionTest : public testing::Test { + public: + class TestableEarlyFullCompaction : public EarlyFullCompaction { + TestableEarlyFullCompaction(const std::optional& full_compaction_interval, + const std::optional& total_size_threshold, + const std::optional& incremental_size_threshold, + const int64_t* current_time) + : EarlyFullCompaction(full_compaction_interval, total_size_threshold, + incremental_size_threshold), + current_time_(current_time) {} + + int64_t CurrentTimeMillis() const override { + return *current_time_; + } + + private: + const int64_t* current_time_; + }; + + LevelSortedRun CreateLevelSortedRun(int32_t level, int64_t total_size) const { + auto file_meta = std::make_shared( + "fake.data", /*file_size=*/total_size, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/ + BinaryRow::EmptyRow(), + /*key_stats=*/ + SimpleStats::EmptyStats(), + /*value_stats=*/ + SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + return {level, SortedRun::FromSingle(file_meta)}; + } + + std::vector CreateRuns(const std::vector& sizes) const { + std::vector runs; + for (const auto& total_size : sizes) { + runs.push_back(CreateLevelSortedRun(/*level=*/0, total_size)); + } + return runs; + } +}; + +TEST_F(EarlyFullCompactionTest, TestCreateNoOptions) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_FALSE(EarlyFullCompaction::Create(core_options)); +} + +TEST_F(EarlyFullCompactionTest, TestCreateWithInterval) { + std::map options = { + {Options::COMPACTION_OPTIMIZATION_INTERVAL, "1h"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_TRUE(EarlyFullCompaction::Create(core_options)); +} + +TEST_F(EarlyFullCompactionTest, TestCreateWithThreshold) { + std::map options = { + {Options::COMPACTION_TOTAL_SIZE_THRESHOLD, "100MB"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_TRUE(EarlyFullCompaction::Create(core_options)); +} + +TEST_F(EarlyFullCompactionTest, TestCreateWithBoth) { + std::map options = { + {Options::COMPACTION_OPTIMIZATION_INTERVAL, "1h"}, + {Options::COMPACTION_TOTAL_SIZE_THRESHOLD, "100MB"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_TRUE(EarlyFullCompaction::Create(core_options)); +} + +TEST_F(EarlyFullCompactionTest, TestInterval) { + int64_t current_time = 10000l; + auto runs = CreateRuns({100l, 200l}); + TestableEarlyFullCompaction early_full_compaction(/*full_compaction_interval=*/1000l, + /*total_size_threshold=*/std::nullopt, + /*incremental_size_threshold=*/std::nullopt, + ¤t_time); + // First time, should trigger + auto compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + ASSERT_EQ(compact_unit->files.size(), 2); + + // Last compaction time is now 10000. + // Advance time, but not enough for interval to trigger. + current_time += 500; + ASSERT_FALSE(early_full_compaction.TryFullCompact(/*num_levels=*/5, runs)); + + // Advance time to be greater than interval. + current_time += 501; + compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + ASSERT_EQ(compact_unit->files.size(), 2); +} + +TEST_F(EarlyFullCompactionTest, TestTotalSizeThreshold) { + EarlyFullCompaction early_full_compaction(/*full_compaction_interval=*/std::nullopt, + /*total_size_threshold=*/1000l, + /*incremental_size_threshold=*/std::nullopt); + + // total size 300 < 1000, should trigger + auto runs = CreateRuns({100l, 200l}); + auto compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + ASSERT_EQ(compact_unit->files.size(), 2); + // total size 1000 == 1000, should not trigger + runs = CreateRuns({500l, 500l}); + ASSERT_FALSE(early_full_compaction.TryFullCompact(/*num_levels=*/5, runs)); + // total size 1500 > 1000, should not trigger + runs = CreateRuns({500l, 1000l}); + ASSERT_FALSE(early_full_compaction.TryFullCompact(/*num_levels=*/5, runs)); +} + +TEST_F(EarlyFullCompactionTest, TestIncrementalSizeThreshold) { + EarlyFullCompaction early_full_compaction(/*full_compaction_interval=*/std::nullopt, + /*total_size_threshold=*/std::nullopt, + /*incremental_size_threshold=*/500l); + + // trigger, no max level + auto runs = CreateRuns({400l, 200l}); + auto compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + ASSERT_EQ(compact_unit->files.size(), 2); + // no trigger, no max level + runs = CreateRuns({100l, 200l}); + ASSERT_FALSE(early_full_compaction.TryFullCompact(/*num_levels=*/5, runs)); + // no trigger, with max level + runs = {CreateLevelSortedRun(0, 100), CreateLevelSortedRun(0, 300), + CreateLevelSortedRun(4, 500)}; + ASSERT_FALSE(early_full_compaction.TryFullCompact(/*num_levels=*/5, runs)); + // trigger, with max level + runs = {CreateLevelSortedRun(0, 100), CreateLevelSortedRun(0, 300), + CreateLevelSortedRun(0, 300), CreateLevelSortedRun(4, 500)}; + compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + ASSERT_EQ(compact_unit->files.size(), 4); +} + +TEST_F(EarlyFullCompactionTest, TestIntervalTriggersFirst) { + int64_t current_time = 10000l; + + // Interval will trigger, but size is > threshold + TestableEarlyFullCompaction early_full_compaction(/*full_compaction_interval=*/1000l, + /*total_size_threshold=*/500l, + /*incremental_size_threshold=*/std::nullopt, + ¤t_time); + // First time, interval should trigger even if size (600) > threshold (500) + auto runs = CreateRuns({300l, 300l}); + auto compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); +} + +TEST_F(EarlyFullCompactionTest, TestThresholdTriggersWhenIntervalFails) { + int64_t current_time = 10000l; + TestableEarlyFullCompaction early_full_compaction(/*full_compaction_interval=*/1000l, + /*total_size_threshold=*/500l, + /*incremental_size_threshold=*/std::nullopt, + ¤t_time); + // Trigger once to set last compaction time + auto runs = CreateRuns({10l, 20l}); + auto compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + // Advance time, but not enough for interval to trigger + current_time += 500; + + // Size (60) < threshold (500), should trigger + runs = CreateRuns({30l, 30l}); + compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + + // Size (600) > threshold (500), should not trigger + runs = CreateRuns({300l, 300l}); + ASSERT_FALSE(early_full_compaction.TryFullCompact(/*num_levels=*/5, runs)); +} + +TEST_F(EarlyFullCompactionTest, TestUpdateLastWhenFullCompactIsTriggeredByTotalSize) { + int64_t current_time = 10000l; + + TestableEarlyFullCompaction early_full_compaction(/*full_compaction_interval=*/1000l, + /*total_size_threshold=*/500l, + /*incremental_size_threshold=*/std::nullopt, + ¤t_time); + // First time, interval should trigger even if size (600) > threshold (500) + auto runs = CreateRuns({300l, 300l}); + auto compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + + current_time = 10100l; + // Second time, compaction triggered by total_size_threshold + runs = CreateRuns({300l, 100l}); + compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + + current_time = 11001l; + // Third time, compaction cannot be triggered as 11001 - 10100 < 1000 full_compaction_interval + runs = CreateRuns({300l, 300l}); + compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_FALSE(compact_unit); +} + +TEST_F(EarlyFullCompactionTest, TestUpdateLastWhenFullCompactIsTriggeredByIncSize) { + int64_t current_time = 10000l; + + TestableEarlyFullCompaction early_full_compaction(/*full_compaction_interval=*/1000l, + /*total_size_threshold=*/std::nullopt, + /*incremental_size_threshold=*/500, + ¤t_time); + // First time, interval should trigger even if size (400) < threshold (500) + std::vector runs = {CreateLevelSortedRun(0, 300), CreateLevelSortedRun(0, 100)}; + auto compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + + current_time = 10100l; + // Second time, compaction triggered by total_size_threshold + runs = {CreateLevelSortedRun(0, 300), CreateLevelSortedRun(0, 300)}; + compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_TRUE(compact_unit); + ASSERT_EQ(compact_unit->output_level, 4); + + current_time = 11001l; + // Third time, compaction cannot be triggered as 11001 - 10100 < 1000 full_compaction_interval + runs = {CreateLevelSortedRun(0, 300), CreateLevelSortedRun(0, 100)}; + compact_unit = early_full_compaction.TryFullCompact(/*num_levels=*/5, runs); + ASSERT_FALSE(compact_unit); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/force_up_level0_compaction.h b/src/paimon/core/mergetree/compact/force_up_level0_compaction.h new file mode 100644 index 000000000..a7805398e --- /dev/null +++ b/src/paimon/core/mergetree/compact/force_up_level0_compaction.h @@ -0,0 +1,70 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/core/mergetree/compact/compact_strategy.h" +#include "paimon/core/mergetree/compact/universal_compaction.h" + +namespace paimon { +/// A `CompactStrategy` to force compacting level 0 files. +class ForceUpLevel0Compaction : public CompactStrategy { + public: + ForceUpLevel0Compaction(const std::shared_ptr& universal, + const std::optional& max_compact_interval) + : universal_(universal), max_compact_interval_(max_compact_interval) { + assert(universal_); + if (max_compact_interval_) { + compact_trigger_count_ = std::make_unique>(0); + } + } + + std::optional MaxCompactInterval() const { + return max_compact_interval_; + } + + Result> Pick(int32_t num_levels, + const std::vector& runs) override { + PAIMON_ASSIGN_OR_RAISE(std::optional unit, universal_->Pick(num_levels, runs)); + if (unit) { + return unit; + } + if (!max_compact_interval_ || !compact_trigger_count_) { + return universal_->ForcePickL0(num_levels, runs); + } + + compact_trigger_count_->fetch_add(1); + // We must copy max_compact_interval because compare_exchange_strong(T& expected, T desired) + // modifies 'expected' to the current actual value of the atomic if the comparison fails. + int32_t expected_compact_interval = max_compact_interval_.value(); + if (compact_trigger_count_->compare_exchange_strong(expected_compact_interval, 0)) { + // Universal compaction due to max lookup compaction interval + return universal_->ForcePickL0(num_levels, runs); + } else { + // Skip universal compaction due to lookup compaction trigger count is less than the max + // interval + return std::optional(); + } + } + + private: + std::shared_ptr universal_; + std::optional max_compact_interval_; + std::unique_ptr> compact_trigger_count_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/force_up_level0_compaction_test.cpp b/src/paimon/core/mergetree/compact/force_up_level0_compaction_test.cpp new file mode 100644 index 000000000..8109d386f --- /dev/null +++ b/src/paimon/core/mergetree/compact/force_up_level0_compaction_test.cpp @@ -0,0 +1,86 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/force_up_level0_compaction.h" + +#include "paimon/core/mergetree/compact/universal_compaction.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class ForceUpLevel0CompactionTest : public testing::Test { + public: + LevelSortedRun CreateLevelSortedRun(int32_t level, int64_t size) const { + auto file_meta = std::make_shared( + "fake.data", /*file_size=*/size, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/ + BinaryRow::EmptyRow(), + /*key_stats=*/ + SimpleStats::EmptyStats(), + /*value_stats=*/ + SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + return {level, SortedRun::FromSingle(file_meta)}; + } + + std::vector CreateRunsWithLevelAndSize( + const std::vector& levels, const std::vector& sizes) const { + EXPECT_EQ(levels.size(), sizes.size()); + std::vector runs; + for (size_t i = 0; i < levels.size(); i++) { + runs.push_back(CreateLevelSortedRun(levels[i], sizes[i])); + } + return runs; + } +}; + +TEST_F(ForceUpLevel0CompactionTest, TestForceCompaction0) { + auto universal = + std::make_shared(/*max_size_amp=*/200, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/5, nullptr, nullptr); + ForceUpLevel0Compaction compaction(universal, /*max_compact_interval=*/std::nullopt); + + ASSERT_OK_AND_ASSIGN( + auto unit, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize({0, 0}, {1, 1}))); + ASSERT_TRUE(unit); + ASSERT_EQ(unit.value().output_level, 2); + + ASSERT_OK_AND_ASSIGN( + unit, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize({0, 1}, {1, 10}))); + ASSERT_TRUE(unit); + ASSERT_EQ(unit.value().output_level, 2); + + ASSERT_OK_AND_ASSIGN( + unit, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize({0, 0, 2}, {1, 5, 10}))); + ASSERT_TRUE(unit); + ASSERT_EQ(unit.value().output_level, 1); + + ASSERT_OK_AND_ASSIGN(unit, + compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize({2}, {10}))); + ASSERT_FALSE(unit); + + ASSERT_OK_AND_ASSIGN(unit, + compaction.Pick(/*num_levels=*/3, + CreateRunsWithLevelAndSize({0, 0, 0, 0}, {1, 5, 10, 20}))); + ASSERT_TRUE(unit); + ASSERT_EQ(unit.value().output_level, 2); +} +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/off_peak_hours.h b/src/paimon/core/mergetree/compact/off_peak_hours.h new file mode 100644 index 000000000..ff2c464f0 --- /dev/null +++ b/src/paimon/core/mergetree/compact/off_peak_hours.h @@ -0,0 +1,63 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/core/core_options.h" +namespace paimon { +/// OffPeakHours to control compaction ratio by hours. +class OffPeakHours { + public: + /// @return Pointer to `OffPeakHours` if the options contain OffPeakHours settings; otherwise, + /// nullptr. + static std::shared_ptr Create(const CoreOptions& options) { + return Create(options.GetCompactOffPeakStartHour(), options.GetCompactOffPeakEndHour(), + options.GetCompactOffPeakRatio()); + } + + static std::shared_ptr Create(int32_t start_hour, int32_t end_hour, + int32_t compact_off_peak_ratio) { + if (start_hour == -1 || end_hour == -1) { + return nullptr; + } + if (start_hour == end_hour) { + return nullptr; + } + return std::shared_ptr( + new OffPeakHours(start_hour, end_hour, compact_off_peak_ratio)); + } + + int32_t CurrentRatio(int32_t target_hour) const { + bool is_off_peak; + if (start_hour_ <= end_hour_) { + is_off_peak = start_hour_ <= target_hour && target_hour < end_hour_; + } else { + is_off_peak = target_hour < end_hour_ || start_hour_ <= target_hour; + } + return is_off_peak ? compact_off_peak_ratio_ : 0; + } + + private: + OffPeakHours(int32_t start_hour, int32_t end_hour, int32_t compact_off_peak_ratio) + : start_hour_(start_hour), + end_hour_(end_hour), + compact_off_peak_ratio_(compact_off_peak_ratio) {} + + private: + int32_t start_hour_; + int32_t end_hour_; + int32_t compact_off_peak_ratio_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/off_peak_hours_test.cpp b/src/paimon/core/mergetree/compact/off_peak_hours_test.cpp new file mode 100644 index 000000000..515485472 --- /dev/null +++ b/src/paimon/core/mergetree/compact/off_peak_hours_test.cpp @@ -0,0 +1,98 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/off_peak_hours.h" + +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(OffPeakHoursTest, TestCreateFromOptions) { + std::map options = {{Options::COMPACT_OFFPEAK_START_HOUR, "22"}, + {Options::COMPACT_OFFPEAK_END_HOUR, "6"}, + {Options::COMPACTION_OFFPEAK_RATIO, "10"}}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + auto off_peak_hours = OffPeakHours::Create(core_options); + ASSERT_TRUE(off_peak_hours); + ASSERT_EQ(off_peak_hours->CurrentRatio(23), 10); + ASSERT_EQ(off_peak_hours->CurrentRatio(7), 0); +} + +TEST(OffPeakHoursTest, TestCreateFromOptionsWithDefault) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + auto off_peak_hours = OffPeakHours::Create(core_options); + ASSERT_FALSE(off_peak_hours); +} + +TEST(OffPeakHoursTest, TestCreateFromOptionsWithSameHour) { + std::map options = {{Options::COMPACT_OFFPEAK_START_HOUR, "5"}, + {Options::COMPACT_OFFPEAK_END_HOUR, "5"}, + {Options::COMPACTION_OFFPEAK_RATIO, "10"}}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + auto off_peak_hours = OffPeakHours::Create(core_options); + ASSERT_FALSE(off_peak_hours); +} + +TEST(OffPeakHoursTest, TestCreateWithInvalidHours) { + ASSERT_FALSE( + OffPeakHours::Create(/*start_hour=*/-1, /*end_hour=*/-1, /*compact_off_peak_ratio=*/10)); + ASSERT_FALSE( + OffPeakHours::Create(/*start_hour=*/5, /*end_hour=*/5, /*compact_off_peak_ratio=*/10)); + ASSERT_FALSE( + OffPeakHours::Create(/*start_hour=*/2, /*end_hour=*/-1, /*compact_off_peak_ratio=*/10)); + ASSERT_FALSE( + OffPeakHours::Create(/*start_hour=*/-1, /*end_hour=*/2, /*compact_off_peak_ratio=*/10)); +} + +TEST(OffPeakHoursTest, TestCurrentRatioNormalHours) { + auto off_peak_hours = + OffPeakHours::Create(/*start_hour=*/2, /*end_hour=*/8, /*compact_off_peak_ratio=*/10); + ASSERT_TRUE(off_peak_hours); + // Before start + ASSERT_EQ(0, off_peak_hours->CurrentRatio(1)); + // At start + ASSERT_EQ(10, off_peak_hours->CurrentRatio(2)); + // In between + ASSERT_EQ(10, off_peak_hours->CurrentRatio(5)); + // Before end + ASSERT_EQ(10, off_peak_hours->CurrentRatio(7)); + // At end (exclusive) + ASSERT_EQ(0, off_peak_hours->CurrentRatio(8)); + // After end + ASSERT_EQ(0, off_peak_hours->CurrentRatio(9)); +} + +TEST(OffPeakHoursTest, TestCurrentRatioOvernightHours) { + auto off_peak_hours = + OffPeakHours::Create(/*start_hour=*/22, /*end_hour=*/6, /*compact_off_peak_ratio=*/10); + ASSERT_TRUE(off_peak_hours); + // Before start + ASSERT_EQ(0, off_peak_hours->CurrentRatio(21)); + // At start + ASSERT_EQ(10, off_peak_hours->CurrentRatio(22)); + // After start + ASSERT_EQ(10, off_peak_hours->CurrentRatio(23)); + // After midnight, before end + ASSERT_EQ(10, off_peak_hours->CurrentRatio(0)); + // Before end + ASSERT_EQ(10, off_peak_hours->CurrentRatio(5)); + // At end (exclusive) + ASSERT_EQ(0, off_peak_hours->CurrentRatio(6)); + // After end, before next start" + ASSERT_EQ(0, off_peak_hours->CurrentRatio(10)); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/universal_compaction.cpp b/src/paimon/core/mergetree/compact/universal_compaction.cpp new file mode 100644 index 000000000..ea7df08ee --- /dev/null +++ b/src/paimon/core/mergetree/compact/universal_compaction.cpp @@ -0,0 +1,183 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/universal_compaction.h" + +#include "paimon/common/utils/date_time_utils.h" +namespace paimon { +UniversalCompaction::UniversalCompaction( + int32_t max_size_amp, int32_t size_ratio, int32_t num_run_compaction_trigger, + const std::shared_ptr& early_full_compaction, + const std::shared_ptr& off_peak_hours) + : max_size_amp_(max_size_amp), + size_ratio_(size_ratio), + num_run_compaction_trigger_(num_run_compaction_trigger), + early_full_compaction_(early_full_compaction), + off_peak_hours_(off_peak_hours) { + assert(num_run_compaction_trigger_ >= 1); +} + +Result> UniversalCompaction::Pick( + int32_t num_levels, const std::vector& runs) { + int32_t max_level = num_levels - 1; + // 0 try full compaction by trigger + if (early_full_compaction_) { + std::optional compact_unit = + early_full_compaction_->TryFullCompact(num_levels, runs); + if (compact_unit) { + return compact_unit; + } + } + // 1 checking for reducing size amplification + std::optional compact_unit = PickForSizeAmp(max_level, runs); + if (compact_unit) { + return compact_unit; + } + // 2 checking for size ratio + PAIMON_ASSIGN_OR_RAISE(compact_unit, PickForSizeRatio(max_level, runs)); + if (compact_unit) { + return compact_unit; + } + // 3 checking for file num + if (runs.size() > static_cast(num_run_compaction_trigger_)) { + // compacting for file num + int32_t candidate_count = runs.size() - num_run_compaction_trigger_ + 1; + PAIMON_ASSIGN_OR_RAISE(std::optional compact_unit, + PickForSizeRatio(max_level, runs, candidate_count)); + return compact_unit; + } + return std::optional(); +} + +Result> UniversalCompaction::ForcePickL0( + int32_t num_levels, const std::vector& runs) { + // collect all level 0 files + int32_t candidate_count = 0; + for (; static_cast(candidate_count) < runs.size(); ++candidate_count) { + if (runs[candidate_count].level > 0) { + break; + } + } + if (candidate_count == 0) { + return std::optional(); + } + return PickForSizeRatio(num_levels - 1, runs, candidate_count, /*force_pick=*/true); +} + +std::optional UniversalCompaction::PickForSizeAmp( + int32_t max_level, const std::vector& runs) { + if (runs.size() < static_cast(num_run_compaction_trigger_)) { + return std::nullopt; + } + int64_t candidate_size = 0; + for (size_t i = 0; i < runs.size() - 1; ++i) { + candidate_size += runs[i].run.TotalSize(); + } + int64_t earliest_size = runs[runs.size() - 1].run.TotalSize(); + + // size amplification = percentage of additional size + if (candidate_size * 100 > max_size_amp_ * earliest_size) { + if (early_full_compaction_) { + early_full_compaction_->UpdateLastFullCompaction(); + } + return CompactUnit::FromLevelRuns(max_level, runs); + } + return std::nullopt; +} + +Result> UniversalCompaction::PickForSizeRatio( + int32_t max_level, const std::vector& runs) { + if (runs.size() < static_cast(num_run_compaction_trigger_)) { + return std::optional(); + } + return PickForSizeRatio(max_level, runs, /*candidate_count=*/1); +} + +Result> UniversalCompaction::PickForSizeRatio( + int32_t max_level, const std::vector& runs, int32_t candidate_count) { + return PickForSizeRatio(max_level, runs, candidate_count, /*force_pick=*/false); +} + +Result> UniversalCompaction::PickForSizeRatio( + int32_t max_level, const std::vector& runs, int32_t candidate_count, + bool force_pick) { + int64_t candidate_size = CandidateSize(runs, candidate_count); + for (size_t i = candidate_count; i < runs.size(); ++i) { + LevelSortedRun next = runs[i]; + PAIMON_ASSIGN_OR_RAISE(int32_t current_hour_ratio, RatioForOffPeak()); + if (static_cast(candidate_size) * (100.0 + size_ratio_ + current_hour_ratio) / + 100.0 < + next.run.TotalSize()) { + break; + } + candidate_size += next.run.TotalSize(); + candidate_count++; + } + if (force_pick || candidate_count > 1) { + return std::optional(CreateUnit(runs, max_level, candidate_count)); + } + return std::optional(); +} + +int64_t UniversalCompaction::CandidateSize(const std::vector& runs, + int32_t candidate_count) { + int64_t size = 0; + for (int32_t i = 0; i < candidate_count; ++i) { + size += runs[i].run.TotalSize(); + } + return size; +} + +Result UniversalCompaction::RatioForOffPeak() const { + PAIMON_ASSIGN_OR_RAISE(int32_t local_hour, DateTimeUtils::GetCurrentLocalHour()); + return !off_peak_hours_ ? 0 : off_peak_hours_->CurrentRatio(local_hour); +} + +CompactUnit UniversalCompaction::CreateUnit(const std::vector& runs, + int32_t max_level, int32_t run_count) { + int32_t output_level; + if (static_cast(run_count) == runs.size()) { + output_level = max_level; + } else { + // level of next run - 1 + output_level = std::max(0, runs[run_count].level - 1); + } + + if (output_level == 0) { + // do not output level 0 + for (size_t i = run_count; i < runs.size(); ++i) { + LevelSortedRun next = runs[i]; + run_count++; + if (next.level != 0) { + output_level = next.level; + break; + } + } + } + if (static_cast(run_count) == runs.size()) { + if (early_full_compaction_) { + early_full_compaction_->UpdateLastFullCompaction(); + } + output_level = max_level; + } + std::vector result_runs; + result_runs.reserve(run_count); + for (int32_t i = 0; i < run_count; ++i) { + result_runs.push_back(runs[i]); + } + return CompactUnit::FromLevelRuns(output_level, result_runs); +} +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/universal_compaction.h b/src/paimon/core/mergetree/compact/universal_compaction.h new file mode 100644 index 000000000..13ec5d8f3 --- /dev/null +++ b/src/paimon/core/mergetree/compact/universal_compaction.h @@ -0,0 +1,64 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/core/mergetree/compact/compact_strategy.h" +#include "paimon/core/mergetree/compact/early_full_compaction.h" +#include "paimon/core/mergetree/compact/off_peak_hours.h" + +namespace paimon { +/// Universal Compaction Style is a compaction style, targeting the use cases requiring lower write +/// amplification, trading off read amplification and space amplification. +/// +/// See RocksDb Universal-Compaction: +/// https://github.com/facebook/rocksdb/wiki/Universal-Compaction. +class UniversalCompaction : public CompactStrategy { + public: + UniversalCompaction(int32_t max_size_amp, int32_t size_ratio, + int32_t num_run_compaction_trigger, + const std::shared_ptr& early_full_compaction, + const std::shared_ptr& off_peak_hours); + Result> Pick(int32_t num_levels, + const std::vector& runs) override; + + Result> ForcePickL0(int32_t num_levels, + const std::vector& runs); + + private: + std::optional PickForSizeAmp(int32_t max_level, + const std::vector& runs); + Result> PickForSizeRatio(int32_t max_level, + const std::vector& runs); + Result> PickForSizeRatio(int32_t max_level, + const std::vector& runs, + int32_t candidate_count); + Result> PickForSizeRatio(int32_t max_level, + const std::vector& runs, + int32_t candidate_count, bool force_pick); + Result RatioForOffPeak() const; + CompactUnit CreateUnit(const std::vector& runs, int32_t max_level, + int32_t run_count); + static int64_t CandidateSize(const std::vector& runs, int32_t candidate_count); + + private: + int32_t max_size_amp_; + int32_t size_ratio_; + int32_t num_run_compaction_trigger_; + std::shared_ptr early_full_compaction_; + std::shared_ptr off_peak_hours_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/universal_compaction_test.cpp b/src/paimon/core/mergetree/compact/universal_compaction_test.cpp new file mode 100644 index 000000000..ffe83c65b --- /dev/null +++ b/src/paimon/core/mergetree/compact/universal_compaction_test.cpp @@ -0,0 +1,447 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/universal_compaction.h" + +#include "paimon/core/mergetree/compact/force_up_level0_compaction.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class UniversalCompactionTest : public testing::Test { + public: + class TestableEarlyFullCompaction : public EarlyFullCompaction { + TestableEarlyFullCompaction(const std::optional& full_compaction_interval, + const std::optional& total_size_threshold, + const std::optional& incremental_size_threshold, + const int64_t* current_time) + : EarlyFullCompaction(full_compaction_interval, total_size_threshold, + incremental_size_threshold), + current_time_(current_time) {} + + int64_t CurrentTimeMillis() const override { + return *current_time_; + } + + private: + const int64_t* current_time_; + }; + + LevelSortedRun CreateLevelSortedRun(int32_t level, int64_t size) const { + auto file_meta = std::make_shared( + "fake.data", /*file_size=*/size, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/ + BinaryRow::EmptyRow(), + /*key_stats=*/ + SimpleStats::EmptyStats(), + /*value_stats=*/ + SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + return {level, SortedRun::FromSingle(file_meta)}; + } + + std::vector CreateRunsWithLevel(const std::vector& levels) const { + std::vector runs; + for (const auto& level : levels) { + runs.push_back(CreateLevelSortedRun(level, /*size=*/1)); + } + return runs; + } + + std::vector CreateRunsWithSize(const std::vector& sizes) const { + std::vector runs; + for (const auto& size : sizes) { + runs.push_back(CreateLevelSortedRun(/*level=*/0, size)); + } + return runs; + } + + std::vector CreateRunsWithLevelAndSize( + const std::vector& levels, const std::vector& sizes) const { + EXPECT_EQ(levels.size(), sizes.size()); + std::vector runs; + for (size_t i = 0; i < levels.size(); i++) { + runs.push_back(CreateLevelSortedRun(levels[i], sizes[i])); + } + return runs; + } + + std::vector GetFileSizeVecFromCompactUnit(const CompactUnit& unit) const { + std::vector sizes; + for (const auto& file : unit.files) { + sizes.push_back(file->file_size); + } + return sizes; + } +}; + +TEST_F(UniversalCompactionTest, TestOutputLevel) { + UniversalCompaction compaction(/*max_size_amp=*/25, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/3, nullptr, nullptr); + ASSERT_EQ( + 1, compaction + .CreateUnit(CreateRunsWithLevel({0, 0, 1, 3, 4}), /*max_level=*/5, /*run_count=*/1) + .output_level); + ASSERT_EQ( + 1, compaction + .CreateUnit(CreateRunsWithLevel({0, 0, 1, 3, 4}), /*max_level=*/5, /*run_count=*/2) + .output_level); + ASSERT_EQ( + 2, compaction + .CreateUnit(CreateRunsWithLevel({0, 0, 1, 3, 4}), /*max_level=*/5, /*run_count=*/3) + .output_level); + ASSERT_EQ( + 3, compaction + .CreateUnit(CreateRunsWithLevel({0, 0, 1, 3, 4}), /*max_level=*/5, /*run_count=*/4) + .output_level); + ASSERT_EQ( + 5, compaction + .CreateUnit(CreateRunsWithLevel({0, 0, 1, 3, 4}), /*max_level=*/5, /*run_count=*/5) + .output_level); +} + +TEST_F(UniversalCompactionTest, TestPick) { + UniversalCompaction compaction(/*max_size_amp=*/25, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/3, nullptr, nullptr); + // by size amplification + ASSERT_OK_AND_ASSIGN(auto pick, + compaction.Pick(/*num_levels=*/3, CreateRunsWithSize({1, 2, 3, 3}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 2, 3, 3})); + + // by size ratio + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/4, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2, 3}, + /*sizes=*/{1, 1, 1, 50}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 1, 1})); + + // by file num + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/4, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2, 3}, + /*sizes=*/{1, 50, 3, 500}))); + ASSERT_TRUE(pick); + // 3 should be in the candidate, by size ratio after picking by file num + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 50, 3})); +} + +TEST_F(UniversalCompactionTest, TestOptimizedCompactionInterval) { + int64_t current_time = 0; + auto full_compact_trigger = std::make_shared( + /*full_compaction_interval=*/1000L, + /*total_size_threshold=*/std::nullopt, + /*incremental_size_threshold=*/std::nullopt, ¤t_time); + UniversalCompaction compaction(/*max_size_amp=*/100, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/3, full_compact_trigger, nullptr); + // first time, force optimized compaction + ASSERT_OK_AND_ASSIGN(auto pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2}, + /*sizes=*/{1, 3, 5}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 3, 5})); + + // modify time, optimized compaction + current_time = 1001L; + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2}, + /*sizes=*/{1, 3, 5}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 3, 5})); + + // third time, no compaction + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2}, + /*sizes=*/{1, 3, 5}))); + ASSERT_FALSE(pick); + + // 4 time, pickForSizeAmp + current_time = 1500L; + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2}, + /*sizes=*/{3, 3, 5}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({3, 3, 5})); + + // 5 time, no compaction because pickForSizeAmp already done + current_time = 2001L; + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2}, + /*sizes=*/{1, 3, 5}))); + ASSERT_FALSE(pick); +} + +TEST_F(UniversalCompactionTest, TestTotalSizeThreshold) { + auto full_compact_trigger = std::make_shared( + /*full_compaction_interval=*/std::nullopt, + /*total_size_threshold=*/10L, + /*incremental_size_threshold=*/std::nullopt); + + UniversalCompaction compaction(/*max_size_amp=*/100, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/3, full_compact_trigger, nullptr); + // total size less than threshold + ASSERT_OK_AND_ASSIGN(auto pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2}, + /*sizes=*/{1, 3, 5}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 3, 5})); + + // total size bigger than threshold + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 1, 2}, + /*sizes=*/{2, 6, 10}))); + ASSERT_FALSE(pick); + // one sort run, not trigger + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{3}, + /*sizes=*/{5}))); + ASSERT_FALSE(pick); +} + +TEST_F(UniversalCompactionTest, TestNoOutputLevel0) { + UniversalCompaction compaction(/*max_size_amp=*/25, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/3, nullptr, nullptr); + ASSERT_OK_AND_ASSIGN(auto pick, + compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 0, 1, 2}, + /*sizes=*/{1, 1, 1, 50}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 1, 1})); + + ASSERT_OK_AND_ASSIGN(pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 0, 1, 2}, + /*sizes=*/{1, 2, 3, 50}))); + ASSERT_TRUE(pick); + // 3 should be in the candidate, by size ratio after picking by file num + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 2, 3})); +} + +TEST_F(UniversalCompactionTest, TestExtremeCaseNoOutputLevel0) { + UniversalCompaction compaction(/*max_size_amp=*/200, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/5, nullptr, nullptr); + ASSERT_OK_AND_ASSIGN( + auto pick, compaction.Pick(/*num_levels=*/6, CreateRunsWithLevelAndSize( + /*levels=*/{0, 0, 0, 0, 0}, + /*sizes=*/{1, 1, 1, 1024, 1024 * 1024}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), + std::vector({1, 1, 1, 1024, 1024 * 1024})); +} + +TEST_F(UniversalCompactionTest, TestSizeAmplification) { + UniversalCompaction compaction(/*max_size_amp=*/25, /*size_ratio=*/0, + /*num_run_compaction_trigger=*/1, nullptr, nullptr); + + std::vector sizes = {1}; + auto append_and_pick = [&](const std::vector& expected_sizes) { + sizes.insert(sizes.begin(), 1); + auto unit = compaction.PickForSizeAmp(3, CreateRunsWithSize(sizes)); + if (unit) { + auto files = GetFileSizeVecFromCompactUnit(unit.value()); + int64_t total_size = std::accumulate(files.begin(), files.end(), 0l); + sizes = {total_size}; + } + ASSERT_EQ(sizes, expected_sizes); + }; + + append_and_pick({2}); + append_and_pick({3}); + append_and_pick({4}); + append_and_pick({1, 4}); + append_and_pick({6}); + append_and_pick({1, 6}); + append_and_pick({8}); + append_and_pick({1, 8}); + append_and_pick({1, 1, 8}); + append_and_pick({11}); + append_and_pick({1, 11}); + append_and_pick({1, 1, 11}); + append_and_pick({14}); + append_and_pick({1, 14}); + append_and_pick({1, 1, 14}); + append_and_pick({1, 1, 1, 14}); + append_and_pick({18}); +} + +TEST_F(UniversalCompactionTest, TestSizeRatio) { + UniversalCompaction compaction(/*max_size_amp=*/25, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/5, nullptr, nullptr); + + std::vector sizes = {1, 1, 1, 1}; + auto append_and_pick = [&](const std::vector& expected_sizes) { + sizes.insert(sizes.begin(), 1); + std::vector levels; + for (size_t i = 0; i < sizes.size(); i++) { + levels.push_back(static_cast(i)); + } + ASSERT_OK_AND_ASSIGN( + auto unit, compaction.PickForSizeRatio(/*max_level=*/sizes.size(), + CreateRunsWithLevelAndSize(levels, sizes))); + if (unit) { + std::vector compact; + compact.reserve(unit->files.size()); + for (const auto& file : unit->files) { + compact.push_back(file->file_size); + } + std::vector result = sizes; + for (int64_t size_val : compact) { + auto it = std::find(result.begin(), result.end(), size_val); + if (it != result.end()) { + result.erase(it); + } + } + int64_t sum = std::accumulate(compact.begin(), compact.end(), 0l); + result.insert(result.begin(), sum); + sizes = result; + } + ASSERT_EQ(sizes, expected_sizes); + }; + + append_and_pick({5}); + append_and_pick({1, 5}); + append_and_pick({1, 1, 5}); + append_and_pick({1, 1, 1, 5}); + append_and_pick({4, 5}); + append_and_pick({1, 4, 5}); + append_and_pick({1, 1, 4, 5}); + append_and_pick({3, 4, 5}); + append_and_pick({1, 3, 4, 5}); + append_and_pick({2, 3, 4, 5}); + append_and_pick({1, 2, 3, 4, 5}); + append_and_pick({16}); + append_and_pick({1, 16}); + append_and_pick({1, 1, 16}); + append_and_pick({1, 1, 1, 16}); + append_and_pick({4, 16}); + append_and_pick({1, 4, 16}); + append_and_pick({1, 1, 4, 16}); + append_and_pick({3, 4, 16}); + append_and_pick({1, 3, 4, 16}); + append_and_pick({2, 3, 4, 16}); + append_and_pick({1, 2, 3, 4, 16}); + append_and_pick({11, 16}); +} +TEST_F(UniversalCompactionTest, TestSizeRatioThreshold) { + { + UniversalCompaction compaction(/*max_size_amp=*/25, /*size_ratio=*/10, + /*num_run_compaction_trigger=*/2, nullptr, nullptr); + ASSERT_OK_AND_ASSIGN( + auto unit, compaction.PickForSizeRatio( + /*max_level=*/3, + CreateRunsWithLevelAndSize(/*levels=*/{0, 1, 2}, /*sizes=*/{8, 9, 10}))); + ASSERT_FALSE(unit); + } + { + UniversalCompaction compaction(/*max_size_amp=*/25, /*size_ratio=*/20, + /*num_run_compaction_trigger=*/2, nullptr, nullptr); + ASSERT_OK_AND_ASSIGN( + auto unit, compaction.PickForSizeRatio( + /*max_level=*/3, + CreateRunsWithLevelAndSize(/*levels=*/{0, 1, 2}, /*sizes=*/{8, 9, 10}))); + ASSERT_TRUE(unit); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(unit.value()), std::vector({8, 9, 10})); + } +} + +TEST_F(UniversalCompactionTest, TestLookup) { + auto universal = + std::make_shared(/*max_size_amp=*/25, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/3, nullptr, nullptr); + ForceUpLevel0Compaction compaction(universal, /*max_compact_interval=*/std::nullopt); + + // level 0 to max level + ASSERT_OK_AND_ASSIGN(auto unit, + compaction.Pick(/*num_levels=*/3, CreateRunsWithSize({1, 2, 2, 2}))); + ASSERT_TRUE(unit); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(unit.value()), std::vector({1, 2, 2, 2})); + ASSERT_EQ(unit.value().output_level, 2); + + // level 0 force pick + ASSERT_OK_AND_ASSIGN( + unit, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize(/*levels=*/{0, 1, 2}, + /*sizes=*/{1, 2, 2}))); + ASSERT_TRUE(unit); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(unit.value()), std::vector({1, 2, 2})); + ASSERT_EQ(unit.value().output_level, 2); + + // level 0 to empty level + ASSERT_OK_AND_ASSIGN( + unit, compaction.Pick(/*num_levels=*/3, + CreateRunsWithLevelAndSize(/*levels=*/{0, 2}, /*sizes=*/{1, 2}))); + ASSERT_TRUE(unit); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(unit.value()), std::vector({1})); + ASSERT_EQ(unit.value().output_level, 1); +} + +TEST_F(UniversalCompactionTest, TestForcePickL0) { + int32_t max_compact_interval = 5; + auto universal = + std::make_shared(/*max_size_amp=*/25, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/5, nullptr, nullptr); + ForceUpLevel0Compaction compaction(universal, max_compact_interval); + + // level 0 to max level + auto level0_to_max = CreateRunsWithSize({1, 2, 2, 2}); + std::optional unit; + for (int32_t i = 1; i <= max_compact_interval; i++) { + // level 0 to max level triggered + ASSERT_OK_AND_ASSIGN(unit, compaction.Pick(/*num_levels=*/3, level0_to_max)); + if (i == max_compact_interval) { + ASSERT_TRUE(unit); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(unit.value()), + std::vector({1, 2, 2, 2})); + ASSERT_EQ(unit.value().output_level, 2); + } else { + // compact skipped + ASSERT_FALSE(unit); + } + } + + // level 0 force pick + auto level0_force_pick = CreateRunsWithLevelAndSize(/*levels=*/{0, 1, 2}, /*sizes=*/{2, 2, 2}); + for (int32_t i = 1; i <= max_compact_interval; i++) { + ASSERT_OK_AND_ASSIGN(unit, compaction.Pick(/*num_levels=*/3, level0_force_pick)); + if (i == max_compact_interval) { + // level 0 force pick triggered + ASSERT_TRUE(unit); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(unit.value()), std::vector({2, 2, 2})); + ASSERT_EQ(unit.value().output_level, 2); + } else { + // compact skipped + ASSERT_FALSE(unit); + } + } + + // level 0 to empty level + auto level0_to_empty = CreateRunsWithLevelAndSize(/*levels=*/{0, 2}, /*sizes=*/{1, 2}); + for (int32_t i = 1; i <= max_compact_interval; i++) { + ASSERT_OK_AND_ASSIGN(unit, compaction.Pick(/*num_levels=*/3, level0_to_empty)); + if (i == max_compact_interval) { + // level 0 force pick triggered + ASSERT_TRUE(unit); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(unit.value()), std::vector({1})); + ASSERT_EQ(unit.value().output_level, 1); + } else { + // compact skipped + ASSERT_FALSE(unit); + } + } +} +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/level_sorted_run.h b/src/paimon/core/mergetree/level_sorted_run.h new file mode 100644 index 000000000..485fae3d5 --- /dev/null +++ b/src/paimon/core/mergetree/level_sorted_run.h @@ -0,0 +1,34 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fmt/format.h" +#include "paimon/core/mergetree/sorted_run.h" +#include "paimon/core/utils/fields_comparator.h" +namespace paimon { +/// A `SortedRun` with level. +struct LevelSortedRun { + LevelSortedRun(int32_t _level, const SortedRun& _run) : level(_level), run(_run) {} + + std::string ToString() const { + return fmt::format("LevelSortedRun{{ level={}, run={} }}", level, run.ToString()); + } + + int32_t level; + SortedRun run; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/sorted_run.h b/src/paimon/core/mergetree/sorted_run.h index 8a50d389b..455839500 100644 --- a/src/paimon/core/mergetree/sorted_run.h +++ b/src/paimon/core/mergetree/sorted_run.h @@ -22,10 +22,11 @@ #include #include +#include "fmt/format.h" +#include "fmt/ranges.h" #include "paimon/common/data/binary_row.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/utils/fields_comparator.h" - namespace paimon { /// A `SortedRun` is a list of files sorted by their keys. The key intervals [minKey, maxKey] /// of these files do not overlap. @@ -58,6 +59,15 @@ class SortedRun { return true; } + std::string ToString() const { + std::vector files_str; + files_str.reserve(files_.size()); + for (const auto& file : files_) { + files_str.push_back(file->ToString()); + } + return fmt::format("{}", fmt::join(files_str, ", ")); + } + private: explicit SortedRun(const std::vector>& files) : files_(files) { for (const auto& file : files) { diff --git a/src/paimon/core/mergetree/sorted_run_test.cpp b/src/paimon/core/mergetree/sorted_run_test.cpp index 58c150713..9beabf232 100644 --- a/src/paimon/core/mergetree/sorted_run_test.cpp +++ b/src/paimon/core/mergetree/sorted_run_test.cpp @@ -24,6 +24,7 @@ #include "gtest/gtest.h" #include "paimon/common/types/data_field.h" #include "paimon/core/manifest/file_source.h" +#include "paimon/core/mergetree/level_sorted_run.h" #include "paimon/core/stats/simple_stats.h" #include "paimon/data/timestamp.h" #include "paimon/memory/memory_pool.h" @@ -31,7 +32,6 @@ #include "paimon/status.h" #include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/testharness.h" - namespace paimon::test { class SortedRunTest : public testing::Test { public: @@ -87,4 +87,15 @@ TEST_F(SortedRunTest, TestSortedRunIsValid) { } } +TEST_F(SortedRunTest, TestSortedRunToString) { + auto m1 = CreateDataFileMeta(10, 20); + auto m2 = CreateDataFileMeta(30, 40); + auto sorted_run = SortedRun::FromSorted({m1, m2}); + auto sorted_run_str = sorted_run.ToString(); + LevelSortedRun level_sorted_run(/*level=*/10, sorted_run); + auto level_sorted_run_str = level_sorted_run.ToString(); + ASSERT_TRUE(level_sorted_run_str.find("LevelSortedRun{ level=10, run={fileName:") != + std::string::npos); + ASSERT_TRUE(level_sorted_run_str.find(sorted_run_str) != std::string::npos); +} } // namespace paimon::test