Skip to content

Commit e8e8842

Browse files
committed
Add tests, fix tests, fix build
1 parent cfe3d6a commit e8e8842

14 files changed

Lines changed: 475 additions & 350 deletions

.changeset/cuddly-cows-attack.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@queuedash/client": major
3+
"@queuedash/api": major
4+
"@queuedash/ui": major
5+
---
6+
7+
Job scheduler support
8+
Per-job logs
9+
Global pause & resume

packages/api/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"dev": "pnpm run build --watch",
88
"test": "pnpm run test:bull && pnpm run test:bullmq && pnpm run test:bee",
99
"test:bull": "QUEUE_TYPE=bull vitest run",
10-
"test:bullmq": "QUEUE_TYPE=bull vitest run",
10+
"test:bullmq": "QUEUE_TYPE=bullmq vitest run",
1111
"test:bee": "QUEUE_TYPE=bee vitest run",
1212
"lint": "eslint ./ --fix"
1313
},
@@ -53,7 +53,7 @@
5353
"prettier": "^3.5.3",
5454
"rollup-plugin-typescript-paths": "^1.5.0",
5555
"typescript": "^5.8.3",
56-
"vite": "^6.2.5",
56+
"vite": "^5.4.19",
5757
"vitest": "^3.1.1"
5858
},
5959
"license": "MIT"

packages/api/src/routers/job.ts

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -185,24 +185,6 @@ export const jobRouter = router({
185185

186186
return logs;
187187
}),
188-
listSchedulers: procedure
189-
.input(
190-
z.object({
191-
queueName: z.string(),
192-
}),
193-
)
194-
.query(async ({ input: { queueName }, ctx: { queues } }) => {
195-
const queueInCtx = findQueueInCtxOrFail({ queues, queueName });
196-
197-
if (queueInCtx.type !== "bullmq") {
198-
throw new TRPCError({
199-
code: "BAD_REQUEST",
200-
message: "Only BullMQ queues support job schedulers",
201-
});
202-
}
203-
204-
return queueInCtx.queue.getJobSchedulers();
205-
}),
206188
list: procedure
207189
.input(
208190
z.object({

packages/api/src/routers/scheduler.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export const schedulerRouter = router({
2020
});
2121
}
2222

23-
return await queueInCtx.queue.getJobSchedulers();
23+
return queueInCtx.queue.getJobSchedulers();
2424
}),
2525

2626
add: procedure
@@ -89,4 +89,54 @@ export const schedulerRouter = router({
8989
}
9090
},
9191
),
92+
93+
bulkRemove: procedure
94+
.input(
95+
z.object({
96+
queueName: z.string(),
97+
jobSchedulerIds: z.array(z.string()),
98+
}),
99+
)
100+
.mutation(
101+
async ({ input: { jobSchedulerIds, queueName }, ctx: { queues } }) => {
102+
const queueInCtx = findQueueInCtxOrFail({
103+
queues,
104+
queueName,
105+
});
106+
107+
if (queueInCtx.type !== "bullmq") {
108+
throw new TRPCError({
109+
code: "BAD_REQUEST",
110+
message: "Scheduled jobs are only supported for BullMQ queues",
111+
});
112+
}
113+
114+
try {
115+
return Promise.all(
116+
jobSchedulerIds.map(async (jobSchedulerId) => {
117+
const scheduler =
118+
await queueInCtx.queue.getJobScheduler(jobSchedulerId);
119+
120+
if (!scheduler) {
121+
throw new TRPCError({
122+
code: "BAD_REQUEST",
123+
});
124+
}
125+
await queueInCtx.queue.removeJobScheduler(jobSchedulerId);
126+
127+
return scheduler;
128+
}),
129+
);
130+
} catch (e) {
131+
if (e instanceof TRPCError) {
132+
throw e;
133+
} else {
134+
throw new TRPCError({
135+
code: "INTERNAL_SERVER_ERROR",
136+
message: e instanceof Error ? e.message : undefined,
137+
});
138+
}
139+
}
140+
},
141+
),
92142
});

packages/api/src/tests/jobs.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,9 @@ test("retry job", async () => {
4141
queueName: firstQueue.queue.name,
4242
jobId: job.id,
4343
});
44-
4544
expect(newJob).toMatchObject({
4645
id: job.id,
47-
retriedAt: expect.any(Date),
46+
...(firstQueue.type === "bull" && { retriedAt: expect.any(Date) }),
4847
});
4948
} catch (e) {
5049
expect(e).toBeInstanceOf(TRPCError);
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { appRouter } from "../routers/_app";
2+
import { expect, test } from "vitest";
3+
import { initRedisInstance, NUM_OF_SCHEDULERS } from "./test.utils";
4+
import { TRPCError } from "@trpc/server";
5+
6+
test("list schedulers", async () => {
7+
const { ctx, firstQueue } = await initRedisInstance();
8+
try {
9+
const caller = appRouter.createCaller(ctx);
10+
const list = await caller.scheduler.list({
11+
queueName: firstQueue.queue.name,
12+
});
13+
14+
expect(list.length).toBe(NUM_OF_SCHEDULERS);
15+
} catch (e) {
16+
if (firstQueue.type !== "bullmq") {
17+
expect(e).toBeInstanceOf(TRPCError);
18+
if (e instanceof TRPCError) {
19+
expect(e.code).toBe("BAD_REQUEST");
20+
}
21+
} else {
22+
throw e;
23+
}
24+
}
25+
});
26+
27+
test("remove scheduler", async () => {
28+
const { ctx, firstQueue } = await initRedisInstance();
29+
try {
30+
const caller = appRouter.createCaller(ctx);
31+
32+
const schedulers = await caller.scheduler.list({
33+
queueName: firstQueue.queue.name,
34+
});
35+
const scheduler = schedulers[0];
36+
37+
await caller.scheduler.remove({
38+
queueName: firstQueue.queue.name,
39+
jobSchedulerId: scheduler.key,
40+
});
41+
42+
const list = await caller.scheduler.list({
43+
queueName: firstQueue.queue.name,
44+
});
45+
46+
expect(list.length).toBe(NUM_OF_SCHEDULERS - 1);
47+
} catch (e) {
48+
if (firstQueue.type !== "bullmq") {
49+
expect(e).toBeInstanceOf(TRPCError);
50+
if (e instanceof TRPCError) {
51+
expect(e.code).toBe("BAD_REQUEST");
52+
}
53+
} else {
54+
throw e;
55+
}
56+
}
57+
});
58+
59+
test("bulk remove schedulers", async () => {
60+
const { ctx, firstQueue } = await initRedisInstance();
61+
try {
62+
const caller = appRouter.createCaller(ctx);
63+
64+
const schedulers = await caller.scheduler.list({
65+
queueName: firstQueue.queue.name,
66+
});
67+
68+
await caller.scheduler.bulkRemove({
69+
queueName: firstQueue.queue.name,
70+
jobSchedulerIds: schedulers.map((scheduler) => scheduler.key),
71+
});
72+
73+
const list = await caller.scheduler.list({
74+
queueName: firstQueue.queue.name,
75+
});
76+
77+
expect(list.length).toBe(0);
78+
} catch (e) {
79+
if (firstQueue.type !== "bullmq") {
80+
expect(e).toBeInstanceOf(TRPCError);
81+
if (e instanceof TRPCError) {
82+
expect(e.code).toBe("BAD_REQUEST");
83+
}
84+
} else {
85+
throw e;
86+
}
87+
}
88+
});

packages/api/src/tests/test.utils.ts

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import BullMQ from "bullmq";
55
import BeeQueue from "bee-queue";
66

77
export const NUM_OF_JOBS = 20;
8+
export const NUM_OF_SCHEDULERS = 3;
89
export const NUM_OF_COMPLETED_JOBS = NUM_OF_JOBS / 2;
910
export const NUM_OF_FAILED_JOBS = NUM_OF_JOBS / 2;
1011
const QUEUE_NAME_PREFIX = "flight-bookings";
@@ -19,7 +20,7 @@ export const sleep = (t: number) =>
1920
type QueueType = "bull" | "bullmq" | "bee";
2021

2122
export const type: QueueType =
22-
(process.env.QUEUE_TYPE as unknown as QueueType) || "bee";
23+
(process.env.QUEUE_TYPE as unknown as QueueType) || "bullmq";
2324

2425
export const initRedisInstance = async () => {
2526
switch (type) {
@@ -64,13 +65,19 @@ export const initRedisInstance = async () => {
6465
type: "bullmq" as const,
6566
};
6667

67-
new BullMQ.Worker(flightBookingsQueue.queue.name, async (job) => {
68-
if (job.data.index > NUM_OF_COMPLETED_JOBS) {
69-
throw new Error("Generic error");
70-
}
71-
72-
return Promise.resolve();
73-
});
68+
new BullMQ.Worker(
69+
flightBookingsQueue.queue.name,
70+
async (job) => {
71+
if (job.data.index > NUM_OF_COMPLETED_JOBS) {
72+
throw new Error("Generic error");
73+
}
74+
75+
return Promise.resolve();
76+
},
77+
{
78+
connection: {},
79+
},
80+
);
7481

7582
await flightBookingsQueue.queue.addBulk(
7683
[...new Array(NUM_OF_JOBS)].map((_, index) => {
@@ -83,6 +90,30 @@ export const initRedisInstance = async () => {
8390
}),
8491
);
8592

93+
const schedulers = [...new Array(NUM_OF_SCHEDULERS)].map(() => {
94+
return {
95+
name: faker.person.fullName(),
96+
template: {
97+
name: faker.person.fullName(),
98+
data: {
99+
name: faker.person.fullName(),
100+
},
101+
},
102+
opts: {
103+
pattern: "0 0 * * *",
104+
tz: "America/Los_Angeles",
105+
},
106+
};
107+
});
108+
109+
for (const scheduler of schedulers) {
110+
await flightBookingsQueue.queue.upsertJobScheduler(
111+
scheduler.name,
112+
scheduler.opts,
113+
scheduler.template,
114+
);
115+
}
116+
86117
await sleep(200);
87118

88119
return {

packages/client/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
"postcss": "^8.5.3",
2828
"prettier": "^3.5.3",
2929
"rollup-plugin-typescript-paths": "^1.5.0",
30-
"tailwindcss": "^4.1.3",
30+
"tailwindcss": "^3.4.16",
3131
"typescript": "^5.8.3",
32-
"vite": "^6.2.5"
32+
"vite": "^5.4.19"
3333
},
3434
"license": "MIT"
3535
}

packages/ui/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
"tailwindcss": "^3.4.16",
6363
"tailwindcss-radix": "^3.0.5",
6464
"typescript": "^5.8.3",
65-
"vite": "^6.2.5"
65+
"vite": "^5.4.19"
6666
},
6767
"peerDependencies": {
6868
"react": ">=18",

packages/ui/src/components/JobActionMenu.tsx

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
HobbyKnifeIcon,
77
TrashIcon,
88
} from "@radix-ui/react-icons";
9+
import { useEffect } from "react";
910

1011
type JobActionMenuProps = {
1112
job: Job;
@@ -17,15 +18,20 @@ export const JobActionMenu = ({
1718
queueName,
1819
onRemove,
1920
}: JobActionMenuProps) => {
20-
const opts = {
21-
onSuccess: () => {
21+
const { mutate: retry, isSuccess: retrySuccess } =
22+
trpc.job.retry.useMutation();
23+
const { mutate: discard, isSuccess: discardSuccess } =
24+
trpc.job.discard.useMutation();
25+
const { mutate: rerun, isSuccess: rerunSuccess } =
26+
trpc.job.rerun.useMutation();
27+
const { mutate: remove, isSuccess: removeSuccess } =
28+
trpc.job.remove.useMutation();
29+
30+
useEffect(() => {
31+
if (retrySuccess || discardSuccess || rerunSuccess || removeSuccess) {
2232
onRemove?.();
23-
},
24-
};
25-
const { mutate: retry } = trpc.job.retry.useMutation(opts);
26-
const { mutate: discard } = trpc.job.discard.useMutation(opts);
27-
const { mutate: rerun } = trpc.job.rerun.useMutation(opts);
28-
const { mutate: remove } = trpc.job.remove.useMutation(opts);
33+
}
34+
}, [retrySuccess, discardSuccess, rerunSuccess, removeSuccess, onRemove]);
2935

3036
const input = {
3137
queueName,

0 commit comments

Comments
 (0)