Skip to content

feat(metrics): add progress metrics via collector #17359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 15, 2025
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions src/common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ pub struct ProgressValues {
pub bytes: usize,
}

impl ProgressValues {
pub fn add(&self, other: &ProgressValues) -> ProgressValues {
ProgressValues {
rows: self.rows + other.rows,
bytes: self.bytes + other.bytes,
}
}
}

#[derive(Debug)]
pub struct Progress {
rows: AtomicUsize,
Expand Down
4 changes: 4 additions & 0 deletions src/common/base/src/runtime/metrics/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl GlobalRegistry {
metric
}

pub fn register_collector(&self, collector: Box<dyn prometheus_client::collector::Collector>) {
self.inner.lock().registry.register_collector(collector);
}

pub(crate) fn new_scoped_metric(&self, index: usize) -> impl Iterator<Item = ScopedMetric> {
let global_registry = self.inner.lock();
let mut scoped_metrics = Vec::with_capacity(global_registry.metrics.len() - index);
Expand Down
7 changes: 7 additions & 0 deletions src/common/metrics/src/metrics/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ const METRIC_QUERY_TOTAL_PARTITIONS: &str = "query_total_partitions";
const METRIC_QUERY_RESULT_ROWS: &str = "query_result_rows";
const METRIC_QUERY_RESULT_BYTES: &str = "query_result_bytes";

pub const METRIC_QUERY_SCAN_PROGRESS_ROWS: &str = "query_scan_progress_rows";
pub const METRIC_QUERY_SCAN_PROGRESS_BYTES: &str = "query_scan_progress_bytes";
pub const METRIC_QUERY_WRITE_PROGRESS_ROWS: &str = "query_write_progress_rows";
pub const METRIC_QUERY_WRITE_PROGRESS_BYTES: &str = "query_write_progress_bytes";
pub const METRIC_QUERY_SPILL_PROGRESS_ROWS: &str = "query_spill_progress_rows";
pub const METRIC_QUERY_SPILL_PROGRESS_BYTES: &str = "query_spill_progress_bytes";

pub static QUERY_START: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_START));
pub static QUERY_SUCCESS: LazyLock<FamilyCounter<VecLabels>> =
Expand Down
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct ProcessInfo {
/// storage metrics for persisted data reading.
pub data_metrics: Option<StorageMetrics>,
pub scan_progress_value: Option<ProgressValues>,
pub write_progress_value: Option<ProgressValues>,
pub spill_progress_value: Option<ProgressValues>,
pub mysql_connection_id: Option<u32>,
pub created_time: SystemTime,
pub status_info: Option<String>,
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ paste = { workspace = true }
petgraph = { workspace = true }
pin-project-lite = { workspace = true }
poem = { workspace = true }
prometheus-client = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
recursive = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
let typ = session.get_type();
if typ.is_user_session() {
SessionManager::instance().status.write().query_finish(now);
SessionManager::instance()
.metrics_collector
.track_finished_query(
ctx.get_scan_progress_value(),
ctx.get_write_progress_value(),
ctx.get_join_spill_progress_value(),
ctx.get_aggregate_spill_progress_value(),
ctx.get_group_by_spill_progress_value(),
ctx.get_window_partition_spill_progress_value(),
);
}

if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) {
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod session;
mod session_ctx;
mod session_info;
mod session_mgr;
mod session_mgr_metrics;
mod session_mgr_status;
mod session_privilege_mgr;
mod session_status;
Expand Down
23 changes: 23 additions & 0 deletions src/query/service/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ impl Session {
memory_usage,
data_metrics: Self::query_data_metrics(session_ctx),
scan_progress_value: Self::query_scan_progress_value(session_ctx),
write_progress_value: Self::query_write_progress_value(session_ctx),
spill_progress_value: Self::query_spill_progress_value(session_ctx),
mysql_connection_id: self.mysql_connection_id,
created_time: Self::query_created_time(session_ctx),
status_info: shared_query_context
Expand Down Expand Up @@ -105,6 +107,27 @@ impl Session {
.map(|context_shared| context_shared.scan_progress.get_values())
}

fn query_write_progress_value(status: &SessionContext) -> Option<ProgressValues> {
status
.get_query_context_shared()
.as_ref()
.map(|context_shared| context_shared.write_progress.get_values())
}

fn query_spill_progress_value(status: &SessionContext) -> Option<ProgressValues> {
status
.get_query_context_shared()
.as_ref()
.map(|context_shared| {
context_shared
.agg_spill_progress
.get_values()
.add(&context_shared.join_spill_progress.get_values())
.add(&context_shared.window_partition_spill_progress.get_values())
.add(&context_shared.group_by_spill_progress.get_values())
})
}

fn query_created_time(status: &SessionContext) -> SystemTime {
match status.get_query_context_shared() {
None => SystemTime::now(),
Expand Down
15 changes: 12 additions & 3 deletions src/query/service/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::Duration;
use databend_common_base::base::tokio;
use databend_common_base::base::GlobalInstance;
use databend_common_base::base::SignalStream;
use databend_common_base::runtime::metrics::GLOBAL_METRICS_REGISTRY;
use databend_common_catalog::table_context::ProcessInfoState;
use databend_common_config::GlobalConfig;
use databend_common_config::InnerConfig;
Expand All @@ -38,6 +39,7 @@ use log::info;
use parking_lot::RwLock;

use crate::sessions::session::Session;
use crate::sessions::session_mgr_metrics::SessionManagerMetricsCollector;
use crate::sessions::ProcessInfo;
use crate::sessions::SessionContext;
use crate::sessions::SessionManagerStatus;
Expand All @@ -47,6 +49,7 @@ pub struct SessionManager {
pub(in crate::sessions) max_sessions: usize,
pub(in crate::sessions) active_sessions: Arc<RwLock<HashMap<String, Weak<Session>>>>,
pub status: Arc<RwLock<SessionManagerStatus>>,
pub metrics_collector: SessionManagerMetricsCollector,

// When typ is MySQL, insert into this map, key is id, val is MySQL connection id.
pub(crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
Expand All @@ -55,20 +58,26 @@ pub struct SessionManager {

impl SessionManager {
pub fn init(conf: &InnerConfig) -> Result<()> {
GlobalInstance::set(Self::create(conf));
let global_instance = Self::create(conf);
GlobalInstance::set(global_instance.clone());
GLOBAL_METRICS_REGISTRY
.register_collector(Box::new(global_instance.metrics_collector.clone()));

Ok(())
}

pub fn create(conf: &InnerConfig) -> Arc<SessionManager> {
let max_sessions = conf.query.max_active_sessions as usize;
Arc::new(SessionManager {
let mgr = Arc::new(SessionManager {
max_sessions,
mysql_basic_conn_id: AtomicU32::new(9_u32.to_le()),
status: Arc::new(RwLock::new(SessionManagerStatus::default())),
mysql_conn_map: Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))),
active_sessions: Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))),
})
metrics_collector: SessionManagerMetricsCollector::new(),
});
mgr.metrics_collector.attach_session_manager(mgr.clone());
mgr
}

pub fn instance() -> Arc<SessionManager> {
Expand Down
171 changes: 171 additions & 0 deletions src/query/service/src/sessions/session_mgr_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::ProgressValues;
use databend_common_metrics::interpreter::METRIC_QUERY_SCAN_PROGRESS_BYTES;
use databend_common_metrics::interpreter::METRIC_QUERY_SCAN_PROGRESS_ROWS;
use databend_common_metrics::interpreter::METRIC_QUERY_SPILL_PROGRESS_BYTES;
use databend_common_metrics::interpreter::METRIC_QUERY_SPILL_PROGRESS_ROWS;
use databend_common_metrics::interpreter::METRIC_QUERY_WRITE_PROGRESS_BYTES;
use databend_common_metrics::interpreter::METRIC_QUERY_WRITE_PROGRESS_ROWS;
use parking_lot::Mutex;
use prometheus_client::collector::Collector;
use prometheus_client::encoding::EncodeMetric;
use prometheus_client::metrics::counter::ConstCounter;

use crate::sessions::SessionManager;

/// [`SessionManagerMetricsCollector`] dumps the progress metrics of scan/write/spills
/// from the [`SessionManager`]'s running queries to the prometheus. To avoid the progress
/// metrics being decreased, we also need to accumulate these progress values after the query
/// is finished.
#[derive(Clone)]
pub struct SessionManagerMetricsCollector {
inner: Arc<Mutex<SessionManagerMetricsCollectorInner>>,
}

pub(crate) struct SessionManagerMetricsCollectorInner {
session_mgr: Option<Arc<SessionManager>>,
finished_scan_total: ProgressValues,
finished_write_total: ProgressValues,
finished_spill_total: ProgressValues,
}

impl SessionManagerMetricsCollector {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(SessionManagerMetricsCollectorInner {
session_mgr: None,
finished_scan_total: ProgressValues::default(),
finished_write_total: ProgressValues::default(),
finished_spill_total: ProgressValues::default(),
})),
}
}

pub fn attach_session_manager(&self, session_mgr: Arc<SessionManager>) {
let mut guard = self.inner.lock();
guard.session_mgr.replace(session_mgr);
}

pub fn track_finished_query(
&self,
scan: ProgressValues,
write: ProgressValues,
join_spill: ProgressValues,
aggregate_spill: ProgressValues,
group_by_spill: ProgressValues,
window_partition_spill: ProgressValues,
) {
let mut guard = self.inner.lock();
guard.finished_scan_total = guard.finished_scan_total.add(&scan);
guard.finished_write_total = guard.finished_write_total.add(&write);
guard.finished_spill_total = guard
.finished_spill_total
.add(&join_spill)
.add(&aggregate_spill)
.add(&group_by_spill)
.add(&window_partition_spill);
}
}

impl Default for SessionManagerMetricsCollector {
fn default() -> Self {
Self::new()
}
}

impl std::fmt::Debug for SessionManagerMetricsCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SessionMetricsCollector")
}
}

impl Collector for SessionManagerMetricsCollector {
fn encode(
&self,
mut encoder: prometheus_client::encoding::DescriptorEncoder,
) -> Result<(), std::fmt::Error> {
let processes = {
match self.inner.lock().session_mgr.as_ref() {
Some(mgr) => mgr.processes_info(),
None => return Ok(()),
}
};

let (mut scan_progress, mut write_progress, mut spill_progress) = {
let guard = self.inner.lock();
(
guard.finished_scan_total.clone(),
guard.finished_write_total.clone(),
guard.finished_spill_total.clone(),
)
};
for process in processes {
if let Some(scan) = &process.scan_progress_value {
scan_progress = scan_progress.add(scan);
}
if let Some(write) = &process.write_progress_value {
write_progress = write_progress.add(write);
}
if let Some(spill) = &process.spill_progress_value {
spill_progress = spill_progress.add(spill);
}
}

let metrics = vec![
(
METRIC_QUERY_SCAN_PROGRESS_ROWS,
scan_progress.rows as f64,
"Total scan rows in progress.",
),
(
METRIC_QUERY_SCAN_PROGRESS_BYTES,
scan_progress.bytes as f64,
"Total scan bytes in progress.",
),
(
METRIC_QUERY_WRITE_PROGRESS_ROWS,
write_progress.rows as f64,
"Total write rows in progress.",
),
(
METRIC_QUERY_WRITE_PROGRESS_BYTES,
write_progress.bytes as f64,
"Total write bytes in progress.",
),
(
METRIC_QUERY_SPILL_PROGRESS_ROWS,
spill_progress.rows as f64,
"Total spill rows in progress.",
),
(
METRIC_QUERY_SPILL_PROGRESS_BYTES,
spill_progress.bytes as f64,
"Total spill bytes in progress.",
),
];

for (name, value, help) in metrics {
let counter = ConstCounter::new(value);
let counter_encoder =
encoder.encode_descriptor(name, help, None, counter.metric_type())?;
counter.encode(counter_encoder)?;
}

Ok(())
}
}