@@ -24,6 +24,8 @@ import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
24
24
import { envForSpawn } from "./misc" ;
25
25
26
26
import {
27
+ ExecuteCodeOutputAsync ,
28
+ ExecuteCodeOutputBlocking ,
27
29
isExecuteCodeOptionsAsyncGet ,
28
30
type ExecuteCodeFunctionWithCallback ,
29
31
type ExecuteCodeOptions ,
@@ -34,7 +36,7 @@ import {
34
36
35
37
const log = getLogger ( "execute-code" ) ;
36
38
37
- const asyncCache = new LRU < string , ExecuteCodeOutput > ( {
39
+ const asyncCache = new LRU < string , ExecuteCodeOutputAsync > ( {
38
40
max : 100 ,
39
41
ttl : 1000 * 60 * 60 ,
40
42
ttlAutopurge : true ,
@@ -64,6 +66,12 @@ export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
64
66
} ,
65
67
) ;
66
68
69
+ async function clean_up_tmp ( tempDir : string | undefined ) {
70
+ if ( tempDir ) {
71
+ await rm ( tempDir , { force : true , recursive : true } ) ;
72
+ }
73
+ }
74
+
67
75
// actual implementation, without the aggregate wrapper
68
76
async function executeCodeNoAggregate (
69
77
opts : ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet ,
@@ -77,16 +85,15 @@ async function executeCodeNoAggregate(
77
85
}
78
86
}
79
87
80
- if ( opts . args == null ) opts . args = [ ] ;
81
- if ( opts . timeout == null ) opts . timeout = 10 ;
82
- if ( opts . ulimit_timeout == null ) opts . ulimit_timeout = true ;
83
- if ( opts . err_on_exit == null ) opts . err_on_exit = true ;
84
- if ( opts . verbose == null ) opts . verbose = true ;
88
+ opts . args ?? = [ ] ;
89
+ opts . timeout ?? = 10 ;
90
+ opts . ulimit_timeout ?? = true ;
91
+ opts . err_on_exit ?? = true ;
92
+ opts . verbose ?? = true ;
85
93
86
94
if ( opts . verbose ) {
87
95
log . debug ( `input: ${ opts . command } ${ opts . args ?. join ( " " ) } ` ) ;
88
96
}
89
-
90
97
const s = opts . command . split ( / \s + / g) ; // split on whitespace
91
98
if ( opts . args ?. length === 0 && s . length > 1 ) {
92
99
opts . bash = true ;
@@ -141,49 +148,59 @@ async function executeCodeNoAggregate(
141
148
await chmod ( tempPath , 0o700 ) ;
142
149
}
143
150
144
- if ( opts . async_mode ) {
151
+ if ( opts . async_call ) {
145
152
// we return an ID, the caller can then use it to query the status
146
153
opts . max_output ??= 1024 * 1024 ; // we limit how much we keep in memory, to avoid problems;
147
154
opts . timeout ??= 10 * 60 ;
148
- const id = uuid ( ) ;
155
+ const job_id = uuid ( ) ;
149
156
const start = new Date ( ) ;
150
- const started : ExecuteCodeOutput = {
157
+ const started : ExecuteCodeOutputAsync = {
158
+ type : "async" ,
151
159
stdout : `Process started running at ${ start . toISOString ( ) } ` ,
152
160
stderr : "" ,
153
161
exit_code : 0 ,
154
- async_start : start . getTime ( ) ,
155
- async_id : id ,
156
- async_status : "running" ,
162
+ start : start . getTime ( ) ,
163
+ job_id ,
164
+ status : "running" ,
157
165
} ;
158
- asyncCache . set ( id , started ) ;
159
-
160
- doSpawn ( { ...opts , origCommand, async_id : id } , ( err , result ) => {
161
- const started = asyncCache . get ( id ) ?. async_start ?? 0 ;
162
- const info : Partial < ExecuteCodeOutput > = {
163
- elapsed_s : ( Date . now ( ) - started ) / 1000 ,
164
- async_start : start . getTime ( ) ,
165
- async_status : "error" ,
166
- } ;
167
- if ( err ) {
168
- asyncCache . set ( id , {
169
- stdout : "" ,
170
- stderr : `${ err } ` ,
171
- exit_code : 1 ,
172
- ...info ,
173
- } ) ;
174
- } else if ( result != null ) {
175
- asyncCache . set ( id , {
176
- ...result ,
177
- ...info ,
178
- ...{ async_status : "completed" } ,
179
- } ) ;
180
- } else {
181
- asyncCache . set ( id , {
182
- stdout : "" ,
183
- stderr : `No result` ,
184
- exit_code : 1 ,
185
- ...info ,
186
- } ) ;
166
+ asyncCache . set ( job_id , started ) ;
167
+
168
+ doSpawn ( { ...opts , origCommand, job_id } , async ( err , result ) => {
169
+ try {
170
+ const started = asyncCache . get ( job_id ) ?. start ?? 0 ;
171
+ const info : Omit <
172
+ ExecuteCodeOutputAsync ,
173
+ "stdout" | "stderr" | "exit_code"
174
+ > = {
175
+ job_id,
176
+ type : "async" ,
177
+ elapsed_s : ( Date . now ( ) - started ) / 1000 ,
178
+ start : start . getTime ( ) ,
179
+ status : "error" ,
180
+ } ;
181
+ if ( err ) {
182
+ asyncCache . set ( job_id , {
183
+ stdout : "" ,
184
+ stderr : `${ err } ` ,
185
+ exit_code : 1 ,
186
+ ...info ,
187
+ } ) ;
188
+ } else if ( result != null ) {
189
+ asyncCache . set ( job_id , {
190
+ ...result ,
191
+ ...info ,
192
+ ...{ status : "completed" } ,
193
+ } ) ;
194
+ } else {
195
+ asyncCache . set ( job_id , {
196
+ stdout : "" ,
197
+ stderr : `No result` ,
198
+ exit_code : 1 ,
199
+ ...info ,
200
+ } ) ;
201
+ }
202
+ } finally {
203
+ await clean_up_tmp ( tempDir ) ;
187
204
}
188
205
} ) ;
189
206
@@ -193,28 +210,26 @@ async function executeCodeNoAggregate(
193
210
return await callback ( doSpawn , { ...opts , origCommand } ) ;
194
211
}
195
212
} finally {
196
- // clean up
197
- if ( tempDir ) {
198
- await rm ( tempDir , { force : true , recursive : true } ) ;
199
- }
213
+ // do not delete the tempDir in async mode!
214
+ if ( ! opts . async_call ) await clean_up_tmp ( tempDir ) ;
200
215
}
201
216
}
202
217
203
218
function update_async (
204
- async_id : string | undefined ,
219
+ job_id : string | undefined ,
205
220
stream : "stdout" | "stderr" ,
206
221
data : string ,
207
222
) {
208
- if ( ! async_id ) return ;
209
- const obj = asyncCache . get ( async_id ) ;
223
+ if ( ! job_id ) return ;
224
+ const obj = asyncCache . get ( job_id ) ;
210
225
if ( obj != null ) {
211
226
obj [ stream ] = data ;
212
227
}
213
228
}
214
229
215
230
function doSpawn (
216
231
opts ,
217
- cb : ( err : string | undefined , result ?: ExecuteCodeOutput ) => void ,
232
+ cb : ( err : string | undefined , result ?: ExecuteCodeOutputBlocking ) => void ,
218
233
) {
219
234
const start_time = walltime ( ) ;
220
235
@@ -278,7 +293,7 @@ function doSpawn(
278
293
} else {
279
294
stdout += data ;
280
295
}
281
- update_async ( opts . async_id , "stdout" , stdout ) ;
296
+ update_async ( opts . job_id , "stdout" , stdout ) ;
282
297
} ) ;
283
298
284
299
r . stderr . on ( "data" , ( data ) => {
@@ -290,7 +305,7 @@ function doSpawn(
290
305
} else {
291
306
stderr += data ;
292
307
}
293
- update_async ( opts . async_id , "stderr" , stderr ) ;
308
+ update_async ( opts . job_id , "stderr" , stderr ) ;
294
309
} ) ;
295
310
296
311
let stderr_is_done = false ;
@@ -363,12 +378,17 @@ function doSpawn(
363
378
const x = opts . origCommand
364
379
? opts . origCommand
365
380
: `'${ opts . command } ' (args=${ opts . args ?. join ( " " ) } )` ;
366
- cb (
367
- `command '${ x } ' exited with nonzero code ${ exit_code } -- stderr='${ trunc (
368
- stderr ,
369
- 1024 ,
370
- ) } '`,
371
- ) ;
381
+ if ( opts . job_id ) {
382
+ cb ( stderr ) ;
383
+ } else {
384
+ // sync behavor, like it was before
385
+ cb (
386
+ `command '${ x } ' exited with nonzero code ${ exit_code } -- stderr='${ trunc (
387
+ stderr ,
388
+ 1024 ,
389
+ ) } '`,
390
+ ) ;
391
+ }
372
392
} else if ( ! ran_code ) {
373
393
// regardless of opts.err_on_exit !
374
394
const x = opts . origCommand
@@ -390,7 +410,7 @@ function doSpawn(
390
410
// if exit-code not set, may have been SIGKILL so we set it to 1
391
411
exit_code = 1 ;
392
412
}
393
- cb ( undefined , { stdout, stderr, exit_code } ) ;
413
+ cb ( undefined , { type : "blocking" , stdout, stderr, exit_code } ) ;
394
414
}
395
415
} ;
396
416
0 commit comments