diff --git a/doc/api/stream.md b/doc/api/stream.md index 1f8dcc643789e9..2a4268900966f4 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2027,7 +2027,7 @@ changes: description: Marking the API stable. --> -* `stream` {Stream|Iterable|AsyncIterable|Function} +* `stream` {Writable|Duplex|WritableStream|TransformStream|Function} * `options` {Object} * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. @@ -2046,13 +2046,18 @@ async function* splitToWords(source) { } } -const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords); +const wordsStream = Readable.from(['text passed through', 'composed stream']).compose(splitToWords); const words = await wordsStream.toArray(); -console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator'] +console.log(words); // prints ['text', 'passed', 'through', 'composed', 'stream'] ``` -See [`stream.compose`][] for more information. +`readable.compose(s)` is equivalent to `stream.compose(readable, s)`. + +This method also allows for an {AbortSignal} to be provided, which will destroy +the composed stream when aborted. + +See [`stream.compose(...streams)`][] for more information. ##### `readable.iterator([options])` @@ -3050,7 +3055,8 @@ await finished(compose(s1, s2, s3)); console.log(res); // prints 'HELLOWORLD' ``` -See [`readable.compose(stream)`][] for `stream.compose` as operator. +For convenience, the [`readable.compose(stream)`][] method is available on +{Readable} and {Duplex} streams as a wrapper for this function. ### `stream.isErrored(stream)` @@ -4998,7 +5004,7 @@ contain multi-byte characters. [`readable.setEncoding()`]: #readablesetencodingencoding [`stream.Readable.from()`]: #streamreadablefromiterable-options [`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream -[`stream.compose`]: #streamcomposestreams +[`stream.compose(...streams)`]: #streamcomposestreams [`stream.cork()`]: #writablecork [`stream.duplexPair()`]: #streamduplexpairoptions [`stream.finished()`]: #streamfinishedstream-options-callback diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 27c22f89926021..6db2df0e3646e0 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -18,7 +18,6 @@ const { AbortController, AbortSignal } = require('internal/abort_controller'); const { AbortError, codes: { - ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE, }, @@ -31,40 +30,10 @@ const { } = require('internal/validators'); const { kWeakHandler, kResistStopPropagation } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); -const staticCompose = require('internal/streams/compose'); -const { - addAbortSignalNoValidate, -} = require('internal/streams/add-abort-signal'); -const { isWritable, isNodeStream } = require('internal/streams/utils'); const kEmpty = Symbol('kEmpty'); const kEof = Symbol('kEof'); -function compose(stream, options) { - if (options != null) { - validateObject(options, 'options'); - } - if (options?.signal != null) { - validateAbortSignal(options.signal, 'options.signal'); - } - - if (isNodeStream(stream) && !isWritable(stream)) { - throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable'); - } - - const composedStream = staticCompose(this, stream); - - if (options?.signal) { - // Not validating as we already validated before - addAbortSignalNoValidate( - options.signal, - composedStream, - ); - } - - return composedStream; -} - function map(fn, options) { validateFunction(fn, 'fn'); if (options != null) { @@ -408,7 +377,6 @@ module.exports.streamReturningOperators = { flatMap, map, take, - compose, }; module.exports.promiseReturningOperators = { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 63346e6486a022..97fe9dc6f60c2f 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -48,6 +48,7 @@ const { Buffer } = require('buffer'); const { addAbortSignal, + addAbortSignalNoValidate, } = require('internal/streams/add-abort-signal'); const eos = require('internal/streams/end-of-stream'); @@ -86,7 +87,10 @@ const { ERR_UNKNOWN_ENCODING, }, } = require('internal/errors'); -const { validateObject } = require('internal/validators'); +const { + validateAbortSignal, + validateObject, +} = require('internal/validators'); const FastBuffer = Buffer[SymbolSpecies]; @@ -1409,6 +1413,30 @@ async function* createAsyncIterator(stream, options) { } } +let composeImpl; + +Readable.prototype.compose = function compose(stream, options) { + if (options != null) { + validateObject(options, 'options'); + } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + + composeImpl ??= require('internal/streams/compose'); + const composedStream = composeImpl(this, stream); + + if (options?.signal) { + // Not validating as we already validated before + addAbortSignalNoValidate( + options.signal, + composedStream, + ); + } + + return composedStream; +}; + // Making it explicit these properties are not enumerable // because otherwise some prototype manipulation in // userland will fail. diff --git a/test/parallel/test-stream-compose-operator.js b/test/parallel/test-stream-readable-compose.js similarity index 78% rename from test/parallel/test-stream-compose-operator.js rename to test/parallel/test-stream-readable-compose.js index 4fefb004f5a1e5..b55bd719402028 100644 --- a/test/parallel/test-stream-compose-operator.js +++ b/test/parallel/test-stream-readable-compose.js @@ -2,7 +2,9 @@ const common = require('../common'); const { - Readable, Transform, + PassThrough, + Readable, + Transform, } = require('stream'); const assert = require('assert'); @@ -19,6 +21,8 @@ const assert = require('assert'); } } }); + assert.strictEqual(stream.readable, true); + assert.strictEqual(stream.writable, false); const result = ['ab', 'cd']; (async () => { for await (const item of stream) { @@ -35,6 +39,8 @@ const assert = require('assert'); callback(null, chunk); }, 4) })); + assert.strictEqual(stream.readable, true); + assert.strictEqual(stream.writable, false); const result = ['a', 'b', 'c', 'd']; (async () => { for await (const item of stream) { @@ -43,6 +49,26 @@ const assert = require('assert'); })().then(common.mustCall()); } +{ + // With Duplex `this`, ensuring writes to the composed stream + // are passed to the pipeline + const pt = new PassThrough({ objectMode: true }); + const composed = pt.compose(async function *(stream) { + for await (const chunk of stream) { + yield chunk * 2; + } + }); + assert.strictEqual(composed.readable, true); + assert.strictEqual(composed.writable, true); + pt.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 123); + })); + composed.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 246); + })); + pt.end(123); +} + { // Throwing an error during `compose` (before waiting for data) const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield