Skip to content

Commit c9bfade

Browse files
feat: add typed cursor
This will convert the Document to the Collection type and return the oid as a tuple.
1 parent 7d92fdb commit c9bfade

File tree

9 files changed

+63
-56
lines changed

9 files changed

+63
-56
lines changed

mongod/src/async/client.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use mongodb::options::{
1414
};
1515
use url::Url;
1616

17+
use super::TypedCursor;
1718
use crate::collection::Collection;
1819
use crate::filter::{AsFilter, Filter};
1920
use crate::query;
@@ -533,7 +534,7 @@ impl Client {
533534
/// # Errors
534535
///
535536
/// This method fails if the mongodb encountered an error.
536-
pub async fn find<C, F>(&self, filter: Option<F>) -> crate::Result<mongodb::Cursor<Document>>
537+
pub async fn find<C, F>(&self, filter: Option<F>) -> crate::Result<TypedCursor<C>>
537538
where
538539
C: AsFilter<F> + Collection,
539540
F: Filter,
@@ -553,7 +554,7 @@ impl Client {
553554
/// # Errors
554555
///
555556
/// This method fails if the mongodb encountered an error, or if the found document is invalid.
556-
pub async fn find_one<C, F>(&self, filter: F) -> crate::Result<Option<C>>
557+
pub async fn find_one<C, F>(&self, filter: F) -> crate::Result<Option<(ObjectId, C)>>
557558
where
558559
C: AsFilter<F> + Collection,
559560
F: Filter,
@@ -562,9 +563,7 @@ impl Client {
562563
let find: query::Find<C> = query::Find::new();
563564
let mut cursor = find.filter(filter)?.query(&self).await?;
564565
if let Some(res) = cursor.next().await {
565-
let doc = res.map_err(crate::error::mongodb)?;
566-
let doc = C::from_document(doc)?;
567-
return Ok(Some(doc));
566+
return Ok(Some(res?));
568567
}
569568
Ok(None)
570569
}

mongod/src/async/cursor.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,37 @@ use std::marker::PhantomData;
22
use std::pin::Pin;
33
use std::task::{Context, Poll};
44

5-
use bson::Document;
5+
use bson::{oid::ObjectId, Document};
66
use futures::Stream;
7-
use mongodb::Cursor;
87

98
use crate::collection::Collection;
109

11-
/// A typed blocking cursor.
10+
/// A typed cursor.
1211
///
13-
/// This wraps the blocking `Cursor` so that is can be automatically return typed documents.
12+
/// This wraps the `Cursor` so that it can be automatically return typed documents.
1413
pub struct TypedCursor<T>
1514
where
1615
T: Collection,
1716
{
18-
cursor: Cursor<Document>,
17+
cursor: mongodb::Cursor<Document>,
1918
document_type: PhantomData<T>,
2019
}
2120

22-
impl<T> From<Cursor<Document>> for TypedCursor<T>
21+
impl<T> TypedCursor<T>
2322
where
2423
T: Collection,
2524
{
26-
fn from(cursor: Cursor<Document>) -> Self {
25+
/// Allow access to the wrapped [`mongodb::Cursor`](https://docs.rs/mongodb/2.0.0/mongodb/struct.Cursor.html).
26+
pub fn into_inner(self) -> mongodb::Cursor<Document> {
27+
self.cursor
28+
}
29+
}
30+
31+
impl<T> From<mongodb::Cursor<Document>> for TypedCursor<T>
32+
where
33+
T: Collection,
34+
{
35+
fn from(cursor: mongodb::Cursor<Document>) -> Self {
2736
TypedCursor {
2837
cursor: cursor,
2938
document_type: PhantomData,
@@ -35,15 +44,15 @@ impl<T> Stream for TypedCursor<T>
3544
where
3645
T: Collection,
3746
{
38-
type Item = crate::Result<T>;
47+
type Item = crate::Result<(ObjectId, T)>;
3948

4049
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4150
let next = Pin::new(&mut self.cursor).poll_next(cx);
4251
match next {
4352
Poll::Ready(opt) => Poll::Ready(opt.map(|result| {
44-
result
45-
.map_err(crate::error::mongodb)
46-
.and_then(|doc| T::from_document(doc))
53+
let doc = result.map_err(crate::error::mongodb)?;
54+
let oid = doc.get_object_id("_id").map_err(crate::error::bson)?;
55+
Ok((oid, T::from_document(doc)?))
4756
})),
4857
Poll::Pending => Poll::Pending,
4958
}

mongod/src/blocking/client.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use mongodb::options::{
1010
};
1111
use mongodb::results::{DeleteResult, InsertManyResult, UpdateResult};
1212

13-
use super::cursor::Cursor;
13+
use super::cursor::{Cursor, TypedCursor};
1414
use crate::collection::Collection;
1515
use crate::filter::{AsFilter, Filter};
1616
use crate::query;
@@ -255,7 +255,7 @@ impl Client {
255255
/// # Errors
256256
///
257257
/// This method fails if the mongodb encountered an error.
258-
pub fn find<C, F>(&self, filter: Option<F>) -> crate::Result<Cursor>
258+
pub fn find<C, F>(&self, filter: Option<F>) -> crate::Result<TypedCursor<C>>
259259
where
260260
C: AsFilter<F> + Collection,
261261
F: Filter,
@@ -275,7 +275,7 @@ impl Client {
275275
/// # Errors
276276
///
277277
/// This method fails if the mongodb encountered an error, or if the found document is invalid.
278-
pub fn find_one<C, F>(&self, filter: F) -> crate::Result<Option<C>>
278+
pub fn find_one<C, F>(&self, filter: F) -> crate::Result<Option<(ObjectId, C)>>
279279
where
280280
C: AsFilter<F> + Collection,
281281
F: Filter,
@@ -284,9 +284,7 @@ impl Client {
284284
let find: query::Find<C> = query::Find::new();
285285
let mut cursor = find.filter(filter)?.blocking(&self)?;
286286
if let Some(res) = cursor.next() {
287-
let document = res.map_err(crate::error::mongodb)?;
288-
let document = C::from_document(document)?;
289-
return Ok(Some(document));
287+
return Ok(Some(res?));
290288
}
291289
Ok(None)
292290
}

mongod/src/blocking/cursor.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::marker::PhantomData;
22

3-
use bson::Document;
3+
use bson::{oid::ObjectId, Document};
44
use futures::stream::StreamExt;
55

66
use crate::collection::Collection;
@@ -13,7 +13,7 @@ enum Response {
1313
Next(Option<crate::Result<Document>>),
1414
}
1515

16-
/// A blocking version of the [`mongodb::Cursor`](https://docs.rs/mongodb/1.1.1/mongodb/struct.Cursor.html).
16+
/// A blocking version of the [`mongodb::Cursor`](https://docs.rs/mongodb/2.0.0/mongodb/struct.Cursor.html).
1717
///
1818
/// This wraps the async `Cursor` so that is can be called in a synchronous fashion, please see the
1919
/// asynchronous description for more information about the cursor.
@@ -66,9 +66,18 @@ pub struct TypedCursor<T>
6666
where
6767
T: Collection,
6868
{
69+
cursor: Cursor,
6970
document_type: PhantomData<T>,
71+
}
7072

71-
tx: tokio::sync::mpsc::UnboundedSender<(Request, std::sync::mpsc::Sender<Response>)>,
73+
impl<T> TypedCursor<T>
74+
where
75+
T: Collection,
76+
{
77+
/// Allow access to the wrapped blocking `Cursor`
78+
pub fn into_inner(self) -> Cursor {
79+
self.cursor
80+
}
7281
}
7382

7483
impl<T> From<Cursor> for TypedCursor<T>
@@ -77,8 +86,8 @@ where
7786
{
7887
fn from(cursor: Cursor) -> Self {
7988
TypedCursor {
89+
cursor,
8090
document_type: PhantomData,
81-
tx: cursor.tx,
8291
}
8392
}
8493
}
@@ -87,21 +96,14 @@ impl<T> Iterator for TypedCursor<T>
8796
where
8897
T: Collection,
8998
{
90-
type Item = crate::Result<T>;
99+
type Item = crate::Result<(ObjectId, T)>;
91100
fn next(&mut self) -> Option<Self::Item> {
92-
let (tx, rx) = std::sync::mpsc::channel();
93-
self.tx
94-
.send((Request::Next, tx))
95-
.expect("core thread panicked");
96-
let res = rx
97-
.recv()
98-
.expect("could not get response from mongo runtime");
99-
let Response::Next(c) = res;
100-
let resp = match c {
101-
Some(Ok(b)) => Some(T::from_document(b)),
102-
Some(Err(e)) => Some(Err(crate::error::mongodb(e))),
103-
None => None,
104-
};
105-
resp
101+
let next = self.cursor.next();
102+
103+
next.map(|res| {
104+
let doc = res?;
105+
let oid = doc.get_object_id("_id").map_err(crate::error::bson)?;
106+
Ok((oid, T::from_document(doc)?))
107+
})
106108
}
107109
}

mongod/src/blocking/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,4 @@ mod cursor;
3838

3939
pub use self::client::{Client, ClientBuilder};
4040
pub(crate) use self::client::{Request, Response};
41-
pub use self::cursor::Cursor;
41+
pub use self::cursor::{Cursor, TypedCursor};

mongod/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@
8989
//!
9090
//! let mut cursor = client.find::<User, _>(None).await.unwrap();
9191
//! while let Some(res) = cursor.next().await {
92-
//! if let Ok(user) = res {
93-
//! println!("{:?}", user);
92+
//! if let Ok((id, user)) = res {
93+
//! println!("{} - {:?}", id, user);
9494
//! }
9595
//! }
9696
//! # Ok(())

mongod/src/query/find.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ use std::time::Duration;
33

44
use mongodb::bson::Document;
55
use mongodb::options::{Collation, CursorType, FindOptions, Hint, ReadConcern, SelectionCriteria};
6-
use mongodb::Cursor;
76

87
use crate::collection::Collection;
98
use crate::field::{AsField, Field};
109
use crate::filter::{AsFilter, Filter};
11-
use crate::r#async::Client;
10+
use crate::r#async::{Client, TypedCursor};
1211
use crate::sort::Sort;
1312

1413
/// A querier to find documents in a MongoDB collection.
@@ -38,8 +37,8 @@ use crate::sort::Sort;
3837
/// .await
3938
/// .unwrap();
4039
/// while let Some(res) = cursor.next().await {
41-
/// if let Ok(user) = res {
42-
/// println!("{:?}", user);
40+
/// if let Ok((id, user)) = res {
41+
/// println!("{} - {:?}", id, user);
4342
/// }
4443
/// }
4544
/// # Ok(())
@@ -253,12 +252,13 @@ impl<C: Collection> Find<C> {
253252
/// # Errors
254253
///
255254
/// This method fails if the mongodb encountered an error.
256-
pub async fn query(self, client: &Client) -> crate::Result<Cursor<Document>> {
255+
pub async fn query(self, client: &Client) -> crate::Result<TypedCursor<C>> {
257256
client
258257
.database()
259258
.collection::<Document>(C::COLLECTION)
260259
.find(self.filter, self.options)
261260
.await
261+
.map(|cur| TypedCursor::from(cur))
262262
.map_err(crate::error::mongodb)
263263
}
264264

@@ -275,14 +275,14 @@ impl<C: Collection> Find<C> {
275275
pub fn blocking(
276276
self,
277277
client: &crate::blocking::Client,
278-
) -> crate::Result<crate::blocking::Cursor> {
278+
) -> crate::Result<crate::blocking::TypedCursor<C>> {
279279
let resp = client.execute(crate::blocking::Request::Find(
280280
C::COLLECTION,
281281
self.filter,
282282
self.options,
283283
))?;
284284
if let crate::blocking::Response::Find(r) = resp {
285-
return Ok(r);
285+
return Ok(crate::blocking::TypedCursor::from(r));
286286
}
287287
Err(crate::error::runtime(
288288
"incorrect response from blocking client",

mongod/tests/async.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use futures::stream::StreamExt;
2-
use mongod::{AsFilter, AsUpdate, Comparator, TypedCursor, Updates};
2+
use mongod::{AsFilter, AsUpdate, Comparator, Updates};
33

44
use user::User;
55

@@ -25,10 +25,9 @@ async fn async_client() {
2525

2626
// Fetch
2727
let mut count: u32 = 0;
28-
let cursor = client.find::<User, _>(None).await.unwrap();
29-
let mut cursor: TypedCursor<User> = TypedCursor::from(cursor);
28+
let mut cursor = client.find::<User, _>(None).await.unwrap();
3029
while let Some(res) = cursor.next().await {
31-
let _user = res.unwrap();
30+
let (_id, _user) = res.unwrap();
3231
count += 1;
3332
}
3433
assert_eq!(count, 2);

mongod/tests/blocking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ mod wrapper {
3030
let mut count: u32 = 0;
3131
let mut cursor = client.find::<User, _>(None).unwrap();
3232
while let Some(res) = cursor.next() {
33-
let _user = res.unwrap();
33+
let (_id, _user) = res.unwrap();
3434
count += 1;
3535
}
3636
assert_eq!(count, 2);

0 commit comments

Comments
 (0)