From e242a99bc4f7e7f38da558e06f8207f1dbd407fc Mon Sep 17 00:00:00 2001 From: Lifeng Lu Date: Sat, 10 Dec 2022 11:55:16 -0700 Subject: [PATCH 1/3] Add test demonstrating spinning hang Reproduces #557 --- .../src/tests/FullDuplexStream.spec.ts | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts b/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts index 8e20b4f3..80b11e1f 100644 --- a/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts +++ b/src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts @@ -2,6 +2,7 @@ import { PassThrough, Readable, Writable } from "stream"; import { Deferred } from "../Deferred"; import { FullDuplexStream } from "../FullDuplexStream"; import { getBufferFrom } from "../Utilities"; +import { delay } from "./Timeout"; describe("FullDuplexStream.CreatePair", () => { @@ -62,8 +63,8 @@ describe("FullDuplexStream.Splice", () => { let duplex: NodeJS.ReadWriteStream; beforeEach(() => { - readable = new PassThrough(); - writable = new PassThrough(); + readable = new PassThrough({ writableHighWaterMark : 8 }); + writable = new PassThrough({ writableHighWaterMark : 8 }); duplex = FullDuplexStream.Splice(readable, writable); }); @@ -86,4 +87,18 @@ describe("FullDuplexStream.Splice", () => { buffer = await getBufferFrom(writable, 1, true); expect(buffer).toBeNull(); }); + + it("Read should yield when data is not ready", async () => { + const task = writeToStream(duplex, "abcdefgh", 4); + const buffer = await getBufferFrom(writable, 32); + await task; + expect(buffer.length).toEqual(32); + }); + + async function writeToStream(stream: NodeJS.ReadWriteStream, message: string, repeat: number) { + while (repeat--) { + stream.write(message); + await delay(2); + } + } }); From d5ffbe7289c54a5ebd508bb835dfd7b3eedb3b77 Mon Sep 17 00:00:00 2001 From: Lifeng Lu Date: Sat, 10 Dec 2022 12:31:55 -0700 Subject: [PATCH 2/3] Fix memory leaks in event handlers --- .../src/MultiplexingStreamFormatters.ts | 10 ++++++++-- .../src/tests/MultiplexingStream.Interop.spec.ts | 8 ++++++-- .../tests/MultiplexingStream.SeededChannels.spec.ts | 8 ++++++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts b/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts index 9cb5508a..0e4a3438 100644 --- a/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts +++ b/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts @@ -299,11 +299,17 @@ export class MultiplexingStreamV2Formatter extends MultiplexingStreamFormatter { const readObject = this.reader.read(); if (readObject === null) { const bytesAvailable = new Deferred(); - this.reader.once("readable", bytesAvailable.resolve.bind(bytesAvailable)); - this.reader.once("end", streamEnded.resolve.bind(streamEnded)); + const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); + const streamEndedCallback = streamEnded.resolve.bind(streamEnded); + + this.reader.once("readable", bytesAvailableCallback); + this.reader.once("end", streamEndedCallback); const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]); await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise); + this.reader.removeListener("readable", bytesAvailableCallback); + this.reader.removeListener("end", streamEndedCallback); + if (bytesAvailable.isCompleted) { continue; } diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts index 15341867..09a3348b 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts @@ -107,9 +107,13 @@ import { ChannelOptions } from "../ChannelOptions"; if (readBuffer === null) { const bytesAvailable = new Deferred(); const streamEnded = new Deferred(); - readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable)); - readable.once("end", streamEnded.resolve.bind(streamEnded)); + const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); + const streamEndedCallback = streamEnded.resolve.bind(streamEnded); + readable.once("readable", bytesAvailableCallback); + readable.once("end", streamEndedCallback); await Promise.race([bytesAvailable.promise, streamEnded.promise]); + readable.removeListener("readable", bytesAvailableCallback); + readable.removeListener("end", streamEndedCallback); if (bytesAvailable.isCompleted) { readBuffer = readable.read() as Buffer; } else { diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.SeededChannels.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.SeededChannels.spec.ts index c49d1428..ca54ec82 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.SeededChannels.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.SeededChannels.spec.ts @@ -79,9 +79,13 @@ async function readAsync(readable: NodeJS.ReadableStream): Promise(); const streamEnded = new Deferred(); - readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable)); - readable.once("end", streamEnded.resolve.bind(streamEnded)); + const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); + const streamEndedCallback = streamEnded.resolve.bind(streamEnded); + readable.once("readable", bytesAvailableCallback); + readable.once("end", streamEndedCallback); await Promise.race([bytesAvailable.promise, streamEnded.promise]); + readable.removeListener("readable", bytesAvailableCallback); + readable.removeListener("end", streamEndedCallback); if (bytesAvailable.isCompleted) { readBuffer = readable.read() as Buffer; } else { From 6e8c2db724f81e1ef78fa785b0a039d9b694c439 Mon Sep 17 00:00:00 2001 From: Lifeng Lu Date: Tue, 13 Dec 2022 16:40:16 -0700 Subject: [PATCH 3/3] This PR is to fix problems that getBufferFrom keeps spinning and consumes lots of CPU, also leads product to hang. This happens when the stream has partial data ready to read, because read(size) returns null (unless the stream is closed), but because there are unread data in the stream, the code will not block waiting on anything but immediately tries to read again. This would consume lots of CPU. On the other hand, because the length of pipeline can be limited. when the reader wants a larger block over the size of the pipeline buffer, it would never get the data, because the writer cannot write any more data until reader takes some bytes out of the stream. This PR is intended to use readableLength to read partial data out, and joins them on the reader side when it is necessary. However, the PR turns out to be more complicated due to this state is not defined in the ReadableStream interface, but in the implementation (Readable). Not sure why an important property like this is not included in the contract. So the code lands to keep the old behavior unless readableLength is available. Also, i kept running into memory issues in unit tests. There were some event handler which can leak memory, and was fixed. But it turned out that the real reason is that reableEnded can be false when streamEnded event is fired. Because the earlier changed code depends on the state, it ends up spinning in the function. Interestingly, it leads out of memory. --- src/nerdbank-streams/src/Utilities.ts | 78 +++++++++++++++++++++------ 1 file changed, 63 insertions(+), 15 deletions(-) diff --git a/src/nerdbank-streams/src/Utilities.ts b/src/nerdbank-streams/src/Utilities.ts index 55487bb4..35bc881c 100644 --- a/src/nerdbank-streams/src/Utilities.ts +++ b/src/nerdbank-streams/src/Utilities.ts @@ -72,32 +72,80 @@ export async function getBufferFrom( cancellationToken?: CancellationToken): Promise { const streamEnded = new Deferred(); - while (size > 0) { - cancellationToken?.throwIfCancelled(); - const readBuffer = readable.read(size) as Buffer; - if (readBuffer === null) { - const bytesAvailable = new Deferred(); - readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable)); - readable.once("end", streamEnded.resolve.bind(streamEnded)); - const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]); - await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise); + if (size === 0) { + return Buffer.from([]); + } - if (bytesAvailable.isCompleted) { - continue; + let readBuffer: Buffer | null = null; + let index: number = 0; + while (size > 0) { + cancellationToken?.throwIfCancelled(); + let availableSize = (readable as Readable).readableLength; + if (!availableSize) { + // Check the end of stream + if ((readable as Readable).readableEnded || streamEnded.isCompleted) { + // stream is closed + if (!allowEndOfStream) { + throw new Error("Stream terminated before required bytes were read."); + } + + // Returns what has been read so far + if (readBuffer === null) { + return null; + } + + // we need trim extra spaces + return readBuffer.subarray(0, index) } + + // we retain this behavior when availableSize === false + // to make existing unit tests happy (which assumes we will try to read stream when no data is ready.) + availableSize = size; + } else if (availableSize > size) { + availableSize = size; } - if (!allowEndOfStream) { - if (!readBuffer || readBuffer.length < size) { + const newBuffer = readable.read(availableSize) as Buffer; + if (newBuffer) { + if (newBuffer.length < availableSize && !allowEndOfStream) { throw new Error("Stream terminated before required bytes were read."); } + + if (readBuffer === null) { + if (availableSize === size || newBuffer.length < availableSize) { + // in the fast pass, we read the entire data once, and donot allocate an extra array. + return newBuffer; + } + + // if we read partial data, we need allocate a buffer to join all data together. + readBuffer = Buffer.alloc(size); + } + + // now append new data to the buffer + newBuffer.copy(readBuffer, index); + + size -= newBuffer.length; + index += newBuffer.length; } - return readBuffer; + if (size > 0) { + const bytesAvailable = new Deferred(); + const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); + const streamEndedCallback = streamEnded.resolve.bind(streamEnded); + readable.once("readable", bytesAvailableCallback); + readable.once("end", streamEndedCallback); + try { + const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]); + await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise); + } finally { + readable.removeListener("readable", bytesAvailableCallback); + readable.removeListener("end", streamEndedCallback); + } + } } - return Buffer.from([]); + return readBuffer; } export function throwIfDisposed(value: IDisposableObservable) {