Skip to content

Commit 4cb1ff9

Browse files
authored
SWIFT-1395 Update driver docs for async/await (#728)
1 parent dbc6ef6 commit 4cb1ff9

File tree

6 files changed

+235
-154
lines changed

6 files changed

+235
-154
lines changed

Guides/Change-Streams-Guide.md

Lines changed: 87 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,164 +1,152 @@
11
# Using Change Streams
22

3-
MongoSwift 0.2.0 added support for [change streams](https://docs.mongodb.com/manual/changeStreams/), which allow applications to access real-time data changes. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
3+
The driver supports [change streams](https://docs.mongodb.com/manual/changeStreams/), which allow applications to access real-time data changes. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
44

55
**Note**: Change streams only work with MongoDB replica sets and sharded clusters.
66

77
## Examples
8+
These examples use the driver's async/await APIs; for examples using `EventLoopFuture`s please see the [previous version of this guide](https://github.com/mongodb/mongo-swift-driver/blob/79c9683d56f92540f4065f40b9f55e1911a1ff5b/Guides/Change-Streams-Guide.md).
89

9-
### Open a Change Stream on a `MongoCollection<Document>` (MongoDB 3.6+)
10-
```swift
11-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
12-
let client = try MongoClient(using: elg)
13-
let inventory = client.db("example").collection("inventory")
10+
### Open a Change Stream on a `MongoCollection` (MongoDB 3.6+)
1411

15-
inventory.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
16-
stream.forEach { event in
17-
// process `ChangeStreamEvent<BSONDocument>` here
18-
}
19-
}.whenFailure { error in
20-
// handle error
21-
}
12+
We recommend to open and interact with change streams in their own `Task`s, and to terminate change streams by canceling their corresponding `Task`s.
13+
In the following example, change stream events will be processed asynchronously as they arrive on `changeStreamTask` until the `Task` is canceled.
14+
`ChangeStream` conforms to Swift's [`AsyncSequence` protocol](https://developer.apple.com/documentation/swift/asyncsequence) and so can be iterated
15+
over using a for-in loop.
2216

23-
// perform some operations using `inventory`...
24-
```
25-
26-
### Open a Change Stream on a `MongoCollection<MyCodableType>` (MongoDB 3.6+)
2717
```swift
28-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
29-
let client = try MongoClient(using: elg)
30-
let inventory = client.db("example").collection("inventory", withType: MyCodableType.self)
18+
struct Item: Codable {
19+
let _id: BSONObjectID
20+
let name: String
21+
let cost: Int
22+
let count: Int
23+
}
3124

32-
inventory.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
33-
stream.forEach { event in
34-
// process `ChangeStreamEvent<MyCodableType>` here
25+
let inventory = client.db("example").collection("inventory", withType: Item.self)
26+
let changeStreamTask = Task {
27+
for try await event in try await inventory.watch() {
28+
// process `ChangeStream<ChangeStreamEvent<Item>>`
3529
}
36-
}.whenFailure { error in
37-
// handle error
3830
}
3931

40-
// perform some operations using `inventory`...
32+
// later...
33+
changeStreamTask.cancel()
4134
```
4235

43-
### Use a Custom `Codable` Type for the `fullDocument` Property of Returned `ChangeStreamEvent`s
36+
If you provide a pipeline to `watch` which transforms the shape of the returned documents, you will need to specify a type to use for the
37+
`ChangeStreamEvent.fullDocument` property. You can do this as follows when calling `watch`:
38+
4439
```swift
45-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
46-
let client = try MongoClient(using: elg)
47-
let inventory = client.db("example").collection("inventory")
40+
struct ItemCount: Codable {
41+
let _id: BSONObjectID
42+
let count: Int
43+
}
4844

49-
inventory.watch(withFullDocumentType: MyCodableType.self).flatMap { stream in // a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
50-
stream.forEach { event in
51-
// process `ChangeStreamEvent<MyCodableType>` here
45+
let changeStreamTask = Task {
46+
let pipeline: [BSONDocument] = [["$unset": ["fullDocument.name", "fullDocument.cost"]]]
47+
for try await event in try await inventory.watch(pipeline, withFullDocumentType: ItemCount.self) {
48+
// process `ChangeStream<ChangeStreamEvent<ItemCount>>`
5249
}
53-
}.whenFailure { error in
54-
// handle error
5550
}
5651

57-
// perform some operations using `inventory`...
52+
// later...
53+
changeStreamTask.cancel()
5854
```
5955

60-
### Use a Custom `Codable` Type for the Return type of `ChangeStream.next()`
61-
```swift
62-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
63-
let client = try MongoClient(using: elg)
64-
let inventory = client.db("example").collection("inventory")
56+
You can also provide a type to use in place of `ChangeStreamEvent` altogether:
6557

66-
inventory.watch(withEventType: MyCodableType.self).flatMap { stream in // a `ChangeStream<MyCodableType>`
67-
stream.forEach { event in
68-
// process `MyCodableType` here
58+
```swift
59+
let changeStreamTask = Task {
60+
for try await event in try await inventory.watch(withEventType: InventoryEvent.self) {
61+
// process `ChangeStream<ChangeStreamEvent<InventoryEvent>>`
6962
}
70-
}.whenFailure { error in
71-
// handle error
7263
}
7364

74-
// perform some operations using `inventory`...
65+
// later...
66+
changeStreamTask.cancel()
7567
```
7668

7769
### Open a Change Stream on a `MongoDatabase` (MongoDB 4.0+)
70+
You can also open a change stream on an entire database, which will observe events on all collections in the database:
71+
7872
```swift
79-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
80-
let client = try MongoClient(using: elg)
8173
let db = client.db("example")
8274

83-
db.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
84-
stream.forEach { event in
85-
// process `ChangeStreamEvent<BSONDocument>` here
75+
let changeStreamTask = Task {
76+
for try await event in try await db.watch() {
77+
// process `ChangeStream<ChangeStreamEvent<BSONDocument>>`
8678
}
87-
}.whenFailure { error in
88-
// handle error
8979
}
9080

91-
// perform some operations using `db`...
81+
// later...
82+
changeStreamTask.cancel()
9283
```
9384

94-
Note: the types of the `fullDocument` property, as well as the return type of `ChangeStream.next()`, may be customized in the same fashion as the examples using `MongoCollection` above.
85+
Note: the type of the `ChangeStreamEvent.fullDocument` property, as well as the return type of `ChangeStream.next()`, may be customized in the same fashion as the examples using `MongoCollection` above by passing in `fullDocumentType` or `eventType` to `watch()`.
9586

9687
### Open a Change Stream on a `MongoClient` (MongoDB 4.0+)
97-
```swift
98-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
99-
let client = try MongoClient(using: elg)
88+
You can also open a change stream on an entire cluster, which will observe events on all databases and collections:
10089

101-
client.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
102-
stream.forEach { event in
103-
// process `ChangeStreamEvent<BSONDocument>` here
90+
```swift
91+
let changeStreamTask = Task {
92+
for try await event in try await client.watch() {
93+
// process `ChangeStream<ChangeStreamEvent<BSONDocument>>`
10494
}
105-
}.whenFailure { error in
106-
// handle error
10795
}
10896

109-
// perform some operations using `client`...
97+
// later...
98+
changeStreamTask.cancel()
11099
```
111100

112-
Note: the types of the `fullDocument` property, as well as the return type of `ChangeStream.next()`, may be customized in the same fashion as the examples using `MongoCollection` above.
101+
Note: the type of the `ChangeStreamEvent.fullDocument` property, as well as the return type of `ChangeStream.next()`, may be customized in the same fashion as the examples using `MongoCollection` above by passing in `fullDocumentType` or `eventType` to `watch()`.
113102

114103
### Resume a Change Stream
104+
Change streams can be resumed from particular points in time using resume tokens. For example:
105+
115106
```swift
116-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
117-
let client = try MongoClient(using: elg)
118107
let inventory = client.db("example").collection("inventory")
119108

120-
inventory.watch().flatMap { stream -> EventLoopFuture<ChangeStream<ChangeStreamEvent<BSONDocument>>> in
109+
let changeStreamTask1 = Task { () -> ResumeToken? in
110+
let changeStream = try await inventory.watch()
121111
// read the first change event
122-
stream.next().flatMap { _ in
123-
// simulate an error by killing the stream
124-
stream.kill()
125-
}.flatMap { _ in
126-
// create a new change stream that starts after the first change event
127-
let resumeToken = stream.resumeToken
128-
return inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
129-
}
130-
}.flatMap { resumedStream in
131-
resumedStream.forEach { event in
132-
// process `ChangeStreamEvent<BSONDocument>` here
112+
_ = try await changeStream.next()
113+
// resume token to resume stream after the first event
114+
return changeStream.resumeToken
115+
}
116+
117+
// Get resume token from the first task and change stream.
118+
guard let resumeToken = try await changeStreamTask1.value else {
119+
fatalError("Unexpectedly missing resume token after processing event")
120+
}
121+
122+
let changeStreamTask2 = Task {
123+
let changeStream = try await inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
124+
for try await event in changeStream {
125+
// process ChangeStreamEvent
133126
}
134-
}.whenFailure { error in
135-
// handle error
136127
}
137128

138-
// perform some operations using `inventory`...
129+
// later...
130+
changeStreamTask2.cancel()
139131
```
140132

141133
### Modify Change Stream Output
142134
```swift
143-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
144-
let client = try MongoClient(using: elg)
145-
let inventory = client.db("example").collection("inventory")
146-
147-
// Only include events where the changed document's username = "alice"
148-
let pipeline: [BSONDocument] = [
149-
["$match": ["fullDocument.username": "alice"]]
150-
]
151-
152-
inventory.watch(pipeline).flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
153-
stream.forEach { event in
154-
// process `ChangeStreamEvent<BSONDocument>` here
135+
let inventory = client.db("example").collection("inventory", withType: Item.self)
136+
137+
let changeStreamTask = Task {
138+
// Only include events where the changed document's count = 0
139+
let pipeline: [BSONDocument] = [
140+
["$match": ["fullDocument.count": 0]]
141+
]
142+
for try await event in try await inventory.watch(pipeline) {
143+
// process `ChangeStream<ChangeStreamEvent<Item>>`
155144
}
156-
}.whenFailure { error in
157-
// handle error
158145
}
159146

160-
// perform some operations using `inventory`...
147+
// later...
148+
changeStreamTask.cancel()
161149
```
162150

163151
## See Also
164-
- [MongoDB Change Streams documentation](https://docs.mongodb.com/manual/changeStreams/)
152+
- [MongoDB Change Streams documentation](https://docs.mongodb.com/manual/changeStreams/)

Guides/Codable-Guide.md

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
11
# `Codable` Usage in MongoSwift and MongoSwiftSync
22
There are a number of ways for users to leverage `Codable` via the driver's API. One such example is through `MongoCollection<T>`. By default, `MongoDatabase.collection` returns a `MongoCollection<BSONDocument>`. Any `find` or `aggregate` method invocation on that returned collection would then return a `MongoCursor<BSONDocument>`, which when iterated returns a `BSONDocument?`:
3+
4+
**Async/Await (recommended)**:
5+
```swift
6+
let collection = db.collection("person")
7+
8+
for try await person in try await collection.find(["occupation": "Software Engineer"]) {
9+
print(person["name"] ?? "nil")
10+
}
11+
12+
try await collection.insertOne(["name": "New Hire", "occupation": "Doctor", "projects": []])
13+
```
14+
15+
**Async (`EventLoopFuture`s)**:
316
```swift
417
let collection = db.collection("person")
518

6-
// asynchronous API
719
collection.find(["occupation": "Software Engineer"]).flatMap { cursor in
820
cursor.toArray()
921
}.map { docs in
@@ -12,14 +24,21 @@ collection.find(["occupation": "Software Engineer"]).flatMap { cursor in
1224
}
1325
}
1426
collection.insertOne(["name": "New Hire", "occupation": "Doctor", "projects": []]).whenSuccess { _ in /* ... */ }
27+
```
28+
29+
**Sync**
30+
```swift
31+
let collection = db.collection("person")
1532

16-
// synchronous API
1733
for person in try collection.find(["occupation": "Software Engineer"]) {
1834
print(try person.get()["name"] ?? "nil")
1935
}
2036
try collection.insertOne(["name": "New Hire", "occupation": "Doctor", "projects": []])
2137
```
38+
2239
However, if the schema of the collection is known, `Codable` structs can be used to work with the data in a more type safe way. To facilitate this, the alternate `collection(name:asType)` method on `MongoDatabase`, which accepts a `Codable` generic type, can be used. The provided type defines the model for all the documents in that collection, and any cursor returned from `find` or `aggregate` on that collection will be generic over that type instead of `BSONDocument`. Iterating such cursors will automatically decode the result documents to the generic type specified. Similarly, `insert` on that collection will accept an instance of that type.
40+
41+
First, define custom types matching your collection schema:
2342
```swift
2443
struct Project: Codable {
2544
let id: BSON
@@ -33,8 +52,21 @@ struct Person: Codable {
3352
}
3453

3554
let collection = db.collection("person", withType: Person.self)
55+
```
56+
57+
Then, use your custom types along with the driver APIs:
58+
59+
**Async/Await (recommended)**:
60+
```swift
61+
for try await person in try await collection.find(["occupation": "Software Engineer"]) {
62+
print(person.name)
63+
}
64+
65+
try await collection.insertOne(Person(name: "New Hire", occupation: "Doctor", projects: []))
66+
```
3667

37-
// asynchronous API
68+
**Async (`EventLoopFuture`s)**:
69+
```swift
3870
collection.find(["occupation": "Software Engineer"]).flatMap { cursor in
3971
cursor.toArray()
4072
}.map { docs in
@@ -43,8 +75,10 @@ collection.find(["occupation": "Software Engineer"]).flatMap { cursor in
4375
}
4476
}
4577
collection.insertOne(Person(name: "New Hire", occupation: "Doctor", projects: [])).whenSuccess { _ in /* ... */ }
78+
```
4679

47-
// synchronous API
80+
**Sync**
81+
```swift
4882
for person in try collection.find(["occupation": "Software Engineer"]) {
4983
print(try person.get().name)
5084
}

Guides/Error-Handling-Guide.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ do {
113113
### Handling a CommandError
114114
```swift
115115
do {
116-
try db.runCommand(["asdfasdf": "sadfsadfasdf"])
116+
try await db.runCommand(["asdfasdf": "sadfsadfasdf"])
117117
} catch let commandError as MongoError.CommandError {
118118
print("Command failed: code: \(commandError.code) message: \(commandError.message)")
119119
} catch { ... }
@@ -127,8 +127,8 @@ Command failed: code: 59 message: no such command: 'asdfasdf'
127127
```swift
128128
// if you want to ignore duplicate key errors
129129
do {
130-
try coll.insertOne(["_id": 1])
131-
try coll.insertOne(["_id": 1])
130+
try await coll.insertOne(["_id": 1])
131+
try await coll.insertOne(["_id": 1])
132132
} catch let writeError as MongoError.WriteError where writeError.writeFailure?.code == 11000 {
133133
print("duplicate key error: \(1) \(writeError.writeFailure?.message ?? "")")
134134
}
@@ -140,10 +140,10 @@ duplicate key error: 1 E11000 duplicate key error collection: mydb.mycoll1 index
140140

141141
### Handling a BulkWriteError
142142
```swift
143-
let docs: [Document] = [["_id": 2], ["_id": 1]]
143+
let docs: [BSONDocument] = [["_id": 2], ["_id": 1]]
144144
do {
145-
try coll.insertOne(["_id": 1])
146-
try coll.insertMany(docs)
145+
try await coll.insertOne(["_id": 1])
146+
try await coll.insertMany(docs)
147147
} catch let bwe as MongoError.BulkWriteError {
148148
if let writeErrors = bwe.writeFailures {
149149
writeErrors.forEach { err in print("Write Error inserting \(docs[err.index]), code: \(err.code), message: \(err.message)") }

0 commit comments

Comments
 (0)