From 6e8c2db724f81e1ef78fa785b0a039d9b694c439 Mon Sep 17 00:00:00 2001 From: Lifeng Lu Date: Tue, 13 Dec 2022 16:40:16 -0700 Subject: [PATCH] This PR is to fix problems that getBufferFrom keeps spinning and consumes lots of CPU, also leads product to hang. This happens when the stream has partial data ready to read, because read(size) returns null (unless the stream is closed), but because there are unread data in the stream, the code will not block waiting on anything but immediately tries to read again. This would consume lots of CPU. On the other hand, because the length of pipeline can be limited. when the reader wants a larger block over the size of the pipeline buffer, it would never get the data, because the writer cannot write any more data until reader takes some bytes out of the stream. This PR is intended to use readableLength to read partial data out, and joins them on the reader side when it is necessary. However, the PR turns out to be more complicated due to this state is not defined in the ReadableStream interface, but in the implementation (Readable). Not sure why an important property like this is not included in the contract. So the code lands to keep the old behavior unless readableLength is available. Also, i kept running into memory issues in unit tests. There were some event handler which can leak memory, and was fixed. But it turned out that the real reason is that reableEnded can be false when streamEnded event is fired. Because the earlier changed code depends on the state, it ends up spinning in the function. Interestingly, it leads out of memory. --- src/nerdbank-streams/src/Utilities.ts | 78 +++++++++++++++++++++------ 1 file changed, 63 insertions(+), 15 deletions(-) diff --git a/src/nerdbank-streams/src/Utilities.ts b/src/nerdbank-streams/src/Utilities.ts index 55487bb4..35bc881c 100644 --- a/src/nerdbank-streams/src/Utilities.ts +++ b/src/nerdbank-streams/src/Utilities.ts @@ -72,32 +72,80 @@ export async function getBufferFrom( cancellationToken?: CancellationToken): Promise { const streamEnded = new Deferred(); - while (size > 0) { - cancellationToken?.throwIfCancelled(); - const readBuffer = readable.read(size) as Buffer; - if (readBuffer === null) { - const bytesAvailable = new Deferred(); - readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable)); - readable.once("end", streamEnded.resolve.bind(streamEnded)); - const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]); - await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise); + if (size === 0) { + return Buffer.from([]); + } - if (bytesAvailable.isCompleted) { - continue; + let readBuffer: Buffer | null = null; + let index: number = 0; + while (size > 0) { + cancellationToken?.throwIfCancelled(); + let availableSize = (readable as Readable).readableLength; + if (!availableSize) { + // Check the end of stream + if ((readable as Readable).readableEnded || streamEnded.isCompleted) { + // stream is closed + if (!allowEndOfStream) { + throw new Error("Stream terminated before required bytes were read."); + } + + // Returns what has been read so far + if (readBuffer === null) { + return null; + } + + // we need trim extra spaces + return readBuffer.subarray(0, index) } + + // we retain this behavior when availableSize === false + // to make existing unit tests happy (which assumes we will try to read stream when no data is ready.) + availableSize = size; + } else if (availableSize > size) { + availableSize = size; } - if (!allowEndOfStream) { - if (!readBuffer || readBuffer.length < size) { + const newBuffer = readable.read(availableSize) as Buffer; + if (newBuffer) { + if (newBuffer.length < availableSize && !allowEndOfStream) { throw new Error("Stream terminated before required bytes were read."); } + + if (readBuffer === null) { + if (availableSize === size || newBuffer.length < availableSize) { + // in the fast pass, we read the entire data once, and donot allocate an extra array. + return newBuffer; + } + + // if we read partial data, we need allocate a buffer to join all data together. + readBuffer = Buffer.alloc(size); + } + + // now append new data to the buffer + newBuffer.copy(readBuffer, index); + + size -= newBuffer.length; + index += newBuffer.length; } - return readBuffer; + if (size > 0) { + const bytesAvailable = new Deferred(); + const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable); + const streamEndedCallback = streamEnded.resolve.bind(streamEnded); + readable.once("readable", bytesAvailableCallback); + readable.once("end", streamEndedCallback); + try { + const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]); + await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise); + } finally { + readable.removeListener("readable", bytesAvailableCallback); + readable.removeListener("end", streamEndedCallback); + } + } } - return Buffer.from([]); + return readBuffer; } export function throwIfDisposed(value: IDisposableObservable) {