Skip to content

Commit

Permalink
chore: Improve tests reliability and logging
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 1b5a02c05014481076e7b6d8b08f25ccc3d3064d
Author: Wojtek Majewski <[email protected]>
Date:   Sun Jan 12 00:12:19 2025 +0100

    style(logging): Improve console log formatting across Supabase functions

    Standardized log messages in multiple Supaworker functions by:
    - Adding function-specific prefixes to log messages
    - Updating log message formatting for consistency
    - Maintaining existing functionality while enhancing readability

commit 4dffa6417182ba738bc16c1a7297a9e5b3b14496
Author: Wojtek Majewski <[email protected]>
Date:   Sat Jan 11 23:26:45 2025 +0100

    test(restarts): improve error handling and assertions in worker restart test

    Add error handling for queue drop and enhance worker spawn verification
    - Wrap queue drop in try-catch to prevent test failures
    - Update sequence value assertion method
    - Improve worker spawn length check

commit 21803e735e96100a04abef6f693586bd2205dbe0
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 23:47:19 2025 +0100

    chore(supaworker): reduce max Postgres connections

    Lowered max Postgres connections from 50 to 20 in Supaworker configuration

commit b4c4a00b2f8e244fc8b5a66b9cfd88535d795272
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 23:46:12 2025 +0100

    test(restarts): simplify worker spawn test assertions

    Refactored test code to use async/await and remove redundant variable assignments, improving
    test readability and reducing unnecessary code

commit 0cc3d65450597fbc3a306e4bfec241142b37a98a
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 17:36:17 2025 +0100

    chore(deno): add configuration files for max_concurrency function

    Initialize Deno project configuration with dependency imports and lock file for Supabase function

commit 27978340534151bb83cffbbe3012ecbaee995aba
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 17:32:23 2025 +0100

    refactor(deps): update dependencies and improve logging

    Update Deno standard library to version 0.224.0
    Replace manual ANSI color codes with dim() color utility
    Enhance worker spawn logging with progress bar improvements

commit ebf918463f59cf05284e763a1bf0f52b102544fd
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 17:30:51 2025 +0100

    chore(config): increase PostgreSQL connection pool default size

    Adjust default pool size from 20 to 100 connections to support higher concurrency requirements

commit d33b5d6a411b9c0016fbce5b3b3e807a2c14dda8
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 15:24:05 2025 +0100

    refactor(tests): standardize logging and import helper function

    Replaced console.log calls with a new log helper function across multiple test files to improve
    logging consistency and readability

commit 7666c483e05b3321f6fe3ec88cbd6389e5ffeca6
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 15:02:19 2025 +0100

    fix(sql): silence postgres notices by adding onnotice handler

    Suppress unnecessary database notices during testing by adding a no-op handler to the postgres
    connection configuration

commit 4c56dc7b698e428ada5d93073eb8e7d9f2b62cc9
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 14:19:17 2025 +0100

    refactor(supaworker): Make tests more reliable

    - rename functions
    - make each use separate queue and maintain it
    - send and assert on particular queues
    - wait for exact number of workers in maxConcurrent

commit db576cc4d5add1e8ba09cb035ff30247accbb712
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 12:55:29 2025 +0100

    test(worker): update CPU-intensive worker test and logging

    Modify test case for worker spawning with reduced message count and timeout
    Simplify CPU-intensive task logging and sequence value retrieval
    Remove unnecessary assertions and debug code from test

    Reduces test complexity and improves worker test reliability

commit 3ae76ff929cf8fc165c217d5e0a5c5aa12f020b9
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 12:43:46 2025 +0100

    chore(deps): Add Deno configuration and lock files for CPU-intensive function

    Configure Deno project dependencies for Supabase function, including PostgreSQL library import
    and dependency lockfile

commit 51781a6c58eced35e2a64c40fc2f92be7a7ab83b
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 12:31:47 2025 +0100

    test(worker): enhance worker spawn test with improved assertions and timeout

    Improve end-to-end test for worker spawning by:
    - Adding fetchWorkers helper function
    - Increasing timeout for sequence increment
    - Adding assertion to verify multiple workers are spawned
    - Cleaning up inactive workers before test
    - Replacing assertGreaterOrEqual with assertGreater for worker count check

commit d22e575118ca63bb79150e13ea2970bcb85a4186
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 11:37:26 2025 +0100

    test(helpers): update sequence wait helper function name

    Rename waitForSeqValue to waitForSeqToIncrementBy in test helper functions
    Update import statements in two test files to reflect the new helper function name

commit a51fa24c4d2a67ca7ab8f77f84a86534a8f75da0
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 11:31:10 2025 +0100

    refactor(testing): improve async utility functions and sequence handling

    Refactored test helper functions to use a more generic waitFor utility with configurable options.
    Updated sequence value tracking and added more robust timeout and polling mechanisms for async
    testing scenarios.

    Key changes:
    - Introduced generic waitFor function with configurable polling and timeout
    - Refactored waitForSeqToIncrementBy to use new waitFor utility
    - Improved sequence value retrieval logic
    - Simplified active worker waiting mechanism
    - Added more descriptive error handling for async operations

commit f6958fc58d60b61be5766c07c006e6f5a8eaefdd
Author: Wojtek Majewski <[email protected]>
Date:   Fri Jan 10 00:59:28 2025 +0100

    feat(tests): improve worker testing and progress tracking

    Update test helpers with progress bar, optimize worker spawning, and adjust test parameters

    - Add progress bar for sequence value waiting
    - Reduce polling interval and modify test batch size
    - Simplify worker startup process
    - Update dependencies in deno.lock
    - Remove unnecessary console logs

commit ea266cfcd2583fe97c31d65cd81aa35fe3f7b07e
Author: Wojtek Majewski <[email protected]>
Date:   Thu Jan 9 23:56:33 2025 +0100

    test(supaworker): Enhance e2e test helpers and test cases

    Improve test utility functions with more flexible sequence and worker tracking:
    - Add sendBatch helper function for sending test messages
    - Extend seqLastValue to support custom sequence names
    - Implement waitForSeqValue with configurable polling and timeout
    - Add waitForActiveWorker to check worker availability
    - Update test cases to use new helper functions
    - Improve message batch sending and sequence value assertions

commit 2cf9fa04b6c6bc2b7a38f29f98d08dfe36abfb03
Author: Wojtek Majewski <[email protected]>
Date:   Thu Jan 9 23:56:08 2025 +0100

    refactor(worker): remove sequence restart command

    Remove unnecessary sequence restart operation in serial sleep worker

commit 9ed343a96a7f2696a4940502e54ecbd66e70ed1f
Author: Wojtek Majewski <[email protected]>
Date:   Thu Jan 9 23:55:25 2025 +0100

    refactor(sequence): simplify sequence increment function

    Streamline sequence creation and logging by removing unnecessary console logs
    and simplifying the incrementCounter function implementation

commit fd116e9b6bf0457d485a22ca91cee180e52e8e03
Author: Wojtek Majewski <[email protected]>
Date:   Thu Jan 9 23:52:43 2025 +0100

    chore(deps): downgrade supabase dependency to v2.1.4

    Fixes problem with console.log output in Deno

commit d5dc436a7f95851e6f32a7f89d34e5fe328cd9f7
Author: Wojtek Majewski <[email protected]>
Date:   Thu Jan 9 23:52:21 2025 +0100

    chore(nx): streamline Supabase and project configuration

    Simplified project.json commands by removing unnecessary array brackets and consolidating
    Supabase-related task configurations.
    Added new 'supabase:status' task and updated dependency and command structures for various
    project tasks.
  • Loading branch information
jumski committed Jan 11, 2025
1 parent e8edd21 commit 1a29f43
Show file tree
Hide file tree
Showing 24 changed files with 973 additions and 161 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"jsdom": "~22.1.0",
"nx": "20.2.2",
"prettier": "^2.6.2",
"supabase": "^2.2.1",
"supabase": "^2.1.4",
"tslib": "^2.3.0",
"typescript": "~5.6.2",
"typescript-eslint": "^8.13.0",
Expand Down
2 changes: 1 addition & 1 deletion pkgs/supaworker/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"executor": "nx:run-commands",
"options": {
"cwd": "pkgs/supaworker",
"commands": ["supabase db start"],
"commands": ["supabase start"],
"parallel": false
}
},
Expand Down
2 changes: 1 addition & 1 deletion pkgs/supaworker/supabase/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ major_version = 15
enabled = true
port = 50329
pool_mode = "transaction"
default_pool_size = 20
default_pool_size = 100
max_client_conn = 100

[db.seed]
Expand Down
294 changes: 294 additions & 0 deletions pkgs/supaworker/supabase/functions/cpu_intensive/deno.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions pkgs/supaworker/supabase/functions/cpu_intensive/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Supaworker } from '../_supaworker/index.ts';
import postgres from 'postgres';
import { crypto } from 'jsr:@std/crypto';

const DB_POOL_URL = Deno.env.get('DB_POOL_URL')!;
console.log('DB_POOL_URL', DB_POOL_URL);

const sql = postgres(DB_POOL_URL);
await sql`SELECT pgmq.create('cpu_intensive')`;
await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`;

async function cpuIntensiveTask() {
let data = new TextEncoder().encode('burn');
for (let i = 0; i < 10000; i++) {
data = new Uint8Array(await crypto.subtle.digest('SHA-256', data));
}
console.log(
'[cpu_intensive] last_val = ',
await sql`SELECT nextval('test_seq')`
);
}

Supaworker.start(cpuIntensiveTask, { queueName: 'cpu_intensive' });
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ console.log('DB_POOL_URL', DB_POOL_URL);

const sql = postgres(DB_POOL_URL);
await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`;
await sql`ALTER SEQUENCE test_seq RESTART WITH 1`;
await sql`SELECT pgmq.create('increment_sequence')`;

async function incrementCounter() {
await sql`SELECT nextval('test_seq')`;
console.log(
'[increment_sequence] next_seq =',
await sql`SELECT nextval('test_seq')`
);
}

Supaworker.start(incrementCounter);
Supaworker.start(incrementCounter, { queueName: 'increment_sequence' });
5 changes: 5 additions & 0 deletions pkgs/supaworker/supabase/functions/max_concurrency/deno.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"imports": {
"postgres": "https://deno.land/x/[email protected]/mod.js"
}
}
294 changes: 294 additions & 0 deletions pkgs/supaworker/supabase/functions/max_concurrency/deno.lock

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions pkgs/supaworker/supabase/functions/max_concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Supaworker } from '../_supaworker/index.ts';
import postgres from 'postgres';

const DB_POOL_URL = Deno.env.get('DB_POOL_URL')!;
console.log('DB_POOL_URL', DB_POOL_URL);

const sql = postgres(DB_POOL_URL);
await sql`SELECT pgmq.create('max_concurrency')`;
await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`;

async function incrementSeq() {
console.log('last_val is ', await sql`SELECT nextval('test_seq')`);
}

Supaworker.start(incrementSeq, {
queueName: 'max_concurrency',
batchSize: 50,
maxConcurrent: 50,
maxPgConnections: 20,
});
5 changes: 5 additions & 0 deletions pkgs/supaworker/supabase/functions/serial_sleep/deno.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"imports": {
"postgres": "https://deno.land/x/[email protected]/mod.js"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import postgres from 'postgres';
const DB_POOL_URL = Deno.env.get('DB_POOL_URL')!;
const sql = postgres(DB_POOL_URL);
await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`;
await sql`ALTER SEQUENCE test_seq RESTART WITH 1`;
await sql`SELECT pgmq.create('serial_sleep')`;

const sleep1s = async () => {
await sql`SELECT nextval('test_seq')`;
console.time('Task time');
const lastVal = await sql`SELECT nextval('test_seq')`;
console.log('[serial_sleep] lastVal =', lastVal);
await delay(1000);
console.timeEnd('Task time');
};

Supaworker.start(sleep1s, {
queueName: 'serial_sleep',
batchSize: 1,
maxConcurrent: 1,
visibilityTimeout: 1,
Expand Down
134 changes: 105 additions & 29 deletions pkgs/supaworker/tests/e2e/_helpers.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,69 @@
import { sql } from '../sql.ts';
import { delay } from 'jsr:@std/async';
import ProgressBar from 'jsr:@deno-library/progress';
import { dim } from 'https://deno.land/[email protected]/fmt/colors.ts';

interface WaitForOptions {
pollIntervalMs?: number;
timeoutMs?: number;
description?: string;
}

export async function log(message: string, ...args: any[]) {
console.log(dim(` -> ${message}`), ...args);
}

export async function waitFor<T>(
predicate: () => Promise<T | false>,
options: WaitForOptions = {}
): Promise<T> {
const {
pollIntervalMs = 250,
timeoutMs = 30000,
description = 'condition',
} = options;

const startTime = Date.now();

while (true) {
const result = await predicate();

if (result) return result;

if (Date.now() - startTime > timeoutMs) {
throw new Error(
`Timeout after ${timeoutMs}ms waiting for ${description}`
);
}

await delay(pollIntervalMs);
}
}

export async function sendBatch(count: number, queueName: string) {
return await sql`
SELECT pgmq.send_batch(
${queueName},
ARRAY(
SELECT '{}'::jsonb
FROM generate_series(1, ${count}::integer)
)
)`;
}

export async function seqLastValue(
seqName: string = 'test_seq'
): Promise<number> {
const seqResult = await sql`SELECT last_value::integer FROM ${sql(seqName)}`;
// Postgres sequences are initialized with a value of 1,
// but incrementing them for the first time does not increment the last_value,
// only sets is_called to true
const seqResult = await sql`
SELECT
CASE
WHEN is_called THEN last_value::integer
ELSE 0
END as last_value
FROM ${sql(seqName)}`;
return seqResult[0].last_value;
}

Expand All @@ -14,46 +73,63 @@ interface WaitForSeqValueOptions {
timeoutMs?: number;
}

export async function waitForSeqValue(
export async function waitForSeqToIncrementBy(
value: number,
options: WaitForSeqValueOptions = {}
): Promise<number> {
const {
pollIntervalMs = 1000,
seqName = 'test_seq',
timeoutMs = 30000,
} = options;
const { seqName = 'test_seq' } = options;

const startTime = Date.now();
let lastVal = 0;
let perSecond = 0;

while (lastVal < value) {
if (Date.now() - startTime > timeoutMs) {
throw new Error(
`Timeout waiting for sequence ${seqName} to reach value ${value}`
);
const progress = new ProgressBar({
title: `${seqName} (${perSecond}/s)`,
total: value,
width: 20,
display: dim(
` -> incrementing "${seqName}": :completed/:total (:eta left) [:bar] :percent`
),
prettyTime: true,
});

const startVal = await seqLastValue(seqName);
let lastVal = startVal;

return await waitFor(
async () => {
lastVal = await seqLastValue(seqName);
progress.render(lastVal);
const incrementedBy = lastVal - startVal;

return incrementedBy >= value ? lastVal : false;
},
{
...options,
description: `sequence ${seqName} to reach value ${value}`,
}
);
}

console.log(`Polling ${seqName}... current value:`, lastVal);
await delay(pollIntervalMs);
lastVal = await seqLastValue(seqName);
}
return lastVal;
export async function waitForActiveWorker() {
return await waitFor(
async () => {
const [{ has_active: hasActiveWorker }] =
await sql`SELECT count(*) > 0 AS has_active FROM supaworker.active_workers`;
log('waiting for active worker ', hasActiveWorker);
return hasActiveWorker;
},
{
pollIntervalMs: 300,
description: 'active worker',
}
);
}

export async function fetchWorkers(workerName: string) {
return await sql`SELECT * FROM supaworker.workers`;
}

export async function startWorker(workerName: string, seconds: number = 5) {
await sql`SELECT supaworker.spawn(${workerName})`;

let workers = await fetchWorkers(workerName);
console.log('Waiting for worker to spawn...');

while (workers.length === 0) {
await delay(500);
workers = await fetchWorkers(workerName);
}
console.log('Worker spawned!');
await sql`SELECT supaworker.spawn(${workerName}::text)`;
await waitForActiveWorker();
log('worker spawned!');
}
56 changes: 56 additions & 0 deletions pkgs/supaworker/tests/e2e/maxConcurrent.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { sql } from '../sql.ts';
import { assertGreaterOrEqual } from 'jsr:@std/assert';
import {
waitFor,
sendBatch,
waitForSeqToIncrementBy,
startWorker,
log,
} from './_helpers.ts';

const MESSAGES_TO_SEND = 5;
const WORKER_NAME = 'serial_sleep';

Deno.test('worker respect maxConcurrent settings', async () => {
await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`;
await sql`ALTER SEQUENCE test_seq RESTART WITH 1`;
await sql`SELECT pgmq.create(${WORKER_NAME})`;
await sql`SELECT pgmq.drop_queue(${WORKER_NAME})`;
await sql`SELECT pgmq.create(${WORKER_NAME})`;
await startWorker(WORKER_NAME);
await waitFor(
async () => {
const [{ worker_count }] = await sql`
SELECT COUNT(*)::integer AS worker_count
FROM supaworker.active_workers
WHERE edge_fn_name = ${WORKER_NAME}
`;

log('worker_count', worker_count);
return worker_count === 1;
},
{ description: 'Waiting for exacly one worker' }
);

try {
// worker sleeps for 1s for each message
// se we will expect roughly 1 message per second
const startTime = Date.now();

await sendBatch(MESSAGES_TO_SEND, WORKER_NAME);
await waitForSeqToIncrementBy(MESSAGES_TO_SEND, {
timeoutMs: MESSAGES_TO_SEND * 1000 + 1000,
});

const endTime = Date.now();
const totalMs = Math.round(endTime - startTime);

assertGreaterOrEqual(
totalMs,
(MESSAGES_TO_SEND - 1) * 1000,
`Should take at least ${MESSAGES_TO_SEND}s to process all messages, took ${totalMs}ms instead`
);
} finally {
await sql.end();
}
});
43 changes: 0 additions & 43 deletions pkgs/supaworker/tests/e2e/maxConcurrent_works.test.ts

This file was deleted.

Loading

0 comments on commit 1a29f43

Please sign in to comment.