Skip to content

Commit e614691

Browse files
authored
Merge pull request #66 from jonhoo/no-block-reactor
Reimplement Pool to be fully asynchronous
2 parents 301cb80 + 3cabd50 commit e614691

File tree

6 files changed

+563
-310
lines changed

6 files changed

+563
-310
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,22 @@ edition = "2018"
1515
bit-vec = "0.5"
1616
byteorder = "1"
1717
bytes = "0.4"
18+
crossbeam = "0.7"
1819
failure = "0.1"
1920
failure_derive = "0.1"
2021
fnv = "1"
21-
futures = "^0.1.18"
22+
futures = "0.1.18"
2223
mysql_common = "0.17"
2324
native-tls = { version = "0.2", optional = true }
2425
regex = "1"
2526
serde = "1"
2627
serde_json = "1"
27-
tokio = "^0.1.9"
28+
tokio = "0.1.9"
2829
tokio-codec = "0.1"
2930
tokio-io = "0.1"
3031
tokio-named-pipes = "0.1"
3132
tokio-uds = "0.2"
33+
tokio-sync = "0.1"
3234
twox-hash = "1"
3335
url = "1"
3436

src/conn/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ impl Conn {
144144
self.get_affected_rows()
145145
}
146146

147+
fn close(mut self) -> impl MyFuture<()> {
148+
self.inner.disconnected = true;
149+
self.cleanup().and_then(Conn::disconnect)
150+
}
151+
147152
fn is_secure(&self) -> bool {
148153
if let Some(ref stream) = self.inner.stream {
149154
stream.is_secure()

src/conn/pool/futures/disconnect_pool.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,40 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9-
use futures::{
10-
Async::{NotReady, Ready},
11-
Future, Poll,
9+
use futures::{task, Async, Future, Poll};
10+
11+
use crate::{
12+
conn::pool::{Inner, Pool},
13+
error::Error,
1214
};
1315

14-
use crate::{conn::pool::Pool, error::*};
16+
use std::sync::{atomic, Arc};
1517

1618
/// Future that disconnects this pool from server and resolves to `()`.
1719
///
1820
/// Active connections taken from this pool should be disconnected manually.
1921
/// Also all pending and new `GetConn`'s will resolve to error.
2022
pub struct DisconnectPool {
21-
pool: Pool,
23+
pool_inner: Arc<Inner>,
2224
}
2325

2426
pub fn new(pool: Pool) -> DisconnectPool {
25-
DisconnectPool { pool }
27+
DisconnectPool {
28+
pool_inner: pool.inner,
29+
}
2630
}
2731

2832
impl Future for DisconnectPool {
2933
type Item = ();
3034
type Error = Error;
3135

3236
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
33-
self.pool.handle_futures()?;
34-
35-
let (new_len, queue_len) = self
36-
.pool
37-
.with_inner(|inner| (inner.new.len(), inner.queue.len()));
37+
self.pool_inner.wake.push(task::current());
3838

39-
if (new_len, queue_len) == (0, 0) {
40-
Ok(Ready(()))
39+
if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
40+
Ok(Async::Ready(()))
4141
} else {
42-
Ok(NotReady)
42+
Ok(Async::NotReady)
4343
}
4444
}
4545
}

src/conn/pool/futures/get_conn.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,84 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9-
use futures::{Future, Poll};
9+
use futures::{try_ready, Async, Future, Poll};
1010

1111
use crate::{
1212
conn::{pool::Pool, Conn},
1313
error::*,
14+
MyFuture,
1415
};
1516

17+
pub(crate) enum GetConnInner {
18+
New,
19+
Done(Option<Conn>),
20+
// TODO: one day this should be an existential
21+
// TODO: impl Drop?
22+
Connecting(Box<dyn MyFuture<Conn>>),
23+
}
24+
1625
/// This future will take connection from a pool and resolve to `Conn`.
1726
pub struct GetConn {
18-
pool: Pool,
27+
pub(crate) pool: Option<Pool>,
28+
pub(crate) inner: GetConnInner,
1929
}
2030

2131
pub fn new(pool: &Pool) -> GetConn {
22-
GetConn { pool: pool.clone() }
32+
GetConn {
33+
pool: Some(pool.clone()),
34+
inner: GetConnInner::New,
35+
}
2336
}
2437

2538
impl Future for GetConn {
2639
type Item = Conn;
2740
type Error = Error;
2841

2942
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
30-
self.pool.poll()
43+
loop {
44+
match self.inner {
45+
GetConnInner::New => match try_ready!(self
46+
.pool
47+
.as_mut()
48+
.expect("GetConn::poll polled after returning Async::Ready")
49+
.poll_new_conn())
50+
.inner
51+
{
52+
GetConnInner::Done(Some(conn)) => {
53+
self.inner = GetConnInner::Done(Some(conn));
54+
}
55+
GetConnInner::Connecting(conn_fut) => {
56+
self.inner = GetConnInner::Connecting(conn_fut);
57+
}
58+
GetConnInner::Done(None) => unreachable!(
59+
"Pool::poll_new_conn never gives out already-consumed GetConns"
60+
),
61+
GetConnInner::New => {
62+
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
63+
}
64+
},
65+
GetConnInner::Done(ref mut c @ Some(_)) => {
66+
let mut c = c.take().unwrap();
67+
c.inner.pool = Some(
68+
self.pool
69+
.take()
70+
.expect("GetConn::poll polled after returning Async::Ready"),
71+
);
72+
return Ok(Async::Ready(c));
73+
}
74+
GetConnInner::Done(None) => {
75+
unreachable!("GetConn::poll polled after returning Async::Ready");
76+
}
77+
GetConnInner::Connecting(ref mut f) => {
78+
let mut c = try_ready!(f.poll());
79+
c.inner.pool = Some(
80+
self.pool
81+
.take()
82+
.expect("GetConn::poll polled after returning Async::Ready"),
83+
);
84+
return Ok(Async::Ready(c));
85+
}
86+
}
87+
}
3188
}
3289
}

src/conn/pool/futures/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9+
pub(super) use self::get_conn::GetConnInner;
910
pub use self::{
1011
disconnect_pool::{new as new_disconnect_pool, DisconnectPool},
1112
get_conn::{new as new_get_conn, GetConn},

0 commit comments

Comments
 (0)