Conversation
🦋 Changeset detectedLatest commit: ccb1ee9 The changes in this PR will be included in the next version bump. This PR includes changesets to release 3 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
There was a problem hiding this comment.
Pull request overview
This PR adds support for job groups across the queue management system, allowing jobs to be organized and filtered by group identifiers. The implementation includes UI components for viewing and managing groups, API endpoints for retrieving group information, and adapter-level support particularly for GroupMQ with extraction support for BullMQ Pro.
Key changes:
- Added groups support to the adapter layer with a new
getGroups()method andGroupInfotype - Implemented groups UI with filtering, bulk operations (retry/delete), and visual group cards
- Enhanced job display to show group ID, progress, and attempt information
- Added
bulkRetrymutation for retrying multiple failed jobs at once
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/api/src/queue-adapters/base.adapter.ts | Adds GroupInfo type and getGroups() method signature; extends AdaptedJob with groupId, progress, and attemptsMade fields |
| packages/api/src/queue-adapters/groupmq.adapter.ts | Implements getGroups() with native GroupMQ methods and fallback aggregation; adds groupId and attemptsMade to job adaptation |
| packages/api/src/queue-adapters/bullmq.adapter.ts | Extracts groupId from BullMQ Pro's group option; adds progress and attemptsMade to job adaptation |
| packages/api/src/queue-adapters/bull.adapter.ts | Changes promote support to true; adds progress() call and attemptsMade to job adaptation |
| packages/api/src/queue-adapters/bee.adapter.ts | Improves timestamp extraction from job metadata; adds progress and attemptsMade fields (as undefined) |
| packages/api/src/routers/queue.ts | Adds groups endpoint to fetch group information with count and status |
| packages/api/src/routers/job.ts | Adds bulkRetry mutation, byId query, and groupId filtering to job list endpoint |
| packages/ui/src/components/GroupsSection.tsx | New component for displaying groups as filterable cards with bulk delete functionality |
| packages/ui/src/components/QueueStatusTabs.tsx | Adds "Retry all" button for failed jobs with bulk retry functionality |
| packages/ui/src/components/JobModal.tsx | Displays groupId, attemptsMade, and progress badges; implements expandable stacktrace viewer |
| packages/ui/src/pages/QueuePage.tsx | Integrates GroupsSection component and passes group filter state to job queries |
Comments suppressed due to low confidence (1)
packages/api/src/routers/job.ts:378
- When filtering jobs by groupId, the pagination logic becomes incorrect. The totalCount and hasNextPage calculations are based on the unfiltered job count from the adapter, but jobs are filtered in-memory after fetching. This causes two issues:
- The totalCount will show the total number of jobs in the status, not the number of jobs matching the groupId filter
- The hasNextPage calculation will be incorrect because it compares the cursor position against the unfiltered total count, but the actual returned jobs are filtered
This will result in misleading pagination information and potentially incorrect "load more" behavior for the UI. Consider either implementing groupId filtering at the adapter level or recalculating totalCount after filtering.
// Filter by groupId if provided
if (groupId) {
jobs = jobs.filter((job) => job.groupId === groupId);
}
const counts = await queueInCtx.adapter.getJobCounts();
const totalCount = counts[status] || 0;
const hasNextPage = jobs.length > 0 && cursor + limit < totalCount;
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const jobs = await this.queue.getJobsByStatus([status], 0, 1000); | ||
| for (const job of jobs) { | ||
| if (job.groupId) { | ||
| groupCounts.set( | ||
| job.groupId, | ||
| (groupCounts.get(job.groupId) || 0) + 1, | ||
| ); | ||
| } |
There was a problem hiding this comment.
The fallback implementation has a hardcoded limit of 1000 jobs per status. If there are more than 1000 jobs in any single status, groups beyond that limit will be missed in the count aggregation. This could lead to incomplete or incorrect group information being displayed to users.
Consider either increasing this limit significantly, implementing pagination for the fallback, or documenting this limitation.
| const jobs = await this.queue.getJobsByStatus([status], 0, 1000); | |
| for (const job of jobs) { | |
| if (job.groupId) { | |
| groupCounts.set( | |
| job.groupId, | |
| (groupCounts.get(job.groupId) || 0) + 1, | |
| ); | |
| } | |
| const batchSize = 1000; | |
| let start = 0; | |
| while (true) { | |
| const end = start + batchSize - 1; | |
| const jobs = await this.queue.getJobsByStatus( | |
| [status], | |
| start, | |
| end, | |
| ); | |
| if (!jobs || jobs.length === 0) { | |
| break; | |
| } | |
| for (const job of jobs) { | |
| if (job.groupId) { | |
| groupCounts.set( | |
| job.groupId, | |
| (groupCounts.get(job.groupId) || 0) + 1, | |
| ); | |
| } | |
| } | |
| if (jobs.length < batchSize) { | |
| // No more jobs beyond this batch for this status | |
| break; | |
| } | |
| start += batchSize; |
| const { mutate: bulkRemove, isPending: isDeleting } = | ||
| trpc.job.bulkRemove.useMutation(); |
There was a problem hiding this comment.
The bulkRemove mutation is missing error handling callbacks (onSuccess/onError) and does not provide user feedback via toast notifications. Users won't know whether the bulk delete operation succeeded or failed. Additionally, there's no query invalidation to refresh the UI after deletion.
Compare with the bulkRetry mutation in QueueStatusTabs.tsx which properly implements onSuccess and onError handlers. Consider adding similar handlers here along with appropriate toast notifications.
| const { mutate: bulkRetry, status: bulkRetryStatus } = | ||
| trpc.job.bulkRetry.useMutation({ | ||
| onSuccess(data) { | ||
| toast.success( | ||
| `Retried ${data.succeeded} job${data.succeeded !== 1 ? "s" : ""}${data.failed > 0 ? `, ${data.failed} failed` : ""}`, | ||
| ); | ||
| }, | ||
| onError(error) { | ||
| toast.error(error.message || "Failed to retry jobs"); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
The bulkRetry mutation is missing query invalidation to refresh the job list after retrying jobs. When jobs are retried, they move from "failed" status to "waiting" status, but the UI won't automatically reflect this change. Users would need to manually refresh or wait for the polling interval.
Similar mutations in the codebase (like in JobActionMenu) rely on callbacks to trigger UI updates. Consider adding query invalidation or a callback mechanism to refresh the job list after successful bulk retry.
| createdAt, | ||
| processedAt, | ||
| finishedAt, | ||
| failedReason: undefined, |
There was a problem hiding this comment.
The Bee adapter's failedReason is now set to undefined instead of an empty string, which is inconsistent with other adapters (BullMQ, GroupMQ, Bull) that use empty string or the actual error message. This inconsistency could cause issues in the UI where failedReason is expected to be a string.
For consistency, consider using an empty string "" when there's no failed reason, matching the pattern used in other adapters.
| failedReason: undefined, | |
| failedReason: "", |
| groups: procedure | ||
| .input( | ||
| z.object({ | ||
| queueName: z.string(), | ||
| }), | ||
| ) | ||
| .query(async ({ input: { queueName }, ctx }) => { | ||
| const internalCtx = transformContext(ctx); | ||
| const queueInCtx = findQueueInCtxOrFail({ | ||
| queues: internalCtx.queues, | ||
| queueName, | ||
| }); | ||
|
|
||
| if (!queueInCtx.adapter.supports.groups) { | ||
| return []; | ||
| } | ||
|
|
||
| try { | ||
| return await queueInCtx.adapter.getGroups(); | ||
| } catch (e) { | ||
| throw new TRPCError({ | ||
| code: "INTERNAL_SERVER_ERROR", | ||
| message: e instanceof Error ? e.message : undefined, | ||
| }); | ||
| } | ||
| }), |
There was a problem hiding this comment.
The new groups endpoint and getGroups() method lack test coverage. The test suite includes comprehensive tests for other features like bulkRemove and schedulers, but no tests are present for the groups functionality.
Consider adding tests for:
- Fetching groups when groups are supported
- Handling empty groups list
- Verifying group counts are accurate
- Testing the fallback implementation path
| bulkRetry: procedure | ||
| .input( | ||
| z.object({ | ||
| queueName: z.string(), | ||
| jobIds: z.array(z.string()), | ||
| }), | ||
| ) | ||
| .mutation(async ({ input: { jobIds, queueName }, ctx }) => { | ||
| const internalCtx = transformContext(ctx); | ||
| const queueInCtx = findQueueInCtxOrFail({ | ||
| queues: internalCtx.queues, | ||
| queueName, | ||
| }); | ||
|
|
||
| if (!queueInCtx.adapter.supports.retry) { | ||
| throw new TRPCError({ | ||
| code: "BAD_REQUEST", | ||
| message: `${queueInCtx.adapter.getType()} does not support retrying jobs`, | ||
| }); | ||
| } | ||
|
|
||
| try { | ||
| const results = await Promise.allSettled( | ||
| jobIds.map(async (jobId) => { | ||
| await queueInCtx.adapter.retryJob(jobId); | ||
| return jobId; | ||
| }), | ||
| ); | ||
|
|
||
| const succeeded = results | ||
| .filter((r) => r.status === "fulfilled") | ||
| .map((r) => (r as PromiseFulfilledResult<string>).value); | ||
| const failed = results.filter((r) => r.status === "rejected").length; | ||
|
|
||
| return { succeeded: succeeded.length, failed }; | ||
| } catch (e) { | ||
| if (e instanceof TRPCError) { | ||
| throw e; | ||
| } else { | ||
| throw new TRPCError({ | ||
| code: "INTERNAL_SERVER_ERROR", | ||
| message: e instanceof Error ? e.message : undefined, | ||
| }); | ||
| } | ||
| } | ||
| }), |
There was a problem hiding this comment.
The new bulkRetry endpoint lacks test coverage. The test suite includes comprehensive tests for other bulk operations like bulkRemove (including error cases with non-existent job IDs), but no tests are present for bulkRetry functionality.
Consider adding tests for:
- Successfully retrying multiple jobs
- Handling partial failures (some jobs succeed, some fail)
- Verifying the returned succeeded/failed counts
- Testing with non-existent job IDs
- Verifying jobs are moved to the correct state after retry
| ? new Date(jobWithRetry.retriedOn) | ||
| : null, | ||
| returnValue: job.returnvalue, | ||
| progress: typeof job.progress() === "number" ? (job.progress() as number) : undefined, |
There was a problem hiding this comment.
The job.progress() call is made without checking if it's a function first. In Bull, progress can be either a function or a number property, and calling it when it's not a function will throw a runtime error.
Consider adding a check like typeof job.progress === 'function' before calling it, or handle both cases (function and direct value).
| }, | ||
| retry: true, | ||
| promote: false, | ||
| promote: true, |
There was a problem hiding this comment.
The promote support was changed from false to true, but there's no indication in the PR description or commit message explaining why Bull now supports promote when it previously didn't. This could be an unintended change that might cause runtime errors if the Bull library doesn't actually support this operation.
If this change is intentional, consider verifying that Bull actually supports the promote operation and that the promoteJob method implementation is correct.
| promote: true, | |
| promote: false, |
No description provided.