diff --git a/Cargo.lock b/Cargo.lock index e778bf359c6d9..acac6a971436a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5121,6 +5121,7 @@ dependencies = [ "pin-project-lite", "poem", "pretty_assertions", + "prometheus-client", "prost", "rand", "recursive", diff --git a/src/common/base/src/base/progress.rs b/src/common/base/src/base/progress.rs index 0b444164fe690..d616e4972569b 100644 --- a/src/common/base/src/base/progress.rs +++ b/src/common/base/src/base/progress.rs @@ -24,6 +24,15 @@ pub struct ProgressValues { pub bytes: usize, } +impl ProgressValues { + pub fn add(&self, other: &ProgressValues) -> ProgressValues { + ProgressValues { + rows: self.rows + other.rows, + bytes: self.bytes + other.bytes, + } + } +} + #[derive(Debug)] pub struct Progress { rows: AtomicUsize, diff --git a/src/common/base/src/runtime/metrics/registry.rs b/src/common/base/src/runtime/metrics/registry.rs index 485ab071e11ad..deba672ec3751 100644 --- a/src/common/base/src/runtime/metrics/registry.rs +++ b/src/common/base/src/runtime/metrics/registry.rs @@ -110,6 +110,10 @@ impl GlobalRegistry { metric } + pub fn register_collector(&self, collector: Box) { + self.inner.lock().registry.register_collector(collector); + } + pub(crate) fn new_scoped_metric(&self, index: usize) -> impl Iterator { let global_registry = self.inner.lock(); let mut scoped_metrics = Vec::with_capacity(global_registry.metrics.len() - index); diff --git a/src/common/metrics/src/metrics/interpreter.rs b/src/common/metrics/src/metrics/interpreter.rs index e9c1c2e303205..0b168bf75c37e 100644 --- a/src/common/metrics/src/metrics/interpreter.rs +++ b/src/common/metrics/src/metrics/interpreter.rs @@ -39,6 +39,13 @@ const METRIC_QUERY_TOTAL_PARTITIONS: &str = "query_total_partitions"; const METRIC_QUERY_RESULT_ROWS: &str = "query_result_rows"; const METRIC_QUERY_RESULT_BYTES: &str = "query_result_bytes"; +pub const METRIC_QUERY_SCAN_PROGRESS_ROWS: &str = "query_scan_progress_rows"; +pub const METRIC_QUERY_SCAN_PROGRESS_BYTES: &str = "query_scan_progress_bytes"; +pub const METRIC_QUERY_WRITE_PROGRESS_ROWS: &str = "query_write_progress_rows"; +pub const METRIC_QUERY_WRITE_PROGRESS_BYTES: &str = "query_write_progress_bytes"; +pub const METRIC_QUERY_SPILL_PROGRESS_ROWS: &str = "query_spill_progress_rows"; +pub const METRIC_QUERY_SPILL_PROGRESS_BYTES: &str = "query_spill_progress_bytes"; + pub static QUERY_START: LazyLock> = LazyLock::new(|| register_counter_family(METRIC_QUERY_START)); pub static QUERY_SUCCESS: LazyLock> = diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 824d03fec4945..2942164ff4a63 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -92,6 +92,8 @@ pub struct ProcessInfo { /// storage metrics for persisted data reading. pub data_metrics: Option, pub scan_progress_value: Option, + pub write_progress_value: Option, + pub spill_progress_value: Option, pub mysql_connection_id: Option, pub created_time: SystemTime, pub status_info: Option, diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 045ca39167c6e..43d0205bb7e38 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -150,6 +150,7 @@ paste = { workspace = true } petgraph = { workspace = true } pin-project-lite = { workspace = true } poem = { workspace = true } +prometheus-client = { workspace = true } prost = { workspace = true } rand = { workspace = true } recursive = { workspace = true } diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 91c0081c5296d..7cdf2e4867511 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -187,6 +187,16 @@ fn log_query_finished(ctx: &QueryContext, error: Option, has_profiles let typ = session.get_type(); if typ.is_user_session() { SessionManager::instance().status.write().query_finish(now); + SessionManager::instance() + .metrics_collector + .track_finished_query( + ctx.get_scan_progress_value(), + ctx.get_write_progress_value(), + ctx.get_join_spill_progress_value(), + ctx.get_aggregate_spill_progress_value(), + ctx.get_group_by_spill_progress_value(), + ctx.get_window_partition_spill_progress_value(), + ); } if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) { diff --git a/src/query/service/src/sessions/mod.rs b/src/query/service/src/sessions/mod.rs index d6d48a9ece1ae..3700e5680510b 100644 --- a/src/query/service/src/sessions/mod.rs +++ b/src/query/service/src/sessions/mod.rs @@ -20,6 +20,7 @@ mod session; mod session_ctx; mod session_info; mod session_mgr; +mod session_mgr_metrics; mod session_mgr_status; mod session_privilege_mgr; mod session_status; diff --git a/src/query/service/src/sessions/session_info.rs b/src/query/service/src/sessions/session_info.rs index b62a46ef9320a..dde95863e5b27 100644 --- a/src/query/service/src/sessions/session_info.rs +++ b/src/query/service/src/sessions/session_info.rs @@ -54,6 +54,8 @@ impl Session { memory_usage, data_metrics: Self::query_data_metrics(session_ctx), scan_progress_value: Self::query_scan_progress_value(session_ctx), + write_progress_value: Self::query_write_progress_value(session_ctx), + spill_progress_value: Self::query_spill_progress_value(session_ctx), mysql_connection_id: self.mysql_connection_id, created_time: Self::query_created_time(session_ctx), status_info: shared_query_context @@ -105,6 +107,27 @@ impl Session { .map(|context_shared| context_shared.scan_progress.get_values()) } + fn query_write_progress_value(status: &SessionContext) -> Option { + status + .get_query_context_shared() + .as_ref() + .map(|context_shared| context_shared.write_progress.get_values()) + } + + fn query_spill_progress_value(status: &SessionContext) -> Option { + status + .get_query_context_shared() + .as_ref() + .map(|context_shared| { + context_shared + .agg_spill_progress + .get_values() + .add(&context_shared.join_spill_progress.get_values()) + .add(&context_shared.window_partition_spill_progress.get_values()) + .add(&context_shared.group_by_spill_progress.get_values()) + }) + } + fn query_created_time(status: &SessionContext) -> SystemTime { match status.get_query_context_shared() { None => SystemTime::now(), diff --git a/src/query/service/src/sessions/session_mgr.rs b/src/query/service/src/sessions/session_mgr.rs index 6ea5cb4de9f63..d871336f432df 100644 --- a/src/query/service/src/sessions/session_mgr.rs +++ b/src/query/service/src/sessions/session_mgr.rs @@ -24,6 +24,7 @@ use std::time::Duration; use databend_common_base::base::tokio; use databend_common_base::base::GlobalInstance; use databend_common_base::base::SignalStream; +use databend_common_base::runtime::metrics::GLOBAL_METRICS_REGISTRY; use databend_common_catalog::table_context::ProcessInfoState; use databend_common_config::GlobalConfig; use databend_common_config::InnerConfig; @@ -38,6 +39,7 @@ use log::info; use parking_lot::RwLock; use crate::sessions::session::Session; +use crate::sessions::session_mgr_metrics::SessionManagerMetricsCollector; use crate::sessions::ProcessInfo; use crate::sessions::SessionContext; use crate::sessions::SessionManagerStatus; @@ -47,6 +49,7 @@ pub struct SessionManager { pub(in crate::sessions) max_sessions: usize, pub(in crate::sessions) active_sessions: Arc>>>, pub status: Arc>, + pub metrics_collector: SessionManagerMetricsCollector, // When typ is MySQL, insert into this map, key is id, val is MySQL connection id. pub(crate) mysql_conn_map: Arc, String>>>, @@ -55,20 +58,26 @@ pub struct SessionManager { impl SessionManager { pub fn init(conf: &InnerConfig) -> Result<()> { - GlobalInstance::set(Self::create(conf)); + let global_instance = Self::create(conf); + GlobalInstance::set(global_instance.clone()); + GLOBAL_METRICS_REGISTRY + .register_collector(Box::new(global_instance.metrics_collector.clone())); Ok(()) } pub fn create(conf: &InnerConfig) -> Arc { let max_sessions = conf.query.max_active_sessions as usize; - Arc::new(SessionManager { + let mgr = Arc::new(SessionManager { max_sessions, mysql_basic_conn_id: AtomicU32::new(9_u32.to_le()), status: Arc::new(RwLock::new(SessionManagerStatus::default())), mysql_conn_map: Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))), active_sessions: Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))), - }) + metrics_collector: SessionManagerMetricsCollector::new(), + }); + mgr.metrics_collector.attach_session_manager(mgr.clone()); + mgr } pub fn instance() -> Arc { diff --git a/src/query/service/src/sessions/session_mgr_metrics.rs b/src/query/service/src/sessions/session_mgr_metrics.rs new file mode 100644 index 0000000000000..90b101ce24267 --- /dev/null +++ b/src/query/service/src/sessions/session_mgr_metrics.rs @@ -0,0 +1,171 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_metrics::interpreter::METRIC_QUERY_SCAN_PROGRESS_BYTES; +use databend_common_metrics::interpreter::METRIC_QUERY_SCAN_PROGRESS_ROWS; +use databend_common_metrics::interpreter::METRIC_QUERY_SPILL_PROGRESS_BYTES; +use databend_common_metrics::interpreter::METRIC_QUERY_SPILL_PROGRESS_ROWS; +use databend_common_metrics::interpreter::METRIC_QUERY_WRITE_PROGRESS_BYTES; +use databend_common_metrics::interpreter::METRIC_QUERY_WRITE_PROGRESS_ROWS; +use parking_lot::Mutex; +use prometheus_client::collector::Collector; +use prometheus_client::encoding::EncodeMetric; +use prometheus_client::metrics::counter::ConstCounter; + +use crate::sessions::SessionManager; + +/// [`SessionManagerMetricsCollector`] dumps the progress metrics of scan/write/spills +/// from the [`SessionManager`]'s running queries to the prometheus. To avoid the progress +/// metrics being decreased, we also need to accumulate these progress values after the query +/// is finished. +#[derive(Clone)] +pub struct SessionManagerMetricsCollector { + inner: Arc>, +} + +pub(crate) struct SessionManagerMetricsCollectorInner { + session_mgr: Option>, + finished_scan_total: ProgressValues, + finished_write_total: ProgressValues, + finished_spill_total: ProgressValues, +} + +impl SessionManagerMetricsCollector { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(SessionManagerMetricsCollectorInner { + session_mgr: None, + finished_scan_total: ProgressValues::default(), + finished_write_total: ProgressValues::default(), + finished_spill_total: ProgressValues::default(), + })), + } + } + + pub fn attach_session_manager(&self, session_mgr: Arc) { + let mut guard = self.inner.lock(); + guard.session_mgr.replace(session_mgr); + } + + pub fn track_finished_query( + &self, + scan: ProgressValues, + write: ProgressValues, + join_spill: ProgressValues, + aggregate_spill: ProgressValues, + group_by_spill: ProgressValues, + window_partition_spill: ProgressValues, + ) { + let mut guard = self.inner.lock(); + guard.finished_scan_total = guard.finished_scan_total.add(&scan); + guard.finished_write_total = guard.finished_write_total.add(&write); + guard.finished_spill_total = guard + .finished_spill_total + .add(&join_spill) + .add(&aggregate_spill) + .add(&group_by_spill) + .add(&window_partition_spill); + } +} + +impl Default for SessionManagerMetricsCollector { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for SessionManagerMetricsCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SessionMetricsCollector") + } +} + +impl Collector for SessionManagerMetricsCollector { + fn encode( + &self, + mut encoder: prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let processes = { + match self.inner.lock().session_mgr.as_ref() { + Some(mgr) => mgr.processes_info(), + None => return Ok(()), + } + }; + + let (mut scan_progress, mut write_progress, mut spill_progress) = { + let guard = self.inner.lock(); + ( + guard.finished_scan_total.clone(), + guard.finished_write_total.clone(), + guard.finished_spill_total.clone(), + ) + }; + for process in processes { + if let Some(scan) = &process.scan_progress_value { + scan_progress = scan_progress.add(scan); + } + if let Some(write) = &process.write_progress_value { + write_progress = write_progress.add(write); + } + if let Some(spill) = &process.spill_progress_value { + spill_progress = spill_progress.add(spill); + } + } + + let metrics = vec![ + ( + METRIC_QUERY_SCAN_PROGRESS_ROWS, + scan_progress.rows as f64, + "Total scan rows in progress.", + ), + ( + METRIC_QUERY_SCAN_PROGRESS_BYTES, + scan_progress.bytes as f64, + "Total scan bytes in progress.", + ), + ( + METRIC_QUERY_WRITE_PROGRESS_ROWS, + write_progress.rows as f64, + "Total write rows in progress.", + ), + ( + METRIC_QUERY_WRITE_PROGRESS_BYTES, + write_progress.bytes as f64, + "Total write bytes in progress.", + ), + ( + METRIC_QUERY_SPILL_PROGRESS_ROWS, + spill_progress.rows as f64, + "Total spill rows in progress.", + ), + ( + METRIC_QUERY_SPILL_PROGRESS_BYTES, + spill_progress.bytes as f64, + "Total spill bytes in progress.", + ), + ]; + + for (name, value, help) in metrics { + let counter = ConstCounter::new(value); + let counter_encoder = + encoder.encode_descriptor(name, help, None, counter.metric_type())?; + counter.encode(counter_encoder)?; + } + + Ok(()) + } +}