Skip to content
3 changes: 3 additions & 0 deletions include/paimon/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class PAIMON_EXPORT Executor {
/// @note This method should be thread-safe and can be called from multiple threads
/// simultaneously.
virtual void Add(std::function<void()> func) = 0;

/// Shutdown the executor immediately, discarding all pending tasks.
virtual void ShutdownNow() = 0;
};

} // namespace paimon
12 changes: 12 additions & 0 deletions include/paimon/file_store_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <cstdint>
#include <map>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -51,6 +52,17 @@ class PAIMON_EXPORT FileStoreWrite {
/// the corresponding array in `batch` must have zero null entries.
virtual Status Write(std::unique_ptr<RecordBatch>&& batch) = 0;

/// Compact data stored in given partition and bucket. Note that compaction process is only
/// submitted and may not be completed when the method returns.
///
/// @param partition the partition to compact
/// @param bucket the bucket to compact
/// @param full_compaction whether to trigger full compaction or just normal compaction
///
/// @return status for compacting the records
virtual Status Compact(const std::map<std::string, std::string>& partition, int32_t bucket,
bool full_compaction) = 0;

/// Generate a list of commit messages with the latest generated data file meta
/// information of the current snapshot.
///
Expand Down
15 changes: 15 additions & 0 deletions include/paimon/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ class PAIMON_EXPORT Metrics {
/// Get all histogram statistics snapshots.
virtual std::map<std::string, HistogramStats> GetAllHistogramStats() const = 0;

/// Set the value of a specific gauge metric (current state metric).
/// @param metric_name The name/key of the gauge metric to set.
/// @param metric_value The value to set for this gauge metric (double, not just integer).
virtual void SetGauge(const std::string& metric_name, double metric_value) = 0;

/// Get the current value of a specific gauge metric (current state metric).
/// @param metric_name The name/key of the gauge metric to retrieve.
/// @return The current value of the gauge metric, or `Status::KeyError` if the metric doesn't
/// exist.
virtual Result<double> GetGauge(const std::string& metric_name) const = 0;

/// Get all gauge metrics as a map.
/// @return A map containing all gauge metric names and their current values.
virtual std::map<std::string, double> GetAllGauges() const = 0;

/// Merge metrics from another Metrics instance into this one.
///
/// For metrics that exist in both instances, the values are added together.
Expand Down
6 changes: 6 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ set(PAIMON_COMMON_SRCS

set(PAIMON_CORE_SRCS
core/append/append_only_writer.cpp
core/append/bucketed_append_compact_manager.cpp
core/casting/binary_to_string_cast_executor.cpp
core/casting/boolean_to_decimal_cast_executor.cpp
core/casting/boolean_to_numeric_cast_executor.cpp
Expand Down Expand Up @@ -256,6 +257,7 @@ set(PAIMON_CORE_SRCS
core/operation/read_context.cpp
core/operation/scan_context.cpp
core/operation/write_context.cpp
core/operation/write_restore.cpp
core/postpone/postpone_bucket_writer.cpp
core/schema/arrow_schema_validator.cpp
core/schema/schema_manager.cpp
Expand Down Expand Up @@ -488,6 +490,7 @@ if(PAIMON_BUILD_TESTS)
add_paimon_test(core_test
SOURCES
core/append/append_only_writer_test.cpp
core/append/bucketed_append_compact_manager_test.cpp
core/casting/cast_executor_factory_test.cpp
core/casting/cast_executor_test.cpp
core/casting/casted_row_test.cpp
Expand All @@ -500,6 +503,7 @@ if(PAIMON_BUILD_TESTS)
core/core_options_test.cpp
core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
core/deletionvectors/bitmap_deletion_vector_test.cpp
core/deletionvectors/bucketed_dv_maintainer_test.cpp
core/deletionvectors/deletion_vector_test.cpp
core/deletionvectors/deletion_vectors_index_file_test.cpp
core/index/index_in_data_file_dir_path_factory_test.cpp
Expand Down Expand Up @@ -561,6 +565,7 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/merge_tree_writer_test.cpp
core/mergetree/sorted_run_test.cpp
core/migrate/file_meta_utils_test.cpp
core/operation/metrics/compaction_metrics_test.cpp
core/operation/data_evolution_file_store_scan_test.cpp
core/operation/data_evolution_split_read_test.cpp
core/operation/key_value_file_store_write_test.cpp
Expand All @@ -581,6 +586,7 @@ if(PAIMON_BUILD_TESTS)
core/operation/raw_file_split_read_test.cpp
core/operation/read_context_test.cpp
core/operation/scan_context_test.cpp
core/operation/write_restore_test.cpp
core/operation/write_context_test.cpp
core/partition/partition_statistics_test.cpp
core/postpone/postpone_bucket_writer_test.cpp
Expand Down
42 changes: 42 additions & 0 deletions src/paimon/common/executor/default_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/

#include <atomic>
#include <chrono>
#include <cstdint>
#include <future>
#include <memory>
#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -80,4 +82,44 @@ TEST(DefaultExecutorTest, TestViaWithException) {
ASSERT_THROW(future.get(), std::runtime_error);
}

TEST(DefaultExecutorTest, TestShutdownNowDropsPendingTasks) {
auto executor = CreateDefaultExecutor(/*thread_count=*/1);
std::atomic<bool> first_started = false;
std::atomic<int32_t> executed_count = 0;
std::promise<void> release_first_task;
auto release_future = release_first_task.get_future();
executor->Add([&]() {
first_started.store(true);
release_future.wait();
++executed_count;
});

for (int32_t index = 0; index < 20; ++index) {
executor->Add([&]() { ++executed_count; });
}

for (int32_t retry = 0; retry < 100 && !first_started.load(); ++retry) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ASSERT_TRUE(first_started.load());
std::thread shutdown_thread([&]() { executor->ShutdownNow(); });
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
release_first_task.set_value();
shutdown_thread.join();

// Only the running task should complete. Pending tasks should be discarded.
ASSERT_EQ(executed_count.load(), 1);
}

TEST(DefaultExecutorTest, TestAddTaskAfterShutdownNowIgnored) {
auto executor = CreateDefaultExecutor(/*thread_count=*/1);
std::atomic<int32_t> executed_count = 0;

executor->ShutdownNow();
executor->Add([&]() { ++executed_count; });

std::this_thread::sleep_for(std::chrono::milliseconds(10));
ASSERT_EQ(executed_count.load(), 0);
}

} // namespace paimon::test
28 changes: 26 additions & 2 deletions src/paimon/common/executor/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ class DefaultExecutor : public Executor {

void Add(std::function<void()> func) override;

void ShutdownNow() override;

private:
void WorkerThread();

void ShutdownInternal(bool wait_for_pending_tasks);

uint32_t thread_count_;
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
Expand All @@ -51,17 +55,37 @@ DefaultExecutor::DefaultExecutor(uint32_t thread_count) : thread_count_(thread_c
}
}

DefaultExecutor::~DefaultExecutor() {
void DefaultExecutor::ShutdownInternal(bool wait_for_pending_tasks) {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (stop_) {
return;
}
stop_ = true;
if (!wait_for_pending_tasks) {
// Discard all pending tasks immediately.
std::queue<std::function<void()>> empty;
tasks_.swap(empty);
}
condition_.notify_all();
}
for (std::thread& worker : workers_) {
worker.join();
if (worker.joinable()) {
worker.join();
}
}
}

DefaultExecutor::~DefaultExecutor() {
// Graceful shutdown: wait for all pending tasks to complete.
ShutdownInternal(/*wait_for_pending_tasks=*/true);
}

void DefaultExecutor::ShutdownNow() {
// Immediate shutdown: discard all pending tasks.
ShutdownInternal(/*wait_for_pending_tasks=*/false);
}

void DefaultExecutor::Add(std::function<void()> func) {
if (!func) {
return;
Expand Down
52 changes: 48 additions & 4 deletions src/paimon/common/metrics/metrics_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,29 @@ std::map<std::string, HistogramStats> MetricsImpl::GetAllHistogramStats() const
return res;
}

void MetricsImpl::SetGauge(const std::string& metric_name, double value) {
std::lock_guard<std::mutex> lock(gauge_lock_);
gauges_[metric_name] = value;
}

Result<double> MetricsImpl::GetGauge(const std::string& metric_name) const {
std::lock_guard<std::mutex> lock(gauge_lock_);
auto it = gauges_.find(metric_name);
if (it != gauges_.end()) {
return it->second;
}
return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
}

std::map<std::string, double> MetricsImpl::GetAllGauges() const {
std::lock_guard<std::mutex> lock(gauge_lock_);
return gauges_;
}

void MetricsImpl::Merge(const std::shared_ptr<Metrics>& other) {
if (other && this != other.get()) {
std::map<std::string, uint64_t> other_counters = other->GetAllCounters();
{
std::map<std::string, uint64_t> other_counters = other->GetAllCounters();
std::lock_guard<std::mutex> guard(counter_lock_);
for (const auto& kv : other_counters) {
auto iter = counters_.find(kv.first);
Expand All @@ -106,7 +125,18 @@ void MetricsImpl::Merge(const std::shared_ptr<Metrics>& other) {
}
}
}

std::map<std::string, double> other_gauges = other->GetAllGauges();
{
std::lock_guard<std::mutex> guard(gauge_lock_);
for (const auto& kv : other_gauges) {
auto iter = gauges_.find(kv.first);
if (iter == gauges_.end()) {
gauges_[kv.first] = kv.second;
} else {
gauges_[kv.first] += kv.second;
}
}
}
auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
if (other_impl) {
std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> other_histograms;
Expand Down Expand Up @@ -139,8 +169,15 @@ void MetricsImpl::Merge(const std::shared_ptr<Metrics>& other) {
void MetricsImpl::Overwrite(const std::shared_ptr<Metrics>& other) {
if (other && this != other.get()) {
std::map<std::string, uint64_t> other_counters = other->GetAllCounters();
std::lock_guard<std::mutex> guard(counter_lock_);
counters_.swap(other_counters);
{
std::lock_guard<std::mutex> guard(counter_lock_);
counters_.swap(other_counters);
}
std::map<std::string, double> other_gauges = other->GetAllGauges();
{
std::lock_guard<std::mutex> guard(gauge_lock_);
gauges_.swap(other_gauges);
}

auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
std::map<std::string, std::shared_ptr<Histogram>> new_histograms;
Expand Down Expand Up @@ -223,6 +260,13 @@ std::string MetricsImpl::ToString() const {
stddev_val.SetDouble(s.stddev);
doc.AddMember(rapidjson::Value(name + ".stddev", allocator), stddev_val, allocator);
}

std::map<std::string, double> gauges = GetAllGauges();
for (const auto& kv : gauges) {
doc.AddMember(rapidjson::Value(kv.first, allocator), rapidjson::Value(kv.second),
allocator);
}

rapidjson::StringBuffer s;
RapidWriter writer(s);
doc.Accept(writer);
Expand Down
7 changes: 7 additions & 0 deletions src/paimon/common/metrics/metrics_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class PAIMON_EXPORT MetricsImpl : public Metrics {
Result<HistogramStats> GetHistogramStats(const std::string& metric_name) const override;
std::map<std::string, HistogramStats> GetAllHistogramStats() const override;

void SetGauge(const std::string& metric_name, double metric_value) override;
Result<double> GetGauge(const std::string& metric_name) const override;
std::map<std::string, double> GetAllGauges() const override;

void Merge(const std::shared_ptr<Metrics>& other) override;
std::string ToString() const override;
void Overwrite(const std::shared_ptr<Metrics>& metrics);
Expand All @@ -66,6 +70,9 @@ class PAIMON_EXPORT MetricsImpl : public Metrics {

mutable std::mutex histogram_lock_;
std::map<std::string, std::shared_ptr<Histogram>> histograms_;

mutable std::mutex gauge_lock_;
std::map<std::string, double> gauges_;
};

} // namespace paimon
31 changes: 30 additions & 1 deletion src/paimon/common/metrics/metrics_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,45 @@ TEST(MetricsImplTest, TestSimple) {
"Key error: metric 'some_metric' not found");
}

TEST(MetricsImplTest, TestGaugeMergeAndOverwrite) {
auto metrics = std::make_shared<MetricsImpl>();
metrics->SetGauge("g1", 1.5);
metrics->SetGauge("g2", 2.0);

auto other = std::make_shared<MetricsImpl>();
other->SetGauge("g2", 3.25);
other->SetGauge("g3", 4.75);

metrics->Merge(other);

ASSERT_OK_AND_ASSIGN(double gauge, metrics->GetGauge("g1"));
EXPECT_DOUBLE_EQ(1.5, gauge);
ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g2"));
EXPECT_DOUBLE_EQ(5.25, gauge);
ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g3"));
EXPECT_DOUBLE_EQ(4.75, gauge);

metrics->Overwrite(other);

ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g2"));
EXPECT_DOUBLE_EQ(3.25, gauge);
ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g3"));
EXPECT_DOUBLE_EQ(4.75, gauge);
ASSERT_NOK_WITH_MSG(metrics->GetGauge("g1"), "Key error: metric 'g1' not found");
}

TEST(MetricsImplTest, TestToString) {
std::shared_ptr<MetricsImpl> metrics1 = std::make_shared<MetricsImpl>();
metrics1->SetCounter("k1", 1);
metrics1->SetCounter("k2", 2);
metrics1->SetGauge("g1", 1.25);
std::shared_ptr<MetricsImpl> metrics2 = std::make_shared<MetricsImpl>();
metrics2->SetCounter("m1", 3);
metrics2->SetCounter("m2", 4);
metrics2->SetCounter("k2", 5);
metrics2->SetGauge("g2", 2.5);
metrics1->Merge(metrics2);
EXPECT_EQ(metrics1->ToString(), "{\"k1\":1,\"k2\":7,\"m1\":3,\"m2\":4}");
EXPECT_EQ(metrics1->ToString(), "{\"k1\":1,\"k2\":7,\"m1\":3,\"m2\":4,\"g1\":1.25,\"g2\":2.5}");
}

} // namespace paimon::test
2 changes: 2 additions & 0 deletions src/paimon/common/reader/reader_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "arrow/api.h"
#include "arrow/array/array_base.h"
#include "arrow/array/array_nested.h"
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
#include "arrow/c/helpers.h"
Expand Down Expand Up @@ -105,4 +106,5 @@ BatchReader::ReadBatchWithBitmap ReaderUtils::AddAllValidBitmap(BatchReader::Rea
all_valid.AddRange(0, batch.first->length);
return std::make_pair(std::move(batch), std::move(all_valid));
}

} // namespace paimon
2 changes: 2 additions & 0 deletions src/paimon/common/reader/reader_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <memory>
#include <string>

#include "arrow/api.h"
#include "paimon/reader/batch_reader.h"
Expand All @@ -25,6 +26,7 @@
namespace arrow {
class MemoryPool;
class Array;
class StructArray;
} // namespace arrow

namespace paimon {
Expand Down
Loading
Loading