Skip to content

Commit 2e81415

Browse files
MattiasBuelensaduh95
authored andcommitted
stream: implement min option for ReadableStreamBYOBReader.read
PR-URL: #50888 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Debadree Chatterjee <[email protected]>
1 parent 515b007 commit 2e81415

15 files changed

+968
-114
lines changed

doc/api/webstreams.md

+12-3
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ added: v16.5.0
492492
-->
493493

494494
* Returns: A promise fulfilled with an object:
495-
* `value` {ArrayBuffer}
495+
* `value` {any}
496496
* `done` {boolean}
497497

498498
Requests the next chunk of data from the underlying {ReadableStream}
@@ -617,15 +617,24 @@ added: v16.5.0
617617
{ReadableStream} is closed or rejected if the stream errors or the reader's
618618
lock is released before the stream finishes closing.
619619
620-
#### `readableStreamBYOBReader.read(view)`
620+
#### `readableStreamBYOBReader.read(view[, options])`
621621
622622
<!-- YAML
623623
added: v16.5.0
624+
changes:
625+
- version: REPLACEME
626+
pr-url: https://github.com/nodejs/node/pull/50888
627+
description: Added `min` option.
624628
-->
625629
626630
* `view` {Buffer|TypedArray|DataView}
631+
* `options` {Object}
632+
* `min` {number} When set, the returned promise will only be
633+
fulfilled as soon as `min` number of elements are available.
634+
When not set, the promise fulfills when at least one element
635+
is available.
627636
* Returns: A promise fulfilled with an object:
628-
* `value` {ArrayBuffer}
637+
* `value` {TypedArray|DataView}
629638
* `done` {boolean}
630639
631640
Requests the next chunk of data from the underlying {ReadableStream}

lib/internal/encoding.js

+1-7
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ const {
4747
const {
4848
validateString,
4949
validateObject,
50-
kValidateObjectAllowNullable,
51-
kValidateObjectAllowArray,
52-
kValidateObjectAllowFunction,
50+
kValidateObjectAllowObjectsAndNull,
5351
} = require('internal/validators');
5452
const binding = internalBinding('encoding_binding');
5553
const {
@@ -393,10 +391,6 @@ const TextDecoder =
393391
makeTextDecoderICU() :
394392
makeTextDecoderJS();
395393

396-
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
397-
kValidateObjectAllowArray |
398-
kValidateObjectAllowFunction;
399-
400394
function makeTextDecoderICU() {
401395
const {
402396
decode: _decode,

lib/internal/validators.js

+7
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
222222
const kValidateObjectAllowNullable = 1 << 0;
223223
const kValidateObjectAllowArray = 1 << 1;
224224
const kValidateObjectAllowFunction = 1 << 2;
225+
const kValidateObjectAllowObjects = kValidateObjectAllowArray |
226+
kValidateObjectAllowFunction;
227+
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
228+
kValidateObjectAllowArray |
229+
kValidateObjectAllowFunction;
225230

226231
/**
227232
* @callback validateObject
@@ -583,6 +588,8 @@ module.exports = {
583588
kValidateObjectAllowNullable,
584589
kValidateObjectAllowArray,
585590
kValidateObjectAllowFunction,
591+
kValidateObjectAllowObjects,
592+
kValidateObjectAllowObjectsAndNull,
586593
validateOneOf,
587594
validatePlainFunction,
588595
validatePort,

lib/internal/webstreams/readablestream.js

+67-40
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const {
2222
SymbolAsyncIterator,
2323
SymbolDispose,
2424
SymbolToStringTag,
25+
TypedArrayPrototypeGetLength,
2526
Uint8Array,
2627
} = primordials;
2728

@@ -33,6 +34,7 @@ const {
3334
ERR_INVALID_ARG_TYPE,
3435
ERR_INVALID_STATE,
3536
ERR_INVALID_THIS,
37+
ERR_OUT_OF_RANGE,
3638
},
3739
} = require('internal/errors');
3840

@@ -58,8 +60,8 @@ const {
5860
validateAbortSignal,
5961
validateBuffer,
6062
validateObject,
61-
kValidateObjectAllowNullable,
62-
kValidateObjectAllowFunction,
63+
kValidateObjectAllowObjects,
64+
kValidateObjectAllowObjectsAndNull,
6365
} = require('internal/validators');
6466

6567
const {
@@ -246,10 +248,10 @@ class ReadableStream {
246248
* @param {UnderlyingSource} [source]
247249
* @param {QueuingStrategy} [strategy]
248250
*/
249-
constructor(source = {}, strategy = kEmptyObject) {
251+
constructor(source = kEmptyObject, strategy = kEmptyObject) {
250252
markTransferMode(this, false, true);
251-
if (source === null)
252-
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
253+
validateObject(source, 'source', kValidateObjectAllowObjects);
254+
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
253255
this[kState] = createReadableStreamState();
254256

255257
this[kIsClosedPromise] = createDeferredPromise();
@@ -332,7 +334,7 @@ class ReadableStream {
332334
getReader(options = kEmptyObject) {
333335
if (!isReadableStream(this))
334336
throw new ERR_INVALID_THIS('ReadableStream');
335-
validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
337+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
336338
const mode = options?.mode;
337339

338340
if (mode === undefined)
@@ -370,6 +372,7 @@ class ReadableStream {
370372

371373
// The web platform tests require that these be handled one at a
372374
// time and in a specific order. options can be null or undefined.
375+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
373376
const preventAbort = options?.preventAbort;
374377
const preventCancel = options?.preventCancel;
375378
const preventClose = options?.preventClose;
@@ -412,6 +415,7 @@ class ReadableStream {
412415
destination);
413416
}
414417

418+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
415419
const preventAbort = options?.preventAbort;
416420
const preventCancel = options?.preventCancel;
417421
const preventClose = options?.preventClose;
@@ -456,10 +460,8 @@ class ReadableStream {
456460
values(options = kEmptyObject) {
457461
if (!isReadableStream(this))
458462
throw new ERR_INVALID_THIS('ReadableStream');
459-
validateObject(options, 'options');
460-
const {
461-
preventCancel = false,
462-
} = options;
463+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
464+
const preventCancel = !!(options?.preventCancel);
463465

464466
// eslint-disable-next-line no-use-before-define
465467
const reader = new ReadableStreamDefaultReader(this);
@@ -931,47 +933,62 @@ class ReadableStreamBYOBReader {
931933

932934
/**
933935
* @param {ArrayBufferView} view
936+
* @param {{
937+
* min? : number
938+
* }} [options]
934939
* @returns {Promise<{
935-
* view : ArrayBufferView,
940+
* value : ArrayBufferView,
936941
* done : boolean,
937942
* }>}
938943
*/
939-
read(view) {
944+
async read(view, options = kEmptyObject) {
940945
if (!isReadableStreamBYOBReader(this))
941-
return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
946+
throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
942947
if (!isArrayBufferView(view)) {
943-
return PromiseReject(
944-
new ERR_INVALID_ARG_TYPE(
945-
'view',
946-
[
947-
'Buffer',
948-
'TypedArray',
949-
'DataView',
950-
],
951-
view));
948+
throw new ERR_INVALID_ARG_TYPE(
949+
'view',
950+
[
951+
'Buffer',
952+
'TypedArray',
953+
'DataView',
954+
],
955+
view,
956+
);
952957
}
958+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
953959

954960
const viewByteLength = ArrayBufferViewGetByteLength(view);
955961
const viewBuffer = ArrayBufferViewGetBuffer(view);
956962
const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
957963

958964
if (viewByteLength === 0 || viewBufferByteLength === 0) {
959-
return PromiseReject(
960-
new ERR_INVALID_STATE.TypeError(
961-
'View or Viewed ArrayBuffer is zero-length or detached',
962-
),
963-
);
965+
throw new ERR_INVALID_STATE.TypeError(
966+
'View or Viewed ArrayBuffer is zero-length or detached');
964967
}
965968

966969
// Supposed to assert here that the view's buffer is not
967970
// detached, but there's no API available to use to check that.
971+
972+
const min = options?.min ?? 1;
973+
if (typeof min !== 'number')
974+
throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);
975+
if (!NumberIsInteger(min))
976+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');
977+
if (min <= 0)
978+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');
979+
if (!isDataView(view)) {
980+
if (min > TypedArrayPrototypeGetLength(view)) {
981+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);
982+
}
983+
} else if (min > viewByteLength) {
984+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);
985+
}
986+
968987
if (this[kState].stream === undefined) {
969-
return PromiseReject(
970-
new ERR_INVALID_STATE.TypeError(
971-
'The reader is not attached to a stream'));
988+
throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
972989
}
973990
const readIntoRequest = new ReadIntoRequest();
974-
readableStreamBYOBReaderRead(this, view, readIntoRequest);
991+
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
975992
return readIntoRequest.promise;
976993
}
977994

@@ -1885,7 +1902,7 @@ function readableByteStreamTee(stream) {
18851902
reading = false;
18861903
},
18871904
};
1888-
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1905+
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
18891906
}
18901907

18911908
function pull1Algorithm() {
@@ -2212,7 +2229,7 @@ function readableStreamReaderGenericRelease(reader) {
22122229
reader[kState].stream = undefined;
22132230
}
22142231

2215-
function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
2232+
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
22162233
const {
22172234
stream,
22182235
} = reader[kState];
@@ -2225,6 +2242,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
22252242
readableByteStreamControllerPullInto(
22262243
stream[kState].controller,
22272244
view,
2245+
min,
22282246
readIntoRequest);
22292247
}
22302248

@@ -2497,7 +2515,7 @@ function readableByteStreamControllerClose(controller) {
24972515

24982516
if (pendingPullIntos.length) {
24992517
const firstPendingPullInto = pendingPullIntos[0];
2500-
if (firstPendingPullInto.bytesFilled > 0) {
2518+
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
25012519
const error = new ERR_INVALID_STATE.TypeError('Partial read');
25022520
readableByteStreamControllerError(controller, error);
25032521
throw error;
@@ -2514,7 +2532,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
25142532

25152533
let done = false;
25162534
if (stream[kState].state === 'closed') {
2517-
desc.bytesFilled = 0;
2535+
assert(desc.bytesFilled % desc.elementSize === 0);
25182536
done = true;
25192537
}
25202538

@@ -2603,6 +2621,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
26032621
function readableByteStreamControllerPullInto(
26042622
controller,
26052623
view,
2624+
min,
26062625
readIntoRequest) {
26072626
const {
26082627
closeRequested,
@@ -2615,6 +2634,11 @@ function readableByteStreamControllerPullInto(
26152634
elementSize = view.constructor.BYTES_PER_ELEMENT;
26162635
ctor = view.constructor;
26172636
}
2637+
2638+
const minimumFill = min * elementSize;
2639+
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
2640+
assert(minimumFill % elementSize === 0);
2641+
26182642
const buffer = ArrayBufferViewGetBuffer(view);
26192643
const byteOffset = ArrayBufferViewGetByteOffset(view);
26202644
const byteLength = ArrayBufferViewGetByteLength(view);
@@ -2633,6 +2657,7 @@ function readableByteStreamControllerPullInto(
26332657
byteOffset,
26342658
byteLength,
26352659
bytesFilled: 0,
2660+
minimumFill,
26362661
elementSize,
26372662
ctor,
26382663
type: 'byob',
@@ -2720,7 +2745,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
27202745
}
27212746

27222747
function readableByteStreamControllerRespondInClosedState(controller, desc) {
2723-
assert(!desc.bytesFilled);
2748+
assert(desc.bytesFilled % desc.elementSize === 0);
27242749
if (desc.type === 'none') {
27252750
readableByteStreamControllerShiftPendingPullInto(controller);
27262751
}
@@ -2897,17 +2922,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
28972922
byteLength,
28982923
byteOffset,
28992924
bytesFilled,
2925+
minimumFill,
29002926
elementSize,
29012927
} = desc;
2902-
const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
29032928
const maxBytesToCopy = MathMin(
29042929
controller[kState].queueTotalSize,
29052930
byteLength - bytesFilled);
29062931
const maxBytesFilled = bytesFilled + maxBytesToCopy;
29072932
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
29082933
let totalBytesToCopyRemaining = maxBytesToCopy;
29092934
let ready = false;
2910-
if (maxAlignedBytes > currentAlignedBytes) {
2935+
assert(bytesFilled < minimumFill);
2936+
if (maxAlignedBytes >= minimumFill) {
29112937
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
29122938
ready = true;
29132939
}
@@ -2950,7 +2976,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29502976
if (!ready) {
29512977
assert(!controller[kState].queueTotalSize);
29522978
assert(desc.bytesFilled > 0);
2953-
assert(desc.bytesFilled < elementSize);
2979+
assert(desc.bytesFilled < minimumFill);
29542980
}
29552981
return ready;
29562982
}
@@ -3006,7 +3032,7 @@ function readableByteStreamControllerRespondInReadableState(
30063032
return;
30073033
}
30083034

3009-
if (desc.bytesFilled < desc.elementSize)
3035+
if (desc.bytesFilled < desc.minimumFill)
30103036
return;
30113037

30123038
readableByteStreamControllerShiftPendingPullInto(controller);
@@ -3191,6 +3217,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
31913217
byteOffset: 0,
31923218
byteLength: autoAllocateChunkSize,
31933219
bytesFilled: 0,
3220+
minimumFill: 1,
31943221
elementSize: 1,
31953222
ctor: Uint8Array,
31963223
type: 'default',

lib/internal/webstreams/transformstream.js

+10-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ const {
2929
kEnumerableProperty,
3030
} = require('internal/util');
3131

32+
const {
33+
validateObject,
34+
kValidateObjectAllowObjects,
35+
kValidateObjectAllowObjectsAndNull,
36+
} = require('internal/validators');
37+
3238
const {
3339
kDeserialize,
3440
kTransfer,
@@ -119,10 +125,13 @@ class TransformStream {
119125
* @param {QueuingStrategy} [readableStrategy]
120126
*/
121127
constructor(
122-
transformer = null,
128+
transformer = kEmptyObject,
123129
writableStrategy = kEmptyObject,
124130
readableStrategy = kEmptyObject) {
125131
markTransferMode(this, false, true);
132+
validateObject(transformer, 'transformer', kValidateObjectAllowObjects);
133+
validateObject(writableStrategy, 'writableStrategy', kValidateObjectAllowObjectsAndNull);
134+
validateObject(readableStrategy, 'readableStrategy', kValidateObjectAllowObjectsAndNull);
126135
const readableType = transformer?.readableType;
127136
const writableType = transformer?.writableType;
128137
const start = transformer?.start;

0 commit comments

Comments
 (0)