Skip to content

Commit 930eaaf

Browse files
committed
util: convert aggregate from js to ts and tweak execute-code to run async #7666
1 parent 72b6ecf commit 930eaaf

File tree

6 files changed

+89
-24
lines changed

6 files changed

+89
-24
lines changed

src/packages/backend/execute-code.ts

+62-11
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
// Execute code in a subprocess.
77

88
import { callback } from "awaiting";
9-
import { spawn } from "node:child_process";
9+
import LRU from "lru-cache";
10+
import {
11+
ChildProcessWithoutNullStreams,
12+
spawn,
13+
SpawnOptionsWithoutStdio,
14+
} from "node:child_process";
1015
import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";
1116
import { tmpdir } from "node:os";
1217
import { join } from "node:path";
@@ -15,7 +20,7 @@ import shellEscape from "shell-escape";
1520
import getLogger from "@cocalc/backend/logger";
1621
import { aggregate } from "@cocalc/util/aggregate";
1722
import { callback_opts } from "@cocalc/util/async-utils";
18-
import { to_json, trunc, walltime } from "@cocalc/util/misc";
23+
import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
1924
import { envForSpawn } from "./misc";
2025

2126
import type {
@@ -27,6 +32,15 @@ import type {
2732

2833
const log = getLogger("execute-code");
2934

35+
const asyncCache = new LRU<string, ExecuteCodeOutput>({
36+
max: 100,
37+
ttl: 1000 * 60 * 60,
38+
ttlAutopurge: true,
39+
allowStale: true,
40+
updateAgeOnGet: true,
41+
updateAgeOnHas: true,
42+
});
43+
3044
// Async/await interface to executing code.
3145
export async function executeCode(
3246
opts: ExecuteCodeOptions,
@@ -52,6 +66,13 @@ export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
5266
async function executeCodeNoAggregate(
5367
opts: ExecuteCodeOptions,
5468
): Promise<ExecuteCodeOutput> {
69+
if (typeof opts.async_get === "string") {
70+
const s = asyncCache.get(opts.async_get);
71+
if (s != null) {
72+
return s;
73+
}
74+
}
75+
5576
if (opts.args == null) opts.args = [];
5677
if (opts.timeout == null) opts.timeout = 10;
5778
if (opts.ulimit_timeout == null) opts.ulimit_timeout = true;
@@ -115,7 +136,32 @@ async function executeCodeNoAggregate(
115136
await writeFile(tempPath, cmd);
116137
await chmod(tempPath, 0o700);
117138
}
118-
return await callback(doSpawn, { ...opts, origCommand });
139+
140+
if (opts.async_exec) {
141+
// we return an ID, the caller can then use it to query the status
142+
const async_limit = 1024 * 1024; // we limit how much we keep in memory, to avoid problems
143+
opts.max_output = Math.min(async_limit, opts.max_output ?? async_limit);
144+
const id = uuid();
145+
asyncCache.set(id, {
146+
stdout: `Process started running at ${Date.now()}`,
147+
stderr: "",
148+
exit_code: 0,
149+
async_id: id,
150+
});
151+
152+
doSpawn({ ...opts, origCommand }, (err, result) => {
153+
if (err) {
154+
asyncCache.set(id, { stdout: "", stderr: `${err}`, exit_code: 1 });
155+
} else if (result != null) {
156+
asyncCache.set(id, result);
157+
} else {
158+
asyncCache.set(id, { stdout: "", stderr: `No result`, exit_code: 1 });
159+
}
160+
});
161+
} else {
162+
// This is the blocking variant
163+
return await callback(doSpawn, { ...opts, origCommand });
164+
}
119165
} finally {
120166
// clean up
121167
if (tempDir) {
@@ -124,7 +170,10 @@ async function executeCodeNoAggregate(
124170
}
125171
}
126172

127-
function doSpawn(opts, cb) {
173+
function doSpawn(
174+
opts,
175+
cb: (err: string | undefined, result?: ExecuteCodeOutput) => void,
176+
) {
128177
const start_time = walltime();
129178

130179
if (opts.verbose) {
@@ -138,7 +187,7 @@ function doSpawn(opts, cb) {
138187
"seconds",
139188
);
140189
}
141-
const spawnOptions = {
190+
const spawnOptions: SpawnOptionsWithoutStdio = {
142191
detached: true, // so we can kill the entire process group if it times out
143192
cwd: opts.path,
144193
...(opts.uid ? { uid: opts.uid } : undefined),
@@ -150,11 +199,11 @@ function doSpawn(opts, cb) {
150199
},
151200
};
152201

153-
let r,
154-
ran_code = false;
202+
let r: ChildProcessWithoutNullStreams;
203+
let ran_code = false;
155204
try {
156205
r = spawn(opts.command, opts.args, spawnOptions);
157-
if (r.stdout == null || r.stderr == null) {
206+
if (r.stdout == null || r.stderr == null || r.pid == null) {
158207
// The docs/examples at https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options
159208
// suggest that r.stdout and r.stderr are always defined. However, this is
160209
// definitely NOT the case in edge cases, as we have observed.
@@ -215,7 +264,7 @@ function doSpawn(opts, cb) {
215264
});
216265

217266
r.on("exit", (code) => {
218-
exit_code = code;
267+
exit_code = code != null ? code : undefined;
219268
finish();
220269
});
221270

@@ -317,8 +366,10 @@ function doSpawn(opts, cb) {
317366
);
318367
}
319368
try {
320-
killed = true;
321-
process.kill(-r.pid, "SIGKILL"); // this should kill process group
369+
killed = true; // we set the kill flag in any case – i.e. process will no longer exist
370+
if (r.pid != null) {
371+
process.kill(-r.pid, "SIGKILL"); // this should kill process group
372+
}
322373
} catch (err) {
323374
// Exceptions can happen, which left uncaught messes up calling code big time.
324375
if (opts.verbose) {

src/packages/next/lib/api/schema/exec.ts

+13-6
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ export const ExecInputSchema = z
1010
.object({
1111
project_id: ProjectIdSchema,
1212
compute_server_id: ComputeServerIdSchema.describe(
13-
`If provided, the desired shell command will be run on the compute server whose id
13+
`If provided, the desired shell command will be run on the compute server whose id
1414
is specified in this field (if available).`,
1515
).optional(),
1616
filesystem: z
1717
.boolean()
1818
.optional()
1919
.describe(
20-
`If \`true\`, this shell command runs in the fileserver container on the compute
20+
`If \`true\`, this shell command runs in the fileserver container on the compute
2121
server; otherwise, it runs on the main compute container.`,
2222
),
2323
path: z
@@ -46,7 +46,7 @@ export const ExecInputSchema = z
4646
.boolean()
4747
.optional()
4848
.describe(
49-
`If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell
49+
`If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell
5050
command is written to a file and then executed via the \`bash\` command.`,
5151
),
5252
aggregate: z
@@ -57,16 +57,16 @@ export const ExecInputSchema = z
5757
])
5858
.optional()
5959
.describe(
60-
`If provided, this shell command is aggregated as in
60+
`If provided, this shell command is aggregated as in
6161
\`src/packages/backend/aggregate.js\`. This parameter allows one to specify
62-
multiple callbacks to be executed against the output of the same command
62+
multiple callbacks to be executed against the output of the same command
6363
(given identical arguments) within a 60-second window.`,
6464
),
6565
err_on_exit: z
6666
.boolean()
6767
.optional()
6868
.describe(
69-
`When \`true\`, this call will throw an error whenever the provided shell command
69+
`When \`true\`, this call will throw an error whenever the provided shell command
7070
exits with a non-zero exit code.`,
7171
),
7272
env: z
@@ -75,6 +75,13 @@ export const ExecInputSchema = z
7575
.describe(
7676
"Environment variables to be passed to the shell command upon execution.",
7777
),
78+
async_exec: z.boolean().optional()
79+
.describe(`If \`true\`, the execution happens asynchroneously.
80+
This means it the API call does not block and returns an ID.
81+
Later, use that ID in a call to \`async_get\` to eventually get the result`),
82+
async_get: z.string().optional()
83+
.describe(`For a given ID returned by \`async\`,
84+
retun the status, or the result as if it is called synchroneously. Results are only cached temporarily!`),
7885
})
7986
.describe("Perform arbitrary shell commands in a compute server or project.");
8087

src/packages/next/pages/api/v2/exec.ts

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ async function get(req) {
3838
aggregate,
3939
err_on_exit,
4040
env,
41+
async_exec,
42+
async_get,
4143
} = getParams(req);
4244

4345
if (!(await isCollaborator({ account_id, project_id }))) {
@@ -61,6 +63,8 @@ async function get(req) {
6163
aggregate,
6264
err_on_exit,
6365
env,
66+
async_exec,
67+
async_get,
6468
},
6569
});
6670
// event and id don't make sense for http post api

src/packages/util/aggregate.d.ts

-1
This file was deleted.

src/packages/util/aggregate.js renamed to src/packages/util/aggregate.ts

+6-5
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,17 @@ Where options is an object.
6363
6464
*/
6565

66-
const { copy_without, field_cmp } = require("./misc");
6766
const json_stable = require("json-stable-stringify");
6867

68+
import { copy_without, field_cmp } from "./misc";
69+
6970
// To avoid using up too much memory, results are cached at most this long
7071
// (so long as function is called periodically to clear the cache... if not,
7172
// no point in clearing, since won't grow much.)
7273
const DONE_CACHE_TIMEOUT_MS = 60000;
7374

7475
function clear_old(done) {
75-
const now = new Date();
76+
const now = Date.now();
7677
for (let key in done) {
7778
const s = done[key];
7879
if (now - s.time >= DONE_CACHE_TIMEOUT_MS) {
@@ -93,7 +94,7 @@ function leq(a, b) {
9394
return a <= b;
9495
}
9596

96-
exports.aggregate = function (options, f) {
97+
export function aggregate(options, f?: any) {
9798
if (f == null) {
9899
f = options;
99100
options = undefined;
@@ -120,7 +121,7 @@ exports.aggregate = function (options, f) {
120121

121122
function aggregate_call_f(opts) {
122123
// Key is a string that determines the inputs to f **that matter**.
123-
const key = json_stable(copy_without(opts, omitted_fields));
124+
const key: string = json_stable(copy_without(opts, omitted_fields));
124125
// Check state
125126
const current = state[key];
126127
const recent = done[key];
@@ -190,4 +191,4 @@ exports.aggregate = function (options, f) {
190191
aggregate_call_f(opts);
191192
}
192193
};
193-
};
194+
}

src/packages/util/types/execute-code.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export interface ExecuteCodeOutput {
22
stdout: string;
33
stderr: string;
44
exit_code: number;
5+
async_id?: string;
56
}
67

78
export interface ExecuteCodeOptions {
@@ -20,12 +21,14 @@ export interface ExecuteCodeOptions {
2021
env?: object; // if given, added to exec environment
2122
aggregate?: string | number; // if given, aggregates multiple calls with same sequence number into one -- see @cocalc/util/aggregate; typically make this a timestamp for compiling code (e.g., latex).
2223
verbose?: boolean; // default true -- impacts amount of logging
24+
async_exec?: boolean; // default false -- if true, return an ID and execute it asynchroneously
25+
async_get?: string; // if set, everything else is ignored and the status/output of the async call is returned
2326
}
2427

2528
export interface ExecuteCodeOptionsWithCallback extends ExecuteCodeOptions {
2629
cb?: (err: undefined | Error, output?: ExecuteCodeOutput) => void;
2730
}
2831

2932
export type ExecuteCodeFunctionWithCallback = (
30-
opts: ExecuteCodeOptionsWithCallback
33+
opts: ExecuteCodeOptionsWithCallback,
3134
) => void;

0 commit comments

Comments
 (0)