From 67a38ac3c3777a52104b2eab4846a1adbc7d55dd Mon Sep 17 00:00:00 2001 From: Micaiah Reid Date: Thu, 8 Feb 2024 14:51:05 -0500 Subject: [PATCH] feat: optionally serve Prometheus metrics (#473) ### Description To enable improved alerts on downtime for Hiro's hosted Chainhook service, we need Chainhook to provide metrics that can be ingested by Prometheus. This PR changes some how we track our metrics (that are served over the `/ping` endpoint of the observer) to enable Prometheus compatibility, and adds a flag to optionally start a server to supply metrics to a Prometheus client. ### Example Starting chainhook with the `--prometheus-port XXXX` flag now enables a service that can supply Prometheus metrics at `localhost:XXXX/metrics`. If using a config file, this option can be specified via: ```yaml [monitoring] prometheus_monitoring_port = XXXX ``` Chainhook will behave as usual with this flag ommitted - metrics can still be retrieved via the observer's `/ping` endpoint, but they will not be formatted for ingestion by a Prometheus client. --- ### Checklist - [X] All tests pass - [X] Tests added in this PR (if applicable) Fixes #474, addresses #466 --- Cargo.lock | 22 + components/chainhook-cli/src/cli/mod.rs | 9 +- components/chainhook-cli/src/config/file.rs | 6 + .../chainhook-cli/src/config/generator.rs | 5 + components/chainhook-cli/src/config/mod.rs | 26 +- .../chainhook-cli/src/config/tests/mod.rs | 34 +- .../src/service/tests/helpers/mock_service.rs | 21 +- .../chainhook-cli/src/service/tests/mod.rs | 26 +- .../src/service/tests/observer_tests.rs | 46 +- components/chainhook-sdk/Cargo.toml | 1 + components/chainhook-sdk/src/lib.rs | 1 + components/chainhook-sdk/src/monitoring.rs | 419 ++++++++++++++++++ components/chainhook-sdk/src/observer/http.rs | 8 +- components/chainhook-sdk/src/observer/mod.rs | 249 ++++------- .../chainhook-sdk/src/observer/tests/mod.rs | 111 ++--- 15 files changed, 727 insertions(+), 257 deletions(-) create mode 100644 components/chainhook-sdk/src/monitoring.rs diff --git a/Cargo.lock b/Cargo.lock index cdff9471d..5f28d809e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,6 +462,7 @@ dependencies = [ "hyper", "lazy_static", "miniscript", + "prometheus", "rand 0.8.5", "regex", "reqwest", @@ -2371,6 +2372,27 @@ dependencies = [ "yansi 1.0.0-rc.1", ] +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "psm" version = "0.1.21" diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index 8ee706a45..ecb6dc96f 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -193,6 +193,9 @@ struct StartCommand { /// Start REST API for managing predicates #[clap(long = "start-http-api")] pub start_http_api: bool, + /// If provided, serves Prometheus metrics at localhost:{port}/metrics. If not specified, does not start Prometheus server. + #[clap(long = "prometheus-port")] + pub prometheus_monitoring_port: Option, } #[derive(Subcommand, PartialEq, Clone, Debug)] @@ -291,9 +294,13 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { match opts.command { Command::Service(subcmd) => match subcmd { ServiceCommand::Start(cmd) => { - let config = + let mut config = Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?; + if cmd.prometheus_monitoring_port.is_some() { + config.monitoring.prometheus_monitoring_port = cmd.prometheus_monitoring_port; + } + let predicates = cmd .predicates_paths .iter() diff --git a/components/chainhook-cli/src/config/file.rs b/components/chainhook-cli/src/config/file.rs index 9f802348b..fcf8b6e18 100644 --- a/components/chainhook-cli/src/config/file.rs +++ b/components/chainhook-cli/src/config/file.rs @@ -7,6 +7,7 @@ pub struct ConfigFile { pub event_source: Option>, pub limits: LimitsConfigFile, pub network: NetworkConfigFile, + pub monitoring: Option, } #[derive(Deserialize, Debug, Clone)] @@ -81,3 +82,8 @@ impl NetworkConfigMode { } } } + +#[derive(Deserialize, Debug, Clone)] +pub struct MonitoringConfigFile { + pub prometheus_monitoring_port: Option, +} diff --git a/components/chainhook-cli/src/config/generator.rs b/components/chainhook-cli/src/config/generator.rs index 68b308be4..7093c403d 100644 --- a/components/chainhook-cli/src/config/generator.rs +++ b/components/chainhook-cli/src/config/generator.rs @@ -45,6 +45,11 @@ max_caching_memory_size_mb = 32000 # If this is not a requirement, you can comment out the `tsv_file_url` line. [[event_source]] tsv_file_url = "https://archive.hiro.so/{network}/stacks-blockchain-api/{network}-stacks-blockchain-api-latest" + +# Enables a server that provides metrics that can be scraped by Prometheus. +# This is disabled by default. +# [monitoring] +# prometheus_monitoring_port = 20457 "#, mode = mode.as_str(), network = network.to_lowercase(), diff --git a/components/chainhook-cli/src/config/mod.rs b/components/chainhook-cli/src/config/mod.rs index b690e623f..fc291659f 100644 --- a/components/chainhook-cli/src/config/mod.rs +++ b/components/chainhook-cli/src/config/mod.rs @@ -31,6 +31,7 @@ pub struct Config { pub event_sources: Vec, pub limits: LimitsConfig, pub network: IndexerConfig, + pub monitoring: MonitoringConfig, } #[derive(Clone, Debug, PartialEq)] @@ -80,6 +81,10 @@ pub struct LimitsConfig { pub max_caching_memory_size_mb: usize, } +#[derive(Clone, Debug, PartialEq)] +pub struct MonitoringConfig { + pub prometheus_monitoring_port: Option, +} impl Config { pub fn from_file_path(file_path: &str) -> Result { let file = File::open(file_path) @@ -120,6 +125,7 @@ impl Config { bitcoin_network: self.network.bitcoin_network.clone(), stacks_network: self.network.stacks_network.clone(), data_handler_tx: None, + prometheus_monitoring_port: self.monitoring.prometheus_monitoring_port, } } @@ -144,7 +150,11 @@ impl Config { continue; } } - + let prometheus_monitoring_port = if let Some(monitoring) = config_file.monitoring { + monitoring.prometheus_monitoring_port + } else { + None + }; let config = Config { storage: StorageConfig { working_dir: config_file.storage.working_dir.unwrap_or("cache".into()), @@ -152,7 +162,7 @@ impl Config { http_api: match config_file.http_api { None => PredicatesApi::Off, Some(http_api) => match http_api.disabled { - Some(false) => PredicatesApi::Off, + Some(true) => PredicatesApi::Off, _ => PredicatesApi::On(PredicatesApiConfig { http_port: http_api.http_port.unwrap_or(DEFAULT_CONTROL_PORT), display_logs: http_api.display_logs.unwrap_or(true), @@ -209,6 +219,9 @@ impl Config { stacks_network, bitcoin_network, }, + monitoring: MonitoringConfig { + prometheus_monitoring_port, + }, }; Ok(config) } @@ -340,6 +353,9 @@ impl Config { stacks_network: StacksNetwork::Devnet, bitcoin_network: BitcoinNetwork::Regtest, }, + monitoring: MonitoringConfig { + prometheus_monitoring_port: None, + }, } } @@ -371,6 +387,9 @@ impl Config { stacks_network: StacksNetwork::Testnet, bitcoin_network: BitcoinNetwork::Testnet, }, + monitoring: MonitoringConfig { + prometheus_monitoring_port: None, + }, } } @@ -402,6 +421,9 @@ impl Config { stacks_network: StacksNetwork::Mainnet, bitcoin_network: BitcoinNetwork::Mainnet, }, + monitoring: MonitoringConfig { + prometheus_monitoring_port: None, + }, } } } diff --git a/components/chainhook-cli/src/config/tests/mod.rs b/components/chainhook-cli/src/config/tests/mod.rs index ec0701e4b..4adf04a8a 100644 --- a/components/chainhook-cli/src/config/tests/mod.rs +++ b/components/chainhook-cli/src/config/tests/mod.rs @@ -1,8 +1,14 @@ use std::path::PathBuf; -use crate::config::{file::NetworkConfigMode, PredicatesApi, PredicatesApiConfig}; - -use super::{generator::generate_config, Config, ConfigFile, EventSourceConfig, PathConfig}; +use crate::config::{ + file::{NetworkConfigMode, PredicatesApiConfigFile}, + PredicatesApi, PredicatesApiConfig, +}; + +use super::{ + file::MonitoringConfigFile, generator::generate_config, Config, ConfigFile, EventSourceConfig, + PathConfig, +}; use chainhook_sdk::types::{BitcoinNetwork, StacksNetwork}; use test_case::test_case; @@ -24,6 +30,28 @@ fn config_from_file_matches_generator_for_all_networks(network: BitcoinNetwork) assert_eq!(generated_config, from_path_config); } +#[test] +fn config_from_file_allows_setting_disabled_fields() { + let generated_config_str = generate_config(&BitcoinNetwork::Regtest); + let mut generated_config_file: ConfigFile = toml::from_str(&generated_config_str).unwrap(); + // http_api and monitoring are optional, so they are disabled in generated config file + generated_config_file.http_api = Some(PredicatesApiConfigFile { + http_port: Some(0), + database_uri: Some(format!("")), + display_logs: Some(false), + disabled: Some(false), + }); + generated_config_file.monitoring = Some(MonitoringConfigFile { + prometheus_monitoring_port: Some(20457), + }); + let generated_config = Config::from_config_file(generated_config_file).unwrap(); + assert!(generated_config.is_http_api_enabled()); + assert_eq!( + generated_config.monitoring.prometheus_monitoring_port, + Some(20457) + ); +} + #[test] fn config_from_file_allows_local_tsv_file() { let path = format!( diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs index 20518105a..890f2ec63 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs @@ -1,6 +1,7 @@ use crate::config::Config; use crate::config::EventSourceConfig; use crate::config::LimitsConfig; +use crate::config::MonitoringConfig; use crate::config::PathConfig; use crate::config::PredicatesApi; use crate::config::PredicatesApiConfig; @@ -12,7 +13,6 @@ use crate::service::Service; use chainhook_sdk::chainhooks::types::ChainhookFullSpecification; use chainhook_sdk::indexer::IndexerConfig; use chainhook_sdk::observer::ObserverCommand; -use chainhook_sdk::observer::ObserverMetrics; use chainhook_sdk::types::BitcoinBlockSignaling; use chainhook_sdk::types::BitcoinNetwork; use chainhook_sdk::types::Chain; @@ -172,7 +172,7 @@ pub async fn call_observer_svc( .map_err(|e| format!("Failed to deserialize response of {method} request to {url}: {e}",)) } -pub async fn call_ping(port: u16) -> Result { +pub async fn call_ping(port: u16) -> Result { let url = format!("http://localhost:{port}/ping"); let res = call_observer_svc(&url, Method::GET, None).await?; match res.get("result") { @@ -182,6 +182,19 @@ pub async fn call_ping(port: u16) -> Result { } } +pub async fn call_prometheus(port: u16) -> Result { + let url = format!("http://localhost:{port}/metrics"); + let client = reqwest::Client::new(); + client + .get(&url) + .send() + .await + .map_err(|e| format!("Failed to make GET request to {url}: {e}",))? + .text() + .await + .map_err(|e| format!("Failed to deserialize response of GET request to {url}: {e}",)) +} + pub async fn build_predicate_api_server(port: u16) -> (Receiver, Shutdown) { let ctx = Context { logger: None, @@ -272,6 +285,7 @@ pub fn get_chainhook_config( bitcoin_rpc_port: u16, working_dir: &str, tsv_dir: &str, + prometheus_port: Option, ) -> Config { let api_config = PredicatesApiConfig { http_port: chainhook_port, @@ -306,6 +320,9 @@ pub fn get_chainhook_config( ingestion_port: stacks_ingestion_port, }), }, + monitoring: MonitoringConfig { + prometheus_monitoring_port: prometheus_port, + }, } } diff --git a/components/chainhook-cli/src/service/tests/mod.rs b/components/chainhook-cli/src/service/tests/mod.rs index 2c7ab47fb..5a9853774 100644 --- a/components/chainhook-cli/src/service/tests/mod.rs +++ b/components/chainhook-cli/src/service/tests/mod.rs @@ -346,18 +346,20 @@ fn _assert_interrupted_status((status, _, _): (PredicateStatus, Option, Opt } } -fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16), String> { +fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16, u16), String> { let redis_port = get_free_port()?; let chainhook_service_port = get_free_port()?; let stacks_rpc_port = get_free_port()?; let stacks_ingestion_port = get_free_port()?; let bitcoin_rpc_port = get_free_port()?; + let prometheus_port = get_free_port()?; Ok(( redis_port, chainhook_service_port, stacks_rpc_port, stacks_ingestion_port, bitcoin_rpc_port, + prometheus_port, )) } @@ -384,13 +386,14 @@ async fn setup_stacks_chainhook_test( starting_chain_tip: u64, redis_seed: Option<(StacksChainhookFullSpecification, PredicateStatus)>, startup_predicates: Option>, -) -> (Child, String, u16, u16, u16, u16) { +) -> (Child, String, u16, u16, u16, u16, u16) { let ( redis_port, chainhook_service_port, stacks_rpc_port, stacks_ingestion_port, bitcoin_rpc_port, + prometheus_port, ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); let mut redis_process = start_redis(redis_port) @@ -451,6 +454,7 @@ async fn setup_stacks_chainhook_test( bitcoin_rpc_port, &working_dir, &tsv_dir, + Some(prometheus_port), ); consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) @@ -477,6 +481,7 @@ async fn setup_stacks_chainhook_test( redis_port, stacks_ingestion_port, bitcoin_rpc_port, + prometheus_port, ) } @@ -504,6 +509,7 @@ async fn test_stacks_predicate_status_is_updated( redis_port, stacks_ingestion_port, _, + _, ) = setup_stacks_chainhook_test(starting_chain_tip, None, None).await; let uuid = &get_random_uuid(); @@ -575,13 +581,14 @@ async fn test_stacks_predicate_status_is_updated( async fn setup_bitcoin_chainhook_test( starting_chain_tip: u64, -) -> (Child, String, u16, u16, u16, u16) { +) -> (Child, String, u16, u16, u16, u16, u16) { let ( redis_port, chainhook_service_port, stacks_rpc_port, stacks_ingestion_port, bitcoin_rpc_port, + prometheus_port, ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); let mut redis_process = start_redis(redis_port) @@ -617,6 +624,7 @@ async fn setup_bitcoin_chainhook_test( bitcoin_rpc_port, &working_dir, &tsv_dir, + Some(prometheus_port), ); start_chainhook_service(config, chainhook_service_port, None, &ctx) @@ -634,6 +642,7 @@ async fn setup_bitcoin_chainhook_test( redis_port, stacks_ingestion_port, bitcoin_rpc_port, + prometheus_port, ) } @@ -659,6 +668,7 @@ async fn test_bitcoin_predicate_status_is_updated( redis_port, stacks_ingestion_port, bitcoin_rpc_port, + _, ) = setup_bitcoin_chainhook_test(starting_chain_tip).await; let uuid = &get_random_uuid(); @@ -757,6 +767,7 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( redis_port, stacks_ingestion_port, bitcoin_rpc_port, + _, ) = setup_bitcoin_chainhook_test(starting_chain_tip).await; let uuid = &get_random_uuid(); @@ -881,7 +892,8 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn test_deregister_predicate(chain: Chain) { - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _) = match &chain { + let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = match &chain + { Chain::Stacks => setup_stacks_chainhook_test(3, None, None).await, Chain::Bitcoin => setup_bitcoin_chainhook_test(3).await, }; @@ -1001,7 +1013,7 @@ async fn test_restarting_with_saved_predicates( let predicate = serde_json::from_value(predicate).expect("failed to set up stacks chanhook spec for test"); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _) = + let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = setup_stacks_chainhook_test(starting_chain_tip, Some((predicate, starting_status)), None) .await; @@ -1044,7 +1056,7 @@ async fn it_allows_specifying_startup_predicate() { let predicate = serde_json::from_value(predicate).expect("failed to set up stacks chanhook spec for test"); let startup_predicate = ChainhookFullSpecification::Stacks(predicate); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _) = + let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; await_new_scanning_status_complete(uuid, chainhook_service_port) @@ -1087,7 +1099,7 @@ async fn register_predicate_responds_409_if_uuid_in_use() { .expect("failed to set up stacks chanhook spec for test"); let startup_predicate = ChainhookFullSpecification::Stacks(stacks_spec); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _) = + let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; let result = call_register_predicate(&predicate, chainhook_service_port) diff --git a/components/chainhook-cli/src/service/tests/observer_tests.rs b/components/chainhook-cli/src/service/tests/observer_tests.rs index 40df05173..f094febd1 100644 --- a/components/chainhook-cli/src/service/tests/observer_tests.rs +++ b/components/chainhook-cli/src/service/tests/observer_tests.rs @@ -12,7 +12,9 @@ use test_case::test_case; use crate::service::tests::{ helpers::{ build_predicates::build_stacks_payload, - mock_service::{call_observer_svc, call_ping, call_register_predicate, flush_redis}, + mock_service::{ + call_observer_svc, call_ping, call_prometheus, call_register_predicate, flush_redis, + }, }, setup_bitcoin_chainhook_test, setup_stacks_chainhook_test, }; @@ -31,6 +33,7 @@ async fn ping_endpoint_returns_metrics() { redis_port, stacks_ingestion_port, _, + _, ) = setup_stacks_chainhook_test(1, None, None).await; let uuid = &get_random_uuid(); @@ -50,8 +53,44 @@ async fn ping_endpoint_returns_metrics() { redis_process.kill().unwrap(); panic!("test failed with error: {e}"); }); + let result = metrics + .get("stacks") + .unwrap() + .get("registered_predicates") + .unwrap(); + assert_eq!(result, 1); + + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); +} + +#[tokio::test] +#[cfg_attr(not(feature = "redis_tests"), ignore)] +async fn prometheus_endpoint_returns_encoded_metrics() { + let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, prometheus_port) = + setup_stacks_chainhook_test(1, None, None).await; + + let uuid = &get_random_uuid(); + let predicate = build_stacks_payload(Some("devnet"), None, None, None, Some(uuid)); + let _ = call_register_predicate(&predicate, chainhook_service_port) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let metrics = call_prometheus(prometheus_port).await.unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + const EXPECTED: &'static str = "# HELP chainhook_stx_registered_predicates The number of Stacks predicates that have been registered by the Chainhook node.\n# TYPE chainhook_stx_registered_predicates gauge\nchainhook_stx_registered_predicates 1\n"; + assert!(metrics.contains(EXPECTED)); - assert_eq!(metrics.stacks.registered_predicates, 1); std::fs::remove_dir_all(&working_dir).unwrap(); flush_redis(redis_port); redis_process.kill().unwrap(); @@ -89,7 +128,7 @@ async fn await_observer_started(port: u16) { #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn bitcoin_rpc_requests_are_forwarded(endpoint: &str, body: Value) { - let (mut redis_process, working_dir, _, redis_port, stacks_ingestion_port, _) = + let (mut redis_process, working_dir, _, redis_port, stacks_ingestion_port, _, _) = setup_bitcoin_chainhook_test(1).await; await_observer_started(stacks_ingestion_port).await; @@ -156,6 +195,7 @@ async fn it_responds_200_for_unimplemented_endpoints( bitcoin_network: BitcoinNetwork::Regtest, stacks_network: chainhook_sdk::types::StacksNetwork::Devnet, data_handler_tx: None, + prometheus_monitoring_port: None, }; start_and_ping_event_observer(config, ingestion_port).await; let url = format!("http://localhost:{ingestion_port}{endpoint}"); diff --git a/components/chainhook-sdk/Cargo.toml b/components/chainhook-sdk/Cargo.toml index c13ef839c..8b0d0beb1 100644 --- a/components/chainhook-sdk/Cargo.toml +++ b/components/chainhook-sdk/Cargo.toml @@ -41,6 +41,7 @@ fxhash = "0.2.1" lazy_static = "1.4.0" regex = "1.9.3" miniscript = "11.0.0" +prometheus = "0.13.3" [dev-dependencies] test-case = "3.1.0" diff --git a/components/chainhook-sdk/src/lib.rs b/components/chainhook-sdk/src/lib.rs index 58188d6a3..cb9f25c1c 100644 --- a/components/chainhook-sdk/src/lib.rs +++ b/components/chainhook-sdk/src/lib.rs @@ -24,6 +24,7 @@ pub use chainhook_types as types; pub mod chainhooks; pub mod indexer; +pub mod monitoring; pub mod observer; pub mod utils; diff --git a/components/chainhook-sdk/src/monitoring.rs b/components/chainhook-sdk/src/monitoring.rs new file mode 100644 index 000000000..ff9ab2b96 --- /dev/null +++ b/components/chainhook-sdk/src/monitoring.rs @@ -0,0 +1,419 @@ +use crate::utils::Context; + +use hiro_system_kit::slog; +use hyper::{ + header::CONTENT_TYPE, + service::{make_service_fn, service_fn}, + Body, Method, Request, Response, Server, +}; +use prometheus::{ + self, + core::{AtomicU64, GenericGauge}, + Encoder, IntGauge, Registry, TextEncoder, +}; +use rocket::serde::json::{json, Value as JsonValue}; +use std::time::{SystemTime, UNIX_EPOCH}; + +type UInt64Gauge = GenericGauge; + +#[derive(Debug, Clone)] +pub struct PrometheusMonitoring { + pub stx_highest_block_ingested: UInt64Gauge, + pub stx_last_reorg_timestamp: IntGauge, + pub stx_last_reorg_applied_blocks: UInt64Gauge, + pub stx_last_reorg_rolled_back_blocks: UInt64Gauge, + pub stx_last_block_ingestion_time: UInt64Gauge, + pub stx_registered_predicates: UInt64Gauge, + pub stx_deregistered_predicates: UInt64Gauge, + pub btc_highest_block_ingested: UInt64Gauge, + pub btc_last_reorg_timestamp: IntGauge, + pub btc_last_reorg_applied_blocks: UInt64Gauge, + pub btc_last_reorg_rolled_back_blocks: UInt64Gauge, + pub btc_last_block_ingestion_time: UInt64Gauge, + pub btc_registered_predicates: UInt64Gauge, + pub btc_deregistered_predicates: UInt64Gauge, + pub registry: Registry, +} + +impl PrometheusMonitoring { + pub fn new() -> PrometheusMonitoring { + let registry = Registry::new(); + let stx_highest_block_ingested = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_stx_highest_block_ingested", + "The highest Stacks block ingested by the Chainhook node.", + ); + let stx_last_reorg_timestamp = PrometheusMonitoring::create_and_register_int_gauge( + ®istry, + "chainhook_stx_last_reorg_timestamp", + "The timestamp of the latest Stacks reorg ingested by the Chainhook node.", + ); + let stx_last_reorg_applied_blocks = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_stx_last_reorg_applied_blocks", + "The number of blocks applied to the Stacks chain as part of the latest Stacks reorg.", + ); + let stx_last_reorg_rolled_back_blocks = + PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_stx_last_reorg_rolled_back_blocks", + "The number of blocks rolled back from the Stacks chain as part of the latest Stacks reorg.", + ); + let stx_last_block_ingestion_time = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_stx_last_block_ingestion_time", + "The time that the Chainhook node last ingested a Stacks block.", + ); + let stx_registered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_stx_registered_predicates", + "The number of Stacks predicates that have been registered by the Chainhook node.", + ); + let stx_deregistered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_stx_deregistered_predicates", + "The number of Stacks predicates that have been deregistered by the Chainhook node.", + ); + let btc_highest_block_ingested = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_btc_highest_block_ingested", + "The highest Bitcoin block ingested by the Chainhook node.", + ); + let btc_last_reorg_timestamp = PrometheusMonitoring::create_and_register_int_gauge( + ®istry, + "chainhook_btc_last_reorg_timestamp", + "The timestamp of the latest Bitcoin reorg ingested by the Chainhook node.", + ); + let btc_last_reorg_applied_blocks = + PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_btc_last_reorg_applied_blocks", + "The number of blocks applied to the Bitcoin chain as part of the latest Bitcoin reorg.", + ); + let btc_last_reorg_rolled_back_blocks = + PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_btc_last_reorg_rolled_back_blocks", + "The number of blocks rolled back from the Bitcoin chain as part of the latest Bitcoin reorg.", + ); + let btc_last_block_ingestion_time = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_btc_last_block_ingestion_time", + "The time that the Chainhook node last ingested a Bitcoin block.", + ); + let btc_registered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_btc_registered_predicates", + "The number of Bitcoin predicates that have been registered by the Chainhook node.", + ); + let btc_deregistered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( + ®istry, + "chainhook_btc_deregistered_predicates", + "The number of Bitcoin predicates that have been deregistered by the Chainhook node.", + ); + + PrometheusMonitoring { + stx_highest_block_ingested, + stx_last_reorg_timestamp, + stx_last_reorg_applied_blocks, + stx_last_reorg_rolled_back_blocks, + stx_last_block_ingestion_time, + stx_registered_predicates, + stx_deregistered_predicates, + btc_highest_block_ingested, + btc_last_reorg_timestamp, + btc_last_reorg_applied_blocks, + btc_last_reorg_rolled_back_blocks, + btc_last_block_ingestion_time, + btc_registered_predicates, + btc_deregistered_predicates, + registry, + } + } + + pub fn create_and_register_uint64_gauge( + registry: &Registry, + name: &str, + help: &str, + ) -> UInt64Gauge { + let g = UInt64Gauge::new(name, help).unwrap(); + registry.register(Box::new(g.clone())).unwrap(); + g + } + + pub fn create_and_register_int_gauge(registry: &Registry, name: &str, help: &str) -> IntGauge { + let g = IntGauge::new(name, help).unwrap(); + registry.register(Box::new(g.clone())).unwrap(); + g + } + + pub fn stx_metrics_deregister_predicate(&self) { + self.stx_registered_predicates.dec(); + self.stx_deregistered_predicates.inc(); + } + + pub fn stx_metrics_register_predicate(&self) { + self.stx_registered_predicates.inc(); + } + pub fn stx_metrics_set_registered_predicates(&self, registered_predicates: u64) { + self.stx_registered_predicates.set(registered_predicates); + } + + pub fn stx_metrics_set_reorg( + &self, + timestamp: i64, + applied_blocks: u64, + rolled_back_blocks: u64, + ) { + self.stx_last_reorg_timestamp.set(timestamp); + self.stx_last_reorg_applied_blocks.set(applied_blocks); + self.stx_last_reorg_rolled_back_blocks + .set(rolled_back_blocks); + } + + pub fn stx_metrics_ingest_block(&self, new_block_height: u64) { + let highest_ingested = self.stx_highest_block_ingested.get(); + if new_block_height > highest_ingested { + self.stx_highest_block_ingested.set(new_block_height); + } + let time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Could not get current time in ms") + .as_secs() as u64; + self.stx_last_block_ingestion_time.set(time); + } + + pub fn btc_metrics_deregister_predicate(&self) { + self.btc_registered_predicates.dec(); + self.btc_deregistered_predicates.inc(); + } + + pub fn btc_metrics_register_predicate(&self) { + self.btc_registered_predicates.inc(); + } + + pub fn btc_metrics_set_registered_predicates(&self, registered_predicates: u64) { + self.btc_registered_predicates.set(registered_predicates); + } + + pub fn btc_metrics_set_reorg( + &self, + timestamp: i64, + applied_blocks: u64, + rolled_back_blocks: u64, + ) { + self.btc_last_reorg_timestamp.set(timestamp); + self.btc_last_reorg_applied_blocks.set(applied_blocks); + self.btc_last_reorg_rolled_back_blocks + .set(rolled_back_blocks); + } + + pub fn btc_metrics_ingest_block(&self, new_block_height: u64) { + let highest_ingested = self.btc_highest_block_ingested.get(); + if new_block_height > highest_ingested { + self.btc_highest_block_ingested.set(new_block_height); + } + let time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Could not get current time in ms") + .as_secs() as u64; + self.btc_last_block_ingestion_time.set(time); + } + + pub fn get_metrics(&self) -> JsonValue { + json!({ + "bitcoin": { + "tip_height": self.btc_highest_block_ingested.get(), + "last_block_ingestion_at": self.btc_last_block_ingestion_time.get(), + "last_reorg": { + "timestamp": self.btc_last_reorg_timestamp.get(), + "applied_blocks": self.btc_last_reorg_applied_blocks.get(), + "rolled_back_blocks": self.btc_last_reorg_rolled_back_blocks.get(), + }, + "registered_predicates": self.btc_registered_predicates.get(), + "deregistered_predicates": self.btc_deregistered_predicates.get(), + }, + "stacks": { + "tip_height": self.stx_highest_block_ingested.get(), + "last_block_ingestion_at": self.stx_last_block_ingestion_time.get(), + "last_reorg": { + "timestamp": self.stx_last_reorg_timestamp.get(), + "applied_blocks": self.stx_last_reorg_applied_blocks.get(), + "rolled_back_blocks": self.stx_last_reorg_rolled_back_blocks.get(), + }, + "registered_predicates": self.stx_registered_predicates.get(), + "deregistered_predicates": self.stx_deregistered_predicates.get(), + } + }) + } +} + +async fn serve_req( + req: Request, + registry: Registry, + ctx: Context, +) -> Result, hyper::Error> { + match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => { + ctx.try_log(|logger| { + slog::debug!( + logger, + "Prometheus monitoring: respsonding to metrics request" + ) + }); + + let encoder = TextEncoder::new(); + let metric_families = registry.gather(); + let mut buffer = vec![]; + let response = match encoder.encode(&metric_families, &mut buffer) { + Ok(_) => Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap(), + Err(e) => { + ctx.try_log(|logger| { + slog::debug!( + logger, + "Prometheus monitoring: failed to encode metrics: {}", + e.to_string() + ) + }); + Response::builder().status(500).body(Body::empty()).unwrap() + } + }; + Ok(response) + } + (_, _) => { + ctx.try_log(|logger| { + slog::debug!( + logger, + "Prometheus monitoring: received request with invalid method/route: {}/{}", + req.method(), + req.uri().path() + ) + }); + let response = Response::builder().status(404).body(Body::empty()).unwrap(); + + Ok(response) + } + } +} + +pub async fn start_serving_prometheus_metrics(port: u16, registry: Registry, ctx: Context) { + let addr = ([0, 0, 0, 0], port).into(); + let ctx_clone = ctx.clone(); + let make_svc = make_service_fn(|_| { + let registry = registry.clone(); + let ctx_clone = ctx_clone.clone(); + async move { + Ok::<_, hyper::Error>(service_fn(move |r| { + serve_req(r, registry.clone(), ctx_clone.clone()) + })) + } + }); + let serve_future = Server::bind(&addr).serve(make_svc); + + ctx.try_log(|logger| slog::info!(logger, "Prometheus monitoring: listening on port {}", port)); + + if let Err(err) = serve_future.await { + ctx.try_log(|logger| slog::warn!(logger, "Prometheus monitoring: server error: {}", err)); + } +} + +#[cfg(test)] +mod test { + use std::{thread::sleep, time::Duration}; + + use super::PrometheusMonitoring; + + #[test] + fn it_tracks_stx_predicate_registration_deregistration_with_defaults() { + let prometheus = PrometheusMonitoring::new(); + assert_eq!(prometheus.stx_registered_predicates.get(), 0); + assert_eq!(prometheus.stx_deregistered_predicates.get(), 0); + prometheus.stx_metrics_set_registered_predicates(10); + assert_eq!(prometheus.stx_registered_predicates.get(), 10); + assert_eq!(prometheus.stx_deregistered_predicates.get(), 0); + prometheus.stx_metrics_register_predicate(); + assert_eq!(prometheus.stx_registered_predicates.get(), 11); + assert_eq!(prometheus.stx_deregistered_predicates.get(), 0); + prometheus.stx_metrics_deregister_predicate(); + assert_eq!(prometheus.stx_registered_predicates.get(), 10); + assert_eq!(prometheus.stx_deregistered_predicates.get(), 1); + } + + #[test] + fn it_tracks_stx_reorgs() { + let prometheus = PrometheusMonitoring::new(); + assert_eq!(prometheus.stx_last_reorg_timestamp.get(), 0); + assert_eq!(prometheus.stx_last_reorg_applied_blocks.get(), 0); + assert_eq!(prometheus.stx_last_reorg_rolled_back_blocks.get(), 0); + prometheus.stx_metrics_set_reorg(10000, 1, 1); + assert_eq!(prometheus.stx_last_reorg_timestamp.get(), 10000); + assert_eq!(prometheus.stx_last_reorg_applied_blocks.get(), 1); + assert_eq!(prometheus.stx_last_reorg_rolled_back_blocks.get(), 1); + } + + #[test] + fn it_tracks_stx_block_ingestion() { + let prometheus = PrometheusMonitoring::new(); + assert_eq!(prometheus.stx_highest_block_ingested.get(), 0); + assert_eq!(prometheus.stx_last_block_ingestion_time.get(), 0); + prometheus.stx_metrics_ingest_block(100); + assert_eq!(prometheus.stx_highest_block_ingested.get(), 100); + let time = prometheus.stx_last_block_ingestion_time.get(); + assert!(time > 0); + // ingesting a block lower than previous tip will + // update ingestion time but not highest block ingested + sleep(Duration::new(1, 0)); + prometheus.stx_metrics_ingest_block(99); + assert_eq!(prometheus.stx_highest_block_ingested.get(), 100); + assert!(prometheus.stx_last_block_ingestion_time.get() > time); + } + + #[test] + fn it_tracks_btc_predicate_registration_deregistration_with_defaults() { + let prometheus = PrometheusMonitoring::new(); + assert_eq!(prometheus.btc_registered_predicates.get(), 0); + assert_eq!(prometheus.btc_deregistered_predicates.get(), 0); + prometheus.btc_metrics_set_registered_predicates(10); + assert_eq!(prometheus.btc_registered_predicates.get(), 10); + assert_eq!(prometheus.btc_deregistered_predicates.get(), 0); + prometheus.btc_metrics_register_predicate(); + assert_eq!(prometheus.btc_registered_predicates.get(), 11); + assert_eq!(prometheus.btc_deregistered_predicates.get(), 0); + prometheus.btc_metrics_deregister_predicate(); + assert_eq!(prometheus.btc_registered_predicates.get(), 10); + assert_eq!(prometheus.btc_deregistered_predicates.get(), 1); + } + + #[test] + fn it_tracks_btc_reorgs() { + let prometheus = PrometheusMonitoring::new(); + assert_eq!(prometheus.btc_last_reorg_timestamp.get(), 0); + assert_eq!(prometheus.btc_last_reorg_applied_blocks.get(), 0); + assert_eq!(prometheus.btc_last_reorg_rolled_back_blocks.get(), 0); + prometheus.btc_metrics_set_reorg(10000, 1, 1); + assert_eq!(prometheus.btc_last_reorg_timestamp.get(), 10000); + assert_eq!(prometheus.btc_last_reorg_applied_blocks.get(), 1); + assert_eq!(prometheus.btc_last_reorg_rolled_back_blocks.get(), 1); + } + + #[test] + fn it_tracks_btc_block_ingestion() { + let prometheus = PrometheusMonitoring::new(); + assert_eq!(prometheus.btc_highest_block_ingested.get(), 0); + assert_eq!(prometheus.btc_last_block_ingestion_time.get(), 0); + prometheus.btc_metrics_ingest_block(100); + assert_eq!(prometheus.btc_highest_block_ingested.get(), 100); + let time = prometheus.btc_last_block_ingestion_time.get(); + assert!(time > 0); + // ingesting a block lower than previous tip will + // update ingestion time but not highest block ingested + sleep(Duration::new(1, 0)); + prometheus.btc_metrics_ingest_block(99); + assert_eq!(prometheus.btc_highest_block_ingested.get(), 100); + assert!(prometheus.btc_last_block_ingestion_time.get() > time); + } +} diff --git a/components/chainhook-sdk/src/observer/http.rs b/components/chainhook-sdk/src/observer/http.rs index 5e0a1f158..25da770d9 100644 --- a/components/chainhook-sdk/src/observer/http.rs +++ b/components/chainhook-sdk/src/observer/http.rs @@ -2,6 +2,7 @@ use crate::indexer::bitcoin::{ build_http_client, download_and_parse_block_with_retry, NewBitcoinBlock, }; use crate::indexer::{self, Indexer}; +use crate::monitoring::PrometheusMonitoring; use crate::utils::Context; use hiro_system_kit::slog; use rocket::serde::json::{json, Json, Value as JsonValue}; @@ -10,19 +11,20 @@ use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex, RwLock}; use super::{ - BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand, ObserverMetrics, + BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand, StacksChainMempoolEvent, }; #[rocket::get("/ping", format = "application/json")] pub fn handle_ping( ctx: &State, - metrics_rw_lock: &State>>, + prometheus_monitoring: &State, ) -> Json { ctx.try_log(|logger| slog::info!(logger, "GET /ping")); + Json(json!({ "status": 200, - "result": metrics_rw_lock.inner(), + "result": prometheus_monitoring.get_metrics(), })) } diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index 170b47c2b..9bfa00c91 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -19,6 +19,7 @@ use crate::indexer::bitcoin::{ BitcoinBlockFullBreakdown, }; use crate::indexer::{Indexer, IndexerConfig}; +use crate::monitoring::{start_serving_prometheus_metrics, PrometheusMonitoring}; use crate::utils::{send_request, Context}; use bitcoincore_rpc::bitcoin::{BlockHash, Txid}; @@ -42,7 +43,6 @@ use std::str; use std::str::FromStr; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::{SystemTime, UNIX_EPOCH}; pub const DEFAULT_INGESTION_PORT: u16 = 20445; @@ -79,6 +79,7 @@ pub struct EventObserverConfig { pub bitcoin_network: BitcoinNetwork, pub stacks_network: StacksNetwork, pub data_handler_tx: Option>, + pub prometheus_monitoring_port: Option, } #[derive(Deserialize, Debug, Clone)] @@ -137,6 +138,9 @@ impl EventObserverConfig { } } + /// Helper to allow overriding some default fields in creating a new EventObserverConfig. + /// + /// *Note: This is used by external crates, so it should not be removed, even if not used internally by Chainhook.* pub fn new_using_overrides( overrides: Option<&EventObserverConfigOverrides>, ) -> Result { @@ -186,6 +190,7 @@ impl EventObserverConfig { bitcoin_network, stacks_network, data_handler_tx: None, + prometheus_monitoring_port: None, }; Ok(config) } @@ -346,35 +351,6 @@ impl ChainhookStore { } } -#[derive(Debug, Default, Serialize, Deserialize, Clone)] -pub struct ReorgMetrics { - timestamp: i64, - applied_blocks: usize, - rolled_back_blocks: usize, -} - -#[derive(Debug, Default, Serialize, Deserialize, Clone)] -pub struct ChainMetrics { - pub tip_height: u64, - pub last_reorg: Option, - pub last_block_ingestion_at: u128, - pub registered_predicates: usize, - pub deregistered_predicates: usize, -} - -impl ChainMetrics { - pub fn deregister_prediate(&mut self) { - self.registered_predicates -= 1; - self.deregistered_predicates += 1; - } -} - -#[derive(Debug, Default, Serialize, Deserialize, Clone)] -pub struct ObserverMetrics { - pub bitcoin: ChainMetrics, - pub stacks: ChainMetrics, -} - #[derive(Debug, Clone)] pub struct BitcoinBlockDataCached { pub block: BitcoinBlockData, @@ -515,18 +491,6 @@ pub async fn start_bitcoin_event_observer( ) -> Result<(), Box> { let chainhook_store = config.get_chainhook_store(); - let observer_metrics = ObserverMetrics { - bitcoin: ChainMetrics { - registered_predicates: 0, - ..Default::default() - }, - stacks: ChainMetrics { - registered_predicates: 0, - ..Default::default() - }, - }; - let observer_metrics_rw_lock = Arc::new(RwLock::new(observer_metrics)); - #[cfg(feature = "zeromq")] { let ctx_moved = ctx.clone(); @@ -538,6 +502,26 @@ pub async fn start_bitcoin_event_observer( }); } + let prometheus_monitoring = PrometheusMonitoring::new(); + prometheus_monitoring.stx_metrics_set_registered_predicates( + chainhook_store.predicates.stacks_chainhooks.len() as u64, + ); + prometheus_monitoring.btc_metrics_set_registered_predicates( + chainhook_store.predicates.bitcoin_chainhooks.len() as u64, + ); + + if let Some(port) = config.prometheus_monitoring_port { + let registry_moved = prometheus_monitoring.registry.clone(); + let ctx_cloned = ctx.clone(); + let _ = std::thread::spawn(move || { + let _ = hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics( + port, + registry_moved, + ctx_cloned, + )); + }); + } + // This loop is used for handling background jobs, emitted by HTTP calls. start_observer_commands_handler( config, @@ -545,7 +529,7 @@ pub async fn start_bitcoin_event_observer( observer_commands_rx, observer_events_tx, None, - observer_metrics_rw_lock.clone(), + prometheus_monitoring, observer_sidecar, ctx, ) @@ -591,17 +575,25 @@ pub async fn start_stacks_event_observer( let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone())); - let observer_metrics = ObserverMetrics { - bitcoin: ChainMetrics { - registered_predicates: chainhook_store.predicates.bitcoin_chainhooks.len(), - ..Default::default() - }, - stacks: ChainMetrics { - registered_predicates: chainhook_store.predicates.stacks_chainhooks.len(), - ..Default::default() - }, - }; - let observer_metrics_rw_lock = Arc::new(RwLock::new(observer_metrics)); + let prometheus_monitoring = PrometheusMonitoring::new(); + prometheus_monitoring.stx_metrics_set_registered_predicates( + chainhook_store.predicates.stacks_chainhooks.len() as u64, + ); + prometheus_monitoring.btc_metrics_set_registered_predicates( + chainhook_store.predicates.bitcoin_chainhooks.len() as u64, + ); + + if let Some(port) = config.prometheus_monitoring_port { + let registry_moved = prometheus_monitoring.registry.clone(); + let ctx_cloned = ctx.clone(); + let _ = std::thread::spawn(move || { + let _ = hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics( + port, + registry_moved, + ctx_cloned, + )); + }); + } let limits = Limits::default().limit("json", 20.megabytes()); let mut shutdown_config = config::Shutdown::default(); @@ -645,7 +637,7 @@ pub async fn start_stacks_event_observer( .manage(background_job_tx_mutex) .manage(bitcoin_config) .manage(ctx_cloned) - .manage(observer_metrics_rw_lock.clone()) + .manage(prometheus_monitoring.clone()) .mount("/", routes) .ignite() .await?; @@ -662,7 +654,7 @@ pub async fn start_stacks_event_observer( observer_commands_rx, observer_events_tx, ingestion_shutdown, - observer_metrics_rw_lock.clone(), + prometheus_monitoring, observer_sidecar, ctx, ) @@ -743,7 +735,7 @@ pub async fn start_observer_commands_handler( observer_commands_rx: Receiver, observer_events_tx: Option>, ingestion_shutdown: Option, - observer_metrics: Arc>, + prometheus_monitoring: PrometheusMonitoring, observer_sidecar: Option, ctx: Context, ) -> Result<(), Box> { @@ -816,21 +808,9 @@ pub async fn start_observer_commands_handler( } }; }; - match observer_metrics.write() { - Ok(mut metrics) => { - if block.block_identifier.index > metrics.bitcoin.tip_height { - metrics.bitcoin.tip_height = block.block_identifier.index; - } - metrics.bitcoin.last_block_ingestion_at = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Could not get current time in ms") - .as_millis() - .into(); - } - Err(e) => ctx.try_log(|logger| { - slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) - }), - }; + + prometheus_monitoring.btc_metrics_ingest_block(block.block_identifier.index); + bitcoin_block_store.insert( block.block_identifier.clone(), BitcoinBlockDataCached { @@ -991,22 +971,11 @@ pub async fn start_observer_commands_handler( .iter() .max_by_key(|b| b.block_identifier.index) { - Some(highest_tip_block) => match observer_metrics.write() { - Ok(mut metrics) => { - metrics.bitcoin.last_reorg = Some(ReorgMetrics { - timestamp: highest_tip_block.timestamp.into(), - applied_blocks: blocks_to_apply.len(), - rolled_back_blocks: blocks_to_rollback.len(), - }); - } - Err(e) => ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire observer_metrics_rw_lock:{}", - e - ) - }), - }, + Some(highest_tip_block) => prometheus_monitoring.btc_metrics_set_reorg( + highest_tip_block.timestamp.into(), + blocks_to_apply.len() as u64, + blocks_to_rollback.len() as u64, + ), None => {} } @@ -1149,16 +1118,7 @@ pub async fn start_observer_commands_handler( .predicates .deregister_bitcoin_hook(hook_uuid.clone()) { - match observer_metrics.write() { - Ok(mut metrics) => metrics.bitcoin.deregister_prediate(), - Err(e) => ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire observer_metrics_rw_lock:{}", - e - ) - }), - } + prometheus_monitoring.btc_metrics_deregister_predicate(); if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateDeregistered( @@ -1210,28 +1170,10 @@ pub async fn start_observer_commands_handler( .iter() .max_by_key(|b| b.block.block_identifier.index) { - Some(highest_tip_update) => match observer_metrics.write() { - Ok(mut metrics) => { - if highest_tip_update.block.block_identifier.index - > metrics.stacks.tip_height - { - metrics.stacks.tip_height = - highest_tip_update.block.block_identifier.index; - } - metrics.stacks.last_block_ingestion_at = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Could not get current time in ms") - .as_millis() - .into(); - } - Err(e) => ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire observer_metrics_rw_lock:{}", - e - ) - }), - }, + Some(highest_tip_update) => prometheus_monitoring + .stx_metrics_ingest_block( + highest_tip_update.block.block_identifier.index, + ), None => {} } } @@ -1241,22 +1183,12 @@ pub async fn start_observer_commands_handler( .iter() .max_by_key(|b| b.block.block_identifier.index) { - Some(highest_tip_update) => match observer_metrics.write() { - Ok(mut metrics) => { - metrics.stacks.last_reorg = Some(ReorgMetrics { - timestamp: highest_tip_update.block.timestamp.into(), - applied_blocks: update.blocks_to_apply.len(), - rolled_back_blocks: update.blocks_to_rollback.len(), - }); - } - Err(e) => ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire observer_metrics_rw_lock:{}", - e - ) - }), - }, + Some(highest_tip_update) => prometheus_monitoring + .stx_metrics_set_reorg( + highest_tip_update.block.timestamp, + update.blocks_to_apply.len() as u64, + update.blocks_to_rollback.len() as u64, + ), None => {} } } @@ -1344,17 +1276,7 @@ pub async fn start_observer_commands_handler( .predicates .deregister_stacks_hook(hook_uuid.clone()) { - match observer_metrics.write() { - Ok(mut metrics) => metrics.stacks.deregister_prediate(), - - Err(e) => ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire observer_metrics_rw_lock:{}", - e - ) - }), - } + prometheus_monitoring.stx_metrics_deregister_predicate(); if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateDeregistered( @@ -1417,18 +1339,13 @@ pub async fn start_observer_commands_handler( } }; - match observer_metrics.write() { - Ok(mut metrics) => match spec { - ChainhookSpecification::Bitcoin(_) => { - metrics.bitcoin.registered_predicates += 1 - } - ChainhookSpecification::Stacks(_) => { - metrics.stacks.registered_predicates += 1 - } - }, - Err(e) => ctx.try_log(|logger| { - slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) - }), + match spec { + ChainhookSpecification::Bitcoin(_) => { + prometheus_monitoring.btc_metrics_register_predicate() + } + ChainhookSpecification::Stacks(_) => { + prometheus_monitoring.stx_metrics_register_predicate() + } }; ctx.try_log(|logger| slog::info!(logger, "Registering chainhook {}", spec.uuid(),)); @@ -1452,12 +1369,7 @@ pub async fn start_observer_commands_handler( }); let hook = chainhook_store.predicates.deregister_stacks_hook(hook_uuid); - match observer_metrics.write() { - Ok(mut metrics) => metrics.stacks.deregister_prediate(), - Err(e) => ctx.try_log(|logger| { - slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) - }), - } + prometheus_monitoring.stx_metrics_deregister_predicate(); if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) { let _ = tx.send(ObserverEvent::PredicateDeregistered( @@ -1473,12 +1385,7 @@ pub async fn start_observer_commands_handler( .predicates .deregister_bitcoin_hook(hook_uuid); - match observer_metrics.write() { - Ok(mut metrics) => metrics.bitcoin.deregister_prediate(), - Err(e) => ctx.try_log(|logger| { - slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) - }), - } + prometheus_monitoring.btc_metrics_deregister_predicate(); if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) { let _ = tx.send(ObserverEvent::PredicateDeregistered( diff --git a/components/chainhook-sdk/src/observer/tests/mod.rs b/components/chainhook-sdk/src/observer/tests/mod.rs index 26a3adcda..965dc4ca4 100644 --- a/components/chainhook-sdk/src/observer/tests/mod.rs +++ b/components/chainhook-sdk/src/observer/tests/mod.rs @@ -11,9 +11,10 @@ use crate::indexer::tests::helpers::transactions::generate_test_tx_bitcoin_p2pkh use crate::indexer::tests::helpers::{ accounts, bitcoin_blocks, stacks_blocks, transactions::generate_test_tx_stacks_contract_call, }; +use crate::monitoring::PrometheusMonitoring; use crate::observer::{ start_observer_commands_handler, ChainhookStore, EventObserverConfig, ObserverCommand, - ObserverMetrics, ObserverSidecar, + ObserverSidecar, }; use crate::utils::{AbstractBlock, Context}; use chainhook_types::{ @@ -25,7 +26,6 @@ use chainhook_types::{ use hiro_system_kit; use std::collections::BTreeMap; use std::sync::mpsc::{channel, Sender}; -use std::sync::{Arc, RwLock}; use super::{ObserverEvent, DEFAULT_INGESTION_PORT}; @@ -45,6 +45,7 @@ fn generate_test_config() -> (EventObserverConfig, ChainhookStore) { bitcoin_network: BitcoinNetwork::Regtest, stacks_network: StacksNetwork::Devnet, data_handler_tx: None, + prometheus_monitoring_port: None, }; let predicates = ChainhookConfig::new(); let chainhook_store = ChainhookStore { predicates }; @@ -267,64 +268,48 @@ fn assert_stacks_chain_event(observer_events_rx: &crossbeam_channel::Receiver>, - expected_count: usize, + prometheus_monitoring: &PrometheusMonitoring, + expected_count: u64, ) { assert_eq!( expected_count, - observer_metrics_rw_lock - .read() - .unwrap() - .stacks - .registered_predicates, + prometheus_monitoring.stx_registered_predicates.get(), "expected {} registered stacks hooks", expected_count ); } fn assert_observer_metrics_stacks_deregistered_predicates( - observer_metrics_rw_lock: &Arc>, - expected_count: usize, + prometheus_monitoring: &PrometheusMonitoring, + expected_count: u64, ) { assert_eq!( expected_count, - observer_metrics_rw_lock - .read() - .unwrap() - .stacks - .deregistered_predicates, + prometheus_monitoring.stx_deregistered_predicates.get(), "expected {} deregistered stacks hooks", expected_count ); } fn assert_observer_metrics_bitcoin_registered_predicates( - observer_metrics_rw_lock: &Arc>, - expected_count: usize, + prometheus_monitoring: &PrometheusMonitoring, + expected_count: u64, ) { assert_eq!( expected_count, - observer_metrics_rw_lock - .read() - .unwrap() - .bitcoin - .registered_predicates, + prometheus_monitoring.btc_registered_predicates.get(), "expected {} registered bitcoin hooks", expected_count ); } fn assert_observer_metrics_bitcoin_deregistered_predicates( - observer_metrics_rw_lock: &Arc>, - expected_count: usize, + prometheus_monitoring: &PrometheusMonitoring, + expected_count: u64, ) { assert_eq!( expected_count, - observer_metrics_rw_lock - .read() - .unwrap() - .bitcoin - .deregistered_predicates, + prometheus_monitoring.btc_deregistered_predicates.get(), "expected {} deregistered bitcoin hooks", expected_count ); @@ -365,8 +350,8 @@ fn generate_and_register_new_ordinals_chainhook( fn test_stacks_chainhook_register_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); - let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); - let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); + let prometheus_monitoring = PrometheusMonitoring::new(); + let prometheus_monitoring_moved = prometheus_monitoring.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -376,7 +361,7 @@ fn test_stacks_chainhook_register_deregister() { observer_commands_rx, Some(observer_events_tx), None, - observer_metrics_rw_lock_moved, + prometheus_monitoring_moved, None, Context::empty(), )); @@ -392,7 +377,7 @@ fn test_stacks_chainhook_register_deregister() { ); // registering stacks chainhook should increment the observer_metric's registered stacks hooks - assert_observer_metrics_stacks_registered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_stacks_registered_predicates(&prometheus_monitoring, 1); // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_stacks_contract_call( @@ -523,9 +508,9 @@ fn test_stacks_chainhook_register_deregister() { }); // deregistering stacks chainhook should decrement the observer_metric's registered stacks hooks - assert_observer_metrics_stacks_registered_predicates(&observer_metrics_rw_lock, 0); + assert_observer_metrics_stacks_registered_predicates(&prometheus_monitoring, 0); // and increment the deregistered hooks - assert_observer_metrics_stacks_deregistered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_stacks_deregistered_predicates(&prometheus_monitoring, 1); // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_stacks_contract_call( @@ -567,6 +552,8 @@ fn test_stacks_chainhook_register_deregister() { // Should propagate block assert_stacks_chain_event(&observer_events_rx); + let thing = &prometheus_monitoring.registry.gather(); + println!("gathered, {:?}", thing); let _ = observer_commands_tx.send(ObserverCommand::Terminate); handle.join().expect("unable to terminate thread"); } @@ -575,8 +562,8 @@ fn test_stacks_chainhook_register_deregister() { fn test_stacks_chainhook_auto_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); - let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); - let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); + let prometheus_monitoring = PrometheusMonitoring::new(); + let prometheus_monitoring_moved = prometheus_monitoring.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -586,7 +573,7 @@ fn test_stacks_chainhook_auto_deregister() { observer_commands_rx, Some(observer_events_tx), None, - observer_metrics_rw_lock_moved, + prometheus_monitoring_moved, None, Context::empty(), )); @@ -616,7 +603,7 @@ fn test_stacks_chainhook_auto_deregister() { _ => false, }); // registering stacks chainhook should increment the observer_metric's registered stacks hooks - assert_observer_metrics_stacks_registered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_stacks_registered_predicates(&prometheus_monitoring, 1); // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_stacks_contract_call( @@ -712,9 +699,9 @@ fn test_stacks_chainhook_auto_deregister() { }); // deregistering stacks chainhook should decrement the observer_metric's registered stacks hooks - assert_observer_metrics_stacks_registered_predicates(&observer_metrics_rw_lock, 0); + assert_observer_metrics_stacks_registered_predicates(&prometheus_monitoring, 0); // and increment the deregistered hooks - assert_observer_metrics_stacks_deregistered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_stacks_deregistered_predicates(&prometheus_monitoring, 1); // Should propagate block assert_stacks_chain_event(&observer_events_rx); @@ -727,8 +714,8 @@ fn test_stacks_chainhook_auto_deregister() { fn test_bitcoin_chainhook_register_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); - let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); - let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); + let prometheus_monitoring = PrometheusMonitoring::new(); + let prometheus_monitoring_moved = prometheus_monitoring.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -738,7 +725,7 @@ fn test_bitcoin_chainhook_register_deregister() { observer_commands_rx, Some(observer_events_tx), None, - observer_metrics_rw_lock_moved, + prometheus_monitoring_moved, None, Context::empty(), )); @@ -754,7 +741,7 @@ fn test_bitcoin_chainhook_register_deregister() { ); // registering bitcoin chainhook should increment the observer_metric's registered bitcoin hooks - assert_observer_metrics_bitcoin_registered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_bitcoin_registered_predicates(&prometheus_monitoring, 1); // Simulate a block that does not include a trigger (wallet_1 to wallet_3) let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer( @@ -881,9 +868,9 @@ fn test_bitcoin_chainhook_register_deregister() { }); // deregistering bitcoin chainhook should decrement the observer_metric's registered bitcoin hooks - assert_observer_metrics_bitcoin_registered_predicates(&observer_metrics_rw_lock, 0); + assert_observer_metrics_bitcoin_registered_predicates(&prometheus_monitoring, 0); // and increment the deregistered hooks - assert_observer_metrics_bitcoin_deregistered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_bitcoin_deregistered_predicates(&prometheus_monitoring, 1); // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer( @@ -944,8 +931,8 @@ fn test_bitcoin_chainhook_register_deregister() { fn test_bitcoin_chainhook_auto_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); - let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); - let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); + let prometheus_monitoring = PrometheusMonitoring::new(); + let prometheus_monitoring_moved = prometheus_monitoring.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -955,7 +942,7 @@ fn test_bitcoin_chainhook_auto_deregister() { observer_commands_rx, Some(observer_events_tx), None, - observer_metrics_rw_lock_moved, + prometheus_monitoring_moved, None, Context::empty(), )); @@ -971,7 +958,7 @@ fn test_bitcoin_chainhook_auto_deregister() { ); // registering bitcoin chainhook should increment the observer_metric's registered bitcoin hooks - assert_observer_metrics_bitcoin_registered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_bitcoin_registered_predicates(&prometheus_monitoring, 1); // Simulate a block that does not include a trigger (wallet_1 to wallet_3) let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer( @@ -1089,9 +1076,9 @@ fn test_bitcoin_chainhook_auto_deregister() { }); // deregistering bitcoin chainhook should decrement the observer_metric's registered bitcoin hooks - assert_observer_metrics_bitcoin_registered_predicates(&observer_metrics_rw_lock, 0); + assert_observer_metrics_bitcoin_registered_predicates(&prometheus_monitoring, 0); // and increment the deregistered hooks - assert_observer_metrics_bitcoin_deregistered_predicates(&observer_metrics_rw_lock, 1); + assert_observer_metrics_bitcoin_deregistered_predicates(&prometheus_monitoring, 1); // Should propagate block assert!(match observer_events_rx.recv() { @@ -1112,8 +1099,6 @@ fn test_bitcoin_chainhook_through_reorg() { let (block_pre_processor_out_tx, block_pre_processor_out_rx) = crossbeam_channel::unbounded(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); - let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); - let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); let empty_ctx = Context::empty(); @@ -1121,6 +1106,8 @@ fn test_bitcoin_chainhook_through_reorg() { bitcoin_blocks_mutator: Some((block_pre_processor_in_tx, block_pre_processor_out_rx)), bitcoin_chain_event_notifier: None, }; + let prometheus_monitoring = PrometheusMonitoring::new(); + let prometheus_monitoring_moved = prometheus_monitoring.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -1130,7 +1117,7 @@ fn test_bitcoin_chainhook_through_reorg() { observer_commands_rx, Some(observer_events_tx), None, - observer_metrics_rw_lock_moved, + prometheus_monitoring_moved, Some(observer_sidecar), Context::empty(), )); @@ -1185,14 +1172,8 @@ fn test_bitcoin_chainhook_through_reorg() { generate_and_register_new_ordinals_chainhook(&observer_commands_tx, &observer_events_rx, 1); // registering bitcoin chainhook should increment the observer_metric's registered bitcoin hooks - assert_eq!( - 1, - observer_metrics_rw_lock - .read() - .unwrap() - .bitcoin - .registered_predicates - ); + + assert_observer_metrics_bitcoin_registered_predicates(&prometheus_monitoring, 1); // Simulate a block that does not include a trigger (wallet_1 to wallet_3) let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer(