diff --git a/src/nerdbank-streams/src/Channel.ts b/src/nerdbank-streams/src/Channel.ts index eae435dd..79171273 100644 --- a/src/nerdbank-streams/src/Channel.ts +++ b/src/nerdbank-streams/src/Channel.ts @@ -56,7 +56,7 @@ export abstract class Channel implements IDisposableObservable { /** * Closes this channel. */ - public dispose() { + public dispose(): void { // The interesting stuff is in the derived class. this._isDisposed = true; } @@ -203,6 +203,9 @@ export class ChannelClass extends Channel { // or even expect it to be recognized by anyone else. // The acceptance promise rejection is observed by the offer channel method. caught(this._completion.promise); + + // Inform the remote side that the offer is rescinded. + this.dispose(); } public onAccepted(acceptanceParameter: AcceptanceParameters): boolean { @@ -244,11 +247,14 @@ export class ChannelClass extends Channel { } } - public async dispose() { + public dispose(): void { if (!this.isDisposed) { super.dispose(); - this._acceptance.reject(new CancellationToken.CancellationError("disposed")); + if (this._acceptance.reject(new CancellationToken.CancellationError("disposed"))) { + // Don't crash node due to an unnoticed rejection when dispose was explicitly called. + caught(this.acceptance); + } // For the pipes, we Complete *our* ends, and leave the user's ends alone. // The completion will propagate when it's ready to. @@ -256,7 +262,9 @@ export class ChannelClass extends Channel { this._duplex.push(null); this._completion.resolve(); - await this._multiplexingStream.onChannelDisposed(this); + + // Send the notification, but we can't await the result of this. + caught(this._multiplexingStream.onChannelDisposed(this)); } } diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts index e78cc49a..5ad2ea3f 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts @@ -8,6 +8,7 @@ import { timeout } from "./Timeout"; import { Channel } from "../Channel"; import CancellationToken from "cancellationtoken"; import * as assert from "assert"; +import { nextTick } from "process"; [1, 2, 3].forEach(protocolMajorVersion => { describe(`MultiplexingStream v${protocolMajorVersion}`, () => { @@ -114,6 +115,24 @@ import * as assert from "assert"; await assert.rejects(offer); }); + it("Channel offer is canceled by sender after receiver gets it", async () => { + // Arrange to cancel the offer only after the remote party receives it (but before they accept it.) + const cts = CancellationToken.create(); + mx2.on('channelOffered', args => { + cts.cancel('rescind offer'); + }); + const offer = mx1.offerChannelAsync("test", undefined, cts.token); + await expectAsync(offer).toBeRejected() + + // Give time for the termination fram to arrive *before* we try to accept the channel. + for (let i = 0; i < 100; i++) { + await new Promise(resolve => nextTick(() => resolve())); + } + + // We expect this to timeout. But we need this for the test to fail if we have unobserved promise rejections. + await expectAsync(timeout(mx2.acceptChannelAsync('test'), 1000)).toBeRejected(); + }); + it("Channel offer is rejected by event handler", async () => { const handler = new Deferred(); mx2.on("channelOffered", (args) => {