diff --git a/Cargo.lock b/Cargo.lock index 0715cd83..0b225d2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,9 +79,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" [[package]] name = "bytes" @@ -89,6 +89,15 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "cadence" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb681a7408f21c9d9dcb6638e340913ea260cc587518b5976510d2341f085a19" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "cc" version = "1.0.78" @@ -141,6 +150,25 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -511,9 +539,9 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" dependencies = [ "libc", "windows-sys", @@ -690,9 +718,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf" dependencies = [ "cfg-if", "libc", @@ -710,6 +738,7 @@ dependencies = [ "base64", "bb8", "bytes", + "cadence", "chrono", "env_logger", "exitcode", @@ -799,9 +828,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" dependencies = [ "unicode-ident", ] @@ -902,9 +931,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.7" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" dependencies = [ "log", "ring", @@ -1075,9 +1104,9 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" dependencies = [ "winapi-util", ] @@ -1419,42 +1448,42 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" [[package]] name = "windows_aarch64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" [[package]] name = "windows_i686_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" [[package]] name = "windows_i686_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" [[package]] name = "windows_x86_64_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" [[package]] name = "windows_x86_64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" diff --git a/Cargo.toml b/Cargo.toml index 06622db2..b252e50a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ rustls-pemfile = "1" hyper = { version = "0.14", features = ["full"] } phf = { version = "0.11.1", features = ["macros"] } exitcode = "1.1.2" +cadence = "0.29" futures = "0.3" [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/README.md b/README.md index 2ba7ba82..b50dc39b 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,32 @@ PGPASSWORD=postgres psql -h 127.0.0.1 -p 6432 -U postgres -c 'SELECT 1' | `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` | | `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` | +## Statsd Configuration +Statsd is optional and can be configured in both UnixSocket and Udp modes + + +UDP +```toml +[general.statsd] +type = "Udp" +[general.statsd.args] +prefix = "prefix.pgcat" +host = "statsd.host.ac" +port = 8125 +``` + +Unix Socket +```toml +[general.statsd] +type = "UnixSocket" +[general.statsd.args] +prefix = "prefix.pgcat" +path = "/var/run/statsd.socket" +``` + +**Note**: The statsd client in pgcat will require it's own thread to run so it doesn't interfere with the main server threads. Under default configs that would mean **5 threads** will be required to run pgcat. + + ## Local development 1. Install Rust (latest stable will work great). diff --git a/src/config.rs b/src/config.rs index 219f0deb..e0ce39ba 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,6 +20,20 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION"); /// Globally available configuration. static CONFIG: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Config::default())); +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] +#[serde(tag = "type", content = "args")] +pub enum StatsDMode { + UnixSocket { + prefix: String, + path: String, + }, + Udp { + prefix: String, + host: String, + port: u16, + }, +} + /// Server role: primary or replica. #[derive(Clone, PartialEq, Serialize, Deserialize, Hash, std::cmp::Eq, Debug, Copy)] pub enum Role { @@ -188,6 +202,8 @@ pub struct General { pub tls_private_key: Option, pub admin_username: String, pub admin_password: String, + + pub statsd: Option, } impl General { @@ -235,6 +251,7 @@ impl Default for General { port: Self::default_port(), enable_prometheus_exporter: Some(false), prometheus_exporter_port: 9930, + statsd: None, connect_timeout: General::default_connect_timeout(), idle_timeout: General::default_idle_timeout(), shutdown_timeout: Self::default_shutdown_timeout(), @@ -577,6 +594,11 @@ impl Config { "Healthcheck timeout: {}ms", self.general.healthcheck_timeout ); + if let Some(statsd_mode) = self.general.statsd.clone() { + info!("Statsd: {:?}", statsd_mode); + } else { + info!("Statsd: Not Enabled"); + }; info!("Connection timeout: {}ms", self.general.connect_timeout); info!("Idle timeout: {}ms", self.general.idle_timeout); info!( diff --git a/src/messages.rs b/src/messages.rs index e7c36747..29022f7e 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -380,7 +380,7 @@ pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut { let mut res = BytesMut::new(); let mut row_desc = BytesMut::new(); - // how many colums we are storing + // how many columns we are storing row_desc.put_i16(columns.len() as i16); for (name, data_type) in columns { diff --git a/src/pool.rs b/src/pool.rs index 702b6178..11565585 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -742,6 +742,7 @@ impl ManageConnection for ServerPool { self.address.name(), self.address.pool_name.clone(), self.address.username.clone(), + self.address.role, ); self.stats.server_login(server_id); diff --git a/src/stats.rs b/src/stats.rs index 3f7e9d61..9fb7ddbd 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -3,11 +3,19 @@ use arc_swap::ArcSwap; use log::{error, info, trace, warn}; use once_cell::sync::Lazy; use std::collections::HashMap; +use std::net::UdpSocket; +use std::os::unix::net::UnixDatagram; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::Instant; +use cadence::{ + prelude::*, BufferedUdpMetricSink, BufferedUnixMetricSink, MetricBuilder, NopMetricSink, + QueuingMetricSink, StatsdClient, +}; + +use crate::config::{get_config, Role, StatsDMode}; use crate::pool::{get_all_pools, get_number_of_addresses}; /// Convenience types for various stats @@ -114,6 +122,8 @@ pub struct ServerInformation { pub address_name: String, pub address_id: usize, + pub role: Role, + pub username: String, pub pool_name: String, pub application_name: String, @@ -188,6 +198,7 @@ enum EventName { address_name: String, pool_name: String, username: String, + role: Role, }, ServerLogin { server_id: i32, @@ -314,7 +325,7 @@ impl Reporter { self.send(event) } - /// Reportes the time spent by a client waiting to get a healthy connection from the pool + /// Reports the time spent by a client waiting to get a healthy connection from the pool pub fn checkout_time(&self, microseconds: u128, client_id: i32, server_id: i32) { let event = Event { name: EventName::CheckoutTime { @@ -401,7 +412,7 @@ impl Reporter { self.send(event) } - /// Reports a client is disconecting from the pooler. + /// Reports a client is disconnecting from the pooler. pub fn client_disconnecting(&self, client_id: i32) { let event = Event { name: EventName::ClientDisconnecting { client_id }, @@ -419,6 +430,7 @@ impl Reporter { address_name: String, pool_name: String, username: String, + role: Role, ) { let event = Event { name: EventName::ServerRegistered { @@ -427,6 +439,7 @@ impl Reporter { address_name, pool_name, username, + role, }, value: 1, }; @@ -475,7 +488,7 @@ impl Reporter { self.send(event) } - /// Reports a server connection is disconecting from the pooler. + /// Reports a server connection is disconnecting from the pooler. pub fn server_disconnecting(&self, server_id: i32) { let event = Event { name: EventName::ServerDisconnecting { server_id }, @@ -494,13 +507,18 @@ impl Reporter { pub struct Collector { rx: Receiver, tx: Sender, + statsd_client: StatsdClient, } impl Collector { /// Create a new collector instance. There should only be one instance /// at a time. This is ensured by mpsc which allows only one receiver. pub fn new(rx: Receiver, tx: Sender) -> Collector { - Collector { rx, tx } + Collector { + rx, + tx, + statsd_client: StatsdClient::new_client(), + } } /// The statistics collection handler. It will collect statistics @@ -565,21 +583,27 @@ impl Collector { server_id, duration_ms, } => { + let mut tags = HashMap::new(); + // Update client stats let app_name = match client_states.get_mut(&client_id) { Some(client_info) => { + add_client_tags(&mut tags, &client_info); + client_info.query_count += stat.value as u64; client_info.application_name.to_string() } None => String::from("Undefined"), }; - // Update server stats and pool aggergation stats + // Update server stats and pool aggregation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.query_count += stat.value as u64; server_info.application_name = app_name; + add_server_tags(&mut tags, &server_info); + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert_with(HashMap::default); @@ -594,28 +618,36 @@ impl Collector { *duration += duration_ms as i64; } None => (), - } + }; + + self.statsd_client.send_count("query_count", 1, tags); } EventName::Transaction { client_id, server_id, } => { + let mut tags = HashMap::new(); + // Update client stats let app_name = match client_states.get_mut(&client_id) { Some(client_info) => { + add_client_tags(&mut tags, &client_info); + client_info.transaction_count += stat.value as u64; client_info.application_name.to_string() } None => String::from("Undefined"), }; - // Update server stats and pool aggergation stats + // Update server stats and pool aggregation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.transaction_count += stat.value as u64; server_info.application_name = app_name; + add_server_tags(&mut tags, &server_info); + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert_with(HashMap::default); @@ -625,15 +657,21 @@ impl Collector { *counter += stat.value; } None => (), - } + }; + + self.statsd_client.send_count("tx_count", 1, tags); } EventName::DataSentToServer { server_id } => { - // Update server stats and address aggergation stats + let mut tags = HashMap::new(); + + // Update server stats and address aggregation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.bytes_sent += stat.value as u64; + add_server_tags(&mut tags, &server_info); + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert_with(HashMap::default); @@ -642,15 +680,22 @@ impl Collector { *counter += stat.value; } None => (), - } + }; + + self.statsd_client + .send_count("bytes_sent", stat.value, tags); } EventName::DataReceivedFromServer { server_id } => { - // Update server states and address aggergation stats + let mut tags = HashMap::new(); + + // Update server states and address aggregation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.bytes_received += stat.value as u64; + add_server_tags(&mut tags, &server_info); + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert_with(HashMap::default); @@ -660,13 +705,18 @@ impl Collector { *counter += stat.value; } None => (), - } + }; + + self.statsd_client + .send_count("bytes_recv", stat.value, tags); } EventName::CheckoutTime { client_id, server_id, } => { + let mut tags = HashMap::new(); + // Update client stats let app_name = match client_states.get_mut(&client_id) { Some(client_info) => { @@ -676,11 +726,13 @@ impl Collector { None => String::from("Undefined"), }; - // Update server stats and address aggergation stats + // Update server stats and address aggregation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.application_name = app_name; + add_server_tags(&mut tags, &server_info); + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert_with(HashMap::default); @@ -704,7 +756,10 @@ impl Collector { } } None => (), - } + }; + + self.statsd_client + .send_count("checkout_time", stat.value, tags); } EventName::ClientRegistered { @@ -713,26 +768,31 @@ impl Collector { username, application_name, } => { + let mut tags = HashMap::new(); + match client_states.get_mut(&client_id) { Some(_) => warn!("Client {:?} was double registered!", client_id), None => { - client_states.insert( + let client_info = ClientInformation { + state: ClientState::Idle, + connect_time: Instant::now(), client_id, - ClientInformation { - state: ClientState::Idle, - connect_time: Instant::now(), - client_id, - pool_name: pool_name.clone(), - username: username.clone(), - application_name: application_name.clone(), - total_wait_time: 0, - transaction_count: 0, - query_count: 0, - error_count: 0, - }, - ); + pool_name: pool_name.clone(), + username: username.clone(), + application_name: application_name.clone(), + total_wait_time: 0, + transaction_count: 0, + query_count: 0, + error_count: 0, + }; + + add_client_tags(&mut tags, &client_info); + + client_states.insert(client_id, client_info); } }; + + self.statsd_client.send_count("client_registered", 1, tags); } EventName::ClientBanError { @@ -809,6 +869,7 @@ impl Collector { address_id, pool_name, username, + role, } => { server_states.insert( server_id, @@ -818,6 +879,7 @@ impl Collector { server_id, username, pool_name, + role, state: ServerState::Idle, application_name: String::from("Undefined"), @@ -1034,3 +1096,138 @@ pub fn get_address_stats() -> AddressStatsLookup { pub fn get_reporter() -> Reporter { (*(*REPORTER.load())).clone() } + +trait StatSubmitter +where + T: cadence::Metric + From, +{ + fn submit_stat(&self, metric_builder: MetricBuilder); +} + +impl StatSubmitter for StatsdClient +where + T: cadence::Metric + From, +{ + fn submit_stat(&self, metric_builder: cadence::MetricBuilder) { + // TODO: Move tagging logic here + if metric_builder.try_send().is_err() { + warn!("Error sending query metrics to client"); + } + } +} + +trait StatCreator { + fn send_count(&self, name: &str, count: i64, tags: HashMap); + + fn send_time(&self, name: &str, time: u64, tags: HashMap); + + fn send_gauge(&self, name: &str, value: u64, tags: HashMap); + + fn new_client() -> Self; +} + +impl StatCreator for StatsdClient { + fn send_count(&self, name: &str, count: i64, tags: HashMap) { + let mut metric_builder = self.count_with_tags(name, count); + + for (k, v) in tags.iter() { + metric_builder = metric_builder.with_tag(k, v); + } + + self.submit_stat(metric_builder); + } + + fn send_time(&self, name: &str, time: u64, tags: HashMap) { + let mut metric_builder = self.time_with_tags(name, time); + + for (k, v) in tags.iter() { + metric_builder = metric_builder.with_tag(k, v); + } + + self.submit_stat(metric_builder); + } + + fn send_gauge(&self, name: &str, value: u64, tags: HashMap) { + let mut metric_builder = self.gauge_with_tags(name, value); + + for (k, v) in tags.iter() { + metric_builder = metric_builder.with_tag(k, v); + } + + self.submit_stat(metric_builder); + } + + fn new_client() -> StatsdClient { + let config = get_config(); + + // Queue with a maximum capacity of 128K elements + const QUEUE_SIZE: usize = 128 * 1024; + + if let Some(statsd_mode) = config.general.statsd { + let (prefix, sink) = match statsd_mode { + StatsDMode::UnixSocket { prefix, path } => { + let socket = UnixDatagram::unbound().unwrap(); + socket.set_nonblocking(true).unwrap(); + let buffered_sink = BufferedUnixMetricSink::from(path, socket); + ( + prefix, + QueuingMetricSink::with_capacity(buffered_sink, QUEUE_SIZE), + ) + } + StatsDMode::Udp { prefix, host, port } => { + // Try to create + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + socket.set_nonblocking(true).unwrap(); + let buffered_sink = BufferedUdpMetricSink::from((host, port), socket).unwrap(); + ( + prefix, + QueuingMetricSink::with_capacity(buffered_sink, QUEUE_SIZE), + ) + } + }; + + info!("Started Statsd Client"); + let statsd_builder = StatsdClient::builder(&prefix, sink); + // TODO: Add default tags for statsd client + statsd_builder.build() + } else { + // No-op client + StatsdClient::from_sink("prefix", NopMetricSink) + } + } +} + +fn add_client_tags(tags: &mut HashMap, client_information: &ClientInformation) { + tags.insert( + String::from("application_name"), + client_information.application_name.to_string(), + ); + tags.insert( + String::from("username"), + client_information.username.to_string(), + ); + tags.insert( + String::from("pool_name"), + client_information.pool_name.to_string(), + ); +} + +fn add_server_tags(tags: &mut HashMap, server_information: &ServerInformation) { + tags.insert( + String::from("pool_name"), + server_information.pool_name.to_string(), + ); + tags.insert( + String::from("address_name"), + server_information.address_name.to_string(), + ); + tags.insert( + String::from("username"), + server_information.username.to_string(), + ); + tags.insert(String::from("role"), server_information.role.to_string()); + tags.insert( + String::from("application_name"), + server_information.application_name.to_string(), + ); +}