Skip to content

Commit 7c0b3ad

Browse files
nickscamaramogery
andauthored
(feat/conc) Move fully to a concurrency limit system (#1045)
* Nick: conc limits init * Nick: test suite plans * fix(email_notification): move expiry check to redis * fix(email_notification): add db check in case redis resets * Update rate-limiter.ts * Update queue-jobs.ts * Create concurrency-limit.test.ts * Update concurrency-limit.test.ts * Create queue-concurrency-integration.test.ts * Update queue-concurrency-integration.test.ts --------- Co-authored-by: Móricz Gergő <[email protected]>
1 parent e2c3932 commit 7c0b3ad

12 files changed

+764
-24
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
import { redisConnection } from "../services/queue-service";
2+
import {
3+
cleanOldConcurrencyLimitEntries,
4+
getConcurrencyLimitActiveJobs,
5+
pushConcurrencyLimitActiveJob,
6+
removeConcurrencyLimitActiveJob,
7+
takeConcurrencyLimitedJob,
8+
pushConcurrencyLimitedJob,
9+
getConcurrencyQueueJobsCount,
10+
ConcurrencyLimitedJob,
11+
} from "../lib/concurrency-limit";
12+
import { CONCURRENCY_LIMIT, getConcurrencyLimitMax } from "../services/rate-limiter";
13+
import { PlanType } from "../types";
14+
15+
// Mock Redis client
16+
jest.mock("../services/queue-service", () => ({
17+
redisConnection: {
18+
zremrangebyscore: jest.fn(),
19+
zrangebyscore: jest.fn(),
20+
zadd: jest.fn(),
21+
zrem: jest.fn(),
22+
zmpop: jest.fn(),
23+
zcard: jest.fn(),
24+
},
25+
}));
26+
27+
describe("Concurrency Limit", () => {
28+
const mockTeamId = "test-team-id";
29+
const mockJobId = "test-job-id";
30+
const mockNow = 1000000;
31+
32+
beforeEach(() => {
33+
jest.clearAllMocks();
34+
});
35+
36+
describe("cleanOldConcurrencyLimitEntries", () => {
37+
it("should remove entries older than current timestamp", async () => {
38+
await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow);
39+
40+
expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith(
41+
"concurrency-limiter:test-team-id",
42+
-Infinity,
43+
mockNow
44+
);
45+
});
46+
});
47+
48+
describe("getConcurrencyLimitActiveJobs", () => {
49+
it("should return active jobs after given timestamp", async () => {
50+
const mockActiveJobs = ["job1", "job2"];
51+
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue(mockActiveJobs);
52+
53+
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
54+
55+
expect(result).toEqual(mockActiveJobs);
56+
expect(redisConnection.zrangebyscore).toHaveBeenCalledWith(
57+
"concurrency-limiter:test-team-id",
58+
mockNow,
59+
Infinity
60+
);
61+
});
62+
63+
it("should return empty array when no active jobs", async () => {
64+
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
65+
66+
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
67+
68+
expect(result).toEqual([]);
69+
});
70+
});
71+
72+
describe("pushConcurrencyLimitActiveJob", () => {
73+
it("should add job with expiration timestamp", async () => {
74+
await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, mockNow);
75+
76+
expect(redisConnection.zadd).toHaveBeenCalledWith(
77+
"concurrency-limiter:test-team-id",
78+
mockNow + 2 * 60 * 1000, // stalledJobTimeoutMs
79+
mockJobId
80+
);
81+
});
82+
});
83+
84+
describe("removeConcurrencyLimitActiveJob", () => {
85+
it("should remove job from active jobs", async () => {
86+
await removeConcurrencyLimitActiveJob(mockTeamId, mockJobId);
87+
88+
expect(redisConnection.zrem).toHaveBeenCalledWith(
89+
"concurrency-limiter:test-team-id",
90+
mockJobId
91+
);
92+
});
93+
});
94+
95+
describe("Queue Operations", () => {
96+
const mockJob: ConcurrencyLimitedJob = {
97+
id: mockJobId,
98+
data: { test: "data" },
99+
opts: {},
100+
priority: 1,
101+
};
102+
103+
describe("takeConcurrencyLimitedJob", () => {
104+
it("should return null when queue is empty", async () => {
105+
(redisConnection.zmpop as jest.Mock).mockResolvedValue(null);
106+
107+
const result = await takeConcurrencyLimitedJob(mockTeamId);
108+
109+
expect(result).toBeNull();
110+
});
111+
112+
it("should return and remove the highest priority job", async () => {
113+
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
114+
"key",
115+
[[JSON.stringify(mockJob)]],
116+
]);
117+
118+
const result = await takeConcurrencyLimitedJob(mockTeamId);
119+
120+
expect(result).toEqual(mockJob);
121+
expect(redisConnection.zmpop).toHaveBeenCalledWith(
122+
1,
123+
"concurrency-limit-queue:test-team-id",
124+
"MIN"
125+
);
126+
});
127+
});
128+
129+
describe("pushConcurrencyLimitedJob", () => {
130+
it("should add job to queue with priority", async () => {
131+
await pushConcurrencyLimitedJob(mockTeamId, mockJob);
132+
133+
expect(redisConnection.zadd).toHaveBeenCalledWith(
134+
"concurrency-limit-queue:test-team-id",
135+
mockJob.priority,
136+
JSON.stringify(mockJob)
137+
);
138+
});
139+
140+
it("should use default priority 1 when not specified", async () => {
141+
const jobWithoutPriority = { ...mockJob };
142+
delete jobWithoutPriority.priority;
143+
144+
await pushConcurrencyLimitedJob(mockTeamId, jobWithoutPriority);
145+
146+
expect(redisConnection.zadd).toHaveBeenCalledWith(
147+
"concurrency-limit-queue:test-team-id",
148+
1,
149+
JSON.stringify(jobWithoutPriority)
150+
);
151+
});
152+
});
153+
154+
describe("getConcurrencyQueueJobsCount", () => {
155+
it("should return the number of jobs in queue", async () => {
156+
const mockCount = 5;
157+
(redisConnection.zcard as jest.Mock).mockResolvedValue(mockCount);
158+
159+
const result = await getConcurrencyQueueJobsCount(mockTeamId);
160+
161+
expect(result).toBe(mockCount);
162+
expect(redisConnection.zcard).toHaveBeenCalledWith(
163+
"concurrency-limit-queue:test-team-id"
164+
);
165+
});
166+
167+
it("should return 0 for empty queue", async () => {
168+
(redisConnection.zcard as jest.Mock).mockResolvedValue(0);
169+
170+
const result = await getConcurrencyQueueJobsCount(mockTeamId);
171+
172+
expect(result).toBe(0);
173+
});
174+
});
175+
});
176+
177+
describe("getConcurrencyLimitMax", () => {
178+
it("should return correct limit for free plan", () => {
179+
const result = getConcurrencyLimitMax("free");
180+
expect(result).toBe(2);
181+
});
182+
183+
it("should return correct limit for standard plan", () => {
184+
const result = getConcurrencyLimitMax("standard");
185+
expect(result).toBe(CONCURRENCY_LIMIT.standard);
186+
});
187+
188+
it("should return correct limit for scale plan", () => {
189+
const result = getConcurrencyLimitMax("scale");
190+
expect(result).toBe(CONCURRENCY_LIMIT.scale);
191+
});
192+
193+
it("should return default limit for unknown plan", () => {
194+
const result = getConcurrencyLimitMax("unknown" as PlanType);
195+
expect(result).toBe(10);
196+
});
197+
198+
it("should handle special team IDs", () => {
199+
process.env.DEV_B_TEAM_ID = "dev-b-team";
200+
const result = getConcurrencyLimitMax("free", "dev-b-team");
201+
expect(result).toBe(120);
202+
});
203+
});
204+
205+
describe("Integration Scenarios", () => {
206+
it("should handle complete job lifecycle", async () => {
207+
const mockJob: ConcurrencyLimitedJob = {
208+
id: "lifecycle-test",
209+
data: { test: "lifecycle" },
210+
opts: {},
211+
};
212+
213+
// Push job to queue
214+
await pushConcurrencyLimitedJob(mockTeamId, mockJob);
215+
expect(redisConnection.zadd).toHaveBeenCalled();
216+
217+
// Take job from queue
218+
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
219+
"key",
220+
[[JSON.stringify(mockJob)]],
221+
]);
222+
const takenJob = await takeConcurrencyLimitedJob(mockTeamId);
223+
expect(takenJob).toEqual(mockJob);
224+
225+
// Add to active jobs
226+
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, mockNow);
227+
expect(redisConnection.zadd).toHaveBeenCalled();
228+
229+
// Verify active jobs
230+
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([mockJob.id]);
231+
const activeJobs = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
232+
expect(activeJobs).toContain(mockJob.id);
233+
234+
// Remove from active jobs
235+
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
236+
expect(redisConnection.zrem).toHaveBeenCalled();
237+
});
238+
});
239+
});

0 commit comments

Comments
 (0)