Skip to content

Commit 8cb6d5b

Browse files
committed
util: convert aggregate from js to ts and tweak execute-code to run async #7666
1 parent 72b6ecf commit 8cb6d5b

File tree

7 files changed

+142
-26
lines changed

7 files changed

+142
-26
lines changed

src/packages/backend/execute-code.test.ts

+26
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { ExecuteCodeOptionsAsyncGet } from "@cocalc/util/types/execute-code";
12
import { executeCode } from "./execute-code";
23

34
describe("hello world", () => {
@@ -88,4 +89,29 @@ describe("test timeout", () => {
8889
expect(err).toContain("killed command");
8990
}
9091
});
92+
93+
it("returns ID for async execution", async () => {
94+
const retID = await executeCode({
95+
command: "sh",
96+
args: ["-c", "sleep .1; echo foo;"],
97+
bash: false,
98+
timeout: 10,
99+
async_exec: true,
100+
});
101+
const id = retID["async_id"];
102+
expect(typeof id).toEqual("string");
103+
if (typeof id === "string") {
104+
await new Promise((done) => setTimeout(done, 1000));
105+
const status = await executeCode({ async_get: id });
106+
console.log("status", status)
107+
expect(status.stdout).toEqual("foo\n");
108+
expect(status.elapsed_s).toBeGreaterThan(0.1)
109+
}
110+
});
91111
});
112+
113+
export function isExecuteCodeOptionsAsyncGet(
114+
opts: unknown,
115+
): opts is ExecuteCodeOptionsAsyncGet {
116+
return typeof (opts as any)?.async_get === "string";
117+
}

src/packages/backend/execute-code.ts

+83-12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
// Execute code in a subprocess.
77

88
import { callback } from "awaiting";
9-
import { spawn } from "node:child_process";
9+
import LRU from "lru-cache";
10+
import {
11+
ChildProcessWithoutNullStreams,
12+
spawn,
13+
SpawnOptionsWithoutStdio,
14+
} from "node:child_process";
1015
import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";
1116
import { tmpdir } from "node:os";
1217
import { join } from "node:path";
@@ -15,21 +20,32 @@ import shellEscape from "shell-escape";
1520
import getLogger from "@cocalc/backend/logger";
1621
import { aggregate } from "@cocalc/util/aggregate";
1722
import { callback_opts } from "@cocalc/util/async-utils";
18-
import { to_json, trunc, walltime } from "@cocalc/util/misc";
23+
import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
1924
import { envForSpawn } from "./misc";
2025

2126
import type {
2227
ExecuteCodeFunctionWithCallback,
2328
ExecuteCodeOptions,
29+
ExecuteCodeOptionsAsyncGet,
2430
ExecuteCodeOptionsWithCallback,
2531
ExecuteCodeOutput,
2632
} from "@cocalc/util/types/execute-code";
33+
import { isExecuteCodeOptionsAsyncGet } from "./execute-code.test";
2734

2835
const log = getLogger("execute-code");
2936

37+
const asyncCache = new LRU<string, ExecuteCodeOutput>({
38+
max: 100,
39+
ttl: 1000 * 60 * 60,
40+
ttlAutopurge: true,
41+
allowStale: true,
42+
updateAgeOnGet: true,
43+
updateAgeOnHas: true,
44+
});
45+
3046
// Async/await interface to executing code.
3147
export async function executeCode(
32-
opts: ExecuteCodeOptions,
48+
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
3349
): Promise<ExecuteCodeOutput> {
3450
return await callback_opts(execute_code)(opts);
3551
}
@@ -50,8 +66,17 @@ export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
5066

5167
// actual implementation, without the aggregate wrapper
5268
async function executeCodeNoAggregate(
53-
opts: ExecuteCodeOptions,
69+
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
5470
): Promise<ExecuteCodeOutput> {
71+
if (isExecuteCodeOptionsAsyncGet(opts)) {
72+
const cached = asyncCache.get(opts.async_get);
73+
if (cached != null) {
74+
return cached;
75+
} else {
76+
throw new Error(`Status or result of '${opts.async_get}' not found.`);
77+
}
78+
}
79+
5580
if (opts.args == null) opts.args = [];
5681
if (opts.timeout == null) opts.timeout = 10;
5782
if (opts.ulimit_timeout == null) opts.ulimit_timeout = true;
@@ -115,7 +140,48 @@ async function executeCodeNoAggregate(
115140
await writeFile(tempPath, cmd);
116141
await chmod(tempPath, 0o700);
117142
}
118-
return await callback(doSpawn, { ...opts, origCommand });
143+
144+
if (opts.async_exec) {
145+
// we return an ID, the caller can then use it to query the status
146+
const async_limit = 1024 * 1024; // we limit how much we keep in memory, to avoid problems
147+
opts.max_output = Math.min(async_limit, opts.max_output ?? async_limit);
148+
const id = uuid();
149+
const start = new Date();
150+
const started = {
151+
stdout: `Process started running at ${start.toISOString()}`,
152+
stderr: "",
153+
exit_code: start.getTime(),
154+
async_id: id,
155+
};
156+
asyncCache.set(id, started);
157+
158+
doSpawn({ ...opts, origCommand }, (err, result) => {
159+
const started = asyncCache.get(id)?.exit_code ?? 0;
160+
const info = { elapsed_s: (Date.now() - started) / 1000 };
161+
if (err) {
162+
asyncCache.set(id, {
163+
stdout: "",
164+
stderr: `${err}`,
165+
exit_code: 1,
166+
...info,
167+
});
168+
} else if (result != null) {
169+
asyncCache.set(id, { ...result, ...info });
170+
} else {
171+
asyncCache.set(id, {
172+
stdout: "",
173+
stderr: `No result`,
174+
exit_code: 1,
175+
...info,
176+
});
177+
}
178+
});
179+
180+
return started;
181+
} else {
182+
// This is the blocking variant
183+
return await callback(doSpawn, { ...opts, origCommand });
184+
}
119185
} finally {
120186
// clean up
121187
if (tempDir) {
@@ -124,7 +190,10 @@ async function executeCodeNoAggregate(
124190
}
125191
}
126192

127-
function doSpawn(opts, cb) {
193+
function doSpawn(
194+
opts,
195+
cb: (err: string | undefined, result?: ExecuteCodeOutput) => void,
196+
) {
128197
const start_time = walltime();
129198

130199
if (opts.verbose) {
@@ -138,7 +207,7 @@ function doSpawn(opts, cb) {
138207
"seconds",
139208
);
140209
}
141-
const spawnOptions = {
210+
const spawnOptions: SpawnOptionsWithoutStdio = {
142211
detached: true, // so we can kill the entire process group if it times out
143212
cwd: opts.path,
144213
...(opts.uid ? { uid: opts.uid } : undefined),
@@ -150,8 +219,8 @@ function doSpawn(opts, cb) {
150219
},
151220
};
152221

153-
let r,
154-
ran_code = false;
222+
let r: ChildProcessWithoutNullStreams;
223+
let ran_code = false;
155224
try {
156225
r = spawn(opts.command, opts.args, spawnOptions);
157226
if (r.stdout == null || r.stderr == null) {
@@ -215,7 +284,7 @@ function doSpawn(opts, cb) {
215284
});
216285

217286
r.on("exit", (code) => {
218-
exit_code = code;
287+
exit_code = code != null ? code : undefined;
219288
finish();
220289
});
221290

@@ -317,8 +386,10 @@ function doSpawn(opts, cb) {
317386
);
318387
}
319388
try {
320-
killed = true;
321-
process.kill(-r.pid, "SIGKILL"); // this should kill process group
389+
killed = true; // we set the kill flag in any case – i.e. process will no longer exist
390+
if (r.pid != null) {
391+
process.kill(-r.pid, "SIGKILL"); // this should kill process group
392+
}
322393
} catch (err) {
323394
// Exceptions can happen, which left uncaught messes up calling code big time.
324395
if (opts.verbose) {

src/packages/next/lib/api/schema/exec.ts

+15-7
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ export const ExecInputSchema = z
1010
.object({
1111
project_id: ProjectIdSchema,
1212
compute_server_id: ComputeServerIdSchema.describe(
13-
`If provided, the desired shell command will be run on the compute server whose id
13+
`If provided, the desired shell command will be run on the compute server whose id
1414
is specified in this field (if available).`,
1515
).optional(),
1616
filesystem: z
1717
.boolean()
1818
.optional()
1919
.describe(
20-
`If \`true\`, this shell command runs in the fileserver container on the compute
20+
`If \`true\`, this shell command runs in the fileserver container on the compute
2121
server; otherwise, it runs on the main compute container.`,
2222
),
2323
path: z
@@ -46,7 +46,7 @@ export const ExecInputSchema = z
4646
.boolean()
4747
.optional()
4848
.describe(
49-
`If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell
49+
`If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell
5050
command is written to a file and then executed via the \`bash\` command.`,
5151
),
5252
aggregate: z
@@ -57,24 +57,32 @@ export const ExecInputSchema = z
5757
])
5858
.optional()
5959
.describe(
60-
`If provided, this shell command is aggregated as in
60+
`If provided, this shell command is aggregated as in
6161
\`src/packages/backend/aggregate.js\`. This parameter allows one to specify
62-
multiple callbacks to be executed against the output of the same command
62+
multiple callbacks to be executed against the output of the same command
6363
(given identical arguments) within a 60-second window.`,
6464
),
6565
err_on_exit: z
6666
.boolean()
6767
.optional()
6868
.describe(
69-
`When \`true\`, this call will throw an error whenever the provided shell command
70-
exits with a non-zero exit code.`,
69+
`When \`true\`, this call will throw an error whenever the provided shell command
70+
exits with a non-zero exit code.`,
7171
),
7272
env: z
7373
.record(z.string(), z.string())
7474
.optional()
7575
.describe(
7676
"Environment variables to be passed to the shell command upon execution.",
7777
),
78+
async_exec: z.boolean().optional()
79+
.describe(`If \`true\`, the execution happens asynchroneously.
80+
This means it the API call does not block and returns an ID (\`async_id\`).
81+
Later, use that ID in a call to \`async_get\` to eventually get the result`),
82+
async_get: z.string().optional()
83+
.describe(`For a given \`async_id\` returned by \`async\`,
84+
retun the status, or the result as if it is called synchroneously.
85+
Results are only cached temporarily!`),
7886
})
7987
.describe("Perform arbitrary shell commands in a compute server or project.");
8088

src/packages/next/pages/api/v2/exec.ts

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ async function get(req) {
3838
aggregate,
3939
err_on_exit,
4040
env,
41+
async_exec,
42+
async_get,
4143
} = getParams(req);
4244

4345
if (!(await isCollaborator({ account_id, project_id }))) {
@@ -61,6 +63,8 @@ async function get(req) {
6163
aggregate,
6264
err_on_exit,
6365
env,
66+
async_exec,
67+
async_get,
6468
},
6569
});
6670
// event and id don't make sense for http post api

src/packages/util/aggregate.d.ts

-1
This file was deleted.

src/packages/util/aggregate.js renamed to src/packages/util/aggregate.ts

+6-5
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,17 @@ Where options is an object.
6363
6464
*/
6565

66-
const { copy_without, field_cmp } = require("./misc");
6766
const json_stable = require("json-stable-stringify");
6867

68+
import { copy_without, field_cmp } from "./misc";
69+
6970
// To avoid using up too much memory, results are cached at most this long
7071
// (so long as function is called periodically to clear the cache... if not,
7172
// no point in clearing, since won't grow much.)
7273
const DONE_CACHE_TIMEOUT_MS = 60000;
7374

7475
function clear_old(done) {
75-
const now = new Date();
76+
const now = Date.now();
7677
for (let key in done) {
7778
const s = done[key];
7879
if (now - s.time >= DONE_CACHE_TIMEOUT_MS) {
@@ -93,7 +94,7 @@ function leq(a, b) {
9394
return a <= b;
9495
}
9596

96-
exports.aggregate = function (options, f) {
97+
export function aggregate(options, f?: any) {
9798
if (f == null) {
9899
f = options;
99100
options = undefined;
@@ -120,7 +121,7 @@ exports.aggregate = function (options, f) {
120121

121122
function aggregate_call_f(opts) {
122123
// Key is a string that determines the inputs to f **that matter**.
123-
const key = json_stable(copy_without(opts, omitted_fields));
124+
const key: string = json_stable(copy_without(opts, omitted_fields));
124125
// Check state
125126
const current = state[key];
126127
const recent = done[key];
@@ -190,4 +191,4 @@ exports.aggregate = function (options, f) {
190191
aggregate_call_f(opts);
191192
}
192193
};
193-
};
194+
}

src/packages/util/types/execute-code.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ export interface ExecuteCodeOutput {
22
stdout: string;
33
stderr: string;
44
exit_code: number;
5+
async_id?: string;
6+
elapsed_s?: number; // how long it took, async execution
57
}
68

79
export interface ExecuteCodeOptions {
@@ -20,12 +22,17 @@ export interface ExecuteCodeOptions {
2022
env?: object; // if given, added to exec environment
2123
aggregate?: string | number; // if given, aggregates multiple calls with same sequence number into one -- see @cocalc/util/aggregate; typically make this a timestamp for compiling code (e.g., latex).
2224
verbose?: boolean; // default true -- impacts amount of logging
25+
async_exec?: boolean; // default false -- if true, return an ID and execute it asynchroneously
26+
}
27+
28+
export interface ExecuteCodeOptionsAsyncGet {
29+
async_get: string; // if set, everything else is ignored and the status/output of the async call is returned
2330
}
2431

2532
export interface ExecuteCodeOptionsWithCallback extends ExecuteCodeOptions {
2633
cb?: (err: undefined | Error, output?: ExecuteCodeOutput) => void;
2734
}
2835

2936
export type ExecuteCodeFunctionWithCallback = (
30-
opts: ExecuteCodeOptionsWithCallback
37+
opts: ExecuteCodeOptionsWithCallback,
3138
) => void;

0 commit comments

Comments
 (0)