This repository was archived by the owner on Feb 29, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathwebsocket-handler.ts
151 lines (133 loc) · 4.33 KB
/
websocket-handler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import { URLExt } from '@jupyterlab/coreutils';
import { ServerConnection } from '@jupyterlab/services';
import { UUID } from '@lumino/coreutils';
import { requestAPI } from './handler';
import { ChatModel, IChatModel } from '../model';
import { ChatService } from '../services';
const CHAT_SERVICE_URL = 'api/chat';
/**
* An implementation of the chat model based on websocket handler.
*/
export class WebSocketHandler extends ChatModel {
/**
* The server settings used to make API requests.
*/
readonly serverSettings: ServerConnection.ISettings;
/**
* Create a new chat handler.
*/
constructor(options: WebSocketHandler.IOptions = {}) {
super(options);
this.serverSettings =
options.serverSettings ?? ServerConnection.makeSettings();
}
/**
* Initializes the WebSocket connection to the Chat backend. Promise is
* resolved when server acknowledges connection and sends the client ID. This
* must be awaited before calling any other method.
*/
async initialize(): Promise<void> {
await this._initialize();
}
/**
* Sends a message across the WebSocket. Promise resolves to the message ID
* when the server sends the same message back, acknowledging receipt.
*/
sendMessage(message: ChatService.ChatRequest): Promise<boolean> {
message.id = UUID.uuid4();
return new Promise(resolve => {
this._socket?.send(JSON.stringify(message));
this._sendResolverQueue.set(message.id!, resolve);
});
}
async getHistory(): Promise<ChatService.ChatHistory> {
let data: ChatService.ChatHistory = { messages: [] };
try {
data = await requestAPI('history', {
method: 'GET'
});
} catch (e) {
return Promise.reject(e);
}
return data;
}
/**
* Dispose the chat handler.
*/
dispose(): void {
super.dispose();
// Clean up socket.
const socket = this._socket;
if (socket) {
this._socket = null;
socket.onopen = () => undefined;
socket.onerror = () => undefined;
socket.onmessage = () => undefined;
socket.onclose = () => undefined;
socket.close();
}
}
onMessage(message: ChatService.IMessage): void {
// resolve promise from `sendMessage()`
if (message.type === 'msg' && message.sender.id === this.id) {
this._sendResolverQueue.get(message.id)?.(true);
}
super.onMessage(message);
}
private _onClose(e: CloseEvent, reject: any) {
reject(new Error('Chat UI websocket disconnected'));
console.error('Chat UI websocket disconnected');
// only attempt re-connect if there was an abnormal closure
// WebSocket status codes defined in RFC 6455: https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1
if (e.code === 1006) {
const delaySeconds = 1;
console.info(`Will try to reconnect in ${delaySeconds} s.`);
setTimeout(async () => await this._initialize(), delaySeconds * 1000);
}
}
private _initialize(): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (this.isDisposed) {
return;
}
console.log('Creating a new websocket connection for chat...');
const { token, WebSocket, wsUrl } = this.serverSettings;
const url =
URLExt.join(wsUrl, CHAT_SERVICE_URL) +
(token ? `?token=${encodeURIComponent(token)}` : '');
const socket = (this._socket = new WebSocket(url));
socket.onclose = e => this._onClose(e, reject);
socket.onerror = e => reject(e);
socket.onmessage = msg =>
msg.data && this.onMessage(JSON.parse(msg.data));
const listenForConnection = (
_: IChatModel,
message: ChatService.IMessage
) => {
if (message.type !== 'connection') {
return;
}
this.id = message.client_id;
resolve();
this.incomingMessage.disconnect(listenForConnection);
};
this.incomingMessage.connect(listenForConnection);
});
}
private _socket: WebSocket | null = null;
/**
* Queue of Promise resolvers pushed onto by `send()`
*/
private _sendResolverQueue = new Map<string, (value: boolean) => void>();
}
/**
* The websocket namespace.
*/
export namespace WebSocketHandler {
/**
* The instantiation options for a data registry handler.
*/
export interface IOptions extends ChatModel.IOptions {
serverSettings?: ServerConnection.ISettings;
}
}