Skip to content

Commit a9f7a35

Browse files
committed
Copy new statement cache form diesel to unblock a release
1 parent f8263a1 commit a9f7a35

File tree

10 files changed

+909
-67
lines changed

10 files changed

+909
-67
lines changed

Cargo.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ default-features = false
4343
features = [
4444
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
4545
]
46-
git = "https://github.com/diesel-rs/diesel"
47-
branch = "master"
4846

4947
[dev-dependencies]
5048
tokio = { version = "1.12.0", features = ["rt", "macros", "rt-multi-thread"] }
@@ -58,13 +56,9 @@ default-features = false
5856
features = [
5957
"chrono"
6058
]
61-
git = "https://github.com/diesel-rs/diesel"
62-
branch = "master"
6359

6460
[dev-dependencies.diesel_migrations]
6561
version = "2.2.0"
66-
git = "https://github.com/diesel-rs/diesel"
67-
branch = "master"
6862

6963
[features]
7064
default = []

src/async_connection_wrapper.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub type AsyncConnectionWrapper<C, B = self::implementation::Tokio> =
100100
pub use self::implementation::AsyncConnectionWrapper;
101101

102102
mod implementation {
103-
use diesel::connection::{CacheSize, Instrumentation, SimpleConnection};
103+
use diesel::connection::{Instrumentation, SimpleConnection};
104104
use std::ops::{Deref, DerefMut};
105105

106106
use super::*;
@@ -187,10 +187,6 @@ mod implementation {
187187
fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
188188
self.inner.set_instrumentation(instrumentation);
189189
}
190-
191-
fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
192-
self.inner.set_prepared_statement_cache_size(size)
193-
}
194190
}
195191

196192
impl<C, B> diesel::connection::LoadConnection for AsyncConnectionWrapper<C, B>

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,8 @@
7272
clippy::cast_possible_truncation,
7373
clippy::cast_sign_loss
7474
)]
75-
7675
use diesel::backend::Backend;
77-
use diesel::connection::{CacheSize, Instrumentation};
76+
use diesel::connection::Instrumentation;
7877
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
7978
use diesel::row::Row;
8079
use diesel::{ConnectionResult, QueryResult};
@@ -96,6 +95,7 @@ pub mod pg;
9695
#[cfg(feature = "pool")]
9796
pub mod pooled_connection;
9897
mod run_query_dsl;
98+
mod statement_cache;
9999
#[cfg(any(feature = "postgres", feature = "mysql"))]
100100
mod stmt_cache;
101101
#[cfg(feature = "sync-connection-wrapper")]
@@ -111,6 +111,8 @@ pub use self::pg::AsyncPgConnection;
111111
#[doc(inline)]
112112
pub use self::run_query_dsl::*;
113113

114+
#[doc(inline)]
115+
pub use self::statement_cache::CacheSize;
114116
#[doc(inline)]
115117
pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
116118

src/mysql/mod.rs

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1+
use crate::statement_cache::CacheSize;
2+
use crate::statement_cache::{MaybeCached, QueryFragmentForCachedStatement, StatementCache};
13
use crate::stmt_cache::{CallbackHelper, QueryFragmentHelper};
24
use crate::{AnsiTransactionManager, AsyncConnection, SimpleAsyncConnection};
3-
use diesel::connection::statement_cache::{
4-
MaybeCached, QueryFragmentForCachedStatement, StatementCache,
5-
};
5+
use diesel::connection::Instrumentation;
6+
use diesel::connection::InstrumentationEvent;
67
use diesel::connection::StrQueryHelper;
7-
use diesel::connection::{CacheSize, Instrumentation};
8-
use diesel::connection::{DynInstrumentation, InstrumentationEvent};
98
use diesel::mysql::{Mysql, MysqlQueryBuilder, MysqlType};
109
use diesel::query_builder::QueryBuilder;
1110
use diesel::query_builder::{bind_collector::RawBytesBindCollector, QueryFragment, QueryId};
@@ -33,7 +32,7 @@ pub struct AsyncMysqlConnection {
3332
conn: mysql_async::Conn,
3433
stmt_cache: StatementCache<Mysql, Statement>,
3534
transaction_manager: AnsiTransactionManager,
36-
instrumentation: DynInstrumentation,
35+
instrumentation: Option<Box<dyn Instrumentation>>,
3736
}
3837

3938
impl SimpleAsyncConnection for AsyncMysqlConnection {
@@ -74,7 +73,7 @@ impl AsyncConnection for AsyncMysqlConnection {
7473
type TransactionManager = AnsiTransactionManager;
7574

7675
async fn establish(database_url: &str) -> diesel::ConnectionResult<Self> {
77-
let mut instrumentation = DynInstrumentation::default_instrumentation();
76+
let mut instrumentation = diesel::connection::get_default_instrumentation();
7877
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
7978
database_url,
8079
));
@@ -99,10 +98,6 @@ impl AsyncConnection for AsyncMysqlConnection {
9998
let stmt_for_exec = match stmt {
10099
MaybeCached::Cached(ref s) => (*s).clone(),
101100
MaybeCached::CannotCache(ref s) => s.clone(),
102-
_ => unreachable!(
103-
"Diesel has only two variants here at the time of writing.\n\
104-
If you ever see this error message please open in issue in the diesel-async issue tracker"
105-
),
106101
};
107102

108103
let (tx, rx) = futures_channel::mpsc::channel(0);
@@ -179,11 +174,11 @@ impl AsyncConnection for AsyncMysqlConnection {
179174
}
180175

181176
fn instrumentation(&mut self) -> &mut dyn Instrumentation {
182-
&mut *self.instrumentation
177+
&mut self.instrumentation
183178
}
184179

185180
fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
186-
self.instrumentation = instrumentation.into();
181+
self.instrumentation = Some(Box::new(instrumentation));
187182
}
188183

189184
fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
@@ -211,7 +206,7 @@ fn update_transaction_manager_status<T>(
211206
fn prepare_statement_helper<'a>(
212207
conn: &'a mut mysql_async::Conn,
213208
sql: &str,
214-
_is_for_cache: diesel::connection::statement_cache::PrepareForCache,
209+
_is_for_cache: crate::statement_cache::PrepareForCache,
215210
_metadata: &[MysqlType],
216211
) -> CallbackHelper<impl Future<Output = QueryResult<(Statement, &'a mut mysql_async::Conn)>> + Send>
217212
{
@@ -239,7 +234,7 @@ impl AsyncMysqlConnection {
239234
conn,
240235
stmt_cache: StatementCache::new(),
241236
transaction_manager: AnsiTransactionManager::default(),
242-
instrumentation: DynInstrumentation::default_instrumentation(),
237+
instrumentation: diesel::connection::get_default_instrumentation(),
243238
};
244239

245240
for stmt in CONNECTION_SETUP_QUERIES {
@@ -296,21 +291,25 @@ impl AsyncMysqlConnection {
296291
sql,
297292
safe_to_cache: is_safe_to_cache_prepared,
298293
};
294+
299295
let inner = async {
300-
let (stmt, conn) = stmt_cache
301-
.cached_statement_non_generic(
302-
query_id,
303-
&helper,
304-
&Mysql,
305-
&metadata,
306-
conn,
307-
prepare_statement_helper,
308-
&mut **instrumentation,
309-
)
310-
.await?;
296+
let (stmt, conn) = {
297+
stmt_cache
298+
.cached_statement_non_generic(
299+
query_id,
300+
&helper,
301+
&Mysql,
302+
&metadata,
303+
conn,
304+
prepare_statement_helper,
305+
&mut *instrumentation,
306+
)
307+
.await?
308+
};
311309
callback(conn, stmt, ToSqlHelper { metadata, binds }).await
312310
};
313311
let r = update_transaction_manager_status(inner.await, transaction_manager);
312+
314313
instrumentation.on_connection_event(InstrumentationEvent::finish_query(
315314
&StrQueryHelper::new(&helper.sql),
316315
r.as_ref().err(),
@@ -371,7 +370,7 @@ impl AsyncMysqlConnection {
371370
conn,
372371
stmt_cache: StatementCache::new(),
373372
transaction_manager: AnsiTransactionManager::default(),
374-
instrumentation: DynInstrumentation::none(),
373+
instrumentation: diesel::connection::get_default_instrumentation(),
375374
})
376375
}
377376
}

src/pg/mod.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@
77
use self::error_helper::ErrorHelper;
88
use self::row::PgRow;
99
use self::serialize::ToSqlHelper;
10+
use crate::statement_cache::CacheSize;
11+
use crate::statement_cache::{PrepareForCache, QueryFragmentForCachedStatement, StatementCache};
1012
use crate::stmt_cache::{CallbackHelper, QueryFragmentHelper};
1113
use crate::{AnsiTransactionManager, AsyncConnection, SimpleAsyncConnection};
12-
use diesel::connection::statement_cache::{
13-
PrepareForCache, QueryFragmentForCachedStatement, StatementCache,
14-
};
14+
use diesel::connection::Instrumentation;
15+
use diesel::connection::InstrumentationEvent;
1516
use diesel::connection::StrQueryHelper;
16-
use diesel::connection::{CacheSize, Instrumentation};
17-
use diesel::connection::{DynInstrumentation, InstrumentationEvent};
1817
use diesel::pg::{
1918
Pg, PgMetadataCache, PgMetadataCacheKey, PgMetadataLookup, PgQueryBuilder, PgTypeMetadata,
2019
};
@@ -132,7 +131,7 @@ pub struct AsyncPgConnection {
132131
connection_future: Option<broadcast::Receiver<Arc<tokio_postgres::Error>>>,
133132
shutdown_channel: Option<oneshot::Sender<()>>,
134133
// a sync mutex is fine here as we only hold it for a really short time
135-
instrumentation: Arc<std::sync::Mutex<DynInstrumentation>>,
134+
instrumentation: Arc<std::sync::Mutex<Option<Box<dyn Instrumentation>>>>,
136135
}
137136

138137
impl SimpleAsyncConnection for AsyncPgConnection {
@@ -169,7 +168,7 @@ impl AsyncConnection for AsyncPgConnection {
169168
type TransactionManager = AnsiTransactionManager;
170169

171170
async fn establish(database_url: &str) -> ConnectionResult<Self> {
172-
let mut instrumentation = DynInstrumentation::default_instrumentation();
171+
let mut instrumentation = diesel::connection::get_default_instrumentation();
173172
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
174173
database_url,
175174
));
@@ -236,14 +235,14 @@ impl AsyncConnection for AsyncPgConnection {
236235
// that means there is only one instance of this arc and
237236
// we can simply access the inner data
238237
if let Some(instrumentation) = Arc::get_mut(&mut self.instrumentation) {
239-
&mut **(instrumentation.get_mut().unwrap_or_else(|p| p.into_inner()))
238+
&mut *(instrumentation.get_mut().unwrap_or_else(|p| p.into_inner()))
240239
} else {
241240
panic!("Cannot access shared instrumentation")
242241
}
243242
}
244243

245244
fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
246-
self.instrumentation = Arc::new(std::sync::Mutex::new(instrumentation.into()));
245+
self.instrumentation = Arc::new(std::sync::Mutex::new(Some(Box::new(instrumentation))));
247246
}
248247

249248
fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
@@ -395,7 +394,7 @@ impl AsyncPgConnection {
395394
None,
396395
None,
397396
Arc::new(std::sync::Mutex::new(
398-
DynInstrumentation::default_instrumentation(),
397+
diesel::connection::get_default_instrumentation(),
399398
)),
400399
)
401400
.await
@@ -416,7 +415,9 @@ impl AsyncPgConnection {
416415
client,
417416
Some(error_rx),
418417
Some(shutdown_tx),
419-
Arc::new(std::sync::Mutex::new(DynInstrumentation::none())),
418+
Arc::new(std::sync::Mutex::new(
419+
diesel::connection::get_default_instrumentation(),
420+
)),
420421
)
421422
.await
422423
}
@@ -425,7 +426,7 @@ impl AsyncPgConnection {
425426
conn: tokio_postgres::Client,
426427
connection_future: Option<broadcast::Receiver<Arc<tokio_postgres::Error>>>,
427428
shutdown_channel: Option<oneshot::Sender<()>>,
428-
instrumentation: Arc<std::sync::Mutex<DynInstrumentation>>,
429+
instrumentation: Arc<std::sync::Mutex<Option<Box<dyn Instrumentation>>>>,
429430
) -> ConnectionResult<Self> {
430431
let mut conn = Self {
431432
conn: Arc::new(conn),

src/pooled_connection/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
//! * [deadpool](self::deadpool)
66
//! * [bb8](self::bb8)
77
//! * [mobc](self::mobc)
8+
use crate::statement_cache::CacheSize;
89
use crate::{AsyncConnection, SimpleAsyncConnection};
910
use crate::{TransactionManager, UpdateAndFetchResults};
1011
use diesel::associations::HasTable;
11-
use diesel::connection::{CacheSize, Instrumentation};
12+
use diesel::connection::Instrumentation;
1213
use diesel::QueryResult;
1314
use futures_core::future::BoxFuture;
1415
use futures_util::FutureExt;

0 commit comments

Comments
 (0)