@@ -46,7 +46,7 @@ import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
4646import { uuid } from "@cocalc/util/misc" ;
4747import { delay } from "awaiting" ;
4848import { Svcm } from "@nats-io/services" ;
49- import { Coordinator } from "./coordinator" ;
49+ import { Coordinator , now } from "./coordinator" ;
5050
5151const logger = getLogger ( "database:nats:changefeeds" ) ;
5252
@@ -55,9 +55,7 @@ const jc = JSONCodec();
5555let api : any | null = null ;
5656export async function init ( ) {
5757 const subject = "hub.*.*.db" ;
58- logger . debug ( `init -- subject='${ subject } ', options=` , {
59- queue : "0" ,
60- } ) ;
58+ logger . debug ( `init -- subject='${ subject } ', options=` ) ;
6159 const coordinator = new Coordinator ( ) ;
6260 const nc = await getConnection ( ) ;
6361
@@ -68,12 +66,18 @@ export async function init() {
6866 name : "db-server" ,
6967 version : "0.2.0" ,
7068 description : "CoCalc Database Service (changefeeds)" ,
69+ queue : "0" ,
7170 } ) ;
7271
7372 api = service . addEndpoint ( "api" , { subject } ) ;
73+ global . x = { coordinator } ;
7474
75- for await ( const mesg of api ) {
76- handleRequest ( { mesg, nc, coordinator } ) ;
75+ try {
76+ for await ( const mesg of api ) {
77+ handleRequest ( { mesg, nc, coordinator } ) ;
78+ }
79+ } finally {
80+ coordinator . close ( ) ;
7781 }
7882}
7983
@@ -90,6 +94,7 @@ async function handleRequest({ mesg, nc, coordinator }) {
9094 try {
9195 const { account_id, project_id } = getUserId ( mesg . subject ) ;
9296 const { name, args } = jc . decode ( mesg . data ) ?? ( { } as any ) ;
97+ //console.log(`got request: "${JSON.stringify({ name, args })}"`);
9398 // logger.debug(`got request: "${JSON.stringify({ name, args })}"`);
9499 if ( ! name ) {
95100 throw Error ( "api endpoint name must be given in message" ) ;
@@ -99,23 +104,37 @@ async function handleRequest({ mesg, nc, coordinator }) {
99104 // project_id,
100105 // name,
101106 // });
102- resp = await getResponse ( { name, args, account_id, project_id, nc, coordinator } ) ;
107+ resp = await getResponse ( {
108+ name,
109+ args,
110+ account_id,
111+ project_id,
112+ nc,
113+ coordinator,
114+ } ) ;
103115 } catch ( err ) {
104- // logger.debug(" ERROR", err);
116+ logger . debug ( ` ERROR -- ${ err } ` ) ;
105117 resp = { error : `${ err } ` } ;
106118 }
107119 // logger.debug(`Responding with "${JSON.stringify(resp)}"`);
108120 mesg . respond ( jc . encode ( resp ) ) ;
109121}
110122
111- async function getResponse ( { name, args, account_id, project_id, nc } ) {
123+ async function getResponse ( {
124+ name,
125+ args,
126+ account_id,
127+ project_id,
128+ nc,
129+ coordinator,
130+ } ) {
112131 if ( name == "userQuery" ) {
113132 const opts = { ...args [ 0 ] , account_id, project_id } ;
114133 if ( ! opts . changes ) {
115134 // a normal query
116135 return await userQuery ( opts ) ;
117136 } else {
118- return await createChangefeed ( opts , nc ) ;
137+ return await createChangefeed ( opts , nc , coordinator ) ;
119138 }
120139 } else {
121140 throw Error ( `name='${ name } ' not implemented` ) ;
@@ -155,7 +174,7 @@ function cancelAllChangefeeds() {
155174// async function, but then handle (and don't return)
156175// the subsequent calls to cb generated by the changefeed.
157176const createChangefeed = reuseInFlight (
158- async ( opts , nc ) => {
177+ async ( opts , nc , coordinator ) => {
159178 const query = opts . query ;
160179 // the query *AND* the user making it define the thing:
161180 const user = { account_id : opts . account_id , project_id : opts . project_id } ;
@@ -164,15 +183,28 @@ const createChangefeed = reuseInFlight(
164183 ...user ,
165184 } ) ;
166185 const hash = sha1 ( desc ) ;
167- const now = Date . now ( ) ;
186+ const manager = await coordinator . getManager ( hash ) ;
187+ if ( manager && coordinator . managerId != manager ) {
188+ // somebody else owns it and they are active -- express interest
189+ await coordinator . userInterest ( hash ) ;
190+ return ;
191+ }
192+ // take it
193+ await coordinator . takeManagement ( hash ) ;
194+
168195 if ( changefeedInterest [ hash ] ) {
169- changefeedInterest [ hash ] = now ;
196+ changefeedInterest [ hash ] = now ( ) ;
170197 logger . debug ( "using existing changefeed for" , queryTable ( query ) , user ) ;
171198 return ;
172199 }
173200 logger . debug ( "creating new changefeed for" , queryTable ( query ) , user ) ;
174201 const changes = uuid ( ) ;
175202 changefeedHashes [ changes ] = hash ;
203+ logger . debug (
204+ "managing " ,
205+ Object . keys ( changefeedHashes ) . length ,
206+ "changefeeds" ,
207+ ) ;
176208 const env = { nc, jc, sha1 } ;
177209 // If you change any settings below (i.e., atomic or immutable), you might also have to change them in
178210 // src/packages/sync/table/changefeed-nats.ts
@@ -256,22 +288,26 @@ const createChangefeed = reuseInFlight(
256288 try {
257289 await callback ( f ) ;
258290 // it's running successfully
259- changefeedInterest [ hash ] = Date . now ( ) ;
291+ changefeedInterest [ hash ] = now ( ) ;
260292
261293 const watch = async ( ) => {
262294 // it's all setup and running. If there's no interest for a while, stop watching
263295 while ( true ) {
264296 await delay ( CHANGEFEED_INTEREST_PERIOD_MS ) ;
265297 if (
266- Date . now ( ) - changefeedInterest [ hash ] >
298+ now ( ) - changefeedInterest [ hash ] >
267299 CHANGEFEED_INTEREST_PERIOD_MS
268300 ) {
269- logger . debug (
270- "insufficient interest in the changefeed, so we stop it." ,
271- query ,
272- ) ;
273- cancelChangefeed ( changes ) ;
274- return ;
301+ // we check both the local known interest *AND* interest recorded by any other servers!
302+ const last = await coordinator . lastUserInterest ( hash ) ;
303+ if ( now ( ) - last >= CHANGEFEED_INTEREST_PERIOD_MS ) {
304+ logger . debug (
305+ "insufficient interest in the changefeed, so we stop it." ,
306+ query ,
307+ ) ;
308+ cancelChangefeed ( changes ) ;
309+ return ;
310+ }
275311 }
276312 }
277313 } ;
0 commit comments