Skip to content

implement horizontally scalable changefeed handling #8267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/packages/backend/nats/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { getEnv } from "@cocalc/backend/nats/env";
import { createOpenFiles, type OpenFiles } from "@cocalc/nats/sync/open-files";
export { inventory } from "@cocalc/nats/sync/inventory";

export type { Stream, DStream, KV, DKV, DKO };
export type { Stream, DStream, KV, DKV, DKO, AKV };

export async function stream<T = any>(opts): Promise<Stream<T>> {
return await createStream<T>({ env: await getEnv(), ...opts });
Expand Down
100 changes: 78 additions & 22 deletions src/packages/database/nats/changefeeds.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
/*

What this does:

A backend server gets a request that a given changefeed (e.g., "messages" or
"projects" for a given user) needs to be managed. For a while, the server will
watch the datϨabase and put entries in a NATS jetstream kv that represents the
data. The browser also periodically pings the backend saying "I'm still
interested in this changefeed" and the backend server keeps up watching postgres
for changes. When the user is gone for long enough (5 minutes?) the backend
stops watching and just leaves the data as is in NATS.

When the user comes back, they immediately get the last version of the data
straight from NATS, and their browser says "I'm interested in this changefeed".
The changefeed then gets updated (hopefully 1-2 seconds later) and periodically
updated after that.


DEVELOPMENT:

1. turn off nats-server handling for the hub by sending this message from a browser as an admin:

await cc.client.nats_client.hub.system.terminate({service:'db'})
Expand Down Expand Up @@ -28,6 +46,7 @@ import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { uuid } from "@cocalc/util/misc";
import { delay } from "awaiting";
import { Svcm } from "@nats-io/services";
import { Coordinator, now } from "./coordinator";

const logger = getLogger("database:nats:changefeeds");

Expand All @@ -36,24 +55,29 @@ const jc = JSONCodec();
let api: any | null = null;
export async function init() {
const subject = "hub.*.*.db";
logger.debug(`init -- subject='${subject}', options=`, {
queue: "0",
});
logger.debug(`init -- subject='${subject}', options=`);
const coordinator = new Coordinator();
const nc = await getConnection();

// @ts-ignore
const svcm = new Svcm(nc);

const service = await svcm.add({
name: "db-server",
version: "0.1.0",
version: "0.2.0",
description: "CoCalc Database Service (changefeeds)",
queue: "0",
});

api = service.addEndpoint("api", { subject });
global.x = { coordinator };

for await (const mesg of api) {
handleRequest(mesg, nc);
try {
for await (const mesg of api) {
handleRequest({ mesg, nc, coordinator });
}
} finally {
coordinator.close();
}
}

Expand All @@ -65,11 +89,12 @@ export function terminate() {
cancelAllChangefeeds();
}

async function handleRequest(mesg, nc) {
async function handleRequest({ mesg, nc, coordinator }) {
let resp;
try {
const { account_id, project_id } = getUserId(mesg.subject);
const { name, args } = jc.decode(mesg.data) ?? ({} as any);
//console.log(`got request: "${JSON.stringify({ name, args })}"`);
// logger.debug(`got request: "${JSON.stringify({ name, args })}"`);
if (!name) {
throw Error("api endpoint name must be given in message");
Expand All @@ -79,23 +104,37 @@ async function handleRequest(mesg, nc) {
// project_id,
// name,
// });
resp = await getResponse({ name, args, account_id, project_id, nc });
resp = await getResponse({
name,
args,
account_id,
project_id,
nc,
coordinator,
});
} catch (err) {
// logger.debug("ERROR", err);
logger.debug(`ERROR -- ${err}`);
resp = { error: `${err}` };
}
// logger.debug(`Responding with "${JSON.stringify(resp)}"`);
mesg.respond(jc.encode(resp));
}

async function getResponse({ name, args, account_id, project_id, nc }) {
async function getResponse({
name,
args,
account_id,
project_id,
nc,
coordinator,
}) {
if (name == "userQuery") {
const opts = { ...args[0], account_id, project_id };
if (!opts.changes) {
// a normal query
return await userQuery(opts);
} else {
return await createChangefeed(opts, nc);
return await createChangefeed(opts, nc, coordinator);
}
} else {
throw Error(`name='${name}' not implemented`);
Expand Down Expand Up @@ -135,7 +174,7 @@ function cancelAllChangefeeds() {
// async function, but then handle (and don't return)
// the subsequent calls to cb generated by the changefeed.
const createChangefeed = reuseInFlight(
async (opts, nc) => {
async (opts, nc, coordinator) => {
const query = opts.query;
// the query *AND* the user making it define the thing:
const user = { account_id: opts.account_id, project_id: opts.project_id };
Expand All @@ -144,15 +183,28 @@ const createChangefeed = reuseInFlight(
...user,
});
const hash = sha1(desc);
const now = Date.now();
const manager = await coordinator.getManager(hash);
if (manager && coordinator.managerId != manager) {
// somebody else owns it and they are active -- express interest
await coordinator.userInterest(hash);
return;
}
// take it
await coordinator.takeManagement(hash);

if (changefeedInterest[hash]) {
changefeedInterest[hash] = now;
changefeedInterest[hash] = now();
logger.debug("using existing changefeed for", queryTable(query), user);
return;
}
logger.debug("creating new changefeed for", queryTable(query), user);
const changes = uuid();
changefeedHashes[changes] = hash;
logger.debug(
"managing ",
Object.keys(changefeedHashes).length,
"changefeeds",
);
const env = { nc, jc, sha1 };
// If you change any settings below (i.e., atomic or immutable), you might also have to change them in
// src/packages/sync/table/changefeed-nats.ts
Expand Down Expand Up @@ -236,22 +288,26 @@ const createChangefeed = reuseInFlight(
try {
await callback(f);
// it's running successfully
changefeedInterest[hash] = Date.now();
changefeedInterest[hash] = now();

const watch = async () => {
// it's all setup and running. If there's no interest for a while, stop watching
while (true) {
await delay(CHANGEFEED_INTEREST_PERIOD_MS);
if (
Date.now() - changefeedInterest[hash] >
now() - changefeedInterest[hash] >
CHANGEFEED_INTEREST_PERIOD_MS
) {
logger.debug(
"insufficient interest in the changefeed, so we stop it.",
query,
);
cancelChangefeed(changes);
return;
// we check both the local known interest *AND* interest recorded by any other servers!
const last = await coordinator.lastUserInterest(hash);
if (now() - last >= CHANGEFEED_INTEREST_PERIOD_MS) {
logger.debug(
"insufficient interest in the changefeed, so we stop it.",
query,
);
cancelChangefeed(changes);
return;
}
}
}
};
Expand Down
108 changes: 108 additions & 0 deletions src/packages/database/nats/coordinator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
This is for managing who is responsible for each changefeed.

It stores:

- for each changefeed id, the managerId of who is manging it
- for each manager id, time when it last checked in

The manger checks in 2.5x every timeout period.
If a manger doesn't check in for the entire timeout period, then
they are considered gone.

DEVELOPMENT:

~/cocalc/src/packages/database/nats$ node
> m1 = new (require('@cocalc/database/nats/coordinator').Coordinator)(); m2 = new (require('@cocalc/database/nats/coordinator').Coordinator)(); null
null
> await m1.owner('foo')
undefined
> await m1.take('foo')
undefined
> await m1.owner('foo')
'l7ZjMufim9'
> await m2.owner('foo')
'l7ZjMufim9'
> await m2.take('foo')
Uncaught Error: foo is locked by another manager
> await m1.free('foo')
> await m2.take('foo')
undefined
> await m1.owner('foo')
'So01mDRsbs'
*/

import { akv, type AKV } from "@cocalc/backend/nats/sync";
import { randomId } from "@cocalc/nats/names";
import getTime from "@cocalc/nats/time";

//const TIMEOUT = 15000;
const TIMEOUT = 3000;

export const now = () => getTime({ noError: true });

export class Coordinator {
public readonly managerId: string;
public readonly akv: AKV;
public readonly timeout: number;
private interval;

constructor({ timeout = TIMEOUT }: { timeout?: number } = {}) {
this.managerId = randomId();
this.akv = akv({ name: "changefeeds" });
this.timeout = timeout;
this.interval = setInterval(this.checkin, this.timeout / 2.5);
this.checkin();
}

close = async () => {
if (this.interval) {
clearInterval(this.interval);
delete this.interval;
}
await this.akv.delete(this.managerId);
};

private checkin = async () => {
await this.akv.set(this.managerId, now());
};

getManager = async (id: string): Promise<string | undefined> => {
const { managerId } = (await this.akv.get(id)) ?? {};
if (!managerId) {
return undefined;
}
const time = await this.akv.get(managerId);
if (!time) {
return undefined;
}
if (time < now() - this.timeout) {
return undefined;
}
return managerId;
};

// use expresses interest in changefeed with given id,
// which we may or may not be the manager of.
userInterest = async (id: string) => {
const x = await this.akv.get(id);
if (!x) {
return;
}
x.time = now();
await this.akv.set(id, x);
};

lastUserInterest = async (id: string): Promise<number> => {
const { time } = (await this.akv.get(id)) ?? { time: 0 };
return time;
};

takeManagement = async (id: string) => {
const cur = await this.getManager(id);
if (cur && cur != this.managerId) {
throw Error(`${id} is locked by another manager`);
}
await this.akv.set(id, { time: now(), managerId: this.managerId });
};
}
6 changes: 6 additions & 0 deletions src/packages/nats/sync/akv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ is not updated and limits are not enforced. Of course chunking (storing large
values properly) is supported.

There is no need to close this because it is stateless.

DEVELOPMENT:

~/cocalc/src/packages/backend$ node
> t = require("@cocalc/backend/nats/sync").akv({name:'test'})

*/

import { GeneralKV } from "./general-kv";
Expand Down
7 changes: 6 additions & 1 deletion src/packages/nats/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,14 @@ export function getLastSkew(): number | null {
return skew;
}

export default function getTime(): number {
export default function getTime({
noError,
}: { noError?: boolean } = {}): number {
if (skew == null) {
init();
if (noError) {
return Date.now();
}
throw Error("clock skew not known");
}
return Date.now() - skew;
Expand Down