Skip to content

Commit 7caee22

Browse files
committed
web/scheduler: worker pipeline sequencing, file exchange between workers
1 parent d15f1ec commit 7caee22

File tree

5 files changed

+77
-34
lines changed

5 files changed

+77
-34
lines changed

web/src/lib/queen-bee/run-worker.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import RemuxWorker from "$lib/workers/remux?worker";
22
import FetchWorker from "$lib/workers/fetch?worker";
33

4-
import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue";
54
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
5+
import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";
66

77
import type { CobaltQueue } from "$lib/types/queue";
88
import type { CobaltPipelineItem } from "$lib/types/workers";
@@ -78,7 +78,7 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
7878

7979
if (eventData.render) {
8080
killWorker(worker, unsubscribe, startCheck);
81-
return itemDone(
81+
return pipelineTaskDone(
8282
parentId,
8383
workerId,
8484
new File([eventData.render], eventData.filename, {
@@ -124,7 +124,7 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st
124124

125125
if (eventData.file) {
126126
killWorker(worker, unsubscribe);
127-
return itemDone(
127+
return pipelineTaskDone(
128128
parentId,
129129
workerId,
130130
eventData.file,

web/src/lib/queen-bee/scheduler.ts

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,65 @@
11
import { get } from "svelte/store";
2-
import { itemRunning, queue } from "$lib/state/queen-bee/queue";
32
import { startWorker } from "$lib/queen-bee/run-worker";
3+
import { itemDone, itemError, itemRunning, queue } from "$lib/state/queen-bee/queue";
44
import { addWorkerToQueue, currentTasks } from "$lib/state/queen-bee/current-tasks";
5+
import type { CobaltPipelineItem } from "$lib/types/workers";
6+
7+
const startPipeline = (pipelineItem: CobaltPipelineItem) => {
8+
addWorkerToQueue(pipelineItem.workerId, {
9+
type: pipelineItem.worker,
10+
parentId: pipelineItem.parentId,
11+
});
12+
13+
itemRunning(
14+
pipelineItem.parentId,
15+
pipelineItem.workerId,
16+
);
17+
18+
startWorker(pipelineItem);
19+
}
520

621
export const checkTasks = () => {
722
const queueItems = get(queue);
8-
const ongoingTasks = get(currentTasks)
23+
const ongoingTasks = get(currentTasks);
924

25+
// TODO (?): task concurrency
1026
if (Object.keys(ongoingTasks).length > 0) return;
1127

1228
for (const item of Object.keys(queueItems)) {
1329
const task = queueItems[item];
1430

1531
if (task.state === "running") {
16-
break;
17-
}
32+
// if the running worker isn't completed and wait to be called again
33+
// (on worker completion)
34+
if (!task.completedWorkers?.includes(task.runningWorker)) {
35+
break;
36+
}
37+
38+
// if all workers are completed, then return the final file and go to next task
39+
if (task.completedWorkers.length === task.pipeline.length) {
40+
const finalFile = task.pipelineResults?.pop();
41+
if (finalFile) {
42+
itemDone(task.id, finalFile);
43+
continue;
44+
} else {
45+
itemError(task.id, task.runningWorker, "no final file");
46+
continue;
47+
}
48+
}
1849

19-
if (task.state === "waiting") {
50+
// if current worker is completed, but there are more workers,
51+
// then start the next one and wait to be called again
2052
for (let i = 0; i < task.pipeline.length; i++) {
21-
// TODO: loop here and pass the file between pipelines
22-
// or schedule several tasks one after another but within
23-
// one parent & pipeline
24-
const pipelineItem = task.pipeline[i];
25-
26-
addWorkerToQueue(pipelineItem.workerId, {
27-
type: pipelineItem.worker,
28-
parentId: task.id,
29-
});
30-
31-
itemRunning(
32-
task.id,
33-
pipelineItem.workerId
34-
);
35-
36-
startWorker(pipelineItem);
37-
break;
53+
if (!task.completedWorkers.includes(task.pipeline[i].workerId)) {
54+
startPipeline(task.pipeline[i]);
55+
break;
56+
}
3857
}
58+
}
59+
60+
// start the nearest waiting task and wait to be called again
61+
if (task.state === "waiting" && task.pipeline.length > 0) {
62+
startPipeline(task.pipeline[0]);
3963
break;
4064
}
4165
}

web/src/lib/state/queen-bee/queue.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@ export function itemError(id: string, workerId: string, error: string) {
3535
checkTasks();
3636
}
3737

38-
export function itemDone(id: string, workerId: string, file: File) {
38+
export function itemDone(id: string, file: File) {
3939
update(queueData => {
4040
if (queueData[id]) {
41+
if (queueData[id].state === "running" && queueData[id].pipelineResults) {
42+
delete queueData[id].pipelineResults;
43+
}
44+
4145
queueData[id] = {
4246
...queueData[id],
4347
state: "done",
@@ -47,6 +51,18 @@ export function itemDone(id: string, workerId: string, file: File) {
4751
return queueData;
4852
});
4953

54+
checkTasks();
55+
}
56+
57+
export function pipelineTaskDone(id: string, workerId: string, file: File) {
58+
update(queueData => {
59+
if (queueData[id] && queueData[id].state === "running") {
60+
queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file];
61+
queueData[id].completedWorkers = [...queueData[id].completedWorkers || [], workerId];
62+
}
63+
return queueData;
64+
});
65+
5066
removeWorkerFromQueue(workerId);
5167
checkTasks();
5268
}

web/src/lib/types/queue.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ export type CobaltQueueItemWaiting = CobaltQueueBaseItem & {
1818
export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
1919
state: "running",
2020
runningWorker: string,
21+
completedWorkers?: string[],
22+
pipelineResults?: File[],
2123
};
2224

2325
export type CobaltQueueItemDone = CobaltQueueBaseItem & {

web/vite.config.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
import { defineConfig, searchForWorkspaceRoot, type PluginOption } from "vite";
2-
import { sveltekit } from "@sveltejs/kit/vite";
1+
import mime from "mime";
32
import basicSSL from "@vitejs/plugin-basic-ssl";
3+
44
import { glob } from "glob";
5-
import mime from "mime";
6-
import { createSitemap } from 'svelte-sitemap/src/index'
5+
import { sveltekit } from "@sveltejs/kit/vite";
6+
import { createSitemap } from "svelte-sitemap/src/index";
7+
import { defineConfig, searchForWorkspaceRoot, type PluginOption } from "vite";
78

8-
import { cp, readdir, mkdir } from "node:fs/promises";
9-
import { createReadStream } from "node:fs";
109
import { join, basename } from "node:path";
10+
import { createReadStream } from "node:fs";
11+
import { cp, readdir, mkdir } from "node:fs/promises";
1112

1213
const exposeLibAV: PluginOption = (() => {
1314
const IMPUT_MODULE_DIR = join(__dirname, 'node_modules/@imput');
@@ -20,7 +21,7 @@ const exposeLibAV: PluginOption = (() => {
2021
const filename = basename(req.url).split('?')[0];
2122
if (!filename) return next();
2223

23-
const [ file ] = await glob(join(IMPUT_MODULE_DIR, '/**/dist/', filename));
24+
const [file] = await glob(join(IMPUT_MODULE_DIR, '/**/dist/', filename));
2425
if (!file) return next();
2526

2627
const fileType = mime.getType(filename);
@@ -114,6 +115,6 @@ export default defineConfig({
114115
proxy: {}
115116
},
116117
optimizeDeps: {
117-
exclude: [ "@imput/libav.js-remux-cli" ]
118+
exclude: ["@imput/libav.js-remux-cli"]
118119
},
119120
});

0 commit comments

Comments
 (0)