Skip to content

Commit

Permalink
Merge pull request #538 from AArnott/avoidRejection
Browse files Browse the repository at this point in the history
Avoid rejection of unobserved promises
  • Loading branch information
AArnott authored Sep 28, 2022
2 parents 8ced31a + ea05e8f commit c25c827
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
16 changes: 12 additions & 4 deletions src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export abstract class Channel implements IDisposableObservable {
/**
* Closes this channel.
*/
public dispose() {
public dispose(): void {
// The interesting stuff is in the derived class.
this._isDisposed = true;
}
Expand Down Expand Up @@ -203,6 +203,9 @@ export class ChannelClass extends Channel {
// or even expect it to be recognized by anyone else.
// The acceptance promise rejection is observed by the offer channel method.
caught(this._completion.promise);

// Inform the remote side that the offer is rescinded.
this.dispose();
}

public onAccepted(acceptanceParameter: AcceptanceParameters): boolean {
Expand Down Expand Up @@ -244,19 +247,24 @@ export class ChannelClass extends Channel {
}
}

public async dispose() {
public dispose(): void {
if (!this.isDisposed) {
super.dispose();

this._acceptance.reject(new CancellationToken.CancellationError("disposed"));
if (this._acceptance.reject(new CancellationToken.CancellationError("disposed"))) {
// Don't crash node due to an unnoticed rejection when dispose was explicitly called.
caught(this.acceptance);
}

// For the pipes, we Complete *our* ends, and leave the user's ends alone.
// The completion will propagate when it's ready to.
this._duplex.end();
this._duplex.push(null);

this._completion.resolve();
await this._multiplexingStream.onChannelDisposed(this);

// Send the notification, but we can't await the result of this.
caught(this._multiplexingStream.onChannelDisposed(this));
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { timeout } from "./Timeout";
import { Channel } from "../Channel";
import CancellationToken from "cancellationtoken";
import * as assert from "assert";
import { nextTick } from "process";

[1, 2, 3].forEach(protocolMajorVersion => {
describe(`MultiplexingStream v${protocolMajorVersion}`, () => {
Expand Down Expand Up @@ -114,6 +115,24 @@ import * as assert from "assert";
await assert.rejects(offer);
});

it("Channel offer is canceled by sender after receiver gets it", async () => {
// Arrange to cancel the offer only after the remote party receives it (but before they accept it.)
const cts = CancellationToken.create();
mx2.on('channelOffered', args => {
cts.cancel('rescind offer');
});
const offer = mx1.offerChannelAsync("test", undefined, cts.token);
await expectAsync(offer).toBeRejected()

// Give time for the termination fram to arrive *before* we try to accept the channel.
for (let i = 0; i < 100; i++) {
await new Promise<void>(resolve => nextTick(() => resolve()));
}

// We expect this to timeout. But we need this for the test to fail if we have unobserved promise rejections.
await expectAsync(timeout(mx2.acceptChannelAsync('test'), 1000)).toBeRejected();
});

it("Channel offer is rejected by event handler", async () => {
const handler = new Deferred<void>();
mx2.on("channelOffered", (args) => {
Expand Down

0 comments on commit c25c827

Please sign in to comment.