Skip to content

Commit df19c89

Browse files
committed
Observe acceptance rejection so it doesn't crash the process
1 parent 8ced31a commit df19c89

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

src/nerdbank-streams/src/Channel.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@ export class ChannelClass extends Channel {
203203
// or even expect it to be recognized by anyone else.
204204
// The acceptance promise rejection is observed by the offer channel method.
205205
caught(this._completion.promise);
206+
207+
// Inform the remote side that the offer is rescinded.
208+
this.dispose();
206209
}
207210

208211
public onAccepted(acceptanceParameter: AcceptanceParameters): boolean {
@@ -248,7 +251,10 @@ export class ChannelClass extends Channel {
248251
if (!this.isDisposed) {
249252
super.dispose();
250253

251-
this._acceptance.reject(new CancellationToken.CancellationError("disposed"));
254+
if (this._acceptance.reject(new CancellationToken.CancellationError("disposed"))) {
255+
// Don't crash node due to an unnoticed rejection when dispose was explicitly called.
256+
caught(this.acceptance);
257+
}
252258

253259
// For the pipes, we Complete *our* ends, and leave the user's ends alone.
254260
// The completion will propagate when it's ready to.

src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { timeout } from "./Timeout";
88
import { Channel } from "../Channel";
99
import CancellationToken from "cancellationtoken";
1010
import * as assert from "assert";
11+
import { nextTick } from "process";
1112

1213
[1, 2, 3].forEach(protocolMajorVersion => {
1314
describe(`MultiplexingStream v${protocolMajorVersion}`, () => {
@@ -114,6 +115,24 @@ import * as assert from "assert";
114115
await assert.rejects(offer);
115116
});
116117

118+
it("Channel offer is canceled by sender after receiver gets it", async () => {
119+
// Arrange to cancel the offer only after the remote party receives it (but before they accept it.)
120+
const cts = CancellationToken.create();
121+
mx2.on('channelOffered', args => {
122+
cts.cancel('rescind offer');
123+
});
124+
const offer = mx1.offerChannelAsync("test", undefined, cts.token);
125+
await expectAsync(offer).toBeRejected()
126+
127+
// Give time for the termination fram to arrive *before* we try to accept the channel.
128+
for (let i = 0; i < 100; i++) {
129+
await new Promise<void>(resolve => nextTick(() => resolve()));
130+
}
131+
132+
// We expect this to timeout. But we need this for the test to fail if we have unobserved promise rejections.
133+
await expectAsync(timeout(mx2.acceptChannelAsync('test'), 1000)).toBeRejected();
134+
});
135+
117136
it("Channel offer is rejected by event handler", async () => {
118137
const handler = new Deferred<void>();
119138
mx2.on("channelOffered", (args) => {

0 commit comments

Comments
 (0)