Skip to content

Commit

Permalink
Merge pull request #569 from lifengl/originSpinFix
Browse files Browse the repository at this point in the history
I took liberties to squash/rewrite commits, making minimal changes to the original proposed change.
  • Loading branch information
AArnott committed Dec 13, 2022
2 parents 6885575 + 6e8c2db commit 4bb8a9f
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 23 deletions.
10 changes: 8 additions & 2 deletions src/nerdbank-streams/src/MultiplexingStreamFormatters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,17 @@ export class MultiplexingStreamV2Formatter extends MultiplexingStreamFormatter {
const readObject = this.reader.read();
if (readObject === null) {
const bytesAvailable = new Deferred<void>();
this.reader.once("readable", bytesAvailable.resolve.bind(bytesAvailable));
this.reader.once("end", streamEnded.resolve.bind(streamEnded));
const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable);
const streamEndedCallback = streamEnded.resolve.bind(streamEnded);

this.reader.once("readable", bytesAvailableCallback);
this.reader.once("end", streamEndedCallback);
const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]);
await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise);

this.reader.removeListener("readable", bytesAvailableCallback);
this.reader.removeListener("end", streamEndedCallback);

if (bytesAvailable.isCompleted) {
continue;
}
Expand Down
78 changes: 63 additions & 15 deletions src/nerdbank-streams/src/Utilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,80 @@ export async function getBufferFrom(
cancellationToken?: CancellationToken): Promise<Buffer | null> {

const streamEnded = new Deferred<void>();
while (size > 0) {
cancellationToken?.throwIfCancelled();

const readBuffer = readable.read(size) as Buffer;
if (readBuffer === null) {
const bytesAvailable = new Deferred<void>();
readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable));
readable.once("end", streamEnded.resolve.bind(streamEnded));
const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]);
await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise);
if (size === 0) {
return Buffer.from([]);
}

if (bytesAvailable.isCompleted) {
continue;
let readBuffer: Buffer | null = null;
let index: number = 0;
while (size > 0) {
cancellationToken?.throwIfCancelled();
let availableSize = (readable as Readable).readableLength;
if (!availableSize) {
// Check the end of stream
if ((readable as Readable).readableEnded || streamEnded.isCompleted) {
// stream is closed
if (!allowEndOfStream) {
throw new Error("Stream terminated before required bytes were read.");
}

// Returns what has been read so far
if (readBuffer === null) {
return null;
}

// we need trim extra spaces
return readBuffer.subarray(0, index)
}

// we retain this behavior when availableSize === false
// to make existing unit tests happy (which assumes we will try to read stream when no data is ready.)
availableSize = size;
} else if (availableSize > size) {
availableSize = size;
}

if (!allowEndOfStream) {
if (!readBuffer || readBuffer.length < size) {
const newBuffer = readable.read(availableSize) as Buffer;
if (newBuffer) {
if (newBuffer.length < availableSize && !allowEndOfStream) {
throw new Error("Stream terminated before required bytes were read.");
}

if (readBuffer === null) {
if (availableSize === size || newBuffer.length < availableSize) {
// in the fast pass, we read the entire data once, and donot allocate an extra array.
return newBuffer;
}

// if we read partial data, we need allocate a buffer to join all data together.
readBuffer = Buffer.alloc(size);
}

// now append new data to the buffer
newBuffer.copy(readBuffer, index);

size -= newBuffer.length;
index += newBuffer.length;
}

return readBuffer;
if (size > 0) {
const bytesAvailable = new Deferred<void>();
const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable);
const streamEndedCallback = streamEnded.resolve.bind(streamEnded);
readable.once("readable", bytesAvailableCallback);
readable.once("end", streamEndedCallback);
try {
const endPromise = Promise.race([bytesAvailable.promise, streamEnded.promise]);
await (cancellationToken ? cancellationToken.racePromise(endPromise) : endPromise);
} finally {
readable.removeListener("readable", bytesAvailableCallback);
readable.removeListener("end", streamEndedCallback);
}
}
}

return Buffer.from([]);
return readBuffer;
}

export function throwIfDisposed(value: IDisposableObservable) {
Expand Down
19 changes: 17 additions & 2 deletions src/nerdbank-streams/src/tests/FullDuplexStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PassThrough, Readable, Writable } from "stream";
import { Deferred } from "../Deferred";
import { FullDuplexStream } from "../FullDuplexStream";
import { getBufferFrom } from "../Utilities";
import { delay } from "./Timeout";

describe("FullDuplexStream.CreatePair", () => {

Expand Down Expand Up @@ -62,8 +63,8 @@ describe("FullDuplexStream.Splice", () => {
let duplex: NodeJS.ReadWriteStream;

beforeEach(() => {
readable = new PassThrough();
writable = new PassThrough();
readable = new PassThrough({ writableHighWaterMark : 8 });
writable = new PassThrough({ writableHighWaterMark : 8 });
duplex = FullDuplexStream.Splice(readable, writable);
});

Expand All @@ -86,4 +87,18 @@ describe("FullDuplexStream.Splice", () => {
buffer = await getBufferFrom(writable, 1, true);
expect(buffer).toBeNull();
});

it("Read should yield when data is not ready", async () => {
const task = writeToStream(duplex, "abcdefgh", 4);
const buffer = await getBufferFrom(writable, 32);
await task;
expect(buffer.length).toEqual(32);
});

async function writeToStream(stream: NodeJS.ReadWriteStream, message: string, repeat: number) {
while (repeat--) {
stream.write(message);
await delay(2);
}
}
});
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,13 @@ import { ChannelOptions } from "../ChannelOptions";
if (readBuffer === null) {
const bytesAvailable = new Deferred<void>();
const streamEnded = new Deferred<void>();
readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable));
readable.once("end", streamEnded.resolve.bind(streamEnded));
const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable);
const streamEndedCallback = streamEnded.resolve.bind(streamEnded);
readable.once("readable", bytesAvailableCallback);
readable.once("end", streamEndedCallback);
await Promise.race([bytesAvailable.promise, streamEnded.promise]);
readable.removeListener("readable", bytesAvailableCallback);
readable.removeListener("end", streamEndedCallback);
if (bytesAvailable.isCompleted) {
readBuffer = readable.read() as Buffer;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,13 @@ async function readAsync(readable: NodeJS.ReadableStream): Promise<Buffer | null
if (readBuffer === null) {
const bytesAvailable = new Deferred<void>();
const streamEnded = new Deferred<void>();
readable.once("readable", bytesAvailable.resolve.bind(bytesAvailable));
readable.once("end", streamEnded.resolve.bind(streamEnded));
const bytesAvailableCallback = bytesAvailable.resolve.bind(bytesAvailable);
const streamEndedCallback = streamEnded.resolve.bind(streamEnded);
readable.once("readable", bytesAvailableCallback);
readable.once("end", streamEndedCallback);
await Promise.race([bytesAvailable.promise, streamEnded.promise]);
readable.removeListener("readable", bytesAvailableCallback);
readable.removeListener("end", streamEndedCallback);
if (bytesAvailable.isCompleted) {
readBuffer = readable.read() as Buffer;
} else {
Expand Down

0 comments on commit 4bb8a9f

Please sign in to comment.