Skip to content

Commit

Permalink
chore: sort out sql files and sql tests
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit d25a50a
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 13:54:05 2025 +0100

    test(queue): refactor test utility and uncomment archive test cases

    Renamed readAllMessages to peekAllMessages with visibility timeout adjustment
    Uncommented and activated previously commented archive test scenarios for single and batch
    message archiving
    Updated test steps to use new peek function and verify message archiving behavior

commit a8b9bc0
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 13:46:31 2025 +0100

    test(queue): Refactor queue test suite and improve message handling

    Enhance Queue test suite with improved type definitions, utility functions,
    and test coverage.
    Key changes include:
    - Renamed TestMessage to TestPayload with explicit type
    - Added readAllMessages and clearDb utility functions
    - Simplified test steps and improved message reading approach
    - Commented out some archive-related test steps
    - Imported additional assert functions

commit f455688
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 11:27:59 2025 +0100

    test(queue): update readWithPoll method parameters in tests

    Adjust test parameters for queue read operations to match expected method signature

commit 4a87e70
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 11:24:15 2025 +0100

    refactor(worker): Add generic type support for WorkerLifecycle

    Introduce generic type parameter for WorkerLifecycle and Queue to improve type safety and
    flexibility.
    Update Worker and test files to support generic message payload type.

    - Add generic type MessagePayload to WorkerLifecycle class
    - Update Queue type to support generic message types
    - Modify constructor signatures to use generic types
    - Remove unnecessary queueName parameter in test cases

commit f996c1b
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 10:52:09 2025 +0100

    test(sql): improve test utility functions and add transaction rollback support

    Add withRollback utility for transactional testing and update SQL test helpers to consistently
    truncate workers table before tests

commit bf71192
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 10:42:46 2025 +0100

    test(Queue): update test cases with simplified read message polling

    Modify queue read message tests by:
    - Simplifying readWithPoll method call
    - Commenting out message content verification
    - Removing unnecessary parameters in test steps

commit f5bcc19
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 10:40:53 2025 +0100

    feat(queue): Add queue management methods and comprehensive tests

    Implement new queue operations including safeCreate, archive, and batch processing methods.
    Add extensive unit tests covering queue creation, message sending, reading, and archiving
    scenarios with different parameters.

    Adds methods:
    - safeCreate: Safely create queue if not exists
    - archive: Archive individual messages
    - archiveBatch: Archive multiple messages
    - Comprehensive test coverage for queue operations

commit d7005ee
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 10:22:05 2025 +0100

    refactor(worker): Simplify worker lifecycle and queue constructor

    Refactored WorkerLifecycle and Queue class constructors to improve dependency injection and
    reduce complexity.
    Updated constructor signatures and added a queueName getter to WorkerLifecycle.

    Changes include:
    - Simplified Queue constructor parameters
    - Modified WorkerLifecycle constructor to accept Queue directly
    - Added queueName getter in WorkerLifecycle
    - Updated Worker initialization to pass Queue to WorkerLifecycle

commit 837829a
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 10:20:54 2025 +0100

    chore(migrations): Update migration file naming and project configuration

    Rename SQL migration files to ensure correct ordering and update project.json to include
    additional migration files

commit ecf4f95
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 10:12:03 2025 +0100

    refactor(database): Move worker views from migration to dedicated SQL file

    Extracted active and inactive worker views from migration script to a separate views.sql file
    for better organization and maintainability

commit b636f7c
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 21 10:09:45 2025 +0100

    refactor(migration): relocate spawn_worker SQL file to sql directory

    Move migration utility SQL file to a more appropriate project structure
  • Loading branch information
jumski committed Jan 21, 2025
1 parent 4b67995 commit 21d1df6
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 103 deletions.
32 changes: 0 additions & 32 deletions pkgs/edge-worker/migrations/000_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,3 @@ create table if not exists edge_worker.workers (
stopped_at TIMESTAMPTZ,
last_heartbeat_at TIMESTAMPTZ not null default now()
);

-------------------------------------------------------------------------------
-- Active Workers View --------------------------------------------------------
-------------------------------------------------------------------------------
create or replace view edge_worker.active_workers as
select
worker_id,
queue_name,
function_name,
started_at,
stopped_at,
last_heartbeat_at
from edge_worker.workers
where
stopped_at is null
and last_heartbeat_at > now() - make_interval(secs => 6);

-------------------------------------------------------------------------------
-- Inactive Workers View ------------------------------------------------------
-------------------------------------------------------------------------------
create or replace view edge_worker.inactive_workers as
select
worker_id,
queue_name,
function_name,
started_at,
stopped_at,
last_heartbeat_at
from edge_worker.workers
where
stopped_at is null
and last_heartbeat_at < now() - make_interval(secs => 6);
1 change: 1 addition & 0 deletions pkgs/edge-worker/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"commands": [
"rm supabase/migrations/*.sql",
"cp migrations/*.sql supabase/migrations/",
"cp sql/*_*.sql supabase/migrations/",
"supabase db reset"
],
"parallel": false
Expand Down
File renamed without changes.
29 changes: 29 additions & 0 deletions pkgs/edge-worker/sql/995_views.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Active workers are workers that have not stopped
-- and have not sent a heartbeat in the last 6 seconds
create or replace view edge_worker.active_workers as
select
worker_id,
queue_name,
function_name,
started_at,
stopped_at,
last_heartbeat_at
from edge_worker.workers
where
stopped_at is null
and last_heartbeat_at > now() - make_interval(secs => 6);

-- Inactive workers are workers that have stopped
-- or have not sent a heartbeat in the last 6 seconds
create or replace view edge_worker.inactive_workers as
select
worker_id,
queue_name,
function_name,
started_at,
stopped_at,
last_heartbeat_at
from edge_worker.workers
where
stopped_at is null
and last_heartbeat_at < now() - make_interval(secs => 6);
15 changes: 11 additions & 4 deletions pkgs/edge-worker/src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@ import { type Json } from './types.ts';
import { MessageRecord } from './types.ts';

export class Queue<MessagePayload extends Json> {
constructor(
private readonly sql: postgres.Sql,
private readonly queueName: string
) {}
constructor(private readonly sql: postgres.Sql, readonly queueName: string) {}

/**
* Creates a queue if it doesn't exist.
* If the queue already exists, this method does nothing.
*/
async safeCreate(): Promise<void> {
await this.sql`
select * from pgmq.create(${this.queueName});
`;
}

async archive(msgId: number): Promise<void> {
await this.sql`
Expand Down
8 changes: 5 additions & 3 deletions pkgs/edge-worker/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ export class Worker<MessagePayload extends Json> {
const queue = new Queue<MessagePayload>(this.sql, this.config.queueName);
const queries = new Queries(this.sql);

this.lifecycle = new WorkerLifecycle(queries, this.logger, {
queueName: this.config.queueName,
});
this.lifecycle = new WorkerLifecycle<MessagePayload>(
queries,
queue,
this.logger
);

this.executionController = new ExecutionController<MessagePayload>(
queue,
Expand Down
15 changes: 10 additions & 5 deletions pkgs/edge-worker/src/WorkerLifecycle.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import { Heartbeat } from './Heartbeat.ts';
import { Logger } from './Logger.ts';
import { Queries } from './Queries.ts';
import { WorkerBootstrap, WorkerRow } from './types.ts';
import { Queue } from './Queue.ts';
import { Json, WorkerBootstrap, WorkerRow } from './types.ts';
import { States, WorkerState } from './WorkerState.ts';

export interface LifecycleConfig {
queueName: string;
}

export class WorkerLifecycle {
export class WorkerLifecycle<MessagePayload extends Json> {
private workerState: WorkerState = new WorkerState();
private heartbeat?: Heartbeat;
private logger: Logger;
private queries: Queries;
private readonly queueName: string;
private queue: Queue<MessagePayload>;
private workerRow?: WorkerRow;

constructor(queries: Queries, logger: Logger, config: LifecycleConfig) {
constructor(queries: Queries, queue: Queue<MessagePayload>, logger: Logger) {
this.queries = queries;
this.logger = logger;
this.queueName = config.queueName;
this.queue = queue;
}

async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
Expand Down Expand Up @@ -70,6 +71,10 @@ export class WorkerLifecycle {
return this.workerRow?.function_name;
}

get queueName() {
return this.queue.queueName;
}

async sendHeartbeat() {
await this.heartbeat?.send();
}
Expand Down
22 changes: 20 additions & 2 deletions pkgs/edge-worker/tests/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,36 @@ const DB_URL = 'postgresql://postgres:[email protected]:50322/postgres';
export function createSql() {
return postgres(DB_URL, {
prepare: true,
onnotice(_) {
onnotice(_: unknown) {
// no-op to silence notices
},
});
}

export async function withRollback<T>(
callback: (sql: postgres.Sql) => Promise<T>
): Promise<T> {
const sql = createSql();
try {
const result = (await sql.begin(
'read write',
async (sqlTx: postgres.Sql) => {
const callbackResult = await callback(sqlTx);
await sqlTx`ROLLBACK`;
return callbackResult;
}
)) as T;
return result;
} finally {
await sql.end();
}
}

export async function withSql<T>(
callback: (sql: postgres.Sql) => Promise<T>
): Promise<T> {
const sql = createSql();
try {
await sql`TRUNCATE edge_worker.workers CASCADE`;
return await callback(sql);
} finally {
await sql.end();
Expand Down
7 changes: 6 additions & 1 deletion pkgs/edge-worker/tests/unit/Queries.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { assertEquals, assertExists, assertRejects } from 'jsr:@std/assert';
import { Queries } from '../../src/Queries.ts';
import { withSql } from '../sql.ts';
import { withSql, withRollback } from '../sql.ts';

Check failure on line 3 in pkgs/edge-worker/tests/unit/Queries.test.ts

View workflow job for this annotation

GitHub Actions / main

`withRollback` is never used
import { WorkerRow } from '../../src/types.ts';

const FAKE_UUID = '123e4567-e89b-12d3-a456-426614174000';

Deno.test('Queries.onWorkerStarted integration test', async () => {
await withSql(async (sql) => {
await sql`TRUNCATE edge_worker.workers CASCADE`;
const queries = new Queries(sql);
// Test data
const queueName = 'test_queue';
Expand All @@ -32,6 +33,7 @@ Deno.test('Queries.onWorkerStarted integration test', async () => {

Deno.test('Queries.onWorkerStarted throws on duplicate worker', async () => {
await withSql(async (sql) => {
await sql`TRUNCATE edge_worker.workers CASCADE`;
const queries = new Queries(sql);

const params = {
Expand Down Expand Up @@ -59,6 +61,7 @@ Deno.test(
'Queries.sendHeartbeat updates last_heartbeat_at for started worker',
() =>
withSql(async (sql) => {
await sql`TRUNCATE edge_worker.workers CASCADE`;
const queries = new Queries(sql);

// First create a worker
Expand Down Expand Up @@ -95,6 +98,7 @@ Deno.test(

Deno.test('Queries operations fail gracefully for non-existent worker', (t) =>
withSql(async (sql) => {
await sql`TRUNCATE edge_worker.workers CASCADE`;
return; // TODO: decide if we really want to throw for non-existent worker

const queries = new Queries(sql);

Check failure on line 104 in pkgs/edge-worker/tests/unit/Queries.test.ts

View workflow job for this annotation

GitHub Actions / main

This statement is unreachable
Expand Down Expand Up @@ -130,6 +134,7 @@ Deno.test(
'Queries.onWorkerStopped updates stopped_at and last_heartbeat_at',
() =>
withSql(async (sql) => {
await sql`TRUNCATE edge_worker.workers CASCADE`;
const queries = new Queries(sql);

// First create a worker
Expand Down
142 changes: 142 additions & 0 deletions pkgs/edge-worker/tests/unit/Queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { assertEquals, assertExists } from '@std/assert';
import { Queue } from '../../src/Queue.ts';
import { type postgres, withSql } from '../sql.ts';
import { MessageRecord } from '../../src/types.ts';

// Test message type
type TestPayload = {
id: string;
data: string;
};

async function peekAllMessages(sql: postgres.Sql, queueName: string) {
return await sql<MessageRecord<TestPayload>[]>`
SELECT * FROM pgmq.read(
queue_name => ${queueName},
vt => 0,
qty => 9999
)
`;
}

async function clearDb(sql: postgres.Sql, queueName: string) {
await sql`SELECT * FROM pgmq.purge_queue(${queueName})`;
await sql`DELETE FROM edge_worker.workers`;
}

Deno.test(
'Queue#safeCreate creates queue and handles duplicate creation',
async () => {
await withSql(async (sql) => {
const queue = new Queue<TestPayload>(sql, 'test_queue_safe_create');

// First creation should succeed
await queue.safeCreate();

// Second creation should not throw
await queue.safeCreate();

// Verify queue exists using pgmq.metrics()
const metrics = await sql`
SELECT * FROM pgmq.metrics('test_queue_safe_create')
`;

assertEquals(metrics.length, 1);
assertEquals(metrics[0].queue_name, 'test_queue_safe_create');
});
}
);

Deno.test('Queue operations integration test', async (t) => {
await withSql(async (sql) => {
await clearDb(sql, 'test_queue');
const queue = new Queue<TestPayload>(sql, 'test_queue');
await queue.safeCreate();
const testMessage: TestPayload = {
id: '123',
data: 'test data',
};

await queue.send(testMessage);
const messages = await peekAllMessages(sql, 'test_queue');

assertEquals(messages.length, 1);
assertExists(messages[0].msg_id);

await t.step('set visibility timeout', async () => {
const message = messages[0];
const updatedMessage = await queue.setVt(message.msg_id, 10);
assertExists(updatedMessage);
assertEquals(updatedMessage.message, message.message);
});

await t.step('archive single message', async () => {
await queue.send(testMessage);
const [message] = await peekAllMessages(sql, 'test_queue');
await queue.archive(message.msg_id);

// Verify message is no longer available
const newMessages = await peekAllMessages(sql, 'test_queue');
assertEquals(newMessages.length, 0);
});
});
});

Deno.test('Queue batch operations', async (t) => {
await withSql(async (sql) => {
await clearDb(sql, 'test_queue_batch');
const queue = new Queue<TestPayload>(sql, 'test_queue_batch');
await queue.safeCreate();
const testMessages: TestPayload[] = [
{ id: '1', data: 'test 1' },
{ id: '2', data: 'test 2' },
{ id: '3', data: 'test 3' },
];

await t.step('send multiple messages', async () => {
for (const msg of testMessages) {
await queue.send(msg);
}
});

await t.step('read multiple messages', async () => {
const messages = await peekAllMessages(sql, 'test_queue_batch');
assertEquals(messages.length, 3);
});

await t.step('archive batch', async () => {
const messages = await peekAllMessages(sql, 'test_queue');
const msgIds = messages.map((m) => m.msg_id);
await queue.archiveBatch(msgIds);

// Verify messages are no longer available
const newMessages = await peekAllMessages(sql, 'test_queue');
assertEquals(newMessages.length, 0);
});
});
});

// Deno.test('Queue readWithPoll with different parameters', async () => {
// await withSql(async (sql) => {
// const queue = new Queue<TestPayload>(sql, 'test_queue_params');
// await queue.safeCreate();
// const testMessage: TestPayload = {
// id: '123',
// data: 'test data',
// };
//
// // Send a message
// await queue.send(testMessage);
//
// // Test different read parameters
// const messages = await queue.readWithPoll(
// 5, // batch size
// 30, // visibility timeout
// 2, // max poll seconds
// 500 // poll interval ms
// );
//
// assertEquals(messages.length, 1);
// assertEquals(messages[0].message, testMessage);
// });
// });
Loading

0 comments on commit 21d1df6

Please sign in to comment.