Skip to content

Commit

Permalink
web/queue: add a remux worker to saving pipeline, use pipelineResults
Browse files Browse the repository at this point in the history
  • Loading branch information
wukko committed Jan 31, 2025
1 parent f2325bd commit 1590490
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
25 changes: 24 additions & 1 deletion web/src/lib/queen-bee/queue.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import mime from "mime";

import { addItem } from "$lib/state/queen-bee/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";
import type { CobaltLocalProcessingResponse } from "$lib/types/api";
Expand Down Expand Up @@ -49,7 +51,10 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse) => {
const parentId = crypto.randomUUID();
const pipeline: CobaltPipelineItem[] = [];

for (const tunnel of info.tunnel) {
// reverse is needed for audio (second item) to be downloaded first
const tunnels = info.tunnel.reverse();

for (const tunnel of tunnels) {
pipeline.push({
worker: "fetch",
workerId: crypto.randomUUID(),
Expand All @@ -60,6 +65,24 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse) => {
})
}

pipeline.push({
worker: "remux",
workerId: crypto.randomUUID(),
parentId,
workerArgs: {
ffargs: [
"-c:v", "copy",
"-c:a", "copy"
],
output: {
// TODO: return mime type from api to avoid dragging a big ass package into web build
type: mime.getType(info.filename) || undefined,
extension: info.filename.split(".").pop(),
},
filename: info.filename,
},
})

addItem({
id: parentId,
state: "waiting",
Expand Down
8 changes: 8 additions & 0 deletions web/src/lib/queen-bee/run-worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import RemuxWorker from "$lib/workers/remux?worker";
import FetchWorker from "$lib/workers/fetch?worker";

import { get } from "svelte/store";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";

Expand Down Expand Up @@ -151,6 +152,13 @@ export const startWorker = async ({ worker, workerId, parentId, workerArgs }: Co
files = workerArgs.files;
}

if (files?.length === 0) {
const parent = get(queue)[parentId];
if (parent.state === "running" && parent.pipelineResults) {
files = parent.pipelineResults;
}
}

if (files.length > 0 && workerArgs.ffargs && workerArgs.output && workerArgs.filename) {
await runRemuxWorker(
workerId,
Expand Down
1 change: 1 addition & 0 deletions web/src/lib/workers/remux.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const remux = async (files: File[], args: string[], output: FileInfo, filename:
await ff.init();

try {
// probing just the first file in files array (usually audio) for duration progress
const file_info = await ff.probe(files[0]).catch((e) => {
if (e?.message?.toLowerCase().includes("out of memory")) {
console.error("uh oh! out of memory");
Expand Down

0 comments on commit 1590490

Please sign in to comment.