Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d94c2da
(query process) show more query process stats in show current_queries
yiguolei Apr 14, 2026
0499853
f
yiguolei Apr 14, 2026
9c48cd8
f
yiguolei Apr 14, 2026
ec6585c
f
yiguolei Apr 14, 2026
807aa8e
f
yiguolei Apr 14, 2026
1b21f7f
f
yiguolei Apr 14, 2026
1bc5711
f
yiguolei Apr 14, 2026
cc52698
f
yiguolei Apr 14, 2026
3ad73c8
f
yiguolei Apr 14, 2026
0784240
[improvement](executor) expose query progress in show current_queries
xuchenhao Apr 16, 2026
04cb498
Merge branch 'master' into query_progress
xuchenhao Apr 16, 2026
c593d55
Merge branch 'master' into query_progress
wenzhenghu Apr 19, 2026
02f661d
[improvement](executor) Change query progress metrics from instance-l…
wenzhenghu Apr 19, 2026
ef4fa37
Merge branch 'master' into query_progress
wenzhenghu Apr 19, 2026
8231889
Merge branch 'master' into query_progress
wenzhenghu Apr 19, 2026
a327df7
Merge branch 'master' into query_progress
xuchenhao Apr 20, 2026
5799f82
[fix](fe) Fix missing query statistics update in workload runtime status
xuchenhao Apr 20, 2026
6ca28db
Merge branch 'master' into query_progress
xuchenhao Apr 20, 2026
48222f3
Merge branch 'master' into query_progress
xuchenhao Apr 21, 2026
62f4744
Merge branch 'master' into query_progress
xuchenhao Apr 21, 2026
d3bf855
Merge branch 'master' into query_progress
xuchenhao Apr 22, 2026
3f60c1d
Merge branch 'master' into query_progress
xuchenhao Apr 22, 2026
533f6c6
[improvement](be) Refactor query task progress accounting to real-tim…
wenzhenghu Apr 22, 2026
a2b5e34
code format
wenzhenghu Apr 22, 2026
12c7835
[fix](be) Preserve query task progress counters after QueryContext te…
wenzhenghu Apr 23, 2026
482c477
delete unused function
wenzhenghu Apr 23, 2026
692fb3b
Merge branch 'master' into query_progress
wenzhenghu Apr 24, 2026
e5a5a7f
[fix](fe) Preserve current queries compatibility
xuchenhao Apr 24, 2026
324c5fd
[fix](fe) Preserve query statistics surfaces
xuchenhao Apr 24, 2026
6178e5f
Merge branch 'master' into query_progress
xuchenhao Apr 24, 2026
9a1b29c
Merge branch 'master' into query_progress
xuchenhao Apr 25, 2026
4296028
Revert "[fix](fe) Preserve query statistics surfaces"
wenzhenghu Apr 25, 2026
6370707
Revert "[fix](fe) Preserve current queries compatibility"
wenzhenghu Apr 25, 2026
cbe885e
optimize task-level query progress tracking and add comprehensive tests
wenzhenghu Apr 27, 2026
7c29737
[fix](fe) Guard query stats merge against concurrent removal
xuchenhao Apr 27, 2026
2354ca1
fix fe ut
wenzhenghu Apr 27, 2026
e24795a
Merge branch 'master' into query_progress
wenzhenghu Apr 27, 2026
3461b34
Merge branch 'master' into query_progress
wenzhenghu Apr 27, 2026
e72a72d
fix fe ut
wenzhenghu Apr 27, 2026
756bdb0
fix regression test
wenzhenghu Apr 27, 2026
e39b893
Merge branch 'master' into query_progress
wenzhenghu Apr 27, 2026
af75da9
fix traverses concurrent maps issue
wenzhenghu Apr 27, 2026
074cef4
Merge branch 'master' into query_progress
wenzhenghu Apr 27, 2026
3f327ff
fix fe lexicographical order issue
wenzhenghu Apr 27, 2026
7258953
Merge branch 'master' into query_progress
xuchenhao Apr 30, 2026
8c54edf
Merge branch 'master' into query_progress
wenzhenghu May 4, 2026
88713bc
[fix](query_progress) remove redundant task progress fields in QueryC…
wenzhenghu May 6, 2026
b06684a
Merge branch 'master' into query_progress
wenzhenghu May 6, 2026
d9331b8
Add miss deleted code:friend class declaration for QueryTaskController
wenzhenghu May 9, 2026
26ff713
Merge branch 'master' into query_progress
wenzhenghu May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/operator/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ void PipelineXLocalStateBase::reached_limit(Block* block, bool* eos) {

if (auto rows = block->rows()) {
_num_rows_returned += rows;
_state->get_query_ctx()->resource_ctx()->io_context()->update_process_rows(rows);
}
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,8 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
}
_pipeline_parent_map.clear();
_op_id_to_shared_state.clear();
// Record task cardinality once when this fragment context finishes task initialization.
_query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));

return Status::OK();
}
Expand Down Expand Up @@ -1971,6 +1973,8 @@ void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
{
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
// Update query-level finished task progress in real time.
_query_ctx->inc_finished_task_num();
if (_closed_tasks >= _total_tasks) {
need_remove = _close_fragment_instance();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "runtime/runtime_state.h"
#include "runtime/task_execution_context.h"
#include "util/stopwatch.hpp"
#include "util/uid_util.h"

namespace doris {
struct ReportStatusRequest;
Expand Down Expand Up @@ -88,11 +89,11 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int get_fragment_id() const { return _fragment_id; }

void decrement_running_task(PipelineId pipeline_id);

uint32_t rec_cte_stage() const { return _rec_cte_stage; }
void set_rec_cte_stage(uint32_t stage) { _rec_cte_stage = stage; }

void decrement_running_task(PipelineId pipeline_id);

Status send_report(bool);

void trigger_report_if_necessary();
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,16 @@ Status QueryContext::reset_global_rf(const google::protobuf::RepeatedField<int32
return Status::OK();
}

void QueryContext::add_total_task_num(int delta) {
if (auto* qtc = dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
qtc->add_total_task_num(delta);
}
}

void QueryContext::inc_finished_task_num() {
if (auto* qtc = dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
qtc->inc_finished_task_num();
}
}

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace doris {

class PipelineFragmentContext;
class PipelineTask;
class QueryTaskController;
class Dependency;
class RecCTEScanLocalState;

Expand Down Expand Up @@ -198,6 +199,10 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

TUniqueId query_id() const { return _query_id; }

// Expose task-level query progress counters for runtime statistics reporting.
void add_total_task_num(int delta);
void inc_finished_task_num();

ScannerScheduler* get_scan_scheduler() { return _scan_task_scheduler; }

ScannerScheduler* get_remote_scan_scheduler() { return _remote_scan_task_scheduler; }
Expand Down Expand Up @@ -307,6 +312,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids);

private:
// Task-level progress counters for current query.
friend class QueryTaskController;

int _timeout_second;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/workload_management/io_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
// number rows returned by query.
// only set once by result sink when closing.
RuntimeProfile::Counter* returned_rows_counter_;
RuntimeProfile::Counter* process_rows_counter_;
RuntimeProfile::Counter* shuffle_send_bytes_counter_;
RuntimeProfile::Counter* shuffle_send_rows_counter_;

Expand All @@ -62,6 +63,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
bytes_write_into_cache_counter_ =
ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT);
process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows", TUnit::UNIT);
shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES);
shuffle_send_rows_counter_ =
ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT);
Expand Down Expand Up @@ -93,6 +95,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
return stats_.bytes_write_into_cache_counter_->value();
}
int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); }
int64_t process_rows() const { return stats_.process_rows_counter_->value(); }
int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); }
int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); }

Expand All @@ -116,6 +119,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
stats_.bytes_write_into_cache_counter_->update(delta);
}
void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); }
void update_process_rows(int64_t delta) const { stats_.process_rows_counter_->update(delta); }
void update_shuffle_send_bytes(int64_t delta) const {
stats_.shuffle_send_bytes_counter_->update(delta);
}
Expand Down
18 changes: 18 additions & 0 deletions be/src/runtime/workload_management/query_task_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,22 @@ std::vector<PipelineTask*> QueryTaskController::get_revocable_tasks() {
return tasks;
}

void QueryTaskController::add_total_task_num(int delta) {
_total_task_num.fetch_add(delta, std::memory_order_relaxed);
}

void QueryTaskController::inc_finished_task_num() {
_finished_task_num.fetch_add(1, std::memory_order_relaxed);
}

int QueryTaskController::get_total_task_num() const {
// Read from controller-owned counters to avoid lifecycle dependency on QueryContext.
return _total_task_num.load(std::memory_order_relaxed);
}

int QueryTaskController::get_finished_task_num() const {
// Read from controller-owned counters to avoid lifecycle dependency on QueryContext.
return _finished_task_num.load(std::memory_order_relaxed);
}

} // namespace doris
10 changes: 10 additions & 0 deletions be/src/runtime/workload_management/query_task_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <atomic>

#include "common/factory_creator.h"
#include "runtime/workload_management/task_controller.h"

Expand Down Expand Up @@ -45,11 +47,19 @@ class QueryTaskController : public TaskController {
size_t get_revocable_size() override;
Status revoke_memory() override;
std::vector<PipelineTask*> get_revocable_tasks() override;
// Expose task progress counters without leaking full QueryContext.
void add_total_task_num(int delta);
void inc_finished_task_num();
int get_total_task_num() const;
int get_finished_task_num() const;

protected:
QueryTaskController(const std::shared_ptr<QueryContext>& query_ctx) : query_ctx_(query_ctx) {}

const std::weak_ptr<QueryContext> query_ctx_;
// Keep task progress counters in controller so they outlive QueryContext if needed.
std::atomic<int> _total_task_num {0};
std::atomic<int> _finished_task_num {0};
};

} // namespace doris
8 changes: 8 additions & 0 deletions be/src/runtime/workload_management/resource_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/data.pb.h>
#include <glog/logging.h>

#include "runtime/workload_management/query_task_controller.h"
#include "util/time.h"

namespace doris {
Expand All @@ -30,6 +31,7 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
statistics->__set_scan_bytes(io_context()->scan_bytes());
statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
statistics->__set_returned_rows(io_context()->returned_rows());
statistics->__set_process_rows(io_context()->process_rows());
statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes());
statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes());
Expand All @@ -49,6 +51,12 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
io_context_->spill_write_bytes_to_local_storage());
statistics->__set_spill_read_bytes_from_local_storage(
io_context_->spill_read_bytes_from_local_storage());

if (auto* query_task_controller = dynamic_cast<QueryTaskController*>(task_controller())) {
// Fill query task-level progress directly from task controller.
statistics->__set_total_tasks_num(query_task_controller->get_total_task_num());
statistics->__set_finished_tasks_num(query_task_controller->get_finished_task_num());
}
}

} // namespace doris
125 changes: 125 additions & 0 deletions be/test/exec/pipeline/pipeline_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/pipeline/pipeline.h"

#include <gen_cpp/FrontendService_types.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

Expand All @@ -40,6 +41,8 @@
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/workload_management/query_task_controller.h"
#include "runtime/workload_management/resource_context.h"

namespace doris {

Expand Down Expand Up @@ -467,6 +470,36 @@ TEST_F(PipelineTest, HAPPY_PATH) {
downstream_recvr->close();
}

TEST_F(PipelineTest, QueryTaskProgressCounters) {
// Verify task-level counters are updated via QueryContext and exposed by QueryTaskController.
_query_ctx->add_total_task_num(7);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();

auto* query_task_controller =
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(query_task_controller, nullptr);
EXPECT_EQ(query_task_controller->get_total_task_num(), 7);
EXPECT_EQ(query_task_controller->get_finished_task_num(), 3);
}

TEST_F(PipelineTest, QueryTaskProgressCountersOutliveQueryContext) {
// Verify controller-owned counters still work after QueryContext is destroyed.
auto resource_ctx = _query_ctx->resource_ctx();
auto* query_task_controller =
dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
ASSERT_NE(query_task_controller, nullptr);

_query_ctx->add_total_task_num(5);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();

_query_ctx.reset();
EXPECT_EQ(query_task_controller->get_total_task_num(), 5);
EXPECT_EQ(query_task_controller->get_finished_task_num(), 2);
}

TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
_reset();
// Pipeline(ExchangeOperator(id=0, HASH_PARTITIONED) -> ExchangeSinkOperatorX(id=1, UNPARTITIONED))
Expand Down Expand Up @@ -1163,4 +1196,96 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
downstream_recvr->close();
}

TEST_F(PipelineTest, QueryTaskProgressConcurrentUpdates) {
// Verify counters are thread-safe under concurrent updates from multiple threads.
auto resource_ctx = _query_ctx->resource_ctx();
auto* ctrl = dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
ASSERT_NE(ctrl, nullptr);

_query_ctx->add_total_task_num(400);

std::vector<std::thread> threads;
for (int i = 0; i < 8; i++) {
threads.emplace_back([this]() {
for (int j = 0; j < 50; j++) {
_query_ctx->inc_finished_task_num();
}
});
}
for (auto& t : threads) {
t.join();
}

EXPECT_EQ(ctrl->get_total_task_num(), 400);
EXPECT_EQ(ctrl->get_finished_task_num(), 400);
}

TEST_F(PipelineTest, QueryTaskProgressThriftSerialization) {
// Verify progress counters are correctly serialized to Thrift struct.
_query_ctx->add_total_task_num(10);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();

TQueryStatistics tqs;
_query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);

EXPECT_TRUE(tqs.__isset.total_tasks_num);
EXPECT_EQ(tqs.total_tasks_num, 10);
EXPECT_TRUE(tqs.__isset.finished_tasks_num);
EXPECT_EQ(tqs.finished_tasks_num, 4);
}

TEST_F(PipelineTest, QueryTaskProgressBoundaryZeroTotal) {
// Verify behavior when no tasks have been registered (total = 0).
auto* ctrl = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl, nullptr);

// total = 0, finished = 0
EXPECT_EQ(ctrl->get_total_task_num(), 0);
EXPECT_EQ(ctrl->get_finished_task_num(), 0);

// inc_finished with no total should still work without crash
_query_ctx->inc_finished_task_num();
EXPECT_EQ(ctrl->get_finished_task_num(), 1);
}

TEST_F(PipelineTest, QueryTaskProgressAllFinished) {
// Verify 100% progress when all tasks finish.
_query_ctx->add_total_task_num(8);
for (int i = 0; i < 8; i++) {
_query_ctx->inc_finished_task_num();
}

auto* ctrl = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl, nullptr);
EXPECT_EQ(ctrl->get_total_task_num(), 8);
EXPECT_EQ(ctrl->get_finished_task_num(), 8);

// Verify thrift serialization
TQueryStatistics tqs;
_query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);
EXPECT_EQ(tqs.total_tasks_num, 8);
EXPECT_EQ(tqs.finished_tasks_num, 8);
}

TEST_F(PipelineTest, QueryTaskProgressCountersSurviveReset) {
// Verify that after calling _reset(), fresh counters are initialized to zero.
auto* ctrl1 = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl1, nullptr);
_query_ctx->add_total_task_num(10);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
EXPECT_EQ(ctrl1->get_total_task_num(), 10);
EXPECT_EQ(ctrl1->get_finished_task_num(), 2);

// Reset creates a new QueryContext
_reset();
auto* ctrl2 = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl2, nullptr);
EXPECT_EQ(ctrl2->get_total_task_num(), 0);
EXPECT_EQ(ctrl2->get_finished_task_num(), 0);
}

} // namespace doris
Loading
Loading