Skip to content

Commit 3443a71

Browse files
committed
Add performance counters gated by metrics feature
1 parent ca61d55 commit 3443a71

15 files changed

+210
-15
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pem = "1.0.1"
2828
percent-encoding = "2.1.0"
2929
pin-project = "1.0.2"
3030
priority-queue = "1"
31-
serde = "1"
31+
serde = { version = "1", features = ["derive", "rc"] }
3232
serde_json = "1"
3333
socket2 = "0.4.2"
3434
thiserror = "1.0.4"
@@ -92,6 +92,7 @@ default-rustls = [
9292
"mysql_common/frunk",
9393
"rustls-tls",
9494
]
95+
metrics = []
9596
minimal = ["flate2/zlib"]
9697
native-tls-tls = ["native-tls", "tokio-native-tls"]
9798
rustls-tls = [

src/buffer_pool.rs

+22-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9+
use crate::metrics::BufferPoolMetrics;
910
use crossbeam::queue::ArrayQueue;
1011
use std::{mem::replace, ops::Deref, sync::Arc};
1112

@@ -14,6 +15,7 @@ pub struct BufferPool {
1415
buffer_size_cap: usize,
1516
buffer_init_cap: usize,
1617
pool: ArrayQueue<Vec<u8>>,
18+
metrics: BufferPoolMetrics,
1719
}
1820

1921
impl BufferPool {
@@ -37,14 +39,21 @@ impl BufferPool {
3739
pool: ArrayQueue::new(pool_cap),
3840
buffer_size_cap,
3941
buffer_init_cap,
42+
metrics: Default::default(),
4043
}
4144
}
4245

4346
pub fn get(self: &Arc<Self>) -> PooledBuf {
44-
let buf = self
45-
.pool
46-
.pop()
47-
.unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap));
47+
let buf = match self.pool.pop() {
48+
Some(buf) => {
49+
self.metrics.reuses.inc();
50+
buf
51+
}
52+
None => {
53+
self.metrics.creations.inc();
54+
Vec::with_capacity(self.buffer_init_cap)
55+
}
56+
};
4857
debug_assert_eq!(buf.len(), 0);
4958
PooledBuf(buf, self.clone())
5059
}
@@ -64,7 +73,15 @@ impl BufferPool {
6473
buf.shrink_to(self.buffer_size_cap);
6574

6675
// ArrayQueue will make sure to drop the buffer if capacity is exceeded
67-
let _ = self.pool.push(buf);
76+
match self.pool.push(buf) {
77+
Ok(()) => self.metrics.returns.inc(),
78+
Err(_buf) => self.metrics.discards.inc(),
79+
};
80+
}
81+
82+
#[cfg(feature = "metrics")]
83+
pub(crate) fn snapshot_metrics(&self) -> BufferPoolMetrics {
84+
self.metrics.clone()
6885
}
6986
}
7087

src/conn/mod.rs

+29-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::{
3838
consts::{CapabilityFlags, Command, StatusFlags},
3939
error::*,
4040
io::Stream,
41+
metrics::ConnMetrics,
4142
opts::Opts,
4243
queryable::{
4344
query_result::{QueryResult, ResultSetMeta},
@@ -56,6 +57,8 @@ pub mod stmt_cache;
5657

5758
/// Helper that asynchronously disconnects the givent connection on the default tokio executor.
5859
fn disconnect(mut conn: Conn) {
60+
conn.metrics().disconnects.inc();
61+
5962
let disconnected = conn.inner.disconnected;
6063

6164
// Mark conn as disconnected.
@@ -114,6 +117,7 @@ struct ConnInner {
114117
/// One-time connection-level infile handler.
115118
infile_handler:
116119
Option<Pin<Box<dyn Future<Output = crate::Result<InfileData>> + Send + Sync + 'static>>>,
120+
conn_metrics: Arc<ConnMetrics>,
117121
}
118122

119123
impl fmt::Debug for ConnInner {
@@ -133,6 +137,7 @@ impl fmt::Debug for ConnInner {
133137
impl ConnInner {
134138
/// Constructs an empty connection.
135139
fn empty(opts: Opts) -> ConnInner {
140+
let conn_metrics: Arc<ConnMetrics> = Default::default();
136141
ConnInner {
137142
capabilities: opts.get_capabilities(),
138143
status: StatusFlags::empty(),
@@ -147,14 +152,15 @@ impl ConnInner {
147152
tx_status: TxStatus::None,
148153
last_io: Instant::now(),
149154
wait_timeout: Duration::from_secs(0),
150-
stmt_cache: StmtCache::new(opts.stmt_cache_size()),
155+
stmt_cache: StmtCache::new(opts.stmt_cache_size(), conn_metrics.clone()),
151156
socket: opts.socket().map(Into::into),
152157
opts,
153158
nonce: Vec::default(),
154159
auth_plugin: AuthPlugin::MysqlNativePassword,
155160
auth_switched: false,
156161
disconnected: false,
157162
infile_handler: None,
163+
conn_metrics,
158164
}
159165
}
160166

@@ -166,6 +172,18 @@ impl ConnInner {
166172
.as_mut()
167173
.ok_or_else(|| DriverError::ConnectionClosed.into())
168174
}
175+
176+
fn set_pool(&mut self, pool: Option<Pool>) {
177+
let conn_metrics = if let Some(ref pool) = pool {
178+
Arc::clone(&pool.inner.metrics.conn)
179+
} else {
180+
Default::default()
181+
};
182+
self.conn_metrics = Arc::clone(&conn_metrics);
183+
self.stmt_cache.conn_metrics = conn_metrics;
184+
185+
self.pool = pool;
186+
}
169187
}
170188

171189
/// MySql server connection.
@@ -854,6 +872,8 @@ impl Conn {
854872
conn.read_wait_timeout().await?;
855873
conn.run_init_commands().await?;
856874

875+
conn.metrics().connects.inc();
876+
857877
Ok(conn)
858878
}
859879
.boxed()
@@ -957,7 +977,10 @@ impl Conn {
957977

958978
self.inner.stmt_cache.clear();
959979
self.inner.infile_handler = None;
960-
self.inner.pool = pool;
980+
self.inner.set_pool(pool);
981+
982+
// TODO: clear some metrics?
983+
961984
Ok(())
962985
}
963986

@@ -1062,6 +1085,10 @@ impl Conn {
10621085

10631086
Ok(BinlogStream::new(self))
10641087
}
1088+
1089+
pub(crate) fn metrics(&self) -> &ConnMetrics {
1090+
&self.inner.conn_metrics
1091+
}
10651092
}
10661093

10671094
#[cfg(test)]

src/conn/pool/futures/get_conn.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ impl Future for GetConn {
140140

141141
return match result {
142142
Ok(mut c) => {
143-
c.inner.pool = Some(pool);
143+
c.inner.set_pool(Some(pool));
144+
c.metrics().connects.inc();
144145
Poll::Ready(Ok(c))
145146
}
146147
Err(e) => {
@@ -156,7 +157,7 @@ impl Future for GetConn {
156157
self.inner = GetConnInner::Done;
157158

158159
let pool = self.pool_take();
159-
checked_conn.inner.pool = Some(pool);
160+
checked_conn.inner.set_pool(Some(pool));
160161
return Poll::Ready(Ok(checked_conn));
161162
}
162163
Err(_) => {

src/conn/pool/mod.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::{
2525
use crate::{
2626
conn::{pool::futures::*, Conn},
2727
error::*,
28+
metrics::PoolMetrics,
2829
opts::{Opts, PoolOpts},
2930
queryable::transaction::{Transaction, TxOpts, TxStatus},
3031
};
@@ -177,6 +178,7 @@ pub struct Inner {
177178
close: atomic::AtomicBool,
178179
closed: atomic::AtomicBool,
179180
exchange: Mutex<Exchange>,
181+
pub(crate) metrics: PoolMetrics,
180182
}
181183

182184
/// Asynchronous pool of MySql connections.
@@ -190,7 +192,7 @@ pub struct Inner {
190192
#[derive(Debug, Clone)]
191193
pub struct Pool {
192194
opts: Opts,
193-
inner: Arc<Inner>,
195+
pub(super) inner: Arc<Inner>,
194196
drop: mpsc::UnboundedSender<Option<Conn>>,
195197
}
196198

@@ -219,6 +221,7 @@ impl Pool {
219221
exist: 0,
220222
recycler: Some((rx, pool_opts)),
221223
}),
224+
metrics: Default::default(),
222225
}),
223226
drop: tx,
224227
}
@@ -232,6 +235,7 @@ impl Pool {
232235

233236
/// Async function that resolves to `Conn`.
234237
pub fn get_conn(&self) -> GetConn {
238+
self.inner.metrics.gets.inc();
235239
GetConn::new(self)
236240
}
237241

@@ -249,6 +253,11 @@ impl Pool {
249253
DisconnectPool::new(self)
250254
}
251255

256+
#[cfg(feature = "metrics")]
257+
pub fn snapshot_metrics(&self) -> PoolMetrics {
258+
self.inner.metrics.clone()
259+
}
260+
252261
/// A way to return connection taken from a pool.
253262
fn return_conn(&mut self, conn: Conn) {
254263
// NOTE: we're not in async context here, so we can't block or return NotReady
@@ -264,18 +273,23 @@ impl Pool {
264273
{
265274
let mut exchange = self.inner.exchange.lock().unwrap();
266275
if exchange.available.len() < self.opts.pool_opts().active_bound() {
276+
self.inner.metrics.returns.inc();
267277
exchange.available.push_back(conn.into());
268278
if let Some(w) = exchange.waiting.pop() {
269279
w.wake();
270280
}
271281
return;
282+
} else {
283+
self.inner.metrics.discards.inc();
272284
}
273285
}
274286

275287
self.send_to_recycler(conn);
276288
}
277289

278290
fn send_to_recycler(&self, conn: Conn) {
291+
self.inner.metrics.recycler.recycles.inc();
292+
279293
if let Err(conn) = self.drop.send(Some(conn)) {
280294
let conn = conn.0.unwrap();
281295

src/conn/pool/recycler.rs

+5
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,23 @@ impl Future for Recycler {
6363
macro_rules! conn_decision {
6464
($self:ident, $conn:ident) => {
6565
if $conn.inner.stream.is_none() || $conn.inner.disconnected {
66+
self.inner.metrics.recycler.discards.inc();
6667
// drop unestablished connection
6768
$self.discard.push(futures_util::future::ok(()).boxed());
6869
} else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
70+
self.inner.metrics.recycler.cleans.inc();
6971
$self.cleaning.push($conn.cleanup_for_pool().boxed());
7072
} else if $conn.expired() || close {
73+
self.inner.metrics.recycler.discards.inc();
7174
$self.discard.push($conn.close_conn().boxed());
7275
} else {
7376
let mut exchange = $self.inner.exchange.lock().unwrap();
7477
if exchange.available.len() >= $self.pool_opts.active_bound() {
7578
drop(exchange);
79+
self.inner.metrics.recycler.discards.inc();
7680
$self.discard.push($conn.close_conn().boxed());
7781
} else {
82+
self.inner.metrics.recycler.returns.inc();
7883
exchange.available.push_back($conn.into());
7984
if let Some(w) = exchange.waiting.pop() {
8085
w.wake();

src/conn/routines/exec.rs

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ impl<'a> ExecRoutine<'a> {
2525

2626
impl Routine<()> for ExecRoutine<'_> {
2727
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
28+
conn.metrics().routines.execs.inc();
29+
2830
#[cfg(feature = "tracing")]
2931
let span = info_span!(
3032
"mysql_async::exec",

src/conn/routines/next_set.rs

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ where
2424
P: Protocol,
2525
{
2626
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
27+
conn.metrics().routines.next_sets.inc();
28+
2729
#[cfg(feature = "tracing")]
2830
let span = debug_span!(
2931
"mysql_async::next_set",

src/conn/routines/ping.rs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub struct PingRoutine;
1414

1515
impl Routine<()> for PingRoutine {
1616
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
17+
conn.metrics().routines.pings.inc();
18+
1719
#[cfg(feature = "tracing")]
1820
let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id());
1921

src/conn/routines/prepare.rs

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ impl PrepareRoutine {
2626

2727
impl Routine<Arc<StmtInner>> for PrepareRoutine {
2828
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<Arc<StmtInner>>> {
29+
conn.metrics().routines.prepares.inc();
30+
2931
#[cfg(feature = "tracing")]
3032
let span = info_span!(
3133
"mysql_async::prepare",

src/conn/routines/query.rs

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ impl<'a> QueryRoutine<'a> {
2222

2323
impl Routine<()> for QueryRoutine<'_> {
2424
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
25+
conn.metrics().routines.queries.inc();
26+
2527
#[cfg(feature = "tracing")]
2628
let span = info_span!(
2729
"mysql_async::query",

src/conn/routines/reset.rs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub struct ResetRoutine;
1414

1515
impl Routine<()> for ResetRoutine {
1616
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
17+
conn.metrics().routines.resets.inc();
18+
1719
#[cfg(feature = "tracing")]
1820
let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id());
1921

0 commit comments

Comments
 (0)