Skip to content

Commit

Permalink
Merge pull request #1119 from sozu-proxy/flatten-cluster-metrics
Browse files Browse the repository at this point in the history
merge metrics across workers
  • Loading branch information
Wonshtrum authored Jul 10, 2024
2 parents 4831ed2 + 6b4b96b commit f14a14d
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 11 deletions.
6 changes: 6 additions & 0 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ pub enum MetricsCmd {
help = "get only the metrics of main process and workers (no cluster metrics)"
)]
no_clusters: bool,
#[clap(
short = 'w',
long = "workers",
help = "display metrics of each worker, without merging by metric name or cluster id (takes more space)"
)]
workers: bool,
},
}

Expand Down
19 changes: 13 additions & 6 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl GatheringTask for QueryMetricsTask {

fn on_finish(
self: Box<Self>,
_server: &mut Server,
server: &mut Server,
client: &mut OptionalClient,
_timed_out: bool,
) {
Expand Down Expand Up @@ -579,12 +579,19 @@ impl GatheringTask for QueryMetricsTask {
)
.collect();

let mut aggregated_metrics = AggregatedMetrics {
main: main_metrics,
clusters: BTreeMap::new(),
workers: workers_metrics,
proxying: BTreeMap::new(),
};

if !self.options.workers && server.workers.len() > 1 {
aggregated_metrics.merge_metrics();
}

client.finish_ok_with_content(
ContentType::Metrics(AggregatedMetrics {
main: main_metrics,
workers: workers_metrics,
})
.into(),
ContentType::Metrics(aggregated_metrics).into(),
"Successfully aggregated all metrics",
);
}
Expand Down
2 changes: 2 additions & 0 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ impl CommandManager {
cluster_ids: Vec<String>,
backend_ids: Vec<String>,
no_clusters: bool,
workers: bool,
) -> Result<(), CtlError> {
let request: Request = RequestType::QueryMetrics(QueryMetricsOptions {
list,
cluster_ids,
backend_ids,
metric_names,
no_clusters,
workers,
})
.into();

Expand Down
11 changes: 10 additions & 1 deletion bin/src/ctl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,16 @@ impl CommandManager {
clusters,
backends,
no_clusters,
} => self.get_metrics(list, refresh, names, clusters, backends, no_clusters),
workers,
} => self.get_metrics(
list,
refresh,
names,
clusters,
backends,
no_clusters,
workers,
),
_ => self.configure_metrics(cmd),
},
SubCmd::Logging { filter } => self.logging_filter(filter),
Expand Down
17 changes: 16 additions & 1 deletion command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ message QueryMetricsOptions {
repeated string metric_names = 4;
// query only worker and main process metrics (no cluster metrics)
required bool no_clusters = 5;
// display metrics of each worker, without flattening (takes more space)
required bool workers = 6;
}

// options to configure metrics collection
Expand Down Expand Up @@ -576,8 +578,18 @@ message AvailableMetrics {

// Aggregated metrics of main process & workers
message AggregatedMetrics {
// metrics about the main process.
// metric_name -> metric_value
map<string, FilteredMetrics> main = 1;
// details of worker metrics, with clusters and backends.
// worker_id -> worker_metrics
map<string, WorkerMetrics> workers = 2;
// if present, contains metrics of clusters and their backends, merged across all workers.
// cluster_id -> cluster_metrics
map<string, ClusterMetrics> clusters = 3;
// if present, proxying metrics, merged accross all workers.
// metric_name -> metric_value
map<string, FilteredMetrics> proxying = 4;
}

// All metrics of a worker: proxy and clusters
Expand All @@ -593,7 +605,7 @@ message WorkerMetrics {
message ClusterMetrics {
// metric name -> metric value
map<string, FilteredMetrics> cluster = 1;
// backend_id -> (metric name-> metric value)
// list of backends with their metrics
repeated BackendMetrics backends = 2;
}

Expand All @@ -604,8 +616,11 @@ message BackendMetrics {

message FilteredMetrics {
oneof inner {
// increases or decrease depending on the state
uint64 gauge = 1;
// increases only
int64 count = 2;
// milliseconds
uint64 time = 3;
Percentiles percentiles = 4;
FilteredTimeSerie time_serie = 5;
Expand Down
20 changes: 17 additions & 3 deletions command/src/proto/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,25 @@ pub fn print_metrics(aggregated_metrics: &AggregatedMetrics) -> Result<(), Displ
println!("\nMAIN PROCESS\n============");
print_proxy_metrics(&aggregated_metrics.main);

if aggregated_metrics.proxying.len() != 0 {
println!("\nPROXYING\n============");
print_proxy_metrics(&aggregated_metrics.proxying);
}

// workers
for (worker_id, worker_metrics) in aggregated_metrics.workers.iter() {
println!("\nWorker {worker_id}\n=========");
print_worker_metrics(worker_metrics)?;
for (worker_id, worker) in aggregated_metrics.workers.iter() {
if worker.clusters.len() != 0 && worker.proxy.len() != 0 {
println!("\nWorker {worker_id}\n=========");
print_worker_metrics(worker)?;
}
}

// clusters
if !aggregated_metrics.clusters.is_empty() {
println!("\nClusters\n=======");
print_cluster_metrics(&aggregated_metrics.clusters);
}

Ok(())
}

Expand Down
108 changes: 108 additions & 0 deletions command/src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::collections::BTreeMap;

use command::{filtered_metrics::Inner, AggregatedMetrics, ClusterMetrics, FilteredMetrics};
use prost::DecodeError;

/// Contains all types received by and sent from Sōzu
Expand Down Expand Up @@ -37,3 +40,108 @@ impl From<command::request::RequestType> for command::Request {
}
}
}

impl AggregatedMetrics {
/// Merge metrics that were received from several workers
///
/// Each worker gather the same kind of metrics,
/// for its own proxying logic, and for the same clusters with their backends.
/// This means we have to reduce each metric from N instances to 1.
pub fn merge_metrics(&mut self) {
// avoid copying the worker metrics, by taking them
let workers = std::mem::take(&mut self.workers);

for (_worker_id, worker) in workers {
for (metric_name, new_value) in worker.proxy {
if !new_value.is_mergeable() {
continue;
}
self.proxying
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
}

for (cluster_id, mut cluster_metrics) in worker.clusters {
for (metric_name, new_value) in cluster_metrics.cluster {
if !new_value.is_mergeable() {
continue;
}
self.clusters
.entry(cluster_id.to_owned())
.and_modify(|cluster| {
cluster
.cluster
.entry(metric_name.clone())
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value.clone());
})
.or_insert(ClusterMetrics {
cluster: BTreeMap::from([(metric_name, new_value)]),
backends: Vec::new(),
});
}

for backend in cluster_metrics.backends.drain(..) {
for (metric_name, new_value) in &backend.metrics {
if !new_value.is_mergeable() {
continue;
}
self.clusters
.entry(cluster_id.to_owned())
.and_modify(|cluster| {
let found_backend = cluster
.backends
.iter_mut()
.find(|present| present.backend_id == backend.backend_id);

let Some(existing_backend) = found_backend else {
cluster.backends.push(backend.clone());
return;
};

let _ = existing_backend
.metrics
.entry(metric_name.clone())
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value.to_owned());
})
.or_insert(ClusterMetrics {
cluster: BTreeMap::new(),
backends: vec![backend.clone()],
});
}
}
}
}
}
}

impl FilteredMetrics {
pub fn merge(&mut self, right: &Self) {
match (&self.inner, &right.inner) {
(Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => {
*self = Self {
inner: Some(Inner::Gauge(a + b)),
};
}
(Some(Inner::Count(a)), Some(Inner::Count(b))) => {
*self = Self {
inner: Some(Inner::Count(a + b)),
};
}
_ => {}
}
}

fn is_mergeable(&self) -> bool {
match &self.inner {
Some(Inner::Gauge(_)) | Some(Inner::Count(_)) => true,
// Inner::Time and Inner::Timeserie are never used in Sōzu
Some(Inner::Time(_))
| Some(Inner::Percentiles(_))
| Some(Inner::TimeSerie(_))
| None => false,
}
}
}
1 change: 1 addition & 0 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl LocalDrain {
backend_ids,
list,
no_clusters,
workers: _workers,
} = options;

if *list {
Expand Down

0 comments on commit f14a14d

Please sign in to comment.