Skip to content

Commit e4792e6

Browse files
committed
optimize delete for streams by batching; use this in conat router
- after several 4 days in production with significant load, we hit a scalability issue with trimming the interest stream. This should fully address that by making it 200K times faster...
1 parent 358fec8 commit e4792e6

File tree

6 files changed

+89
-8
lines changed

6 files changed

+89
-8
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { dstream } from "@cocalc/backend/conat/sync";
2+
import { before, after, client, wait } from "@cocalc/backend/conat/test/setup";
3+
4+
const log = process.env.VERBOSE ? console.log : (..._args) => {};
5+
6+
beforeAll(before);
7+
8+
jest.setTimeout(10000);
9+
10+
describe("a stress test", () => {
11+
const name = `test-${Math.random()}`;
12+
const pushCount = 2000;
13+
let s;
14+
it(`creates an ephemeral stream and pushes ${pushCount} messages`, async () => {
15+
const start = Date.now();
16+
s = await dstream({
17+
client,
18+
name,
19+
noAutosave: true,
20+
ephemeral: true,
21+
});
22+
for (let i = 0; i < pushCount; i++) {
23+
s.push({ i });
24+
}
25+
expect(s.length).toBe(pushCount);
26+
// NOTE: warning -- this is **MUCH SLOWER**, e.g., 10x slower,
27+
// running under jest, hence why count is small.
28+
await s.save();
29+
expect(s.length).toBe(pushCount);
30+
log(
31+
"write",
32+
Math.round((1000 * pushCount) / (Date.now() - start)),
33+
"messages per second",
34+
);
35+
});
36+
37+
it("deletes all of the messages we just wrote", async () => {
38+
const start = Date.now();
39+
await s.delete({ seqs: s.seqs() });
40+
await s.save();
41+
await wait({ until: () => s.length == 0 });
42+
expect(s.length).toBe(0);
43+
log(
44+
"delete",
45+
Math.round((1000 * pushCount) / (Date.now() - start)),
46+
"messages per second",
47+
);
48+
});
49+
});
50+
51+
afterAll(after);

src/packages/backend/conat/test/sync/dstream.test.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ describe("create two dstreams and observe sync between them", () => {
110110
});
111111
});
112112

113-
describe.only("get sequence number and time of message", () => {
113+
describe("get sequence number and time of message", () => {
114114
let s;
115115

116116
it("creates stream and write message", async () => {
@@ -413,6 +413,17 @@ describe("test delete of messages from stream", () => {
413413
await wait({ until: () => s1.length == 2 });
414414
expect(s1.get()).toEqual(["x", "y"]);
415415
});
416+
417+
it("delete an array of sequence numbers", async () => {
418+
s1.push("x", "y", "z");
419+
await s1.save();
420+
const v = await s1.getAll();
421+
const seqs0 = [s1.seq(0), s1.seq(2)];
422+
const { seqs } = await s1.delete({ seqs: seqs0 });
423+
expect(seqs).toEqual(seqs0);
424+
await wait({ until: () => s1.length == v.length - 2 });
425+
expect(s1.getAll()).toEqual([v[1]].concat(v.slice(3)));
426+
});
416427
});
417428

418429
afterAll(after);

src/packages/conat/core/cluster.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,8 @@ export async function trimClusterStreams(
296296
if (seqs.length > 0) {
297297
// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
298298
logger.debug("trimClusterStream: trimming interest", { seqs });
299-
for (const seq of seqs) {
300-
await interest.delete({ seq });
301-
}
299+
await interest.delete({ seqs });
300+
logger.debug("trimClusterStream: successfully trimmed interest", { seqs });
302301
}
303302

304303
// Next deal with sticky -- trim ones where the pattern is no longer of interest.
@@ -335,9 +334,8 @@ export async function trimClusterStreams(
335334
if (seqs2.length > 0) {
336335
// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
337336
logger.debug("trimClusterStream: trimming sticky", { seqs2 });
338-
for (const seq of seqs2) {
339-
await sticky.delete({ seq });
340-
}
337+
await sticky.delete({ seqs: seqs2 });
338+
logger.debug("trimClusterStream: successfully trimmed sticky", { seqs2 });
341339
}
342340

343341
return { seqsInterest: seqs, seqsSticky: seqs2 };

src/packages/conat/persist/client.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,13 @@ class PersistStreamClient extends EventEmitter {
278278
delete = async ({
279279
timeout,
280280
seq,
281+
seqs,
281282
last_seq,
282283
all,
283284
}: {
284285
timeout?: number;
285286
seq?: number;
287+
seqs?: number[];
286288
last_seq?: number;
287289
all?: boolean;
288290
}): Promise<{ seqs: number[] }> => {
@@ -291,6 +293,7 @@ class PersistStreamClient extends EventEmitter {
291293
headers: {
292294
cmd: "delete",
293295
seq,
296+
seqs,
294297
last_seq,
295298
all,
296299
timeout,

src/packages/conat/persist/storage.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,9 @@ export class PersistentStream extends EventEmitter {
367367
try {
368368
await this.db.backup(path);
369369
} catch (err) {
370-
console.log(err);
370+
if (!process.env.COCALC_TEST_MODE) {
371+
console.log(err);
372+
}
371373
logger.debug("WARNING: error creating a backup", path, err);
372374
}
373375
});
@@ -537,10 +539,12 @@ export class PersistentStream extends EventEmitter {
537539

538540
delete = ({
539541
seq,
542+
seqs: seqs0,
540543
last_seq,
541544
all,
542545
}: {
543546
seq?: number;
547+
seqs?: number[];
544548
last_seq?: number;
545549
all?: boolean;
546550
}): { seqs: number[] } => {
@@ -565,6 +569,15 @@ export class PersistentStream extends EventEmitter {
565569
.all(seq)
566570
.map((row: any) => row.seq);
567571
this.db.prepare("DELETE FROM messages WHERE seq=?").run(seq);
572+
} else if (seqs0) {
573+
const statement = this.db.prepare("DELETE FROM messages WHERE seq=?");
574+
const transaction = this.db.transaction((seqs) => {
575+
for (const s of seqs) {
576+
statement.run(s);
577+
}
578+
});
579+
transaction(seqs0);
580+
seqs = seqs0;
568581
}
569582
this.emit("change", { op: "delete", seqs });
570583
this.throttledBackup();

src/packages/conat/sync/core-stream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,7 @@ export class CoreStream<T = any> extends EventEmitter {
936936
seq,
937937
last_seq,
938938
key,
939+
seqs,
939940
}: {
940941
// give exactly ONE parameter -- by default nothing happens with no params
941942
// all: delete everything
@@ -944,6 +945,8 @@ export class CoreStream<T = any> extends EventEmitter {
944945
last_index?: number;
945946
// seq: delete message with this sequence number
946947
seq?: number;
948+
// seqs: delete the messages in this array of sequence numbers
949+
seqs?: number[];
947950
// last_seq: delete everything up to and including this sequence number
948951
last_seq?: number;
949952
// key: delete the message with this key
@@ -966,6 +969,8 @@ export class CoreStream<T = any> extends EventEmitter {
966969
}
967970
} else if (seq != null) {
968971
opts = { seq };
972+
} else if (seqs != null) {
973+
opts = { seqs };
969974
} else if (last_seq != null) {
970975
opts = { last_seq };
971976
} else if (key != null) {

0 commit comments

Comments
 (0)