Skip to content

Commit 8c63447

Browse files
nbbeekendariakp
andauthored
refactor(NODE-4631): change_stream, gridfs to use maybeCallback (#3406)
Co-authored-by: Daria Pardue <[email protected]>
1 parent dc34388 commit 8c63447

File tree

8 files changed

+276
-163
lines changed

8 files changed

+276
-163
lines changed

src/bulk/common.ts

+16-22
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { executeOperation } from '../operations/execute_operation';
2222
import { InsertOperation } from '../operations/insert';
2323
import { AbstractOperation, Hint } from '../operations/operation';
2424
import { makeUpdateStatement, UpdateOperation, UpdateStatement } from '../operations/update';
25-
import { PromiseProvider } from '../promise_provider';
2625
import type { Server } from '../sdam/server';
2726
import type { Topology } from '../sdam/topology';
2827
import type { ClientSession } from '../sessions';
@@ -31,6 +30,7 @@ import {
3130
Callback,
3231
getTopology,
3332
hasAtomicOperators,
33+
maybeCallback,
3434
MongoDBNamespace,
3535
resolveOptions
3636
} from '../utils';
@@ -1270,11 +1270,19 @@ export abstract class BulkOperationBase {
12701270
options?: BulkWriteOptions | Callback<BulkWriteResult>,
12711271
callback?: Callback<BulkWriteResult>
12721272
): Promise<BulkWriteResult> | void {
1273-
if (typeof options === 'function') (callback = options), (options = {});
1274-
options = options ?? {};
1273+
callback =
1274+
typeof callback === 'function'
1275+
? callback
1276+
: typeof options === 'function'
1277+
? options
1278+
: undefined;
1279+
options = options != null && typeof options !== 'function' ? options : {};
12751280

12761281
if (this.s.executed) {
1277-
return handleEarlyError(new MongoBatchReExecutionError(), callback);
1282+
// eslint-disable-next-line @typescript-eslint/require-await
1283+
return maybeCallback(async () => {
1284+
throw new MongoBatchReExecutionError();
1285+
}, callback);
12781286
}
12791287

12801288
const writeConcern = WriteConcern.fromOptions(options);
@@ -1292,10 +1300,10 @@ export abstract class BulkOperationBase {
12921300
}
12931301
// If we have no operations in the bulk raise an error
12941302
if (this.s.batches.length === 0) {
1295-
const emptyBatchError = new MongoInvalidArgumentError(
1296-
'Invalid BulkOperation, Batch cannot be empty'
1297-
);
1298-
return handleEarlyError(emptyBatchError, callback);
1303+
// eslint-disable-next-line @typescript-eslint/require-await
1304+
return maybeCallback(async () => {
1305+
throw new MongoInvalidArgumentError('Invalid BulkOperation, Batch cannot be empty');
1306+
}, callback);
12991307
}
13001308

13011309
this.s.executed = true;
@@ -1351,20 +1359,6 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
13511359
}
13521360
});
13531361

1354-
/** helper function to assist with promiseOrCallback behavior */
1355-
function handleEarlyError(
1356-
err?: AnyError,
1357-
callback?: Callback<BulkWriteResult>
1358-
): Promise<BulkWriteResult> | void {
1359-
if (typeof callback === 'function') {
1360-
callback(err);
1361-
return;
1362-
}
1363-
1364-
const PromiseConstructor = PromiseProvider.get() ?? Promise;
1365-
return PromiseConstructor.reject(err);
1366-
}
1367-
13681362
function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
13691363
if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
13701364
return bulkOperation.s.options.forceServerObjectId;

src/change_stream.ts

+44-55
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type { AggregateOptions } from './operations/aggregate';
2020
import type { CollationOptions, OperationParent } from './operations/command';
2121
import type { ReadPreference } from './read_preference';
2222
import type { ServerSessionId } from './sessions';
23-
import { Callback, filterOptions, getTopology, maybePromise, MongoDBNamespace } from './utils';
23+
import { Callback, filterOptions, getTopology, maybeCallback, MongoDBNamespace } from './utils';
2424

2525
/** @internal */
2626
const kCursorStream = Symbol('cursorStream');
@@ -649,29 +649,25 @@ export class ChangeStream<
649649
hasNext(callback: Callback<boolean>): void;
650650
hasNext(callback?: Callback): Promise<boolean> | void {
651651
this._setIsIterator();
652-
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
653-
// Shadowing is intentional here. We want to override the `callback` variable
654-
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
655-
return maybePromise(callback, callback => {
656-
(async () => {
652+
return maybeCallback(async () => {
653+
try {
654+
const hasNext = await this.cursor.hasNext();
655+
return hasNext;
656+
} catch (error) {
657657
try {
658+
await this._processErrorIteratorMode(error);
658659
const hasNext = await this.cursor.hasNext();
659660
return hasNext;
660661
} catch (error) {
661662
try {
662-
await this._processErrorIteratorMode(error);
663-
const hasNext = await this.cursor.hasNext();
664-
return hasNext;
665-
} catch (error) {
666-
await this.close().catch(err => err);
667-
throw error;
663+
await this.close();
664+
} catch {
665+
// We are not concerned with errors from close()
668666
}
667+
throw error;
669668
}
670-
})().then(
671-
hasNext => callback(undefined, hasNext),
672-
error => callback(error)
673-
);
674-
});
669+
}
670+
}, callback);
675671
}
676672

677673
/** Get the next available document from the Change Stream. */
@@ -680,31 +676,27 @@ export class ChangeStream<
680676
next(callback: Callback<TChange>): void;
681677
next(callback?: Callback<TChange>): Promise<TChange> | void {
682678
this._setIsIterator();
683-
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
684-
// Shadowing is intentional here. We want to override the `callback` variable
685-
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
686-
return maybePromise(callback, callback => {
687-
(async () => {
679+
return maybeCallback(async () => {
680+
try {
681+
const change = await this.cursor.next();
682+
const processedChange = this._processChange(change ?? null);
683+
return processedChange;
684+
} catch (error) {
688685
try {
686+
await this._processErrorIteratorMode(error);
689687
const change = await this.cursor.next();
690688
const processedChange = this._processChange(change ?? null);
691689
return processedChange;
692690
} catch (error) {
693691
try {
694-
await this._processErrorIteratorMode(error);
695-
const change = await this.cursor.next();
696-
const processedChange = this._processChange(change ?? null);
697-
return processedChange;
698-
} catch (error) {
699-
await this.close().catch(err => err);
700-
throw error;
692+
await this.close();
693+
} catch {
694+
// We are not concerned with errors from close()
701695
}
696+
throw error;
702697
}
703-
})().then(
704-
change => callback(undefined, change),
705-
error => callback(error)
706-
);
707-
});
698+
}
699+
}, callback);
708700
}
709701

710702
/**
@@ -715,29 +707,25 @@ export class ChangeStream<
715707
tryNext(callback: Callback<Document | null>): void;
716708
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
717709
this._setIsIterator();
718-
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
719-
// Shadowing is intentional here. We want to override the `callback` variable
720-
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
721-
return maybePromise(callback, callback => {
722-
(async () => {
710+
return maybeCallback(async () => {
711+
try {
712+
const change = await this.cursor.tryNext();
713+
return change ?? null;
714+
} catch (error) {
723715
try {
716+
await this._processErrorIteratorMode(error);
724717
const change = await this.cursor.tryNext();
725718
return change ?? null;
726719
} catch (error) {
727720
try {
728-
await this._processErrorIteratorMode(error);
729-
const change = await this.cursor.tryNext();
730-
return change ?? null;
731-
} catch (error) {
732-
await this.close().catch(err => err);
733-
throw error;
721+
await this.close();
722+
} catch {
723+
// We are not concerned with errors from close()
734724
}
725+
throw error;
735726
}
736-
})().then(
737-
change => callback(undefined, change),
738-
error => callback(error)
739-
);
740-
});
727+
}
728+
}, callback);
741729
}
742730

743731
/** Is the cursor closed */
@@ -752,13 +740,14 @@ export class ChangeStream<
752740
close(callback?: Callback): Promise<void> | void {
753741
this[kClosed] = true;
754742

755-
return maybePromise(callback, cb => {
743+
return maybeCallback(async () => {
756744
const cursor = this.cursor;
757-
return cursor.close(err => {
745+
try {
746+
await cursor.close();
747+
} finally {
758748
this._endStream();
759-
return cb(err);
760-
});
761-
});
749+
}
750+
}, callback);
762751
}
763752

764753
/**

src/gridfs/index.ts

+21-48
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { Logger } from '../logger';
77
import { Filter, TypedEventEmitter } from '../mongo_types';
88
import type { ReadPreference } from '../read_preference';
99
import type { Sort } from '../sort';
10-
import { Callback, maybePromise } from '../utils';
10+
import { Callback, maybeCallback } from '../utils';
1111
import { WriteConcern, WriteConcernOptions } from '../write_concern';
1212
import type { FindOptions } from './../operations/find';
1313
import {
@@ -144,28 +144,18 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
144144
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
145145
delete(id: ObjectId, callback: Callback<void>): void;
146146
delete(id: ObjectId, callback?: Callback<void>): Promise<void> | void {
147-
return maybePromise(callback, callback => {
148-
return this.s._filesCollection.deleteOne({ _id: id }, (error, res) => {
149-
if (error) {
150-
return callback(error);
151-
}
147+
return maybeCallback(async () => {
148+
const { deletedCount } = await this.s._filesCollection.deleteOne({ _id: id });
152149

153-
return this.s._chunksCollection.deleteMany({ files_id: id }, error => {
154-
if (error) {
155-
return callback(error);
156-
}
150+
// Delete orphaned chunks before returning FileNotFound
151+
await this.s._chunksCollection.deleteMany({ files_id: id });
157152

158-
// Delete orphaned chunks before returning FileNotFound
159-
if (!res?.deletedCount) {
160-
// TODO(NODE-3483): Replace with more appropriate error
161-
// Consider creating new error MongoGridFSFileNotFoundError
162-
return callback(new MongoRuntimeError(`File not found for id ${id}`));
163-
}
164-
165-
return callback();
166-
});
167-
});
168-
});
153+
if (deletedCount === 0) {
154+
// TODO(NODE-3483): Replace with more appropriate error
155+
// Consider creating new error MongoGridFSFileNotFoundError
156+
throw new MongoRuntimeError(`File not found for id ${id}`);
157+
}
158+
}, callback);
169159
}
170160

171161
/** Convenience wrapper around find on the files collection */
@@ -215,42 +205,25 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
215205
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
216206
rename(id: ObjectId, filename: string, callback: Callback<void>): void;
217207
rename(id: ObjectId, filename: string, callback?: Callback<void>): Promise<void> | void {
218-
return maybePromise(callback, callback => {
208+
return maybeCallback(async () => {
219209
const filter = { _id: id };
220210
const update = { $set: { filename } };
221-
return this.s._filesCollection.updateOne(filter, update, (error?, res?) => {
222-
if (error) {
223-
return callback(error);
224-
}
225-
226-
if (!res?.matchedCount) {
227-
return callback(new MongoRuntimeError(`File with id ${id} not found`));
228-
}
229-
230-
return callback();
231-
});
232-
});
211+
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update);
212+
if (matchedCount === 0) {
213+
throw new MongoRuntimeError(`File with id ${id} not found`);
214+
}
215+
}, callback);
233216
}
234217

235218
/** Removes this bucket's files collection, followed by its chunks collection. */
236219
drop(): Promise<void>;
237220
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
238221
drop(callback: Callback<void>): void;
239222
drop(callback?: Callback<void>): Promise<void> | void {
240-
return maybePromise(callback, callback => {
241-
return this.s._filesCollection.drop(error => {
242-
if (error) {
243-
return callback(error);
244-
}
245-
return this.s._chunksCollection.drop(error => {
246-
if (error) {
247-
return callback(error);
248-
}
249-
250-
return callback();
251-
});
252-
});
253-
});
223+
return maybeCallback(async () => {
224+
await this.s._filesCollection.drop();
225+
await this.s._chunksCollection.drop();
226+
}, callback);
254227
}
255228

256229
/** Get the Db scoped logger. */

src/gridfs/upload.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { Document } from '../bson';
44
import { ObjectId } from '../bson';
55
import type { Collection } from '../collection';
66
import { AnyError, MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error';
7-
import { Callback, maybePromise } from '../utils';
7+
import { Callback, maybeCallback } from '../utils';
88
import type { WriteConcernOptions } from '../write_concern';
99
import { WriteConcern } from './../write_concern';
1010
import type { GridFSFile } from './download';
@@ -149,20 +149,20 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
149149
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
150150
abort(callback: Callback<void>): void;
151151
abort(callback?: Callback<void>): Promise<void> | void {
152-
return maybePromise(callback, callback => {
152+
return maybeCallback(async () => {
153153
if (this.state.streamEnd) {
154154
// TODO(NODE-3485): Replace with MongoGridFSStreamClosed
155-
return callback(new MongoAPIError('Cannot abort a stream that has already completed'));
155+
throw new MongoAPIError('Cannot abort a stream that has already completed');
156156
}
157157

158158
if (this.state.aborted) {
159159
// TODO(NODE-3485): Replace with MongoGridFSStreamClosed
160-
return callback(new MongoAPIError('Cannot call abort() on a stream twice'));
160+
throw new MongoAPIError('Cannot call abort() on a stream twice');
161161
}
162162

163163
this.state.aborted = true;
164-
this.chunks.deleteMany({ files_id: this.id }, error => callback(error));
165-
});
164+
await this.chunks.deleteMany({ files_id: this.id });
165+
}, callback);
166166
}
167167

168168
/**

0 commit comments

Comments
 (0)