Skip to content

Commit 6254847

Browse files
committed
Set connection status to ERROR when closed due to protocol error
Amends DuplexConnection#close to accept an optional error indicating that the connection is being closed due to that error. Updates implementations to handle this error and report it to consumers. Updates RSocketMachine to pass protocol-level connection errors to close Adds/updates tests to check for handling this parameter
1 parent a85a4db commit 6254847

File tree

13 files changed

+206
-63
lines changed

13 files changed

+206
-63
lines changed

packages/rsocket-core/src/RSocketMachine.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -598,8 +598,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
598598
};
599599

600600
_handleConnectionError(error: Error): void {
601-
this._handleError(error);
602-
this._connection.close();
601+
this._connection.close(error);
603602
const errorHandler = this._errorHandler;
604603
if (errorHandler) {
605604
errorHandler(error);

packages/rsocket-core/src/RSocketResumableTransport.js

+13-8
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ export default class RSocketResumableTransport implements DuplexConnection {
156156
this._statusSubscribers = new Set();
157157
}
158158

159-
close(): void {
160-
this._close();
159+
close(error?: Error): void {
160+
this._close(error);
161161
}
162162

163163
connect(): void {
@@ -275,13 +275,18 @@ export default class RSocketResumableTransport implements DuplexConnection {
275275
if (this._isTerminated()) {
276276
return;
277277
}
278-
if (error) {
279-
this._setConnectionStatus({error, kind: 'ERROR'});
280-
} else {
281-
this._setConnectionStatus(CONNECTION_STATUS.CLOSED);
282-
}
278+
279+
const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED;
280+
this._setConnectionStatus(status);
281+
283282
const receivers = this._receivers;
284-
receivers.forEach(r => r.onComplete());
283+
receivers.forEach(subscriber => {
284+
if (error) {
285+
subscriber.onError(error);
286+
} else {
287+
subscriber.onComplete();
288+
}
289+
});
285290
receivers.clear();
286291

287292
const senders = this._senders;

packages/rsocket-core/src/ReassemblyDuplexConnection.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ export class ReassemblyDuplexConnection implements DuplexConnection {
3737
.lift(actual => new ReassemblySubscriber(actual));
3838
}
3939

40-
close(): void {
41-
this._source.close();
40+
close(error?: Error): void {
41+
this._source.close(error);
4242
}
4343

4444
connect(): void {

packages/rsocket-core/src/__mocks__/MockDuplexConnection.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ export function genMockConnection() {
2828
let closed = false;
2929

3030
const connection = {
31-
close: jest.fn(() => {
32-
connection.mock.close();
31+
close: jest.fn(error => {
32+
if (error) {
33+
connection.mock.closeWithError(error);
34+
} else {
35+
connection.mock.close();
36+
}
3337
}),
3438
connect: jest.fn(),
3539
connectionStatus: jest.fn(() => status),

packages/rsocket-core/src/__tests__/RSocketClient-test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ describe('RSocketClient', () => {
310310
expect(errors.values().next().value).toEqual(
311311
`No keep-alive acks for ${keepAliveTimeout} millis`,
312312
);
313-
expect(status.kind).toEqual('CLOSED');
313+
expect(status.kind).toEqual('ERROR');
314314

315315
jest.advanceTimersByTime(keepAliveTimeout);
316316
});

packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js

+55
Original file line numberDiff line numberDiff line change
@@ -687,4 +687,59 @@ describe('RSocketResumableTransport', () => {
687687
expect(currentTransport.sendOne.mock.calls.length).toBe(0);
688688
});
689689
});
690+
691+
describe('post-connect() APIs', () => {
692+
beforeEach(() => {
693+
resumableTransport.connect();
694+
currentTransport.mock.connect();
695+
});
696+
697+
describe('close()', () => {
698+
describe('given an error', () => {
699+
it('closes the transport', () => {
700+
resumableTransport.close(new Error());
701+
expect(currentTransport.close.mock.calls.length).toBe(1);
702+
});
703+
704+
it('sets the status to ERROR with the given error', () => {
705+
const error = new Error();
706+
resumableTransport.close(error);
707+
expect(resumableStatus.kind).toBe('ERROR');
708+
expect(resumableStatus.error).toBe(error);
709+
});
710+
711+
it('calls receive.onError with the given error', () => {
712+
const onError = jest.fn();
713+
const onSubscribe = subscription =>
714+
subscription.request(Number.MAX_SAFE_INTEGER);
715+
resumableTransport.receive().subscribe({onError, onSubscribe});
716+
const error = new Error();
717+
resumableTransport.close(error);
718+
expect(onError.mock.calls.length).toBe(1);
719+
expect(onError.mock.calls[0][0]).toBe(error);
720+
});
721+
});
722+
723+
describe('not given an error', () => {
724+
it('closes the transport', () => {
725+
resumableTransport.close();
726+
expect(currentTransport.close.mock.calls.length).toBe(1);
727+
});
728+
729+
it('sets the status to CLOSED', () => {
730+
resumableTransport.close();
731+
expect(resumableStatus.kind).toBe('CLOSED');
732+
});
733+
734+
it('calls receive.onComplete', () => {
735+
const onComplete = jest.fn();
736+
const onSubscribe = subscription =>
737+
subscription.request(Number.MAX_SAFE_INTEGER);
738+
resumableTransport.receive().subscribe({onComplete, onSubscribe});
739+
resumableTransport.close();
740+
expect(onComplete.mock.calls.length).toBe(1);
741+
});
742+
});
743+
});
744+
});
690745
});

packages/rsocket-tcp-client/src/RSocketTcpClient.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ export class RSocketTcpConnection implements DuplexConnection {
6060
}
6161
}
6262

63-
close(): void {
64-
this._close();
63+
close(error?: Error): void {
64+
this._close(error);
6565
}
6666

6767
connect(): void {

packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js

+52-19
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,62 @@ describe('RSocketTcpClient', () => {
9898
});
9999

100100
describe('close()', () => {
101-
it('closes the socket', () => {
102-
client.close();
103-
expect(socket.end.mock.calls.length).toBe(1);
104-
});
101+
describe('given an error', () => {
102+
it('closes the socket', () => {
103+
client.close(new Error());
104+
expect(socket.end.mock.calls.length).toBe(1);
105+
});
105106

106-
it('sets the status to CLOSED', () => {
107-
let status;
108-
client.connectionStatus().subscribe({
109-
onNext: _status => (status = _status),
110-
onSubscribe: subscription =>
111-
subscription.request(Number.MAX_SAFE_INTEGER),
107+
it('sets the status to ERROR with the given error', () => {
108+
let status;
109+
client.connectionStatus().subscribe({
110+
onNext: _status => (status = _status),
111+
onSubscribe: subscription =>
112+
subscription.request(Number.MAX_SAFE_INTEGER),
113+
});
114+
const error = new Error();
115+
client.close(error);
116+
expect(status.kind).toBe('ERROR');
117+
expect(status.error).toBe(error);
118+
});
119+
120+
it('calls receive.onError with the given error', () => {
121+
const onError = jest.fn();
122+
const onSubscribe = subscription =>
123+
subscription.request(Number.MAX_SAFE_INTEGER);
124+
client.receive().subscribe({onError, onSubscribe});
125+
const error = new Error();
126+
client.close(error);
127+
expect(onError.mock.calls.length).toBe(1);
128+
expect(onError.mock.calls[0][0]).toBe(error);
112129
});
113-
client.close();
114-
expect(status.kind).toBe('CLOSED');
115130
});
116131

117-
it('calls receive.onComplete', () => {
118-
const onComplete = jest.fn();
119-
const onSubscribe = subscription =>
120-
subscription.request(Number.MAX_SAFE_INTEGER);
121-
client.receive().subscribe({onComplete, onSubscribe});
122-
client.close();
123-
expect(onComplete.mock.calls.length).toBe(1);
132+
describe('not given an error', () => {
133+
it('closes the socket', () => {
134+
client.close();
135+
expect(socket.end.mock.calls.length).toBe(1);
136+
});
137+
138+
it('sets the status to CLOSED', () => {
139+
let status;
140+
client.connectionStatus().subscribe({
141+
onNext: _status => (status = _status),
142+
onSubscribe: subscription =>
143+
subscription.request(Number.MAX_SAFE_INTEGER),
144+
});
145+
client.close();
146+
expect(status.kind).toBe('CLOSED');
147+
});
148+
149+
it('calls receive.onComplete', () => {
150+
const onComplete = jest.fn();
151+
const onSubscribe = subscription =>
152+
subscription.request(Number.MAX_SAFE_INTEGER);
153+
client.receive().subscribe({onComplete, onSubscribe});
154+
client.close();
155+
expect(onComplete.mock.calls.length).toBe(1);
156+
});
124157
});
125158
});
126159

packages/rsocket-types/src/ReactiveSocketTypes.js

+4-3
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ export interface DuplexConnection {
118118
receive(): Flowable<Frame>,
119119

120120
/**
121-
* Close the underlying connection, emitting `onComplete` on the receive()
122-
* Publisher.
121+
* Close the underlying connection, optionally providing an error as reason.
122+
* If an error is passed, emits `onError` on the receive() Publisher.
123+
* If no error is passed, emits `onComplete` on the receive() Publisher.
123124
*/
124-
close(): void,
125+
close(error?: Error): void,
125126

126127
/**
127128
* Open the underlying connection. Throws if the connection is already in

packages/rsocket-websocket-client/src/RSocketWebSocketClient.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ export default class RSocketWebSocketClient implements DuplexConnection {
6262
this._statusSubscribers = new Set();
6363
}
6464

65-
close(): void {
66-
this._close();
65+
close(error?: Error): void {
66+
this._close(error);
6767
}
6868

6969
connect(): void {

packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js

+52-19
Original file line numberDiff line numberDiff line change
@@ -93,29 +93,62 @@ describe('RSocketWebSocketClient', () => {
9393
});
9494

9595
describe('close()', () => {
96-
it('closes the socket', () => {
97-
client.close();
98-
expect(socket.close.mock.calls.length).toBe(1);
99-
});
96+
describe('given an error', () => {
97+
it('closes the socket', () => {
98+
client.close(new Error());
99+
expect(socket.close.mock.calls.length).toBe(1);
100+
});
100101

101-
it('sets the status to CLOSED', () => {
102-
let status;
103-
client.connectionStatus().subscribe({
104-
onNext: _status => (status = _status),
105-
onSubscribe: subscription =>
106-
subscription.request(Number.MAX_SAFE_INTEGER),
102+
it('sets the status to ERROR with the given error', () => {
103+
let status;
104+
client.connectionStatus().subscribe({
105+
onNext: _status => (status = _status),
106+
onSubscribe: subscription =>
107+
subscription.request(Number.MAX_SAFE_INTEGER),
108+
});
109+
const error = new Error();
110+
client.close(error);
111+
expect(status.kind).toBe('ERROR');
112+
expect(status.error).toBe(error);
113+
});
114+
115+
it('calls receive.onError with the given error', () => {
116+
const onError = jest.fn();
117+
const onSubscribe = subscription =>
118+
subscription.request(Number.MAX_SAFE_INTEGER);
119+
client.receive().subscribe({onError, onSubscribe});
120+
const error = new Error();
121+
client.close(error);
122+
expect(onError.mock.calls.length).toBe(1);
123+
expect(onError.mock.calls[0][0]).toBe(error);
107124
});
108-
client.close();
109-
expect(status.kind).toBe('CLOSED');
110125
});
111126

112-
it('calls receive.onComplete', () => {
113-
const onComplete = jest.fn();
114-
const onSubscribe = subscription =>
115-
subscription.request(Number.MAX_SAFE_INTEGER);
116-
client.receive().subscribe({onComplete, onSubscribe});
117-
client.close();
118-
expect(onComplete.mock.calls.length).toBe(1);
127+
describe('not given an error', () => {
128+
it('closes the socket', () => {
129+
client.close();
130+
expect(socket.close.mock.calls.length).toBe(1);
131+
});
132+
133+
it('sets the status to CLOSED', () => {
134+
let status;
135+
client.connectionStatus().subscribe({
136+
onNext: _status => (status = _status),
137+
onSubscribe: subscription =>
138+
subscription.request(Number.MAX_SAFE_INTEGER),
139+
});
140+
client.close();
141+
expect(status.kind).toBe('CLOSED');
142+
});
143+
144+
it('calls receive.onComplete', () => {
145+
const onComplete = jest.fn();
146+
const onSubscribe = subscription =>
147+
subscription.request(Number.MAX_SAFE_INTEGER);
148+
client.receive().subscribe({onComplete, onSubscribe});
149+
client.close();
150+
expect(onComplete.mock.calls.length).toBe(1);
151+
});
119152
});
120153
});
121154

packages/rsocket-websocket-server/src/RSocketWebSocketServer.js

+7-2
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,13 @@ class WSDuplexConnection implements DuplexConnection {
237237
});
238238
}
239239

240-
close(): void {
241-
this._socket.emit('close');
240+
close(error?: Error): void {
241+
if (error) {
242+
this._socket.emit('error', error);
243+
} else {
244+
this._socket.emit('close');
245+
}
246+
242247
this._socket.close();
243248
}
244249

packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js

+9-1
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,18 @@ describe('RSocketWebSocketServer', () => {
7474
expect(status.error).toBe(error);
7575
});
7676

77-
it('returns CLOSED if explicitly closed', () => {
77+
it('returns CLOSED if explicitly closed with no error', () => {
7878
connection.receive().subscribe(() => {});
7979
connection.close();
8080
expect(status.kind).toBe('CLOSED');
8181
});
82+
83+
it('returns ERROR if explicitly closed with an error', () => {
84+
connection.receive().subscribe(() => {});
85+
const error = new Error();
86+
connection.close(error);
87+
expect(status.kind).toBe('ERROR');
88+
expect(status.error).toBe(error);
89+
});
8290
});
8391
});

0 commit comments

Comments
 (0)