forked from microsoft/FluidFramework
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloaderContainerTracker.ts
599 lines (545 loc) · 26.3 KB
/
loaderContainerTracker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/
/* eslint-disable @typescript-eslint/strict-boolean-expressions */
import { assert } from "@fluidframework/common-utils";
import { IContainer, IDeltaQueue, IHostLoader } from "@fluidframework/container-definitions";
import { Container } from "@fluidframework/container-loader";
import { canBeCoalescedByService } from "@fluidframework/driver-utils";
import { IDocumentMessage, ISequencedDocumentMessage, MessageType } from "@fluidframework/protocol-definitions";
import { debug } from "./debug";
import { IOpProcessingController } from "./testObjectProvider";
import { timeoutAwait, timeoutPromise } from "./timeoutUtils";
const debugOp = debug.extend("ops");
const debugWait = debug.extend("wait");
// set the maximum timeout value as 5 mins
const defaultMaxTimeout = 5 * 6000;
interface ContainerRecord {
// A short number for debug output
index: number;
// LoaderContainerTracker paused state
paused: boolean;
// Tracking trailing no-op that may or may be acked by the server so we can discount them
// See issue #5629
startTrailingNoOps: number;
trailingNoOps: number;
// Track last proposal to ensure no unresolved proposal
lastProposal: number;
}
export class LoaderContainerTracker implements IOpProcessingController {
private readonly containers = new Map<IContainer, ContainerRecord>();
private lastProposalSeqNum: number = 0;
constructor(private readonly syncSummarizerClients: boolean = false) {}
/**
* Add a loader to start to track any container created from them
* @param loader - loader to start tracking any container created.
*/
public add<LoaderType extends IHostLoader>(loader: LoaderType) {
// TODO: Expose Loader API to able to intercept container creation (See issue #5114)
const patch = <T, C extends IContainer>(fn: (...args) => Promise<C>) => {
const boundFn = fn.bind(loader);
return async (...args: T[]) => {
const container = await boundFn(...args);
this.addContainer(container);
return container;
};
};
/* eslint-disable @typescript-eslint/unbound-method */
loader.resolve = patch(loader.resolve);
loader.createDetachedContainer = patch(loader.createDetachedContainer);
loader.rehydrateDetachedContainerFromSnapshot = patch(loader.rehydrateDetachedContainerFromSnapshot);
/* eslint-enable @typescript-eslint/unbound-method */
}
/**
* Utility function to add container to be tracked.
*
* @param container - container to add
*/
private addContainer(container: IContainer) {
// ignore summarizer
if (!container.deltaManager.clientDetails.capabilities.interactive && !this.syncSummarizerClients) { return; }
// don't add container that is already tracked
if (this.containers.has(container)) { return; }
const record = {
index: this.containers.size,
paused: false,
startTrailingNoOps: 0,
trailingNoOps: 0,
lastProposal: 0,
};
this.containers.set(container, record);
this.trackTrailingNoOps(container, record);
this.trackLastProposal(container);
this.setupTrace(container, record.index);
}
/**
* Keep track of the trailing NoOp that was sent so we can discount them in the clientSequenceNumber tracking.
* The server might coalesce them with other ops, or a single NoOp, or delay it if it don't think it is necessary.
*
* @param container - the container to track
* @param record - the record to update the trailing op information
*/
private trackTrailingNoOps(container: IContainer, record: ContainerRecord) {
container.deltaManager.outbound.on("op", (messages) => {
for (const msg of messages) {
if (canBeCoalescedByService(msg)) {
// Track the NoOp that was sent.
if (record.trailingNoOps === 0) {
// record the starting sequence number of the trailing no ops if we haven't been tracking yet.
record.startTrailingNoOps = msg.clientSequenceNumber;
}
record.trailingNoOps++;
} else {
// Other ops has been sent. We would like to see those ack'ed, so no more need to track NoOps
record.trailingNoOps = 0;
}
}
});
container.deltaManager.inbound.on("push", (message) => {
// Received the no op back, update the record if we are tracking
if (canBeCoalescedByService(message)
&& message.clientId === (container as Container).clientId
&& record.trailingNoOps !== 0
&& record.startTrailingNoOps <= message.clientSequenceNumber
) {
// NoOp might have coalesced and skipped ahead some sequence number
// update the record and skip ahead as well
const oldStartTrailingNoOps = record.startTrailingNoOps;
record.startTrailingNoOps = message.clientSequenceNumber + 1;
record.trailingNoOps -= (record.startTrailingNoOps - oldStartTrailingNoOps);
}
});
container.on("disconnected", () => {
// reset on disconnect.
record.trailingNoOps = 0;
});
}
private trackLastProposal(container: IContainer) {
container.on("codeDetailsProposed", (value, proposal) => {
if (proposal.sequenceNumber > this.lastProposalSeqNum) {
this.lastProposalSeqNum = proposal.sequenceNumber;
}
});
}
/**
* Reset the tracker, closing all containers and stop tracking them.
*/
public reset() {
this.lastProposalSeqNum = 0;
for (const container of this.containers.keys()) {
container.close();
}
this.containers.clear();
// REVIEW: do we need to unpatch the loaders?
}
/**
* Ensure all tracked containers are synchronized
*/
public async ensureSynchronized(...containers: IContainer[]): Promise<void> {
await this.processSynchronized(undefined, ...containers);
}
/**
* Ensure all tracked containers are synchronized with a time limit
*/
public async ensureSynchronizedWithTimeout?(timeoutDuration: number | undefined, ...containers: IContainer[]) {
await this.processSynchronized(timeoutDuration, ...containers);
}
/**
* Make sure all the tracked containers are synchronized.
*
* No isDirty (non-readonly) containers
*
* No extra clientId in quorum of any container that is not tracked and still opened.
*
* - i.e. no pending Join/Leave message.
*
* No unresolved proposal (minSeqNum \>= lastProposalSeqNum)
*
* lastSequenceNumber of all container is the same
*
* clientSequenceNumberObserved is the same as clientSequenceNumber sent
*
* - this overlaps with !isDirty, but include task scheduler ops.
*
* - Trailing NoOp is tracked and don't count as pending ops.
*/
private async processSynchronized(timeoutDuration: number | undefined, ...containers: IContainer[]) {
const start = Date.now();
const resumed = this.resumeProcessing(...containers);
let waitingSequenceNumberSynchronized = false;
// eslint-disable-next-line no-constant-condition
while (true) {
const containersToApply = this.getContainers(containers);
if (containersToApply.length === 0) { break; }
// Ignore readonly dirty containers, because it can't sent up and nothing can be done about it being dirty
const dirtyContainers = containersToApply.filter((c) => {
const { deltaManager, isDirty } = c;
return deltaManager.readOnlyInfo.readonly !== true && isDirty;
});
if (dirtyContainers.length === 0) {
// Wait for all the leave messages
const pendingClients = this.getPendingClients(containersToApply);
if (pendingClients.length === 0) {
if (this.isSequenceNumberSynchronized(containersToApply)) {
// done, we are in sync
break;
}
if (!waitingSequenceNumberSynchronized) {
// Only write it out once
waitingSequenceNumberSynchronized = true;
debugWait("Waiting for sequence number synchronized");
await timeoutAwait(this.waitForAnyInboundOps(containersToApply), {
durationMs: timeoutDuration ? timeoutDuration - (Date.now() - start) : defaultMaxTimeout,
errorMsg: "Timeout on waiting for sequence number synchronized",
});
}
} else {
waitingSequenceNumberSynchronized = false;
await timeoutAwait(this.waitForPendingClients(pendingClients), {
durationMs: timeoutDuration ? timeoutDuration - (Date.now() - start) : defaultMaxTimeout,
errorMsg: "Timeout on waiting for pending join or leave op",
});
}
} else {
// Wait for all the containers to be saved
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
debugWait(`Waiting container to be saved ${dirtyContainers.map((c) => this.containers.get(c)!.index)}`);
waitingSequenceNumberSynchronized = false;
const remainedDuration = timeoutDuration ? timeoutDuration - (Date.now() - start) : defaultMaxTimeout;
await Promise.all(dirtyContainers.map(async (c) => Promise.race(
[timeoutPromise(
(resolve) => c.once("saved", () => resolve()),
{
durationMs: remainedDuration,
errorMsg: "Timeout on waiting a container to be saved",
},
),
new Promise((resolve) => c.once("closed", resolve)),
],
)));
}
// yield a turn to allow side effect of the ops we just processed execute before we check again
await new Promise<void>((resolve) => { setTimeout(resolve, 0); });
}
// Pause all container that was resumed
// don't call pause if resumed is empty and pause everything, which is not what we want
if (resumed.length !== 0) {
await timeoutAwait(this.pauseProcessing(...resumed), {
durationMs: timeoutDuration ? timeoutDuration - (Date.now() - start) : defaultMaxTimeout,
errorMsg: "Timeout on waiting for pausing all resumed containers",
});
}
debugWait("Synchronized");
}
/**
* Utility to calculate the set of clientId per container in quorum that is NOT associated with
* any container we tracked, indicating there is a pending join or leave op that we need to wait.
*
* @param containersToApply - the set of containers to check
*/
private getPendingClients(containersToApply: IContainer[]) {
// All the clientId we track should be a superset of the quorum, otherwise, we are missing
// leave messages
const openedDocuments = Array.from(this.containers.keys()).filter((c) => !c.closed);
const openedClientId = openedDocuments.map((container) => (container as Container).clientId);
const pendingClients: [IContainer, Set<string>][] = [];
containersToApply.forEach((container) => {
const pendingClientId = new Set<string>();
const quorum = container.getQuorum();
quorum.getMembers().forEach((client, clientId) => {
// ignore summarizer
if (!client.client.details.capabilities.interactive && !this.syncSummarizerClients) { return; }
if (!openedClientId.includes(clientId)) {
pendingClientId.add(clientId);
}
});
if (pendingClientId.size !== 0) {
pendingClients.push([container, pendingClientId]);
}
});
return pendingClients;
}
/**
* Utility to check synchronization based on sequence number
* See ensureSynchronized for more detail
*
* @param containersToApply - the set of containers to check
*/
private isSequenceNumberSynchronized(containersToApply: IContainer[]) {
// clientSequenceNumber check detects ops in flight, both on the wire and in the outbound queue
// We need both client sequence number and isDirty check because:
// - Currently isDirty flag ignores ops for task scheduler, so we need the client sequence number check
// - But isDirty flags include ops during forceReadonly and disconnected, because we don't submit
// the ops in the first place, clientSequenceNumber is not assigned
const isClientSequenceNumberSynchronized = containersToApply.every((container) => {
if (container.deltaManager.readOnlyInfo.readonly === true) {
// Ignore readonly container. the clientSeqNum and clientSeqNumObserved might be out of sync
// because we transition to readonly when outbound is not empty or the in transit op got lost
return true;
}
// Note that in read only mode, the op won't be submitted
let deltaManager = (container.deltaManager as any);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { trailingNoOps } = this.containers.get(container)!;
// Back-compat: clientSequenceNumber & clientSequenceNumberObserved moved to ConnectionManager in 0.53
if (!("clientSequenceNumber" in deltaManager)) {
deltaManager = deltaManager.connectionManager;
}
assert("clientSequenceNumber" in deltaManager, "no clientSequenceNumber");
assert("clientSequenceNumberObserved" in deltaManager, "no clientSequenceNumber");
return deltaManager.clientSequenceNumber ===
(deltaManager.clientSequenceNumberObserved as number) + trailingNoOps;
});
if (!isClientSequenceNumberSynchronized) {
return false;
}
const minSeqNum = containersToApply[0].deltaManager.minimumSequenceNumber;
if (minSeqNum < this.lastProposalSeqNum) {
// There is an unresolved proposal
return false;
}
// Check to see if all the container has process the same number of ops.
const seqNum = containersToApply[0].deltaManager.lastSequenceNumber;
return containersToApply.every((c) => c.deltaManager.lastSequenceNumber === seqNum);
}
/**
* Utility to wait for any clientId in quorum that is NOT associated with any container we
* tracked, indicating there is a pending join or leave op that we need to wait.
*
* Note that this function doesn't account for container that got added after we started waiting
*
* @param containersToApply - the set of containers to wait for any inbound ops for
*/
private async waitForPendingClients(pendingClients: [IContainer, Set<string>][]) {
const unconnectedClients =
Array.from(this.containers.keys()).filter((c) => !c.closed && !(c as Container).connected);
return Promise.all(pendingClients.map(async ([container, pendingClientId]) => {
return new Promise<void>((resolve) => {
const cleanup = () => {
unconnectedClients.forEach((c) => c.off("connected", handler));
container.getQuorum().off("removeMember", handler);
};
const handler = (clientId: string) => {
pendingClientId.delete(clientId);
if (pendingClientId.size === 0) {
cleanup();
resolve();
}
};
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const index = this.containers.get(container)!.index;
debugWait(`${index}: Waiting for pending clients ${Array.from(pendingClientId.keys())}`);
unconnectedClients.forEach((c) => c.on("connected", handler));
container.getQuorum().on("removeMember", handler);
container.on("closed", () => {
cleanup();
resolve();
});
});
}));
}
/**
* Utility to wait for any inbound ops from a set of containers
* @param containersToApply - the set of containers to wait for any inbound ops for
*/
private async waitForAnyInboundOps(containersToApply: IContainer[]) {
return new Promise<void>((resolve) => {
const handler = () => {
containersToApply.map((c) => {
c.deltaManager.inbound.off("push", handler);
});
resolve();
};
containersToApply.map((c) => {
c.deltaManager.inbound.on("push", handler);
});
});
}
/**
* Resume all queue activities on all paused tracked containers and return them
*/
public resumeProcessing(...containers: IContainer[]) {
const resumed: IContainer[] = [];
const containersToApply = this.getContainers(containers);
for (const container of containersToApply) {
const record = this.containers.get(container);
if (record?.paused === true) {
debugWait(`${record.index}: container resumed`);
container.deltaManager.inbound.resume();
container.deltaManager.outbound.resume();
resumed.push(container);
record.paused = false;
}
}
return resumed;
}
/**
* Pause all queue activities on the containers given, or all tracked containers
* Any containers given that is not tracked will be ignored.
*/
public async pauseProcessing(...containers: IContainer[]) {
const pauseP: Promise<void>[] = [];
const containersToApply = this.getContainers(containers);
for (const container of containersToApply) {
const record = this.containers.get(container);
if (record !== undefined && !record.paused) {
debugWait(`${record.index}: container paused`);
pauseP.push(container.deltaManager.inbound.pause());
pauseP.push(container.deltaManager.outbound.pause());
record.paused = true;
}
}
await Promise.all(pauseP);
}
/**
* Pause all queue activities on all tracked containers, and resume only
* inbound to process ops until it is idle. All queues are left in the paused state
* after the function
*/
public async processIncoming(...containers: IContainer[]) {
return this.processQueue(containers, (container) => container.deltaManager.inbound);
}
/**
* Pause all queue activities on all tracked containers, and resume only
* outbound to process ops until it is idle. All queues are left in the paused state
* after the function
*/
public async processOutgoing(...containers: IContainer[]) {
return this.processQueue(containers, (container) => container.deltaManager.outbound);
}
/**
* Implementation of processIncoming and processOutgoing
*/
private async processQueue<U>(containers: IContainer[], getQueue: (container: IContainer) => IDeltaQueue<U>) {
await this.pauseProcessing(...containers);
const resumed: IDeltaQueue<U>[] = [];
const containersToApply = this.getContainers(containers);
const inflightTracker = new Map<IContainer, number>();
const cleanup: (() => void)[] = [];
for (const container of containersToApply) {
const queue = getQueue(container);
// track the outgoing ops (if any) to make sure they make the round trip to at least to the same client
// to make sure they are sequenced.
cleanup.push(this.setupInOutTracker(container, inflightTracker));
queue.resume();
resumed.push(queue);
}
while (resumed.some((queue) => !queue.idle)) {
debugWait("Wait until queue is idle");
await new Promise<void>((resolve) => { setTimeout(resolve, 0); });
}
// Make sure all the op that we sent out are acked first
// This is no op if we are processing incoming
if (inflightTracker.size) {
debugWait("Wait for inflight ops");
do {
await this.waitForAnyInboundOps(containersToApply);
} while (inflightTracker.size);
}
// remove the handlers
cleanup.forEach((clean) => clean());
await Promise.all(resumed.map(async (queue) => queue.pause()));
}
/**
* Utility to set up listener to track the outbound ops until it round trip back
* Returns a function to remove the handler after it is done.
*
* @param container - the container to setup
* @param inflightTracker - a map to track the clientSequenceNumber per container it expect to get ops back
*/
private setupInOutTracker(container: IContainer, inflightTracker: Map<IContainer, number>) {
const outHandler = (messages: IDocumentMessage[]) => {
for (const message of messages) {
if (!canBeCoalescedByService(message)) {
inflightTracker.set(container, message.clientSequenceNumber);
}
}
};
const inHandler = (message: ISequencedDocumentMessage) => {
if (!canBeCoalescedByService(message)
&& message.clientId === (container as Container).clientId
&& inflightTracker.get(container) === message.clientSequenceNumber) {
inflightTracker.delete(container);
}
};
container.deltaManager.outbound.on("op", outHandler);
container.deltaManager.inbound.on("push", inHandler);
return () => {
container.deltaManager.outbound.off("op", outHandler);
container.deltaManager.inbound.off("push", inHandler);
};
}
/**
* Setup debug traces for connection and ops
*/
private setupTrace(container: IContainer, index: number) {
if (debugOp.enabled) {
const getContentsString = (type: string, msgContents: any) => {
try {
if (type !== MessageType.Operation) {
if (typeof msgContents === "string") { return msgContents; }
return JSON.stringify(msgContents);
}
let address = "";
// contents comes in the wire as JSON string ("push" event)
// But already parsed when apply ("op" event)
let contents = typeof msgContents === "string" ?
JSON.parse(msgContents) : msgContents;
while (contents !== undefined && contents !== null) {
if (contents.contents?.address !== undefined) {
address += `/${contents.contents.address}`;
contents = contents.contents.contents;
} else if (contents.content?.address !== undefined) {
address += `/${contents.content.address}`;
contents = contents.content.contents;
} else {
break;
}
}
if (address) {
return `${address} ${JSON.stringify(contents)}`;
}
return JSON.stringify(contents);
} catch (e: any) {
return `${e.message}: ${e.stack}`;
}
};
debugOp(`${index}: ADD: clientId: ${(container as Container).clientId}`);
container.deltaManager.outbound.on("op", (messages) => {
for (const msg of messages) {
debugOp(`${index}: OUT: `
+ `cli: ${msg.clientSequenceNumber.toString().padStart(3)} `
+ `rsq: ${msg.referenceSequenceNumber.toString().padStart(3)} `
+ `${msg.type} ${getContentsString(msg.type, msg.contents)}`);
}
});
const getInboundHandler = (type: string) => {
return (msg: ISequencedDocumentMessage) => {
const clientSeq = msg.clientId === (container as Container).clientId ?
`cli: ${msg.clientSequenceNumber.toString().padStart(3)}` : " ";
debugOp(`${index}: ${type}: seq: ${msg.sequenceNumber.toString().padStart(3)} `
+ `${clientSeq} min: ${msg.minimumSequenceNumber.toString().padStart(3)} `
+ `${msg.type} ${getContentsString(msg.type, msg.contents)}`);
};
};
container.deltaManager.inbound.on("push", getInboundHandler("IN "));
container.deltaManager.inbound.on("op", getInboundHandler("OP "));
container.deltaManager.on("connect", (details) => {
debugOp(`${index}: CON: clientId: ${details.clientId}`);
});
container.deltaManager.on("disconnect", (reason) => {
debugOp(`${index}: DIS: ${reason}`);
});
}
}
/**
* Filter out the opened containers based on param.
* @param containers - The container to filter to. If the array is empty, it means don't filter and return
* all open containers.
*/
private getContainers(containers: IContainer[]) {
const containersToApply = containers.length === 0 ? Array.from(this.containers.keys()) : containers;
return containersToApply.filter((container) => !container.closed);
}
}