@@ -209,74 +209,78 @@ describe('Change Streams', function () {
209
209
}
210
210
} ) ;
211
211
212
- it ( 'should support creating multiple simultaneous ChangeStreams' , {
213
- metadata : { requires : { topology : 'replicaset' } } ,
212
+ describe ( 'when creating multiple simultaneous ChangeStreams' , ( ) => {
213
+ let client ;
214
+ let changeStream1 ;
215
+ let changeStream2 ;
216
+ let changeStream3 ;
214
217
215
- test : function ( done ) {
216
- const configuration = this . configuration ;
217
- const client = configuration . newClient ( ) ;
218
+ beforeEach ( async function ( ) {
219
+ client = this . configuration . newClient ( ) ;
220
+ } ) ;
218
221
219
- client . connect ( ( err , client ) => {
220
- expect ( err ) . to . not . exist ;
221
- this . defer ( ( ) => client . close ( ) ) ;
222
+ afterEach ( async function ( ) {
223
+ await changeStream1 ?. close ( ) ;
224
+ await changeStream2 ?. close ( ) ;
225
+ await changeStream3 ?. close ( ) ;
226
+ await client ?. close ( ) ;
227
+ } ) ;
222
228
229
+ it (
230
+ 'supports simultaneous parallel ChangeStream use' ,
231
+ { requires : { topology : '!single' } } ,
232
+ async function ( ) {
223
233
const database = client . db ( 'integration_tests' ) ;
224
234
const collection1 = database . collection ( 'simultaneous1' ) ;
225
235
const collection2 = database . collection ( 'simultaneous2' ) ;
226
236
227
- const changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
228
- this . defer ( ( ) => changeStream1 . close ( ) ) ;
229
- const changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
230
- this . defer ( ( ) => changeStream2 . close ( ) ) ;
231
- const changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
232
- this . defer ( ( ) => changeStream3 . close ( ) ) ;
237
+ changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
238
+ changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
239
+ changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
233
240
234
241
setTimeout ( ( ) => {
235
- this . defer (
236
- collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) )
237
- ) ;
242
+ collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) ) ;
238
243
} , 50 ) ;
239
244
240
- Promise . resolve ( )
241
- . then ( ( ) =>
242
- Promise . all ( [ changeStream1 . hasNext ( ) , changeStream2 . hasNext ( ) , changeStream3 . hasNext ( ) ] )
243
- )
244
- . then ( function ( hasNexts ) {
245
- // Check all the Change Streams have a next item
246
- assert . ok ( hasNexts [ 0 ] ) ;
247
- assert . ok ( hasNexts [ 1 ] ) ;
248
- assert . ok ( hasNexts [ 2 ] ) ;
249
-
250
- return Promise . all ( [ changeStream1 . next ( ) , changeStream2 . next ( ) , changeStream3 . next ( ) ] ) ;
251
- } )
252
- . then ( function ( changes ) {
253
- // Check the values of the change documents are correct
254
- assert . equal ( changes [ 0 ] . operationType , 'insert' ) ;
255
- assert . equal ( changes [ 1 ] . operationType , 'insert' ) ;
256
- assert . equal ( changes [ 2 ] . operationType , 'insert' ) ;
257
-
258
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
259
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
260
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
261
-
262
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
263
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
264
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
265
-
266
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous1' ) ;
267
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
268
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
269
-
270
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
271
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber' , 2 ) ;
272
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber' , 3 ) ;
273
- } )
274
- . then (
275
- ( ) => done ( ) ,
276
- err => done ( err )
277
- ) ;
278
- } ) ;
279
- }
245
+ const hasNexts = await Promise . all ( [
246
+ changeStream1 . hasNext ( ) ,
247
+ changeStream2 . hasNext ( ) ,
248
+ changeStream3 . hasNext ( )
249
+ ] ) ;
250
+
251
+ // Check all the Change Streams have a next item
252
+ expect ( hasNexts [ 0 ] ) . to . be . true ;
253
+ expect ( hasNexts [ 1 ] ) . to . be . true ;
254
+ expect ( hasNexts [ 2 ] ) . to . be . true ;
255
+
256
+ const changes = await Promise . all ( [
257
+ changeStream1 . next ( ) ,
258
+ changeStream2 . next ( ) ,
259
+ changeStream3 . next ( )
260
+ ] ) ;
261
+
262
+ // Check the values of the change documents are correct
263
+ expect ( changes [ 0 ] . operationType ) . to . be . equal ( 'insert' ) ;
264
+ expect ( changes [ 1 ] . operationType ) . to . be . equal ( 'insert' ) ;
265
+ expect ( changes [ 2 ] . operationType ) . to . be . equal ( 'insert' ) ;
266
+
267
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
268
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
269
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
270
+
271
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
272
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
273
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
274
+
275
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous1' ) ;
276
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
277
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
278
+
279
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
280
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber' , 2 ) ;
281
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber' , 3 ) ;
282
+ }
283
+ ) ;
280
284
} ) ;
281
285
282
286
it ( 'should properly close ChangeStream cursor' , {
@@ -806,23 +810,28 @@ describe('Change Streams', function () {
806
810
} ) ;
807
811
808
812
it ( 'when invoked with promises' , {
809
- metadata : { requires : { topology : 'replicaset' } } ,
810
- test : function ( ) {
811
- const read = ( ) => {
812
- return Promise . resolve ( )
813
- . then ( ( ) => changeStream . next ( ) )
814
- . then ( ( ) => changeStream . next ( ) )
815
- . then ( ( ) => {
816
- this . defer ( lastWrite ( ) ) ;
817
- const nextP = changeStream . next ( ) ;
818
- return changeStream . close ( ) . then ( ( ) => nextP ) ;
819
- } ) ;
813
+ metadata : { requires : { topology : '!single' } } ,
814
+ test : async function ( ) {
815
+ const read = async ( ) => {
816
+ await changeStream . next ( ) ;
817
+ await changeStream . next ( ) ;
818
+
819
+ const write = lastWrite ( ) ;
820
+
821
+ const nextP = changeStream . next ( ) ;
822
+
823
+ await changeStream . close ( ) ;
824
+
825
+ await write ;
826
+ await nextP ;
820
827
} ;
821
828
822
- return Promise . all ( [ read ( ) , write ( ) ] ) . then (
823
- ( ) => Promise . reject ( new Error ( 'Expected operation to fail with error' ) ) ,
824
- err => expect ( err . message ) . to . equal ( 'ChangeStream is closed' )
829
+ const error = await Promise . all ( [ read ( ) , write ( ) ] ) . then (
830
+ ( ) => null ,
831
+ error => error
825
832
) ;
833
+
834
+ expect ( error . message ) . to . equal ( 'ChangeStream is closed' ) ;
826
835
}
827
836
} ) ;
828
837
0 commit comments