Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add a hacky exporter of tokio metrics #241

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ itertools = "0.12"
libc = "0.2"
metrics = "0.23"
metrics-exporter-prometheus = "0.15"
metrics-util = "0.17.0"
moka = { version = "0.12", features = ["sync"] }
parking_lot = { version = "0.12.1" }
parking_lot_core = "0.9.9"
Expand Down Expand Up @@ -104,6 +105,7 @@ tikv-jemallocator = { version = "0.6.0", features = [
tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] }
tl-proto = "0.4"
tokio = { version = "1", default-features = false }
tokio-metrics = { version = "0.3.1" }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["codec"] }
tower = "0.5"
Expand Down Expand Up @@ -138,9 +140,10 @@ tycho-util = { path = "./util", version = "0.1.4" }
weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", rev = "59728ea0c8703dd28a4c37dee05c1321cd81b966" }

[workspace.lints.rust]
future_incompatible = "warn"
nonstandard_style = "warn"
rust_2018_idioms = "warn"
future_incompatible = { level = "warn", priority = -1 }
nonstandard_style = { level = "warn", priority = -1 }
rust_2018_idioms = { level = "warn", priority = -1 }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[workspace.lints.clippy]
all = { level = "warn", priority = -1 }
Expand Down Expand Up @@ -214,6 +217,7 @@ useless_transmute = "warn"
verbose_file_reads = "warn"
zero_sized_map_values = "warn"


[profile.release]
lto = "thin"
codegen-units = 1
Expand Down
3 changes: 3 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ serde = { workspace = true }
serde_json = { workspace = true, features = ["preserve_order"] }
serde_path_to_error = { workspace = true }
tempfile = { workspace = true }
tokio-metrics = { workspace = true, optional = true }
metrics-util = { workspace = true, optional = true }
tikv-jemallocator = { workspace = true, features = [
"unprefixed_malloc_on_supported_platforms",
"background_threads",
Expand Down Expand Up @@ -73,6 +75,7 @@ rustc_version = { workspace = true }
default = ["jemalloc"]
jemalloc = ["dep:tikv-jemallocator", "dep:tikv-jemalloc-ctl", "dep:metrics"]
deadlock-detection = ["parking_lot/deadlock_detection"]
tokio-metrics = ["dep:tokio-metrics", "dep:metrics-util"]
debug = ["tycho-consensus/test"]
lto = ["weedb/lto"]

Expand Down
209 changes: 190 additions & 19 deletions cli/src/cmd/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::PathBuf;

use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use tokio::runtime::Runtime;
use tycho_core::global_config::GlobalConfig;
use tycho_util::cli::logger::{init_logger, set_abort_with_tracing};
use tycho_util::cli::{resolve_public_ip, signal};
Expand Down Expand Up @@ -75,27 +76,24 @@ impl CmdRun {
.stack_size(8 * 1024 * 1024)
.thread_name(|_| "rayon_worker".to_string())
.num_threads(node_config.threads.rayon_threads)
.build_global()
.unwrap();

tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(node_config.threads.tokio_workers)
.build()?
.block_on(async move {
let run_fut = tokio::spawn(self.run_impl(args, node_config));
let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS);
tokio::select! {
res = run_fut => res.unwrap(),
signal = stop_fut => match signal {
Ok(signal) => {
tracing::info!(?signal, "received termination signal");
Ok(())
}
Err(e) => Err(e.into()),
.build_global()?;

let rt = build_tokio_runtime(&node_config)?;

rt.block_on(async move {
let run_fut = tokio::spawn(self.run_impl(args, node_config));
let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS);
tokio::select! {
res = run_fut => res.unwrap(),
signal = stop_fut => match signal {
Ok(signal) => {
tracing::info!(?signal, "received termination signal");
Ok(())
}
Err(e) => Err(e.into()),
}
})
}
})
}

async fn run_impl(self, args: BaseArgs, node_config: NodeConfig) -> Result<()> {
Expand Down Expand Up @@ -157,6 +155,179 @@ impl CmdRun {
}
}

fn build_tokio_runtime(node_config: &NodeConfig) -> Result<Runtime> {
#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
use std::time::Duration;

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
use tokio::runtime::{HistogramConfiguration, LogHistogram};

let mut rt = tokio::runtime::Builder::new_multi_thread();

let num_workers = node_config.threads.tokio_workers;
rt.enable_all().worker_threads(num_workers);

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
let hist_params = LogHistogram::builder()
.precision_exact(2)
.min_value(Duration::from_micros(500))
.max_value(Duration::from_secs(1))
.max_buckets(NUM_BUCKETS)?;

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
const NUM_BUCKETS: usize = 46;
#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
{
rt.enable_metrics_poll_time_histogram()
.metrics_poll_time_histogram_configuration(HistogramConfiguration::log(hist_params));
}

let rt = rt.build()?;

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
rt.spawn(async move {
// extracted from tokio sources
// issue https://github.com/tokio-rs/tokio/issues/7033
fn bucket_range(bucket: usize, p: u32, bucket_offset: usize, num_buckets: usize) -> std::ops::Range<u64> {
let input_bucket = bucket;
let bucket = bucket + bucket_offset;

let range_start_0th_bucket = match input_bucket {
0 => Some(0_u64),
_ => None,
};
let range_end_last_bucket = match input_bucket {
n if n == num_buckets - 1 => Some(u64::MAX),
_ => None,
};

if bucket < 1 << p {
// The first set of buckets are all size 1
let bucket = bucket as u64;
range_start_0th_bucket.unwrap_or(bucket)..range_end_last_bucket.unwrap_or(bucket + 1)
} else {
// Determine which range of buckets we're in, then determine which bucket in the range it is
let bucket = bucket as u64;
let p = p as u64;
let w = (bucket >> p) - 1;
let base_bucket = (w + 1) * (1_u64 << p);
let offset = bucket - base_bucket;
let s = 1_u64 << (w + p);
let start = s + (offset << w);
let end = s + ((offset + 1) << w);

range_start_0th_bucket.unwrap_or(start)..range_end_last_bucket.unwrap_or(end)
}
}

fn fill_log_buckets() -> [f64; NUM_BUCKETS] {
let mut boundaries = [0.0; NUM_BUCKETS];

for i in 0..NUM_BUCKETS {
// parameters are taken from dbg!(&hist_params); above
let range = bucket_range(i, 2, 70, 46);
boundaries[i] = (range.start as f64) / 1_000_000_000.0;
}

boundaries
}
let log_buckets: [f64; NUM_BUCKETS] = fill_log_buckets();


// we can use histogram when https://github.com/metrics-rs/metrics/issues/509 is resolved
// otherwise it will burn CPU and memory
let handle = tokio::runtime::Handle::current();
let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);

const METRIC_NAME: &str = "tycho_tokio_poll_count_time_bucket";
const METRIC_SUM: &str = "tycho_tokio_poll_count_time_sum";
const METRIC_COUNT: &str = "tycho_tokio_poll_count_time_count";

for interval in runtime_monitor.intervals() {
let histogram = interval.poll_count_histogram;

let mut cumulative_count = 0;
let mut sum = 0.0;

// poll time histogram via gauages
for (idx, value) in histogram.iter().enumerate() {
let bucket = log_buckets[idx];
cumulative_count += *value;
let le = format!("{:.6}", bucket);
metrics::gauge!(METRIC_NAME, "le" => le).set(cumulative_count as f64);
sum += bucket * (*value as f64);
}
// Add sum and count
metrics::gauge!(METRIC_SUM).set(sum);
metrics::gauge!(METRIC_COUNT).set(cumulative_count as f64);
// Add +Inf bucket
metrics::gauge!(METRIC_NAME, "le" => "+Inf").set(cumulative_count as f64);

let mean_poll_time = interval.mean_poll_duration.as_secs_f64();
metrics::gauge!("tycho_tokio_mean_poll_time").set(mean_poll_time);

let max_poll_time = interval.mean_poll_duration_worker_max.as_secs_f64();
metrics::gauge!("tycho_tokio_max_poll_time").set(max_poll_time);

let metrics = handle.metrics();
metrics::gauge!("tycho_tokio_num_alive_tasks").set(metrics.num_alive_tasks() as f64);

let global_queue_depth = metrics.global_queue_depth();
metrics::gauge!("tycho_tokio_global_queue_depth").set(global_queue_depth as f64);

let num_blocking_threads = metrics.num_blocking_threads();
metrics::gauge!("tycho_tokio_num_blocking_threads").set(num_blocking_threads as f64);

let spawned_tasks = metrics.spawned_tasks_count();
metrics::gauge!("tycho_tokio_spawned_tasks_count").set(spawned_tasks as f64);


metrics::gauge!("tycho_tokio_num_idle_blocking_threads")
.set(metrics.num_idle_blocking_threads() as f64);

metrics::gauge!("tycho_tokio_injection_queue_depth")
.set(metrics.global_queue_depth() as f64);

let blocking_queue_length = metrics.blocking_queue_depth();
metrics::gauge!("tycho_tokio_blocking_queue_depth").set(blocking_queue_length as f64);

for worker_id in 0..num_workers {
let park_count = metrics.worker_park_count(worker_id);
metrics::gauge!("tycho_tokio_worker_park_count", "worker_id" => format!("{worker_id}")).set(park_count as f64);

let worker_noop_count = metrics.worker_noop_count(worker_id);
metrics::gauge!("tycho_tokio_worker_noop_count", "worker_id" => format!("{worker_id}")).set(worker_noop_count as f64);

let worker_steal_count = metrics.worker_steal_count(worker_id);
metrics::gauge!("tycho_tokio_worker_steal_count", "worker_id" => format!("{worker_id}")).set(worker_steal_count as f64);

let worker_steal_operations = metrics.worker_steal_operations(worker_id);
metrics::gauge!("tycho_tokio_worker_steal_operations", "worker_id" => format!("{worker_id}")).set(worker_steal_operations as f64);

let worker_local_queue_depth = metrics.worker_local_queue_depth(worker_id);
metrics::gauge!("tycho_tokio_worker_local_queue_depth", "worker_id" => format!("{worker_id}")).set(worker_local_queue_depth as f64);

let worker_mean_poll_time = metrics.worker_mean_poll_time(worker_id).as_secs_f64();
metrics::gauge!("tycho_tokio_worker_mean_poll_time", "worker_id" => format!("{worker_id}")).set(worker_mean_poll_time);

let worker_busy_time = metrics.worker_total_busy_duration(worker_id).as_secs_f64();
metrics::gauge!("tycho_tokio_worker_busy_time", "worker_id" => format!("{worker_id}")).set(worker_busy_time);
}
metrics::gauge!("tycho_tokio_io_driver_fd_registered_count").set(metrics.io_driver_fd_registered_count() as f64);
metrics::gauge!("tycho_tokio_io_driver_fd_deregistered_count").set(metrics.io_driver_fd_deregistered_count() as f64);

metrics::gauge!("tycho_tokio_remote_schedule_count").set(metrics.remote_schedule_count() as f64);

metrics::gauge!("tycho_tokio_budget_forced_yield_count").set(metrics.budget_forced_yield_count() as f64);


tokio::time::sleep(Duration::from_millis(5000)).await;
}
});

Ok(rt)
}

fn init_metrics(config: &MetricsConfig) -> Result<()> {
use metrics_exporter_prometheus::Matcher;
const EXPONENTIAL_SECONDS: &[f64] = &[
Expand Down
1 change: 1 addition & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ check_format: install_fmt
# Clippy go brr.
lint:
#cargo clippy --all-targets --all-features --workspace # update when clippy is fixed
export RUSTFLAGS="-cfg tokio_unstable"
cargo clippy --all-targets --all-features -p tycho-block-util -p tycho-core -p tycho-network -p tycho-rpc -p tycho-storage -p tycho-consensus -p tycho-util -p tycho-collator -p tycho-control -p tycho-light-node -p tycho-cli -- -D warnings

# Generates cargo docs.
Expand Down
4 changes: 3 additions & 1 deletion scripts/check-metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def process_metric_arg(
if constant_value:
metric_names.add(constant_value)
else:
if not any(dir in file_path for dir in blacklisted_dirs):
if not any(
dir in file_path for dir in blacklisted_dirs
) and not arg.startswith("format!"):
print(f"Warning: Unresolved metric name '{arg}' in {file_path}")


Expand Down
Loading
Loading