diff --git a/Cargo.lock b/Cargo.lock index f5b8173fd..fef5fcbc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1637,12 +1637,16 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" dependencies = [ + "aho-corasick", "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.14.5", + "indexmap 2.6.0", "metrics", "num_cpus", + "ordered-float", "quanta", + "radix_trie", "sketches-ddsketch", ] @@ -1925,6 +1929,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -3126,9 +3139,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.40.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", @@ -3153,6 +3166,18 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "tokio-metrics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -3471,6 +3496,7 @@ dependencies = [ "humantime", "metrics", "metrics-exporter-prometheus", + "metrics-util", "parking_lot", "rand", "rayon", @@ -3484,6 +3510,7 @@ dependencies = [ "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", + "tokio-metrics", "tracing", "tracing-subscriber", "tycho-block-util", diff --git a/Cargo.toml b/Cargo.toml index 1fc180778..3df965633 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ itertools = "0.12" libc = "0.2" metrics = "0.23" metrics-exporter-prometheus = "0.15" +metrics-util = "0.17.0" moka = { version = "0.12", features = ["sync"] } parking_lot = { version = "0.12.1" } parking_lot_core = "0.9.9" @@ -104,6 +105,7 @@ tikv-jemallocator = { version = "0.6.0", features = [ tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] } tl-proto = "0.4" tokio = { version = "1", default-features = false } +tokio-metrics = { version = "0.3.1" } tokio-stream = "0.1.15" tokio-util = { version = "0.7.10", features = ["codec"] } tower = "0.5" @@ -138,9 +140,10 @@ tycho-util = { path = "./util", version = "0.1.4" } weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", rev = "59728ea0c8703dd28a4c37dee05c1321cd81b966" } [workspace.lints.rust] -future_incompatible = "warn" -nonstandard_style = "warn" -rust_2018_idioms = "warn" +future_incompatible = { level = "warn", priority = -1 } +nonstandard_style = { level = "warn", priority = -1 } +rust_2018_idioms = { level = "warn", priority = -1 } +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } [workspace.lints.clippy] all = { level = "warn", priority = -1 } @@ -214,6 +217,7 @@ useless_transmute = "warn" verbose_file_reads = "warn" zero_sized_map_values = "warn" + [profile.release] lto = "thin" codegen-units = 1 diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 109733f1e..02b893ba1 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -39,6 +39,8 @@ serde = { workspace = true } serde_json = { workspace = true, features = ["preserve_order"] } serde_path_to_error = { workspace = true } tempfile = { workspace = true } +tokio-metrics = { workspace = true, optional = true } +metrics-util = { workspace = true, optional = true } tikv-jemallocator = { workspace = true, features = [ "unprefixed_malloc_on_supported_platforms", "background_threads", @@ -73,6 +75,7 @@ rustc_version = { workspace = true } default = ["jemalloc"] jemalloc = ["dep:tikv-jemallocator", "dep:tikv-jemalloc-ctl", "dep:metrics"] deadlock-detection = ["parking_lot/deadlock_detection"] +tokio-metrics = ["dep:tokio-metrics", "dep:metrics-util"] debug = ["tycho-consensus/test"] lto = ["weedb/lto"] diff --git a/cli/src/cmd/node/mod.rs b/cli/src/cmd/node/mod.rs index 14f548645..7d3a69cf3 100644 --- a/cli/src/cmd/node/mod.rs +++ b/cli/src/cmd/node/mod.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; +use tokio::runtime::Runtime; use tycho_core::global_config::GlobalConfig; use tycho_util::cli::logger::{init_logger, set_abort_with_tracing}; use tycho_util::cli::{resolve_public_ip, signal}; @@ -75,27 +76,24 @@ impl CmdRun { .stack_size(8 * 1024 * 1024) .thread_name(|_| "rayon_worker".to_string()) .num_threads(node_config.threads.rayon_threads) - .build_global() - .unwrap(); - - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(node_config.threads.tokio_workers) - .build()? - .block_on(async move { - let run_fut = tokio::spawn(self.run_impl(args, node_config)); - let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS); - tokio::select! { - res = run_fut => res.unwrap(), - signal = stop_fut => match signal { - Ok(signal) => { - tracing::info!(?signal, "received termination signal"); - Ok(()) - } - Err(e) => Err(e.into()), + .build_global()?; + + let rt = build_tokio_runtime(&node_config)?; + + rt.block_on(async move { + let run_fut = tokio::spawn(self.run_impl(args, node_config)); + let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS); + tokio::select! { + res = run_fut => res.unwrap(), + signal = stop_fut => match signal { + Ok(signal) => { + tracing::info!(?signal, "received termination signal"); + Ok(()) } + Err(e) => Err(e.into()), } - }) + } + }) } async fn run_impl(self, args: BaseArgs, node_config: NodeConfig) -> Result<()> { @@ -157,6 +155,179 @@ impl CmdRun { } } +fn build_tokio_runtime(node_config: &NodeConfig) -> Result { + #[cfg(all(feature = "tokio-metrics", tokio_unstable))] + use std::time::Duration; + + #[cfg(all(feature = "tokio-metrics", tokio_unstable))] + use tokio::runtime::{HistogramConfiguration, LogHistogram}; + + let mut rt = tokio::runtime::Builder::new_multi_thread(); + + let num_workers = node_config.threads.tokio_workers; + rt.enable_all().worker_threads(num_workers); + + #[cfg(all(feature = "tokio-metrics", tokio_unstable))] + let hist_params = LogHistogram::builder() + .precision_exact(2) + .min_value(Duration::from_micros(500)) + .max_value(Duration::from_secs(1)) + .max_buckets(NUM_BUCKETS)?; + + #[cfg(all(feature = "tokio-metrics", tokio_unstable))] + const NUM_BUCKETS: usize = 46; + #[cfg(all(feature = "tokio-metrics", tokio_unstable))] + { + rt.enable_metrics_poll_time_histogram() + .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(hist_params)); + } + + let rt = rt.build()?; + + #[cfg(all(feature = "tokio-metrics", tokio_unstable))] + rt.spawn(async move { + // extracted from tokio sources + // issue https://github.com/tokio-rs/tokio/issues/7033 + fn bucket_range(bucket: usize, p: u32, bucket_offset: usize, num_buckets: usize) -> std::ops::Range { + let input_bucket = bucket; + let bucket = bucket + bucket_offset; + + let range_start_0th_bucket = match input_bucket { + 0 => Some(0_u64), + _ => None, + }; + let range_end_last_bucket = match input_bucket { + n if n == num_buckets - 1 => Some(u64::MAX), + _ => None, + }; + + if bucket < 1 << p { + // The first set of buckets are all size 1 + let bucket = bucket as u64; + range_start_0th_bucket.unwrap_or(bucket)..range_end_last_bucket.unwrap_or(bucket + 1) + } else { + // Determine which range of buckets we're in, then determine which bucket in the range it is + let bucket = bucket as u64; + let p = p as u64; + let w = (bucket >> p) - 1; + let base_bucket = (w + 1) * (1_u64 << p); + let offset = bucket - base_bucket; + let s = 1_u64 << (w + p); + let start = s + (offset << w); + let end = s + ((offset + 1) << w); + + range_start_0th_bucket.unwrap_or(start)..range_end_last_bucket.unwrap_or(end) + } + } + + fn fill_log_buckets() -> [f64; NUM_BUCKETS] { + let mut boundaries = [0.0; NUM_BUCKETS]; + + for i in 0..NUM_BUCKETS { + // parameters are taken from dbg!(&hist_params); above + let range = bucket_range(i, 2, 70, 46); + boundaries[i] = (range.start as f64) / 1_000_000_000.0; + } + + boundaries + } + let log_buckets: [f64; NUM_BUCKETS] = fill_log_buckets(); + + + // we can use histogram when https://github.com/metrics-rs/metrics/issues/509 is resolved + // otherwise it will burn CPU and memory + let handle = tokio::runtime::Handle::current(); + let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle); + + const METRIC_NAME: &str = "tycho_tokio_poll_count_time_bucket"; + const METRIC_SUM: &str = "tycho_tokio_poll_count_time_sum"; + const METRIC_COUNT: &str = "tycho_tokio_poll_count_time_count"; + + for interval in runtime_monitor.intervals() { + let histogram = interval.poll_count_histogram; + + let mut cumulative_count = 0; + let mut sum = 0.0; + + // poll time histogram via gauages + for (idx, value) in histogram.iter().enumerate() { + let bucket = log_buckets[idx]; + cumulative_count += *value; + let le = format!("{:.6}", bucket); + metrics::gauge!(METRIC_NAME, "le" => le).set(cumulative_count as f64); + sum += bucket * (*value as f64); + } + // Add sum and count + metrics::gauge!(METRIC_SUM).set(sum); + metrics::gauge!(METRIC_COUNT).set(cumulative_count as f64); + // Add +Inf bucket + metrics::gauge!(METRIC_NAME, "le" => "+Inf").set(cumulative_count as f64); + + let mean_poll_time = interval.mean_poll_duration.as_secs_f64(); + metrics::gauge!("tycho_tokio_mean_poll_time").set(mean_poll_time); + + let max_poll_time = interval.mean_poll_duration_worker_max.as_secs_f64(); + metrics::gauge!("tycho_tokio_max_poll_time").set(max_poll_time); + + let metrics = handle.metrics(); + metrics::gauge!("tycho_tokio_num_alive_tasks").set(metrics.num_alive_tasks() as f64); + + let global_queue_depth = metrics.global_queue_depth(); + metrics::gauge!("tycho_tokio_global_queue_depth").set(global_queue_depth as f64); + + let num_blocking_threads = metrics.num_blocking_threads(); + metrics::gauge!("tycho_tokio_num_blocking_threads").set(num_blocking_threads as f64); + + let spawned_tasks = metrics.spawned_tasks_count(); + metrics::gauge!("tycho_tokio_spawned_tasks_count").set(spawned_tasks as f64); + + + metrics::gauge!("tycho_tokio_num_idle_blocking_threads") + .set(metrics.num_idle_blocking_threads() as f64); + + metrics::gauge!("tycho_tokio_injection_queue_depth") + .set(metrics.global_queue_depth() as f64); + + let blocking_queue_length = metrics.blocking_queue_depth(); + metrics::gauge!("tycho_tokio_blocking_queue_depth").set(blocking_queue_length as f64); + + for worker_id in 0..num_workers { + let park_count = metrics.worker_park_count(worker_id); + metrics::gauge!("tycho_tokio_worker_park_count", "worker_id" => format!("{worker_id}")).set(park_count as f64); + + let worker_noop_count = metrics.worker_noop_count(worker_id); + metrics::gauge!("tycho_tokio_worker_noop_count", "worker_id" => format!("{worker_id}")).set(worker_noop_count as f64); + + let worker_steal_count = metrics.worker_steal_count(worker_id); + metrics::gauge!("tycho_tokio_worker_steal_count", "worker_id" => format!("{worker_id}")).set(worker_steal_count as f64); + + let worker_steal_operations = metrics.worker_steal_operations(worker_id); + metrics::gauge!("tycho_tokio_worker_steal_operations", "worker_id" => format!("{worker_id}")).set(worker_steal_operations as f64); + + let worker_local_queue_depth = metrics.worker_local_queue_depth(worker_id); + metrics::gauge!("tycho_tokio_worker_local_queue_depth", "worker_id" => format!("{worker_id}")).set(worker_local_queue_depth as f64); + + let worker_mean_poll_time = metrics.worker_mean_poll_time(worker_id).as_secs_f64(); + metrics::gauge!("tycho_tokio_worker_mean_poll_time", "worker_id" => format!("{worker_id}")).set(worker_mean_poll_time); + + let worker_busy_time = metrics.worker_total_busy_duration(worker_id).as_secs_f64(); + metrics::gauge!("tycho_tokio_worker_busy_time", "worker_id" => format!("{worker_id}")).set(worker_busy_time); + } + metrics::gauge!("tycho_tokio_io_driver_fd_registered_count").set(metrics.io_driver_fd_registered_count() as f64); + metrics::gauge!("tycho_tokio_io_driver_fd_deregistered_count").set(metrics.io_driver_fd_deregistered_count() as f64); + + metrics::gauge!("tycho_tokio_remote_schedule_count").set(metrics.remote_schedule_count() as f64); + + metrics::gauge!("tycho_tokio_budget_forced_yield_count").set(metrics.budget_forced_yield_count() as f64); + + + tokio::time::sleep(Duration::from_millis(5000)).await; + } + }); + + Ok(rt) +} + fn init_metrics(config: &MetricsConfig) -> Result<()> { use metrics_exporter_prometheus::Matcher; const EXPONENTIAL_SECONDS: &[f64] = &[ diff --git a/justfile b/justfile index b38cb5464..7b08c444b 100644 --- a/justfile +++ b/justfile @@ -43,6 +43,7 @@ check_format: install_fmt # Clippy go brr. lint: #cargo clippy --all-targets --all-features --workspace # update when clippy is fixed + export RUSTFLAGS="-cfg tokio_unstable" cargo clippy --all-targets --all-features -p tycho-block-util -p tycho-core -p tycho-network -p tycho-rpc -p tycho-storage -p tycho-consensus -p tycho-util -p tycho-collator -p tycho-control -p tycho-light-node -p tycho-cli -- -D warnings # Generates cargo docs. diff --git a/scripts/check-metrics.py b/scripts/check-metrics.py index e2a55c5d2..517c465b9 100644 --- a/scripts/check-metrics.py +++ b/scripts/check-metrics.py @@ -87,7 +87,9 @@ def process_metric_arg( if constant_value: metric_names.add(constant_value) else: - if not any(dir in file_path for dir in blacklisted_dirs): + if not any( + dir in file_path for dir in blacklisted_dirs + ) and not arg.startswith("format!"): print(f"Warning: Unresolved metric name '{arg}' in {file_path}") diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index de8f8c57e..7afd30050 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -67,6 +67,7 @@ def create_gauge_panel( unit_format=UNITS.NUMBER_FORMAT, labels=[], legend_format: str | None = None, + description: str | None = None, ) -> Panel: if isinstance(expr, str): expr = [Expr(metric=expr, label_selectors=labels)] @@ -88,9 +89,7 @@ def create_gauge_panel( targets = [target(e, legend_format=legend_format) for e in expr] return timeseries_panel( - title=title, - targets=targets, - unit=unit_format, + title=title, targets=targets, unit=unit_format, description=description ) @@ -101,6 +100,7 @@ def create_counter_panel( labels_selectors: List[str] = [], legend_format: str | None = None, by_labels: list[str] = ["instance"], + description: str | None = None, ) -> Panel: """ Create a counter panel for visualization. @@ -156,6 +156,7 @@ def create_counter_panel( title=title, targets=targets, unit=unit_format, + description=description, ) @@ -2074,6 +2075,149 @@ def rayon_stats() -> RowPanel: return create_row("Rayon Stats", metrics) +def tokio_overview_panel() -> RowPanel: + metrics = [ + create_gauge_panel( + "tycho_tokio_num_alive_tasks", + "Num Alive Tasks", + UNITS.NUMBER_FORMAT, + description="Current number of alive tasks in the runtime", + ), + create_gauge_panel( + "tycho_tokio_global_queue_depth", + "Global Queue Depth", + UNITS.NUMBER_FORMAT, + description="Number of tasks currently scheduled in the global queue. Should be zero in normal operation.", + ), + create_counter_panel( + "tycho_tokio_spawned_tasks_count", + "Spawned Tasks Count", + description="Number of spawned tasks", + ), + create_counter_panel( + "tycho_tokio_budget_forced_yield_count", + "Forced Yield Count", + UNITS.NUMBER_FORMAT, + description=""" + Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets. + This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. + """, + ), + create_counter_panel( + "tycho_tokio_remote_schedule_count", + "Number of tasks scheduled from outside of the runtime.", + UNITS.NUMBER_FORMAT, + description=""" + Number of tasks which were spawned or notified from a non-runtime thread and must be queued using the Runtime’s injection queue, which tends to be slower. + """, + ), + create_gauge_panel( + "tycho_tokio_injection_queue_depth", + "Injection Queue Depth", + description="Number of tasks in the injection queue", + ), + create_gauge_panel( + "tycho_tokio_blocking_queue_depth", + "Blocking Queue Depth", + description="Number of tasks currently scheduled in the blocking thread pool", + ), + create_gauge_panel( + "tycho_tokio_num_blocking_threads", + "Num Blocking Threads", + description="Number of additional threads spawned by the runtime for blocking operations", + ), + create_gauge_panel( + "tycho_tokio_num_idle_blocking_threads", + "Num Idle Blocking Threads", + description="Number of idle blocking threads", + ), + create_counter_panel( + "tycho_tokio_io_driver_fd_registered_count", + "Registered IO FDs", + description="Number of registered file descriptors", + ), + create_counter_panel( + "tycho_tokio_io_driver_fd_deregistered_count", + "Deregistered IO FDs", + description="Number of deregistered file descriptors", + ), + ] + return create_row("Tokio Overview", metrics) + + +def tokio_details_panel() -> RowPanel: + legend = "{{instance}} - worker: {{worker_id}}" + + metrics = [ + # to supress check-metrics.py warnings + # uses tycho_tokio_poll_count_time_bucket + # uses tycho_tokio_poll_count_time_count + # uses tycho_tokio_poll_count_time_sum + create_heatmap_panel( + "tycho_tokio_poll_count_time", + "Poll Count Time", + yaxis(UNITS.SECONDS), + ), + create_gauge_panel( + "tycho_tokio_mean_poll_time", + "Mean Poll Time", + UNITS.SECONDS, + description="Mean task poll time", + ), + create_gauge_panel( + "tycho_tokio_max_poll_time", + "Max Poll Time", + UNITS.SECONDS, + description="Max task execution time", + ), + create_counter_panel( + "tycho_tokio_worker_park_count", + "Worker Park Count", + legend_format=legend, + description="Number of times worker parked", + ), + create_counter_panel( + "tycho_tokio_worker_noop_count", + "Worker No-op Count", + legend_format=legend, + description="Number of times worker performed no-op", + ), + create_counter_panel( + "tycho_tokio_worker_steal_count", + "Worker Steal Count", + legend_format=legend, + description="Number of tasks stolen from other workers", + ), + create_counter_panel( + "tycho_tokio_worker_steal_operations", + "Worker Steal Operations", + legend_format=legend, + description="Number of times worker tryed to steal new tasks", + ), + create_counter_panel( + "tycho_tokio_worker_busy_time", + "The amount of time the given worker thread has been busy", + legend_format=legend, + description="The worker busy duration starts at zero when the runtime is created and increases whenever the worker is spending time processing work." + "Using this value can indicate the load of the given worker. If a lot of time is spent busy, then the worker is under load and will check for inbound events less often.", + ), + create_gauge_panel( + "tycho_tokio_worker_local_queue_depth", + "Worker Local Queue Depth", + legend_format=legend, + description="Number of tasks in local worker's queue", + ), + create_gauge_panel( + "tycho_tokio_worker_mean_poll_time", + "Mean Worker Poll Time", + legend_format=legend, + unit_format=UNITS.SECONDS, + description="Mean task poll time for worker", + ), + ] + return create_row("Tokio Details", metrics) + + def templates() -> Templating: return Templating( list=[ @@ -2164,6 +2308,8 @@ def templates() -> Templating: net_dht(), allocator_stats(), rayon_stats(), + tokio_overview_panel(), + tokio_details_panel(), jrpc(), jrpc_timings(), ], diff --git a/scripts/install.sh b/scripts/install.sh index 71a7097cc..1a5ec1c97 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -20,13 +20,16 @@ EOF exit 1 fi -features="" +features="--features tokio-metrics" if set_clang_env 19; then features="$features --features lto" echo "INFO: Building node with lto" export RUSTFLAGS="-Clinker-plugin-lto -Clinker=clang -Clink-arg=-fuse-ld=$LD" fi +# merge the features +export RUSTFLAGS="$RUSTFLAGS -Ctarget_cpu=native -Cforce-frame-pointers=true --cfg tokio_unstable" + cargo install $features --path ./cli --locked cat << EOF