Skip to content

Commit 680976a

Browse files
committed
adds reconnect example
1 parent 23da9b0 commit 680976a

File tree

1 file changed

+237
-0
lines changed

1 file changed

+237
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import {
2+
RSocketClient,
3+
BufferEncoders,
4+
encodeCompositeMetadata,
5+
TEXT_PLAIN,
6+
MESSAGE_RSOCKET_COMPOSITE_METADATA,
7+
MESSAGE_RSOCKET_ROUTING,
8+
MESSAGE_RSOCKET_AUTHENTICATION,
9+
encodeRoute,
10+
encodeSimpleAuthMetadata,
11+
} from 'rsocket-core';
12+
import type { ReactiveSocket, Payload, ISubscriber, ISubscription, DuplexConnection, Frame, ConnectionStatus } from 'rsocket-types';
13+
import { Flowable, Signle } from 'rsocket-flowable';
14+
import RSocketWebSocketClient from 'rsocket-websocket-client';
15+
import WebSocket from 'ws';
16+
17+
18+
class ResubscribeOperator<T> implements ISubscriber<T>, ISubscription {
19+
source: Flowable<T>;
20+
actual: ISubscriber<T>;
21+
22+
done: boolean;
23+
once: boolean;
24+
25+
upstream: ISubscription;
26+
27+
requested: number;
28+
29+
constructor(source: Flowable<T>, actual: ISubscriber<T>) {
30+
this.source = source;
31+
this.actual = actual;
32+
this.requested = 0;
33+
}
34+
35+
onSubscribe(subscription: ISubscription) {
36+
if (this.done) {
37+
subscription.cancel();
38+
return;
39+
}
40+
41+
this.upstream = subscription;
42+
43+
if (!this.once) {
44+
this.once = true;
45+
this.actual.onSubscribe(this);
46+
return;
47+
}
48+
49+
subscription.request(this.requested);
50+
}
51+
52+
onComplete() {
53+
if (this.done) {
54+
return;
55+
}
56+
57+
this.done = true;
58+
this.actual.onComplete();
59+
}
60+
61+
onError(error: Error) {
62+
if (this.done) {
63+
return;
64+
}
65+
66+
this.upstream = null;
67+
setTimeout(() => this.source.subscribe(this));
68+
}
69+
70+
onNext(value: T) {
71+
if (this.done) {
72+
return;
73+
}
74+
75+
this.requested--;
76+
this.actual.onNext(value);
77+
}
78+
79+
cancel() {
80+
if (this.done) {
81+
return;
82+
}
83+
84+
this.done = true;
85+
86+
if (this.upstream) {
87+
this.upstream = null;
88+
this.upstream.cancel();
89+
}
90+
}
91+
92+
request(n: number) {
93+
this.requested += n;
94+
if (this.upstream) {
95+
this.upstream.request(n);
96+
}
97+
}
98+
}
99+
100+
class ReconnectableRSocket<D, M> implements ReactiveSocket<D, M> {
101+
102+
socket: ReactiveSocket<D, M>;
103+
clientFactory: () => RSocketClient<D, M>;
104+
105+
constructor(clientFactory: () => RSocketClient<D, M>) {
106+
this.clientFactory = clientFactory;
107+
this.connect();
108+
}
109+
110+
connect() {
111+
this.clientFactory().connect().then(
112+
socket => {
113+
this.socket = socket;
114+
socket.connectionStatus().subscribe(event => {
115+
if (event.kind !== 'CONNECTED') {
116+
this.socket = null;
117+
this.connect();
118+
}
119+
});
120+
},
121+
error => this.connect()
122+
);
123+
}
124+
125+
fireAndForget(payload: Payload<D, M>): void {
126+
if (!this.socket) {
127+
throw new Error('Not Connected yet. Retry later');
128+
}
129+
130+
this.socket.fireAndForget(payload);
131+
}
132+
133+
requestResponse(payload: Payload<D, M>): Single<Payload<D, M>> {
134+
if (!this.socket) {
135+
return Single.error(new Error('Not Connected yet. Retry later'));
136+
}
137+
138+
return this.socket.requestResponse(payload);
139+
}
140+
141+
requestStream(payload: Payload<D, M>): Flowable<Payload<D, M>> {
142+
if (!this.socket) {
143+
return Flowable.error(new Error('Not Connected yet. Retry later'));
144+
}
145+
146+
return this.socket.requestStream(payload);
147+
}
148+
149+
requestChannel(payloads: Flowable<Payload<D, M>>): Flowable<Payload<D, M>> {
150+
if (!this.socket) {
151+
return Flowable.error(new Error('Not Connected yet. Retry later'));
152+
}
153+
154+
return this.socket.requestChannel(payloads);
155+
}
156+
157+
metadataPush(payload: Payload<D, M>): Single<void> {
158+
if (!this.socket) {
159+
return Single.error(new Error('Not Connected yet. Retry later'));
160+
}
161+
162+
return this.socket.metadataPush(payload);
163+
}
164+
165+
}
166+
167+
const maxRSocketRequestN = 2147483647;
168+
const keepAlive = 60000;
169+
const lifetime = 180000;
170+
const dataMimeType = 'application/octet-stream';
171+
const metadataMimeType = MESSAGE_RSOCKET_COMPOSITE_METADATA.string;
172+
const route = 'rsocket.request.stream';
173+
174+
const clientFactory: () => RSocketClient<Buffer, Buffer> = () => new RSocketClient({
175+
setup: {
176+
dataMimeType,
177+
keepAlive,
178+
lifetime,
179+
metadataMimeType,
180+
payload: {
181+
data: undefined,
182+
metadata: encodeCompositeMetadata([
183+
[TEXT_PLAIN, Buffer.from('Hello World')],
184+
[MESSAGE_RSOCKET_ROUTING, encodeRoute(route)],
185+
[
186+
MESSAGE_RSOCKET_AUTHENTICATION,
187+
encodeSimpleAuthMetadata('user', 'pass'),
188+
],
189+
['custom/test/metadata', Buffer.from([1, 2, 3])],
190+
]),
191+
},
192+
},
193+
transport: new RSocketWebSocketClient(
194+
{
195+
debug: true,
196+
url: 'ws://localhost:8080/rsocket',
197+
wsCreator: url => new WebSocket(url),
198+
},
199+
BufferEncoders,
200+
),
201+
});
202+
203+
204+
const socket = new ReconnectableRSocket(clientFactory);
205+
206+
207+
const request = new Flowable(subscriber => {
208+
socket
209+
.requestStream({
210+
data: Buffer.from('request-stream'),
211+
metadata: encodeCompositeMetadata([
212+
[TEXT_PLAIN, Buffer.from('Hello World')],
213+
[MESSAGE_RSOCKET_ROUTING, encodeRoute(route)],
214+
[
215+
MESSAGE_RSOCKET_AUTHENTICATION,
216+
encodeSimpleAuthMetadata('user', 'pass'),
217+
],
218+
['custom/test/metadata', Buffer.from([1, 2, 3])],
219+
]),
220+
})
221+
.subscribe(subscriber);
222+
});
223+
224+
request
225+
.map()
226+
.lift(actual => new ResubscribeOperator(request, actual))
227+
.subscribe({
228+
// eslint-disable-next-line no-console
229+
onComplete: () => console.log('Request-stream completed'),
230+
onError: error =>
231+
console.error(`Request-stream error:${error.message}`),
232+
// eslint-disable-next-line no-console
233+
onNext: value => console.log('%s %s', value.data, value.metadata),
234+
onSubscribe: sub => sub.request(maxRSocketRequestN),
235+
});
236+
237+
setTimeout(() => { }, 30000000);

0 commit comments

Comments
 (0)