Skip to content

Commit 54238fb

Browse files
authored
RUST-2140 Support the nsType field for change streams (#1316)
1 parent fbec185 commit 54238fb

File tree

5 files changed

+511
-241
lines changed

5 files changed

+511
-241
lines changed

src/change_stream/event.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ pub struct ChangeStreamEvent<T> {
7171
/// Identifies the collection or database on which the event occurred.
7272
pub ns: Option<ChangeNamespace>,
7373

74+
/// The type of the newly created object. Only included for `OperationType::Create`.
75+
pub ns_type: Option<ChangeNamespaceType>,
76+
7477
/// The new name for the `ns` collection. Only included for `OperationType::Rename`.
7578
pub to: Option<ChangeNamespace>,
7679

@@ -268,3 +271,18 @@ pub struct ChangeNamespace {
268271
/// The name of the collection in which the change occurred.
269272
pub coll: Option<String>,
270273
}
274+
275+
/// Identifies the type of object for a `create` event.
276+
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
277+
#[non_exhaustive]
278+
pub enum ChangeNamespaceType {
279+
/// A collection with no special options set.
280+
Collection,
281+
/// A timeseries collection.
282+
Timeseries,
283+
/// A view collection.
284+
View,
285+
/// Forward compatibility fallthrough.
286+
#[serde(untagged)]
287+
Other(String),
288+
}
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
# Change Streams
2+
3+
______________________________________________________________________
4+
5+
## Introduction
6+
7+
The YAML and JSON files in this directory are platform-independent tests that drivers can use to prove their conformance
8+
to the Change Streams Spec.
9+
10+
Several prose tests, which are not easily expressed in YAML, are also presented in this file. Those tests will need to
11+
be manually implemented by each driver.
12+
13+
### Subdirectories for Test Formats
14+
15+
This document describes the legacy format for change streams tests. Tests in this legacy format are located under
16+
`./legacy/`.
17+
18+
New change streams tests should be written in the
19+
[unified test format](../../unified-test-format/unified-test-format.md) and placed under `./unified/`.
20+
21+
## Spec Test Format
22+
23+
Each YAML file has the following keys:
24+
25+
- `database_name`: The default database
26+
- `collection_name`: The default collection
27+
- `database2_name`: Another database
28+
- `collection2_name`: Another collection
29+
- `tests`: An array of tests that are to be run independently of each other. Each test will have some of the following
30+
fields:
31+
- `description`: The name of the test.
32+
- `minServerVersion`: The minimum server version to run this test against. If not present, assume there is no minimum
33+
server version.
34+
- `maxServerVersion`: Reserved for later use
35+
- `failPoint`: Optional configureFailPoint command document to run to configure a fail point on the primary server.
36+
- `target`: The entity on which to run the change stream. Valid values are:
37+
- `collection`: Watch changes on collection `database_name.collection_name`
38+
- `database`: Watch changes on database `database_name`
39+
- `client`: Watch changes on entire clusters
40+
- `topology`: An array of server topologies against which to run the test. Valid topologies are `single`,
41+
`replicaset`, `sharded`, and `load-balanced`.
42+
- `changeStreamPipeline`: An array of additional aggregation pipeline stages to add to the change stream
43+
- `changeStreamOptions`: Additional options to add to the changeStream
44+
- `operations`: Array of documents, each describing an operation. Each document has the following fields:
45+
- `database`: Database against which to run the operation
46+
- `collection`: Collection against which to run the operation
47+
- `name`: Name of the command to run
48+
- `arguments` (optional): Object of arguments for the command (ex: document to insert)
49+
- `expectations`: Optional list of command-started events in Extended JSON format
50+
- `result`: Document with ONE of the following fields:
51+
- `error`: Describes an error received during the test
52+
- `success`: An Extended JSON array of documents expected to be received from the changeStream
53+
54+
## Spec Test Match Function
55+
56+
The definition of MATCH or MATCHES in the Spec Test Runner is as follows:
57+
58+
- MATCH takes two values, `expected` and `actual`
59+
- Notation is "Assert \[actual\] MATCHES \[expected\]
60+
- Assertion passes if `expected` is a subset of `actual`, with the value `42` acting as placeholders for "any value"
61+
62+
Pseudocode implementation of `actual` MATCHES `expected`:
63+
64+
```text
65+
If expected is "42" or 42:
66+
Assert that actual exists (is not null or undefined)
67+
Else:
68+
Assert that actual is of the same JSON type as expected
69+
If expected is a JSON array:
70+
For every idx/value in expected:
71+
Assert that actual[idx] MATCHES value
72+
Else if expected is a JSON object:
73+
For every key/value in expected
74+
Assert that actual[key] MATCHES value
75+
Else:
76+
Assert that expected equals actual
77+
```
78+
79+
The expected values for `result.success` and `expectations` are written in Extended JSON. Drivers may adopt any of the
80+
following approaches to comparisons, as long as they are consistent:
81+
82+
- Convert `actual` to Extended JSON and compare to `expected`
83+
- Convert `expected` and `actual` to BSON, and compare them
84+
- Convert `expected` and `actual` to native equivalents of JSON, and compare them
85+
86+
## Spec Test Runner
87+
88+
Before running the tests
89+
90+
- Create a MongoClient `globalClient`, and connect to the server. When executing tests against a sharded cluster,
91+
`globalClient` must only connect to one mongos. This is because tests that set failpoints will only work
92+
consistently if both the `configureFailPoint` and failing commands are sent to the same mongos.
93+
94+
For each YAML file, for each element in `tests`:
95+
96+
- If `topology` does not include the topology of the server instance(s), skip this test.
97+
- Use `globalClient` to
98+
- Drop the database `database_name`
99+
- Drop the database `database2_name`
100+
- Create the database `database_name` and the collection `database_name.collection_name`
101+
- Create the database `database2_name` and the collection `database2_name.collection2_name`
102+
- If the the `failPoint` field is present, configure the fail point on the primary server. See
103+
[Server Fail Point](../../transactions/tests/legacy-test-format.md#server-fail-point) in the Transactions spec
104+
test documentation for more information.
105+
- Create a new MongoClient `client`
106+
- Begin monitoring all APM events for `client`. (If the driver uses global listeners, filter out all events that do not
107+
originate with `client`). Filter out any "internal" commands (e.g. `hello` or legacy hello)
108+
- Using `client`, create a changeStream `changeStream` against the specified `target`. Use `changeStreamPipeline` and
109+
`changeStreamOptions` if they are non-empty. Capture any error.
110+
- If there was no error, use `globalClient` and run every operation in `operations` in serial against the server until
111+
all operations have been executed or an error is thrown. Capture any error.
112+
- If there was no error and `result.error` is set, iterate `changeStream` once and capture any error.
113+
- If there was no error and `result.success` is non-empty, iterate `changeStream` until it returns as many changes as
114+
there are elements in the `result.success` array or an error is thrown. Capture any error.
115+
- Close `changeStream`
116+
- If there was an error:
117+
- Assert that an error was expected for the test.
118+
- Assert that the error MATCHES `result.error`
119+
- Else:
120+
- Assert that no error was expected for the test
121+
- Assert that the changes received from `changeStream` MATCH the results in `result.success`
122+
- If there are any `expectations`
123+
- For each (`expected`, `idx`) in `expectations`
124+
- If `actual[idx]` is a `killCursors` event, skip it and move to `actual[idx+1]`.
125+
- Else assert that `actual[idx]` MATCHES `expected`
126+
- Note: the change stream test command event expectations cover a prefix subset of all command events published by the
127+
driver. The test runner MUST verify that, if there are N expectations, that the first N events published by the
128+
driver match the expectations, and MUST NOT inspect any subsequent events published by the driver.
129+
- Close the MongoClient `client`
130+
131+
After running all tests
132+
133+
- Close the MongoClient `globalClient`
134+
- Drop database `database_name`
135+
- Drop database `database2_name`
136+
137+
### Iterating the Change Stream
138+
139+
Although synchronous drivers must provide a
140+
[non-blocking mode of iteration](../change-streams.md#not-blocking-on-iteration), asynchronous drivers may not have such
141+
a mechanism. Those drivers with only a blocking mode of iteration should be careful not to iterate the change stream
142+
unnecessarily, as doing so could cause the test runner to block indefinitely. For this reason, the test runner procedure
143+
above advises drivers to take a conservative approach to iteration.
144+
145+
If the test expects an error and one was not thrown by either creating the change stream or executing the test's
146+
operations, iterating the change stream once allows for an error to be thrown by a `getMore` command. If the test does
147+
not expect any error, the change stream should be iterated only until it returns as many result documents as are
148+
expected by the test.
149+
150+
### Testing on Sharded Clusters
151+
152+
When writing data on sharded clusters, majority-committed data does not always show up in the response of the first
153+
`getMore` command after the data is written. This is because in sharded clusters, no data from shard A may be returned
154+
until all other shard reports an entry that sorts after the change in shard A.
155+
156+
To account for this, drivers MUST NOT rely on change stream documents in certain batches. For example, if expecting two
157+
documents in a change stream, these may not be part of the same `getMore` response, or even be produced in two
158+
subsequent `getMore` responses. Drivers MUST allow for a `getMore` to produce empty batches when testing on a sharded
159+
cluster. By default, this can take up to 10 seconds, but can be controlled by enabling the `writePeriodicNoops` server
160+
parameter and configuring the `periodNoopIntervalSecs` parameter. Choosing lower values allows for running change stream
161+
tests with smaller timeouts.
162+
163+
## Prose Tests
164+
165+
The following tests have not yet been automated, but MUST still be tested. All tests SHOULD be run on both replica sets
166+
and sharded clusters unless otherwise specified:
167+
168+
1. `ChangeStream` must continuously track the last seen `resumeToken`
169+
170+
2. `ChangeStream` will throw an exception if the server response is missing the resume token (if wire version is \< 8,
171+
this is a driver-side error; for 8+, this is a server-side error)
172+
173+
3. After receiving a `resumeToken`, `ChangeStream` will automatically resume one time on a resumable error with the
174+
initial pipeline and options, except for the addition/update of a `resumeToken`.
175+
176+
4. `ChangeStream` will not attempt to resume on any error encountered while executing an `aggregate` command. Note that
177+
retryable reads may retry `aggregate` commands. Drivers should be careful to distinguish retries from resume
178+
attempts. Alternatively, drivers may specify `retryReads=false` or avoid using a
179+
[retryable error](../../retryable-reads/retryable-reads.md#retryable-error) for this test.
180+
181+
5. **Removed**
182+
183+
6. `ChangeStream` will perform server selection before attempting to resume, using initial `readPreference`
184+
185+
7. Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed on
186+
the driver side.
187+
188+
8. The `killCursors` command sent during the "Resume Process" must not be allowed to throw an exception.
189+
190+
9. `$changeStream` stage for `ChangeStream` against a server `>=4.0` and `<4.0.7` that has not received any results yet
191+
MUST include a `startAtOperationTime` option when resuming a change stream.
192+
193+
10. **Removed**
194+
195+
11. For a `ChangeStream` under these conditions:
196+
197+
- Running against a server `>=4.0.7`.
198+
- The batch is empty or has been iterated to the last document.
199+
200+
Expected result:
201+
202+
- `getResumeToken` must return the `postBatchResumeToken` from the current command response.
203+
204+
12. For a `ChangeStream` under these conditions:
205+
206+
- Running against a server `<4.0.7`.
207+
- The batch is empty or has been iterated to the last document.
208+
209+
Expected result:
210+
211+
- `getResumeToken` must return the `_id` of the last document returned if one exists.
212+
- `getResumeToken` must return `resumeAfter` from the initial aggregate if the option was specified.
213+
- If `resumeAfter` was not specified, the `getResumeToken` result must be empty.
214+
215+
13. For a `ChangeStream` under these conditions:
216+
217+
- The batch is not empty.
218+
- The batch has been iterated up to but not including the last element.
219+
220+
Expected result:
221+
222+
- `getResumeToken` must return the `_id` of the previous document returned.
223+
224+
14. For a `ChangeStream` under these conditions:
225+
226+
- The batch is not empty.
227+
- The batch hasn’t been iterated at all.
228+
- Only the initial `aggregate` command has been executed.
229+
230+
Expected result:
231+
232+
- `getResumeToken` must return `startAfter` from the initial aggregate if the option was specified.
233+
- `getResumeToken` must return `resumeAfter` from the initial aggregate if the option was specified.
234+
- If neither the `startAfter` nor `resumeAfter` options were specified, the `getResumeToken` result must be empty.
235+
236+
Note that this test cannot be run against sharded topologies because in that case the initial `aggregate` command
237+
only establishes cursors on the shards and always returns an empty `firstBatch`.
238+
239+
15. **Removed**
240+
241+
16. **Removed**
242+
243+
17. `$changeStream` stage for `ChangeStream` started with `startAfter` against a server `>=4.1.1` that has not received
244+
any results yet MUST include a `startAfter` option and MUST NOT include a `resumeAfter` option when resuming a
245+
change stream.
246+
247+
18. `$changeStream` stage for `ChangeStream` started with `startAfter` against a server `>=4.1.1` that has received at
248+
least one result MUST include a `resumeAfter` option and MUST NOT include a `startAfter` option when resuming a
249+
change stream.
250+
251+
19. Validate that large `ChangeStream` events are split when using `$changeStreamSplitLargeEvent`:
252+
253+
1. Run only against servers `>=6.0.9 && <6.1` or `>=7.0`.
254+
2. Create a new collection `_[C]()` with `changeStreamPreAndPostImages` enabled.
255+
3. Insert into `_[C]()` a document at least 10mb in size, e.g. `{ "value": "q"*10*1024*1024 }`
256+
4. Create a change stream `_[S]()` by calling `watch` on `_[C]()` with pipeline
257+
`[{ "$changeStreamSplitLargeEvent": {} }]` and `fullDocumentBeforeChange=required`.
258+
5. Call `updateOne` on `_[C]()` with an empty `query` and an update setting the field to a new large value, e.g.
259+
`{ "$set": { "value": "z"*10*1024*1024 } }`.
260+
6. Collect two events from `_[S]()`.
261+
7. Assert that the events collected have `splitEvent` fields `{ "fragment": 1, "of": 2 }` and
262+
`{ "fragment": 2, "of": 2 }`, in that order.

0 commit comments

Comments
 (0)