Skip to content

Commit

Permalink
feat(worker): enhance serial worker with sequence tracking and concur…
Browse files Browse the repository at this point in the history
…rency control

Modify serial sleep worker to add database sequence tracking and improve worker configuration.
Add end-to-end test to verify maxConcurrent behavior with precise timing checks.

- Update worker to use postgres sequence for tracking message processing
- Modify sleep function to include sequence value retrieval
- Expand worker configuration with additional parameters
- Add comprehensive test for maxConcurrent functionality
  • Loading branch information
jumski committed Jan 9, 2025
1 parent dd71b92 commit b5a65d2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 3 deletions.
17 changes: 15 additions & 2 deletions pkgs/supaworker/supabase/functions/serial-sleep-worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import { Supaworker } from '../_supaworker/index.ts';
import { delay } from 'jsr:@std/async';
import postgres from 'postgres';

const sleep1s = () => delay(1000);
const DB_POOL_URL = Deno.env.get('DB_POOL_URL')!;
const sql = postgres(DB_POOL_URL);
await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`;
await sql`ALTER SEQUENCE test_seq RESTART WITH 1`;

Supaworker.start(sleep1s, { maxConcurrent: 1 });
const sleep1s = async () => {
await sql`SELECT nextval('test_seq')`;
await delay(1000);
};

Supaworker.start(sleep1s, {
batchSize: 1,
maxConcurrent: 1,
visibilityTimeout: 1,
});
2 changes: 1 addition & 1 deletion pkgs/supaworker/tests/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ functions/

✅ Concurrency

- [ ] Worker respects maxConcurrent (does not read new messages if there are no empty slots, reads max empty-slots-count of messages)
- [x] Worker respects maxConcurrent and processes messages in serial when set to 1
43 changes: 43 additions & 0 deletions pkgs/supaworker/tests/e2e/maxConcurrent_works.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { sql } from '../sql.ts';
import {
assertEquals,
assertGreaterOrEqual,
assertLess,
} from 'jsr:@std/assert';
import { delay } from 'jsr:@std/async';
import { seqLastValue, waitForSeqValue } from './_helpers.ts';

Deno.test('worker respect maxConcurrent settings', async () => {
await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`;
await sql`ALTER SEQUENCE test_seq RESTART WITH 1`;
await sql`DELETE FROM pgmq.q_pgflow`;
await sql`DELETE FROM pgmq.a_pgflow`;
await sql`SELECT supaworker.spawn('serial-sleep-worker')`;

try {
// worker sleeps for 1s for each message
// se we will expect roughly 1 message per second
const startTime = Date.now();

await sql`SELECT pgmq.send_batch('pgflow', ARRAY['{}', '{}', '{}', '{}', '{}']::jsonb[])`;

await waitForSeqValue(5);

const endTime = Date.now();
const totalMs = Math.floor(endTime - startTime);

assertGreaterOrEqual(
totalMs,
5000,
'Should take at least 5 seconds to process all messages'
);
assertLess(
totalMs,
7000,
'Should take less than 7 seconds to process all messages'
);
} finally {
// Clean up connection
await sql.end();
}
});

0 comments on commit b5a65d2

Please sign in to comment.