Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,32 @@ struct PAIMON_EXPORT Options {
/// "compaction.force-rewrite-all-files" - Whether to force pick all files for a full
/// compaction. Usually seen in a compaction task to external paths. Default value is "false".
static const char COMPACTION_FORCE_REWRITE_ALL_FILES[];
/// "compaction.optimization-interval" - Implying how often to perform an optimization
/// compaction, this configuration is used to ensure the query timeliness of the read-optimized
/// system table. No default value.
static const char COMPACTION_OPTIMIZATION_INTERVAL[];
/// "compaction.total-size-threshold" - When total size is smaller than this threshold, force a
/// full compaction. No default value.
static const char COMPACTION_TOTAL_SIZE_THRESHOLD[];
/// "compaction.incremental-size-threshold" - When incremental size is bigger than this
/// threshold, force a full compaction. No default value.
static const char COMPACTION_INCREMENTAL_SIZE_THRESHOLD[];
/// "compaction.offpeak.start.hour" - The start of off-peak hours, expressed as an integer
/// between 0 and 23, inclusive. Set to -1 to disable off-peak. Default is -1.
static const char COMPACT_OFFPEAK_START_HOUR[];
/// "compaction.offpeak.end.hour" - The end of off-peak hours, expressed as an integer between 0
/// and 23, exclusive. Set to -1 to disable off-peak. Default is -1.
static const char COMPACT_OFFPEAK_END_HOUR[];
/// "compaction.offpeak-ratio" - Allows you to set a different (by default, more aggressive)
/// percentage ratio for determining whether larger sorted run's size are included in
/// compactions during off-peak hours. Works in the same way as compaction.size-ratio. Only
/// applies if offpeak.start.hour and offpeak.end.hour are also enabled.
/// For instance, if your cluster experiences low pressure between 2 AM and 6 PM , you can
/// configure `compaction.offpeak.start.hour=2` and `compaction.offpeak.end.hour=18` to define
/// this period as off-peak hours. During these hours, you can increase the off-peak compaction
/// ratio (e.g. `compaction.offpeak-ratio=20`) to enable more aggressive data compaction.
/// Default is 0.
static const char COMPACTION_OFFPEAK_RATIO[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
7 changes: 7 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ set(PAIMON_CORE_SRCS
core/manifest/manifest_list.cpp
core/manifest/partition_entry.cpp
core/manifest/index_manifest_file_handler.cpp
core/mergetree/compact/universal_compaction.cpp
core/mergetree/compact/early_full_compaction.cpp
core/mergetree/compact/aggregate/aggregate_merge_function.cpp
core/mergetree/compact/aggregate/field_sum_agg.cpp
core/mergetree/compact/interval_partition.cpp
Expand Down Expand Up @@ -527,6 +529,11 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/compact/partial_update_merge_function_test.cpp
core/mergetree/compact/reducer_merge_function_wrapper_test.cpp
core/mergetree/compact/sort_merge_reader_test.cpp
core/mergetree/compact/off_peak_hours_test.cpp
core/mergetree/compact/early_full_compaction_test.cpp
core/mergetree/compact/universal_compaction_test.cpp
core/mergetree/compact/force_up_level0_compaction_test.cpp
core/mergetree/compact/compact_strategy_test.cpp
core/mergetree/drop_delete_reader_test.cpp
core/mergetree/merge_tree_writer_test.cpp
core/mergetree/sorted_run_test.cpp
Expand Down
8 changes: 7 additions & 1 deletion src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,11 @@ const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
const char Options::WRITE_ONLY[] = "write-only";
const char Options::COMPACTION_MIN_FILE_NUM[] = "compaction.min.file-num";
const char Options::COMPACTION_FORCE_REWRITE_ALL_FILES[] = "compaction.force-rewrite-all-files";

const char Options::COMPACTION_OPTIMIZATION_INTERVAL[] = "compaction.optimization-interval";
const char Options::COMPACTION_TOTAL_SIZE_THRESHOLD[] = "compaction.total-size-threshold";
const char Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD[] =
"compaction.incremental-size-threshold";
const char Options::COMPACT_OFFPEAK_START_HOUR[] = "compaction.offpeak.start.hour";
const char Options::COMPACT_OFFPEAK_END_HOUR[] = "compaction.offpeak.end.hour";
const char Options::COMPACTION_OFFPEAK_RATIO[] = "compaction.offpeak-ratio";
} // namespace paimon
8 changes: 8 additions & 0 deletions src/paimon/common/utils/date_time_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ class DateTimeUtils {
return *(static_cast<const int64_t*>(local_ts_scalar->data()));
}

static inline Result<int32_t> GetCurrentLocalHour() {
PAIMON_ASSIGN_OR_RAISE(uint64_t local_us, GetCurrentLocalTimeUs());
auto local_seconds = static_cast<time_t>(local_us / 1000000);
std::tm local_tm{};
gmtime_r(&local_seconds, &local_tm);
return local_tm.tm_hour;
}

static inline int32_t GetPrecisionFromType(
const std::shared_ptr<arrow::TimestampType>& timestamp_type) {
int32_t precision = Timestamp::MAX_PRECISION;
Expand Down
16 changes: 15 additions & 1 deletion src/paimon/common/utils/date_time_utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,28 @@ TEST(DateTimeUtilsTest, TestGetLocalTimezoneName) {
ASSERT_EQ(DateTimeUtils::GetLocalTimezoneName(), timezone);
}

TEST(DateTimeUtilsTest, TestGetCurrentTime) {
TEST(DateTimeUtilsTest, TestGetCurrentLocalTimeUs) {
TimezoneGuard guard("Asia/Shanghai");
uint64_t utc_ts = DateTimeUtils::GetCurrentUTCTimeUs();
uint64_t local_ts = DateTimeUtils::GetCurrentLocalTimeUs().value();
ASSERT_GT(local_ts, utc_ts);
ASSERT_GE(local_ts - utc_ts, 28800000000l);
}

TEST(DateTimeUtilsTest, TestGetCurrentLocalHour) {
int32_t shanghai_hour = 0;
int32_t utc_hour = 0;
{
TimezoneGuard guard("Asia/Shanghai");
ASSERT_OK_AND_ASSIGN(shanghai_hour, DateTimeUtils::GetCurrentLocalHour());
}
{
TimezoneGuard guard("UTC");
ASSERT_OK_AND_ASSIGN(utc_hour, DateTimeUtils::GetCurrentLocalHour());
}
ASSERT_EQ((shanghai_hour - utc_hour + 24) % 24, 8);
}

TEST(DateTimeUtilsTest, TestToUTCTimestamp) {
TimezoneGuard guard("Asia/Shanghai");
{
Expand Down
48 changes: 48 additions & 0 deletions src/paimon/core/compact/compact_unit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "paimon/core/io/data_file_meta.h"
#include "paimon/core/mergetree/level_sorted_run.h"
namespace paimon {
/// A files unit for compaction.
struct CompactUnit {
static CompactUnit FromLevelRuns(int32_t output_level,
const std::vector<LevelSortedRun>& runs) {
std::vector<std::shared_ptr<DataFileMeta>> files;
for (const auto& run : runs) {
const auto& files_in_run = run.run.Files();
files.insert(files.end(), files_in_run.begin(), files_in_run.end());
}
return FromFiles(output_level, files, /*file_rewrite=*/false);
}
Comment thread
lxy-9602 marked this conversation as resolved.

static CompactUnit FromFiles(int32_t output_level,
const std::vector<std::shared_ptr<DataFileMeta>>& files,
bool file_rewrite) {
return CompactUnit(output_level, files, file_rewrite);
}

CompactUnit(int32_t _output_level, const std::vector<std::shared_ptr<DataFileMeta>>& _files,
bool _file_rewrite)
: output_level(_output_level), files(_files), file_rewrite(_file_rewrite) {}

int32_t output_level;
std::vector<std::shared_ptr<DataFileMeta>> files;
bool file_rewrite;
};
} // namespace paimon
62 changes: 62 additions & 0 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ struct CoreOptions::Impl {
std::optional<std::string> global_index_external_path;

std::optional<std::string> scan_tag_name;
std::optional<int64_t> optimized_compaction_interval;
std::optional<int64_t> compaction_total_size_threshold;
std::optional<int64_t> compaction_incremental_size_threshold;
int32_t compact_off_peak_start_hour = -1;
int32_t compact_off_peak_end_hour = -1;
int32_t compact_off_peak_ratio = 0;
};

// Parse configurations from a map and return a populated CoreOptions object
Expand Down Expand Up @@ -507,6 +513,7 @@ Result<CoreOptions> CoreOptions::FromMap(
impl->scan_tag_name = scan_tag_name;
}

// Parse compaction options
// Parse commit.force-compact
PAIMON_RETURN_NOT_OK(
parser.Parse<bool>(Options::COMMIT_FORCE_COMPACT, &impl->commit_force_compact));
Expand All @@ -519,6 +526,41 @@ Result<CoreOptions> CoreOptions::FromMap(
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::COMPACTION_FORCE_REWRITE_ALL_FILES,
&impl->compaction_force_rewrite_all_files));

// Parse compaction.optimization-interval
std::string optimized_compaction_interval_str;
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_OPTIMIZATION_INTERVAL,
&optimized_compaction_interval_str));
if (!optimized_compaction_interval_str.empty()) {
PAIMON_ASSIGN_OR_RAISE(impl->optimized_compaction_interval,
TimeDuration::Parse(optimized_compaction_interval_str));
}
// Parse compaction.total-size-threshold
std::string compaction_total_size_threshold_str;
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_TOTAL_SIZE_THRESHOLD,
&compaction_total_size_threshold_str));
if (!compaction_total_size_threshold_str.empty()) {
PAIMON_ASSIGN_OR_RAISE(impl->compaction_total_size_threshold,
MemorySize::ParseBytes(compaction_total_size_threshold_str));
}
// Parse compaction.incremental-size-threshold
std::string compaction_incremental_size_threshold_str;
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD,
&compaction_incremental_size_threshold_str));
if (!compaction_incremental_size_threshold_str.empty()) {
PAIMON_ASSIGN_OR_RAISE(impl->compaction_incremental_size_threshold,
MemorySize::ParseBytes(compaction_incremental_size_threshold_str));
}

// Parse compaction.offpeak.start.hour
PAIMON_RETURN_NOT_OK(
parser.Parse(Options::COMPACT_OFFPEAK_START_HOUR, &impl->compact_off_peak_start_hour));
// Parse compaction.offpeak.end.hour
PAIMON_RETURN_NOT_OK(
parser.Parse(Options::COMPACT_OFFPEAK_END_HOUR, &impl->compact_off_peak_end_hour));
// Parse compaction.offpeak-ratio
PAIMON_RETURN_NOT_OK(
parser.Parse(Options::COMPACTION_OFFPEAK_RATIO, &impl->compact_off_peak_ratio));

return options;
}

Expand Down Expand Up @@ -847,4 +889,24 @@ std::optional<std::string> CoreOptions::GetScanTagName() const {
return impl_->scan_tag_name;
}

std::optional<int64_t> CoreOptions::GetOptimizedCompactionInterval() const {
return impl_->optimized_compaction_interval;
}
std::optional<int64_t> CoreOptions::GetCompactionTotalSizeThreshold() const {
return impl_->compaction_total_size_threshold;
}
std::optional<int64_t> CoreOptions::GetCompactionIncrementalSizeThreshold() const {
return impl_->compaction_incremental_size_threshold;
}

int32_t CoreOptions::GetCompactOffPeakStartHour() const {
return impl_->compact_off_peak_start_hour;
}
int32_t CoreOptions::GetCompactOffPeakEndHour() const {
return impl_->compact_off_peak_end_hour;
}
int32_t CoreOptions::GetCompactOffPeakRatio() const {
return impl_->compact_off_peak_ratio;
}

} // namespace paimon
8 changes: 8 additions & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ class PAIMON_EXPORT CoreOptions {

std::optional<std::string> GetScanTagName() const;

std::optional<int64_t> GetOptimizedCompactionInterval() const;
std::optional<int64_t> GetCompactionTotalSizeThreshold() const;
std::optional<int64_t> GetCompactionIncrementalSizeThreshold() const;

int32_t GetCompactOffPeakStartHour() const;
int32_t GetCompactOffPeakEndHour() const;
int32_t GetCompactOffPeakRatio() const;

const std::map<std::string, std::string>& ToMap() const;

private:
Expand Down
22 changes: 20 additions & 2 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_TRUE(core_options.GlobalIndexEnabled());
ASSERT_FALSE(core_options.GetGlobalIndexExternalPath());
ASSERT_EQ(std::nullopt, core_options.GetScanTagName());
ASSERT_EQ(std::nullopt, core_options.GetOptimizedCompactionInterval());
ASSERT_EQ(std::nullopt, core_options.GetCompactionTotalSizeThreshold());
ASSERT_EQ(std::nullopt, core_options.GetCompactionIncrementalSizeThreshold());
ASSERT_EQ(-1, core_options.GetCompactOffPeakStartHour());
ASSERT_EQ(-1, core_options.GetCompactOffPeakEndHour());
ASSERT_EQ(0, core_options.GetCompactOffPeakRatio());
}

TEST(CoreOptionsTest, TestFromMap) {
Expand Down Expand Up @@ -159,7 +165,13 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::SCAN_TAG_NAME, "test-tag"},
{Options::WRITE_ONLY, "true"},
{Options::COMPACTION_MIN_FILE_NUM, "10"},
{Options::COMPACTION_FORCE_REWRITE_ALL_FILES, "true"}};
{Options::COMPACTION_FORCE_REWRITE_ALL_FILES, "true"},
{Options::COMPACTION_OPTIMIZATION_INTERVAL, "2s"},
{Options::COMPACTION_TOTAL_SIZE_THRESHOLD, "5 GB"},
{Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, "12 kB"},
{Options::COMPACT_OFFPEAK_START_HOUR, "3"},
{Options::COMPACT_OFFPEAK_END_HOUR, "16"},
{Options::COMPACTION_OFFPEAK_RATIO, "8"}};

ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
auto fs = core_options.GetFileSystem();
Expand Down Expand Up @@ -238,7 +250,13 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_TRUE(core_options.WriteOnly());
ASSERT_EQ(10, core_options.GetCompactionMinFileNum());
ASSERT_TRUE(core_options.CompactionForceRewriteAllFiles());
} // namespace paimon::test
ASSERT_EQ(2000, core_options.GetOptimizedCompactionInterval().value());
ASSERT_EQ(5l * 1024 * 1024 * 1024, core_options.GetCompactionTotalSizeThreshold().value());
ASSERT_EQ(12l * 1024, core_options.GetCompactionIncrementalSizeThreshold().value());
ASSERT_EQ(3, core_options.GetCompactOffPeakStartHour());
ASSERT_EQ(16, core_options.GetCompactOffPeakEndHour());
ASSERT_EQ(8, core_options.GetCompactOffPeakRatio());
}

TEST(CoreOptionsTest, TestInvalidCase) {
ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::BUCKET, "3.5"}}),
Expand Down
64 changes: 64 additions & 0 deletions src/paimon/core/mergetree/compact/compact_strategy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "paimon/core/compact/compact_unit.h"
#include "paimon/core/io/data_file_meta.h"
#include "paimon/core/mergetree/level_sorted_run.h"
namespace paimon {
/// Compact strategy to decide which files to select for compaction.
class CompactStrategy {
public:
virtual ~CompactStrategy() = default;

/// Pick compaction unit from runs.
/// @note Compaction is runs-based, not file-based.
/// Level 0 is special, one run per file; all other levels are one run per level.
/// Compaction is sequential from small level to large level.
virtual Result<std::optional<CompactUnit>> Pick(int32_t num_levels,
const std::vector<LevelSortedRun>& runs) = 0;
/// Pick a compaction unit consisting of all existing files.
// TODO(xinyu.lxy): support RecordLevelExpire and BucketedDvMaintainer
static std::optional<CompactUnit> PickFullCompaction(int32_t num_levels,
const std::vector<LevelSortedRun>& runs,
bool force_rewrite_all_files) {
int32_t max_level = num_levels - 1;
Comment thread
lxy-9602 marked this conversation as resolved.
if (runs.empty()) {
// no sorted run, no need to compact
return std::nullopt;
}
// only max level files
if (runs.size() == 1 && runs[0].level == max_level) {
std::vector<std::shared_ptr<DataFileMeta>> files_to_be_compacted;
const auto& run = runs[0];
for (const auto& file : run.run.Files()) {
if (force_rewrite_all_files) {
// add all files when force compacted
files_to_be_compacted.push_back(file);
}
// TODO(xinyu.lxy): support RecordLevelExpire and BucketedDvMaintainer
}
if (files_to_be_compacted.empty()) {
return std::nullopt;
}
return CompactUnit::FromFiles(max_level, files_to_be_compacted, /*file_rewrite=*/true);
}
// full compaction
return CompactUnit::FromLevelRuns(max_level, runs);
}
};
} // namespace paimon
Loading
Loading