From 026821f181183e256239b9d0fda03c2ae5b819f9 Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Tue, 20 Aug 2024 11:38:53 -0600 Subject: [PATCH 1/2] Touch-up code comments --- src/nerdbank-streams/src/Channel.ts | 2 +- src/nerdbank-streams/src/Utilities.ts | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/nerdbank-streams/src/Channel.ts b/src/nerdbank-streams/src/Channel.ts index b06d6da2..ef58f8ce 100644 --- a/src/nerdbank-streams/src/Channel.ts +++ b/src/nerdbank-streams/src/Channel.ts @@ -220,7 +220,7 @@ export class ChannelClass extends Channel { // We should find a way to detect when we *actually* share the received buffer with the Channel's user // and only report consumption when they receive the buffer from us so that we effectively apply - // backpressure to the remote party based on our user's actual consumption rather than keep allocating memory. + // backpressure to the remote party based on our user's actual consumption rather than continually allocating memory. if (this._multiplexingStream.backpressureSupportEnabled && buffer) { this._multiplexingStream.localContentExamined(this, buffer.length) } diff --git a/src/nerdbank-streams/src/Utilities.ts b/src/nerdbank-streams/src/Utilities.ts index 13c58d4e..f4c9c766 100644 --- a/src/nerdbank-streams/src/Utilities.ts +++ b/src/nerdbank-streams/src/Utilities.ts @@ -92,12 +92,12 @@ export async function getBufferFrom( throw new Error('Stream terminated before required bytes were read.') } - // Returns what has been read so far + // Returns what has been read so far. if (readBuffer === null) { return null } - // we need trim extra spaces + // We need to trim the trailing space. return readBuffer.subarray(0, index) } @@ -116,11 +116,11 @@ export async function getBufferFrom( if (readBuffer === null) { if (availableSize === size || newBuffer.length < availableSize) { - // in the fast pass, we read the entire data once, and donot allocate an extra array. + // In the fast pass, we read the entire data once, and do not allocate an extra array. return newBuffer } - // if we read partial data, we need allocate a buffer to join all data together. + // If we read partial data, we need to allocate a buffer to join all data together. readBuffer = Buffer.alloc(size) } From 82b8983356cc52cdb6f3fb69e4ffd30bd4e236d3 Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Fri, 23 Aug 2024 17:03:44 -0600 Subject: [PATCH 2/2] Fix node.js stream blockage in mxstream channels When the node.js implementation of a `MultiplexingStream` channel receives more data than the highWatermark (16KB) limit, a flowing stream stops flowing permanently, blocking all communication in that direction. This fixes the problem by calling `resume()` on the stream when flowing has been stopped by that particular data buffer. --- src/nerdbank-streams/src/Channel.ts | 9 ++++ .../src/tests/MultiplexingStream.spec.ts | 53 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/src/nerdbank-streams/src/Channel.ts b/src/nerdbank-streams/src/Channel.ts index ef58f8ce..d9b17e16 100644 --- a/src/nerdbank-streams/src/Channel.ts +++ b/src/nerdbank-streams/src/Channel.ts @@ -216,8 +216,17 @@ export class ChannelClass extends Channel { } public onContent(buffer: Buffer | null) { + const priorReadableFlowing = this._duplex.readableFlowing + this._duplex.push(buffer) + // Large buffer pushes can switch a stream from flowing to non-flowing + // when it meets or exceeds the highWaterMark. We need to resume the stream + // in this case so that the user can continue to receive data. + if (priorReadableFlowing && this._duplex.readableFlowing === false) { + this._duplex.resume() + } + // We should find a way to detect when we *actually* share the received buffer with the Channel's user // and only report consumption when they receive the buffer from us so that we effectively apply // backpressure to the remote party based on our user's actual consumption rather than continually allocating memory. diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts index 04dbfe19..d28e4e84 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts @@ -8,6 +8,59 @@ import { Channel } from '../Channel' import CancellationToken from 'cancellationtoken' import * as assert from 'assert' import { nextTick } from 'process' +import { Duplex } from 'stream' + +it('highWatermark threshold does not clog', async () => { + // Brokered service + let bytesToReceive = 0 + let receivedAllBytes = new Deferred() + function receiver(pipe: Duplex) { + let lengths: number[] = [] + pipe.on('data', (data: Buffer) => { + lengths.push(data.length) + + bytesToReceive -= data.length + // console.log(`recv ${data.length}. ${bytesToReceive} remaining`) + if (bytesToReceive <= 0) { + receivedAllBytes.resolve(undefined) + } + }) + } + + // IServiceBroker + const { first: localServicePipe, second: servicePipe } = FullDuplexStream.CreatePair() + receiver(localServicePipe) + + // MultiplexingStreamServiceBroker + const simulatedMxStream = FullDuplexStream.CreatePair() + const [mx1, mx2] = await Promise.all([MultiplexingStream.CreateAsync(simulatedMxStream.first), MultiplexingStream.CreateAsync(simulatedMxStream.second)]) + const [local, remote] = await Promise.all([mx1.offerChannelAsync(''), mx2.acceptChannelAsync('')]) + servicePipe.pipe(local.stream) + local.stream.pipe(servicePipe) + + global.test_servicePipe = servicePipe + global.test_d = local.stream + global.test_localServicePipe = localServicePipe + + // brokered service client + function writeHelper(buffer: Buffer): boolean { + bytesToReceive += buffer.length + const result = remote.stream.write(buffer) + // console.log('written', buffer.length, result) + return result + } + for (let i = 15; i < 20; i++) { + const buffer = Buffer.alloc(i * 1024) + writeHelper(buffer) + await nextTickAsync() + writeHelper(Buffer.alloc(10)) + await nextTickAsync() + } + + if (bytesToReceive > 0) { + await receivedAllBytes.promise + } +}) ;[1, 2, 3].forEach(protocolMajorVersion => { describe(`MultiplexingStream v${protocolMajorVersion}`, () => { let mx1: MultiplexingStream