Skip to content

Commit

Permalink
This PR is to fix problems that getBufferFrom keeps spinning and cons…
Browse files Browse the repository at this point in the history
…umes lots of CPU, also leads product to hang.

This happens when the stream has partial data ready to read, because read(size) returns null (unless the stream is closed), but because there are unread data in the stream, the code will not block waiting on anything but immediately tries to read again. This would consume lots of CPU. On the other hand, because the length of pipeline can be limited. when the reader wants a larger block over the size of the pipeline buffer, it would never get the data, because the writer cannot write any more data until reader takes some bytes out of the stream.

This PR is intended to use readableLength to read partial data out, and joins them on the reader side when it is necessary. However, the PR turns out to be more complicated due to this state is not defined in the ReadableStream interface, but in the implementation (Readable). Not sure why an important property like this is not included in the contract. So the code lands to keep the old behavior unless readableLength is available. Also, i kept running into memory issues in unit tests. There were some event handler which can leak memory, and was fixed. But it turned out that the real reason is that reableEnded can be false when streamEnded event is fired. Because the earlier changed code depends on the state, it ends up spinning in the function. Interestingly, it leads out of memory.
  • Loading branch information
Lifeng Lu authored and AArnott committed Dec 13, 2022
1 parent d5ffbe7 commit 6e8c2db
Showing 1 changed file with 63 additions and 15 deletions.
78 changes: 63 additions & 15 deletions src/nerdbank-streams/src/Utilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,80 @@ export async function getBufferFrom(
cancellationToken?: CancellationToken): Promise<Buffer | null> {

const streamEnded = new Deferred<void>();
while (size > 0) {
cancellationToken?.throwIfCancelled();

const readBuffer = readable.read(size) as Buffer;
if (readBuffer === null) {
const bytesAvailable = new Deferred<void>();
readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable));
readable.once("end", streamEnded.resolve.bind(streamEnded));
const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]);
await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise);
if (size === 0) {
return Buffer.from([]);
}

if (bytesAvailable.isCompleted) {
continue;
let readBuffer: Buffer | null = null;
let index: number = 0;
while (size > 0) {
cancellationToken?.throwIfCancelled();
let availableSize = (readable as Readable).readableLength;
if (!availableSize) {
// Check the end of stream
if ((readable as Readable).readableEnded || streamEnded.isCompleted) {
// stream is closed
if (!allowEndOfStream) {
throw new Error("Stream terminated before required bytes were read.");
}

// Returns what has been read so far
if (readBuffer === null) {
return null;
}

// we need trim extra spaces
return readBuffer.subarray(0, index)
}

// we retain this behavior when availableSize === false
// to make existing unit tests happy (which assumes we will try to read stream when no data is ready.)
availableSize = size;
} else if (availableSize > size) {
availableSize = size;
}

if (!allowEndOfStream) {
if (!readBuffer || readBuffer.length < size) {
const newBuffer = readable.read(availableSize) as Buffer;
if (newBuffer) {
if (newBuffer.length < availableSize && !allowEndOfStream) {
throw new Error("Stream terminated before required bytes were read.");
}

if (readBuffer === null) {
if (availableSize === size || newBuffer.length < availableSize) {
// in the fast pass, we read the entire data once, and donot allocate an extra array.
return newBuffer;
}

// if we read partial data, we need allocate a buffer to join all data together.
readBuffer = Buffer.alloc(size);
}

// now append new data to the buffer
newBuffer.copy(readBuffer, index);

size -= newBuffer.length;
index += newBuffer.length;
}

return readBuffer;
if (size > 0) {
const bytesAvailable = new Deferred<void>();
const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable);
const streamEndedCallback = streamEnded.resolve.bind(streamEnded);
readable.once("readable", bytesAvailableCallback);
readable.once("end", streamEndedCallback);
try {
const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]);
await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise);
} finally {
readable.removeListener("readable", bytesAvailableCallback);
readable.removeListener("end", streamEndedCallback);
}
}
}

return Buffer.from([]);
return readBuffer;
}

export function throwIfDisposed(value: IDisposableObservable) {
Expand Down

0 comments on commit 6e8c2db

Please sign in to comment.