From 8183af5b35423b7d9236c90cda349184038f2f94 Mon Sep 17 00:00:00 2001 From: Patrick Freed Date: Wed, 4 Aug 2021 17:41:49 -0400 Subject: [PATCH] box futures --- src/client/executor.rs | 72 +++++++++++++++++--------------- src/operation/find/mod.rs | 4 +- src/test/spec/retryable_reads.rs | 6 +-- 3 files changed, 42 insertions(+), 40 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 9ffa36a65..72ff5e735 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -69,42 +69,45 @@ impl Client { op: T, session: impl Into>, ) -> Result { - // TODO RUST-9: allow unacknowledged write concerns - if !op.is_acknowledged() { - return Err(ErrorKind::InvalidArgument { - message: "Unacknowledged write concerns are not supported".to_string(), + Box::pin(async { + // TODO RUST-9: allow unacknowledged write concerns + if !op.is_acknowledged() { + return Err(ErrorKind::InvalidArgument { + message: "Unacknowledged write concerns are not supported".to_string(), + } + .into()); } - .into()); - } - match session.into() { - Some(session) => { - if !Arc::ptr_eq(&self.inner, &session.client().inner) { - return Err(ErrorKind::InvalidArgument { - message: "the session provided to an operation must be created from the \ - same client as the collection/database" - .into(), + match session.into() { + Some(session) => { + if !Arc::ptr_eq(&self.inner, &session.client().inner) { + return Err(ErrorKind::InvalidArgument { + message: "the session provided to an operation must be created from \ + the same client as the collection/database" + .into(), + } + .into()); } - .into()); - } - if let Some(SelectionCriteria::ReadPreference(read_preference)) = - op.selection_criteria() - { - if session.in_transaction() && read_preference != &ReadPreference::Primary { - return Err(ErrorKind::Transaction { - message: "read preference in a transaction must be primary".into(), + if let Some(SelectionCriteria::ReadPreference(read_preference)) = + op.selection_criteria() + { + if session.in_transaction() && read_preference != &ReadPreference::Primary { + return Err(ErrorKind::Transaction { + message: "read preference in a transaction must be primary".into(), + } + .into()); } - .into()); } + self.execute_operation_with_retry(op, Some(session)).await + } + None => { + let mut implicit_session = self.start_implicit_session(&op).await?; + self.execute_operation_with_retry(op, implicit_session.as_mut()) + .await } - self.execute_operation_with_retry(op, Some(session)).await - } - None => { - let mut implicit_session = self.start_implicit_session(&op).await?; - self.execute_operation_with_retry(op, implicit_session.as_mut()) - .await } - } + }) + .await } /// Execute the given operation, returning the implicit session created for it if one was. @@ -114,10 +117,13 @@ impl Client { &self, op: T, ) -> Result<(T::O, Option)> { - let mut implicit_session = self.start_implicit_session(&op).await?; - self.execute_operation_with_retry(op, implicit_session.as_mut()) - .await - .map(|result| (result, implicit_session)) + Box::pin(async { + let mut implicit_session = self.start_implicit_session(&op).await?; + self.execute_operation_with_retry(op, implicit_session.as_mut()) + .await + .map(|result| (result, implicit_session)) + }) + .await } /// Selects a server and executes the given operation on it, optionally using a provided diff --git a/src/operation/find/mod.rs b/src/operation/find/mod.rs index e4cc64782..727f431a0 100644 --- a/src/operation/find/mod.rs +++ b/src/operation/find/mod.rs @@ -21,7 +21,7 @@ use super::CursorResponse; pub(crate) struct Find { ns: Namespace, filter: Option, - options: Option, + options: Option>, _phantom: PhantomData, } @@ -46,7 +46,7 @@ impl Find { Self { ns, filter, - options, + options: options.map(Box::new), _phantom: Default::default(), } } diff --git a/src/test/spec/retryable_reads.rs b/src/test/spec/retryable_reads.rs index 422bff840..9fa808382 100644 --- a/src/test/spec/retryable_reads.rs +++ b/src/test/spec/retryable_reads.rs @@ -1,7 +1,6 @@ use std::{sync::Arc, time::Duration}; use bson::doc; -use futures::FutureExt; use tokio::sync::RwLockWriteGuard; use crate::{ @@ -63,10 +62,7 @@ async fn retry_releases_connection() { let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap(); RUNTIME - .timeout( - Duration::from_secs(1), - collection.find_one(doc! {}, None).boxed(), - ) + .timeout(Duration::from_secs(1), collection.find_one(doc! {}, None)) .await .expect("operation should not time out") .expect("find should succeed");