This repository was archived by the owner on Nov 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathcodestream_message_test.js
373 lines (339 loc) · 12.2 KB
/
codestream_message_test.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
'use strict';
const Assert = require('assert');
const CodeStreamAPITest = require(process.env.CSSVC_BACKEND_ROOT + '/api_server/lib/test_base/codestream_api_test');
const BoundAsync = require(process.env.CSSVC_BACKEND_ROOT + '/shared/server_utils/bound_async');
const PubNub = require('pubnub');
const MockPubnub = require(process.env.CSSVC_BACKEND_ROOT + '/shared/server_utils/pubnub/mock_pubnub');
const PubNubClient = require(process.env.CSSVC_BACKEND_ROOT + '/shared/server_utils/pubnub/pubnub_client_async');
const SocketClusterClient = require(process.env.CSSVC_BACKEND_ROOT + '/shared/server_utils/socketcluster/socketcluster_client');
const RandomString = require('randomstring');
const OS = require('os');
/* eslint no-console: 0 */
class CodeStreamMessageTest extends CodeStreamAPITest {
constructor (options) {
super(options);
this.reallySendMessages = true; // we suppress messages ordinarily, but since we're actually testing them...
this.messageParts = {};
}
// before the test, set up messaging clients and start listening
before (callback) {
this.broadcasterClientsForUser = {};
BoundAsync.series(this, [
super.before,
this.makeData, // make whatever data we need to be in the database to proceed
this.makeBroadcasterForServer, // make broadcaster client simulating server to send
this.makeBroadcasterForClient, // make broadcaster client simulating client to receive
this.setChannelName, // set the channel name that we'll listen for
this.wait // wait a bit for access privileges to be set
], callback);
}
// after the test runs, unsubscribe from all channels
after (callback) {
Object.keys(this.broadcasterClientsForUser || []).forEach(userId => {
this.broadcasterClientsForUser[userId].unsubscribeAll();
this.broadcasterClientsForUser[userId].disconnect();
delete this.broadcasterClientsForUser[userId];
});
if (this.broadcasterForServer) {
this.broadcasterForServer.unsubscribeAll();
this.broadcasterForServer.disconnect();
delete this.broadcasterForServer;
}
super.after(callback);
}
// during the test, we send a message and wait for it to arrive
run (callback) {
if (this.mockMode && this.wantServer && !this.usingSocketCluster) {
console.warn('NOTE - THIS TEST CAN NOT SIMULATE A SERVER IN MOCK MODE, PASSING SUPERFICIALLY');
this.testDidNotRun = true;
return callback();
}
BoundAsync.series(this, [
this.listenOnClient, // start listening first
this.waitForSubscribe, // after listening, wait a bit till we generate the message
this.doGenerateMessage, // now trigger whatever request will cause the message to be sent
this.waitForMessage, // wait for the message to arrive
this.clearTimer // once the message arrives, stop waiting
], callback);
}
makeBroadcasterForServer (callback) {
if (!this.wantServer) {
return callback();
}
if (this.usingSocketCluster) {
return this.makeSocketClusterClientForServer(callback);
}
if (this.mockMode) {
this.testLog('Cannot make PubNub client for server in mock mode');
return callback();
}
// all we have to do here is provide the full config, which includes the secretKey
this.testLog('Making PubNub client for server...');
let config = Object.assign({}, this.apiConfig.broadcastEngine.pubnub);
config.uuid = `API-${OS.hostname()}-${this.testNum}`;
let client = new PubNub(config);
this.broadcasterForServer = new PubNubClient({
pubnub: client
});
this.broadcasterForServer.init();
callback();
}
makeSocketClusterClientForServer (callback) {
const broadcasterConfig = this.apiConfig.broadcastEngine.codestreamBroadcaster;
this.testLog('Making SocketCluster client for server...');
(async () => {
const config = Object.assign({},
{
// formerly the socketCluster object
host: broadcasterConfig.host,
port: broadcasterConfig.port,
authKey: broadcasterConfig.secrets.api,
ignoreHttps: broadcasterConfig.ignoreHttps,
strictSSL: broadcasterConfig.sslCert.requireStrictSSL,
apiSecret: broadcasterConfig.secrets.api
},
{ uid: 'API' }
);
this.broadcasterForServer = new SocketClusterClient(config);
try {
await this.broadcasterForServer.init();
}
catch (error) {
return callback(error);
}
callback();
})();
}
makeBroadcasterForClient (callback) {
if (this.usingSocketCluster) {
return this.makeSocketClusterClientForClient(callback);
}
let listeningUser;
if (this.listeningUserIndex !== undefined) {
listeningUser = this.users[this.listeningUserIndex];
} else if (this.listeningUser) {
listeningUser = this.listeningUser;
} else {
listeningUser = this.currentUser;
}
// we remove the secretKey, which clients should NEVER have, and the publishKey, which we won't be using
const user = listeningUser.user;
const token = listeningUser.broadcasterToken || listeningUser.user.id;
let clientConfig = Object.assign({}, this.apiConfig.broadcastEngine.pubnub);
delete clientConfig.secretKey;
delete clientConfig.publishKey;
clientConfig.uuid = user._pubnubUuid || user.id;
clientConfig.authKey = token;
if (this.mockMode) {
clientConfig.ipc = this.ipc;
clientConfig.serverId = this.apiConfig.apiServer.ipc.serverId;
}
let client = this.mockMode ? new MockPubnub(clientConfig) : new PubNub(clientConfig);
this.broadcasterClientsForUser[user.id] = new PubNubClient({
pubnub: client
});
this.broadcasterClientsForUser[user.id].init();
this.testLog(`Made PubNub client for user ${user.id}, token ${token}`);
callback();
}
makeSocketClusterClientForClient (callback) {
let listeningUser;
if (this.listeningUserIndex !== undefined) {
listeningUser = this.users[this.listeningUserIndex];
} else {
listeningUser = this.currentUser;
}
const { user, broadcasterToken } = listeningUser;
const broadcasterConfig = this.apiConfig.broadcastEngine.codestreamBroadcaster;
const config = Object.assign({},
{
// formerly the socketCluster object
host: broadcasterConfig.host,
port: broadcasterConfig.port,
authKey: broadcasterConfig.secrets.api,
ignoreHttps: broadcasterConfig.ignoreHttps,
strictSSL: broadcasterConfig.requireStrictSSL,
apiSecret: broadcasterConfig.secrets.api
},
{
uid: user.id,
authKey: broadcasterToken,
}
);
if (this.cheatOnSubscription) {
config.subscriptionCheat = this.apiConfig.sharedSecrets.subscriptionCheat;
}
(async () => {
this.broadcasterClientsForUser[user.id] = new SocketClusterClient(config);
try {
await this.broadcasterClientsForUser[user.id].init();
this.testLog(`Made SocketCluster client for user ${user.id}, token ${broadcasterToken}`);
}
catch (error) {
return callback(error);
}
callback();
})();
}
// make whatever data we need to set up our messaging, this should be overridden for specific tests
makeData (callback) {
callback();
}
// set the channel name of interest, this should be overridden for specific tests
setChannelName (callback) {
callback('setChannelName should be overridden');
}
// wait for permissions to be set through pubnub PAM
wait (callback) {
const time = this.usingSocketCluster ? 0 : (this.mockMode ? 100 : 5000);
this.testLog(`Waiting ${time} for message...`);
setTimeout(callback, time);
}
// begin listening on the simulated client
listenOnClient (callback) {
// we'll time out after 5 seconds
const timeout = this.messageReceiveTimeout || 5000;
this.testLog(`Client listening on ${this.channelName}, will time out after ${timeout} ms...`);
this.messageTimer = setTimeout(
this.messageTimeout.bind(this, this.channelName),
timeout
);
(async () => {
// subscribe to the channel of interest
let listeningUser;
if (this.listeningUserIndex !== undefined) {
listeningUser = this.users[this.listeningUserIndex];
} else if (this.listeningUser) {
listeningUser = this.listeningUser;
} else {
listeningUser = this.currentUser;
}
const broadcaster = this.broadcasterClientsForUser[listeningUser.user.id];
try {
await broadcaster.subscribe(
this.channelName,
this.messageReceived.bind(this),
{
//withPresence: this.withPresence,
onFail: this.onSubscribeFail ? this.onSubscribeFail.bind(this) : undefined
}
);
this.testLog(`Subscribed to ${this.channelName}`);
}
catch (error) {
this.testLog(`Failed to subscribe to ${this.channelName}`);
return callback(error);
}
callback();
})();
}
// wait some period after we subscribe before generating the test message
// in most cases, we don't need to wait, override this to wait longer
waitForSubscribe (callback) {
this.testLog('Waiting 0 for subscribe...');
setTimeout(callback, 0);
}
// called if message doesn't arrive after timeout
messageTimeout (channel) {
Assert.fail('message never arrived for ' + channel);
}
// called when a message has been received, assert that it matches expectations
messageReceived (error, message) {
if (error) { return this.messageCallback(error); }
if (message.channel !== this.channelName) {
this.testLog(`Received message ${message.messageId} on ${message.channel}, ignoring:\n${JSON.stringify(message, 0, 10)}`);
return; // ignore
}
// if message is big enough to be split into parts, receive the part, and only proceed
// if this gives us the full message
if (message.message.part !== undefined) {
message = this.receiveMessagePart(message);
if (!message) { return; }
}
if (!this.validateMessage(message)) {
this.testLog(`Received message ${message.messageId} on ${message.channel}, but was not validated:\n${JSON.stringify(message, 0, 10)}`);
return; // ignore
}
// the message can actually arrive before we are waiting for it, so in that case signal that we already got it
if (this.messageCallback) {
this.testLog(`Message ${message.messageId} validated`);
this.messageCallback();
}
else {
this.testLog(`Message ${message.messageId} already received`);
this.messageAlreadyReceived = true;
}
}
// receive part of a multi-part message
receiveMessagePart (message) {
const { part, totalParts, fullMessageId } = message.message;
this.messageParts[fullMessageId] = this.messageParts[fullMessageId] || {};
const messageEntry = this.messageParts[fullMessageId];
messageEntry[part] = message.message.message;
if (Object.keys(messageEntry).length === totalParts) {
let fullMessage = '';
for (let i = 0; i < totalParts; i++) {
fullMessage += messageEntry[i.toString()];
}
delete this.messageParts[fullMessageId];
const json = JSON.parse(fullMessage);
return { message: json };
}
}
// validate the message received against expectations
validateMessage (message) {
if (typeof message.message === 'object') {
Assert(message.message.requestId, 'received message has no requestId');
this.message.requestId = message.message.requestId; // don't care what it is
Assert(message.message.messageId, 'received message has no messageId');
this.message.messageId = message.message.messageId; // don't care what it is
}
try {
Assert.deepStrictEqual(message.message, this.message, 'received message doesn\'t match');
}
catch (e) {
// I have no clue why I need to do this, shouldn't Assert just throw???
console.warn('Message assertion failed:', e);
throw e;
}
return true;
}
// generate the message, and log when we are doing it
doGenerateMessage (callback) {
this.testLog('Generating message...');
this.generateMessage(callback);
}
// generate the message, this could be overriden but by default it just sends a random message
generateMessage (callback) {
this.sendFromServer(callback);
}
// send a random message from the server
sendFromServer (callback) {
this.message = RandomString.generate(100);
this.testLog('Publishing message from server...');
this.broadcasterForServer.publish(
this.message,
this.channelName
);
callback();
}
// wait for the message to arrive
waitForMessage (callback) {
if (this.messageAlreadyReceived) {
return callback();
}
else {
this.messageCallback = callback;
// do nothing until we get the message or a timeout...
}
}
// clear out timer
clearTimer (callback) {
if (this.messageTimer) {
clearTimeout(this.messageTimer);
delete this.messageTimer;
}
callback();
}
}
module.exports = CodeStreamMessageTest;