diff --git a/bin/src/cli.rs b/bin/src/cli.rs index c6e429a29..cb0aaa3c1 100644 --- a/bin/src/cli.rs +++ b/bin/src/cli.rs @@ -228,6 +228,12 @@ pub enum MetricsCmd { help = "get only the metrics of main process and workers (no cluster metrics)" )] no_clusters: bool, + #[clap( + short = 'w', + long = "workers", + help = "display metrics of each worker, without flattening by cluster id (takes more space)" + )] + workers: bool, }, } diff --git a/bin/src/command/requests.rs b/bin/src/command/requests.rs index c9fbe3939..9938a686f 100644 --- a/bin/src/command/requests.rs +++ b/bin/src/command/requests.rs @@ -532,7 +532,7 @@ impl GatheringTask for QueryMetricsTask { fn on_finish( self: Box, - _server: &mut Server, + server: &mut Server, client: &mut OptionalClient, _timed_out: bool, ) { @@ -579,12 +579,18 @@ impl GatheringTask for QueryMetricsTask { ) .collect(); + let mut aggregated_metrics = AggregatedMetrics { + main: main_metrics, + clusters: BTreeMap::new(), + workers: workers_metrics, + }; + + if !self.options.workers && server.workers.len() > 1 { + aggregated_metrics.merge_cluster_metrics(); + } + client.finish_ok_with_content( - ContentType::Metrics(AggregatedMetrics { - main: main_metrics, - workers: workers_metrics, - }) - .into(), + ContentType::Metrics(aggregated_metrics).into(), "Successfully aggregated all metrics", ); } diff --git a/bin/src/ctl/command.rs b/bin/src/ctl/command.rs index c47f4b20d..7e28e6825 100644 --- a/bin/src/ctl/command.rs +++ b/bin/src/ctl/command.rs @@ -85,6 +85,7 @@ impl CommandManager { cluster_ids: Vec, backend_ids: Vec, no_clusters: bool, + workers: bool, ) -> Result<(), CtlError> { let request: Request = RequestType::QueryMetrics(QueryMetricsOptions { list, @@ -92,6 +93,7 @@ impl CommandManager { backend_ids, metric_names, no_clusters, + workers, }) .into(); diff --git a/bin/src/ctl/mod.rs b/bin/src/ctl/mod.rs index d89808266..a7e34674e 100644 --- a/bin/src/ctl/mod.rs +++ b/bin/src/ctl/mod.rs @@ -123,7 +123,16 @@ impl CommandManager { clusters, backends, no_clusters, - } => self.get_metrics(list, refresh, names, clusters, backends, no_clusters), + workers, + } => self.get_metrics( + list, + refresh, + names, + clusters, + backends, + no_clusters, + workers, + ), _ => self.configure_metrics(cmd), }, SubCmd::Logging { filter } => self.logging_filter(filter), diff --git a/command/src/command.proto b/command/src/command.proto index caa9a1fe2..5aef97b9b 100644 --- a/command/src/command.proto +++ b/command/src/command.proto @@ -441,6 +441,8 @@ message QueryMetricsOptions { repeated string metric_names = 4; // query only worker and main process metrics (no cluster metrics) required bool no_clusters = 5; + // display metrics of each worker, without flattening (takes more space) + required bool workers = 6; } // options to configure metrics collection @@ -576,8 +578,12 @@ message AvailableMetrics { // Aggregated metrics of main process & workers message AggregatedMetrics { + // metric_name -> metric_value map main = 1; + // worker_id -> worker_metrics map workers = 2; + // cluster_id -> cluster_metrics + map clusters = 3; } // All metrics of a worker: proxy and clusters @@ -593,7 +599,7 @@ message WorkerMetrics { message ClusterMetrics { // metric name -> metric value map cluster = 1; - // backend_id -> (metric name-> metric value) + // list of backends with their metrics repeated BackendMetrics backends = 2; } @@ -604,8 +610,11 @@ message BackendMetrics { message FilteredMetrics { oneof inner { + // increases or decrease depending on the state uint64 gauge = 1; + // increases only int64 count = 2; + // milliseconds uint64 time = 3; Percentiles percentiles = 4; FilteredTimeSerie time_serie = 5; diff --git a/command/src/proto/display.rs b/command/src/proto/display.rs index 3302f7512..327a8c782 100644 --- a/command/src/proto/display.rs +++ b/command/src/proto/display.rs @@ -245,6 +245,13 @@ pub fn print_metrics(aggregated_metrics: &AggregatedMetrics) -> Result<(), Displ println!("\nWorker {worker_id}\n========="); print_worker_metrics(worker_metrics)?; } + + // clusters + if !aggregated_metrics.clusters.is_empty() { + println!("\nClusters\n======="); + print_cluster_metrics(&aggregated_metrics.clusters); + } + Ok(()) } diff --git a/command/src/proto/mod.rs b/command/src/proto/mod.rs index c8567afd5..5e838e5f7 100644 --- a/command/src/proto/mod.rs +++ b/command/src/proto/mod.rs @@ -1,3 +1,6 @@ +use std::collections::BTreeMap; + +use command::{filtered_metrics::Inner, AggregatedMetrics, ClusterMetrics, FilteredMetrics}; use prost::DecodeError; /// Contains all types received by and sent from Sōzu @@ -37,3 +40,101 @@ impl From for command::Request { } } } + +impl AggregatedMetrics { + /// Merge cluster metrics that were received from several workers + /// + /// Each workers serves the same clusters and gathers metrics on them, + /// which means we have to reduce each metric from N instances to 1. + pub fn merge_cluster_metrics(&mut self) { + for (_worker_id, worker_metrics) in &self.workers { + for (cluster_id, cluster_metrics) in &worker_metrics.clusters { + for (metric_name, new_value) in &cluster_metrics.cluster { + if !new_value.is_mergeable() { + continue; + } + self.clusters + .entry(cluster_id.to_owned()) + .and_modify(|cluster| { + cluster + .cluster + .entry(metric_name.to_owned()) + .and_modify(|old_value| old_value.merge(new_value)) + .or_insert(new_value.to_owned()); + }) + .or_insert(ClusterMetrics { + cluster: BTreeMap::from([( + metric_name.to_owned(), + new_value.to_owned(), + )]), + backends: Vec::new(), + }); + } + + for backend in &cluster_metrics.backends { + for (metric_name, new_value) in &backend.metrics { + if !new_value.is_mergeable() { + continue; + } + self.clusters + .entry(cluster_id.to_owned()) + .and_modify(|cluster| { + if let Some(present_backend) = + cluster.backends.iter_mut().find(|present_backend| { + present_backend.backend_id == backend.backend_id + }) + { + present_backend + .metrics + .entry(metric_name.to_owned()) + .and_modify(|old_value| old_value.merge(new_value)) + .or_insert(new_value.to_owned()); + } else { + cluster.backends.push(backend.clone()); + } + }) + .or_insert(ClusterMetrics { + cluster: BTreeMap::new(), + backends: vec![backend.clone()], + }); + } + } + } + } + + // remove the copied cluster metrics from the worker metrics + for (_worker_id, worker_metrics) in self.workers.iter_mut() { + worker_metrics.clusters = BTreeMap::new(); + } + } +} + +impl FilteredMetrics { + pub fn merge(&mut self, right: &Self) { + match (&self.inner, &right.inner) { + (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => { + *self = Self { + inner: Some(Inner::Gauge(a + b)), + }; + } + (Some(Inner::Count(a)), Some(Inner::Count(b))) => { + *self = Self { + inner: Some(Inner::Count(a + b)), + }; + } + // TODO: add histograms + _ => {} + } + } + + fn is_mergeable(&self) -> bool { + match &self.inner { + Some(Inner::Gauge(_)) | Some(Inner::Count(_)) => true, + // Inner::Time and Inner::Timeserie are never used in Sōzu + Some(Inner::Time(_)) + | Some(Inner::Percentiles(_)) + | Some(Inner::TimeSerie(_)) + | None => false, + } + } +} diff --git a/lib/src/metrics/local_drain.rs b/lib/src/metrics/local_drain.rs index af91a7fca..0b0574ce8 100644 --- a/lib/src/metrics/local_drain.rs +++ b/lib/src/metrics/local_drain.rs @@ -253,6 +253,7 @@ impl LocalDrain { backend_ids, list, no_clusters, + workers: _workers, } = options; if *list {