Skip to content

Commit

Permalink
chore: improve worker reliability in various means
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 06edb2f
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 14 13:55:09 2025 +0100

    refactor(worker): Introduce WorkerState enum for improved state management

    Replace string literals with strongly-typed WorkerState enum to enhance type safety and
    readability of worker state transitions.
    Modify worker state checks and assignments to use the new enum, improving code maintainability
    and reducing potential runtime errors.

    - Introduced WorkerState enum with states: Idle, Starting, Running, Stopping
    - Replaced string literal state checks with enum comparisons
    - Updated worker state assignments to use enum values
    - Enhanced error handling in start method by throwing an error instead of silently returning

commit 78eb845
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 14 13:20:26 2025 +0100

    refactor(executor): Improve execution and lifecycle management

    Refactor MessageExecutor and ExecutionController to enhance execution
    tracking and error handling:
    - Introduce Promise.withResolvers for better promise management
    - Move executor tracking and semaphore release to more robust lifecycle
    - Ensure cleanup happens consistently via finally() handler
    - Add hasStarted flag to prevent multiple execution attempts

commit 78bb094
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 14 12:48:33 2025 +0100

    feat(supaworker): Implement batch archiving mechanism for message processing

    Enhance message handling with BatchArchiver class to support batched message archiving.
    Added methods for adding messages, scheduling archives, and flushing pending messages.
    Updated ExecutionController and MessageExecutor to integrate batch archiving functionality.
    Included a temporary helper function for testing batch archiver synchronization.

commit ff168dd
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 14 12:23:36 2025 +0100

    feat(worker): Add batch archiving and improve execution logging

    Implement BatchArchiver for efficient message batch processing
    Enhance ExecutionController with batch archiving and detailed logging
    Update message execution flow to support new archiving mechanism

    - Create BatchArchiver class for configurable batch message archiving
    - Modify ExecutionController to integrate BatchArchiver
    - Add logging for message execution start and synchronous failure
    - Improve type generics for better type safety

commit 51535f9
Author: Wojtek Majewski <[email protected]>
Date:   Tue Jan 14 09:24:06 2025 +0100

    feat(supaworker): adjust max concurrency and delay settings

    Modify Supabase worker function to:
    - Increase max concurrent workers from 20 to 40
    - Reduce delay from random duration to fixed 100ms
    - Remove unnecessary sequence creation and extra nextval call

commit 0d466ee
Author: Wojtek Majewski <[email protected]>
Date:   Mon Jan 13 23:51:46 2025 +0100

    chore(config): Increase Postgres connection pool settings

    Doubled default pool size and max client connections for improved database performance
  • Loading branch information
jumski committed Jan 14, 2025
1 parent 02b5905 commit af902bf
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 155 deletions.
110 changes: 110 additions & 0 deletions pkgs/supaworker/src/BatchArchiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Queue } from './Queue.ts';
import { Json } from './types.ts';

interface BatchConfig {
batchSize?: number;
timeoutMs?: number;
}

/**
* A class that manages the archiving of messages in batches.
*/
export class BatchArchiver<MessagePayload extends Json> {
private pending = new Set<number>();
private timeoutId?: number;
private config: Required<BatchConfig>;

constructor(private queue: Queue<MessagePayload>, config: BatchConfig = {}) {
this.config = {
batchSize: 100,
timeoutMs: 1000,
...config,
};
}

/**
* Adds a message ID to the pending set and schedules an archive for the next batch.
* @param msgId The message ID to add to the pending set
* @returns Promise that resolves when the message has been added to the pending set
*/
async add(msgId: number): Promise<void> {
this.pending.add(msgId);
await this.archiveImmediatelyOrSchedule();
}

/**
* Clears any pending timeout.
*/
private clearTimeout() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = undefined;
}
}

/**
* Archives the current batch of pending messages immediately if the batch size
* is less than or equal to the configured batch size.
*
* Otherwise, schedules an archive for the next batch.
*/
private async archiveImmediatelyOrSchedule(): Promise<void> {
if (this.pending.size >= this.config.batchSize) {
this.clearTimeout();
await this.archiveBatch();
} else {
this.setupTimeout();
}
}

/**
* Sets up a timeout to archive the current batch of pending messages.
*
* If the timeout is already set, it will clear the existing timeout.
*/
private setupTimeout() {
if (this.timeoutId) return;

this.timeoutId = setTimeout(async () => {
this.clearTimeout();
try {
await this.archiveBatch();
} catch (error) {
console.error('Timeout-triggered archive failed:', error);
}
}, this.config.timeoutMs);
}

/**
* Archives the current batch of pending message IDs using the queue.
* Clears the pending set after successful archival.
*
* @throws Will throw an error if archiving fails
* @returns Promise that resolves when the batch has been archived
* @private
*/
private async archiveBatch(): Promise<void> {
if (this.pending.size === 0) return;

const batch = Array.from(this.pending);

try {
await this.queue.archiveBatch(batch);
this.pending.clear();
} catch (error) {
console.error('Failed to archive batch:', error);
throw error;
}
}

/**
* Archives all pending messages immediately and cleans up any scheduled timeouts.
*
* Used during shutdown to ensure all messages are archived before stopping.
* @returns Promise that resolves when all pending messages have been archived
*/
async flush(): Promise<void> {
this.clearTimeout();
await this.archiveBatch();
}
}
66 changes: 31 additions & 35 deletions pkgs/supaworker/src/ExecutionController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,61 @@ import { Queue } from './Queue.ts';
import { Json } from './types.ts';
import { MessageRecord } from './Worker.ts';
import { Sema } from 'npm:async-sema@^3.1.1';
import { BatchArchiver } from './BatchArchiver.ts';

export class ExecutionController<T extends Json> {
private executors = new Map<number, MessageExecutor<T>>();
export class ExecutionController<MessagePayload extends Json> {
private executors = new Map<number, MessageExecutor<MessagePayload>>();
private semaphore: Sema;
private archiver: BatchArchiver<MessagePayload>;

constructor(
private queue: Queue<T>,
private queue: Queue<MessagePayload>,
private signal: AbortSignal,
maxConcurrent: number = 10
) {
this.semaphore = new Sema(maxConcurrent);
this.archiver = new BatchArchiver(queue);
}

async start(
record: MessageRecord<T>,
handler: (message: T) => Promise<void>
record: MessageRecord<MessagePayload>,
handler: (message: MessagePayload) => Promise<void>
) {
await this.semaphore.acquire();

let executor: MessageExecutor<T>;
let executor: MessageExecutor<MessagePayload>;
try {
// Create executor (could throw)
executor = new MessageExecutor(this.queue, record, handler, this.signal);
executor = new MessageExecutor(
this.queue,
record,
handler,
this.signal,
this.archiver
);

// Add to tracking map (could throw)
this.executors.set(executor.msgId, executor);
// Attach cleanup before any execution
executor.finally(() => {
this.executors.delete(executor.msgId);
this.semaphore.release();
});

try {
// Start execution (could throw synchronously)
console.log(
`[ExecutionController] Starting execution for ${executor.msgId}`
);
// Only add to map after we've attached cleanup
this.executors.set(executor.msgId, executor);
executor.execute();
} catch (error) {
// Clean up if execute() throws synchronously
this.executors.delete(executor.msgId);
console.log(
`[ExecutionController] Execution failed synchronously for ${executor.msgId}, cleaning up`
);
// The finally handler will clean up the map and semaphore
throw error;
}

// Only attach finally() after successful execute()
executor.finally(() => {
this.executors.delete(executor.msgId);
this.semaphore.release();
});

return executor;
} catch (error) {
// Release semaphore if anything fails during setup
this.semaphore.release();
throw error;
}
Expand All @@ -59,20 +69,6 @@ export class ExecutionController<T extends Json> {
Array.from(this.executors.values()).map((e) => e.executionPromise)
);
}
}

private async waitForAvailableExecutor(
record: MessageRecord<T>,
handler: (message: T) => Promise<void>
) {
try {
await this.semaphore.acquire();

return new MessageExecutor(this.queue, record, handler, this.signal);
} catch (e) {
this.semaphore.release();

throw e;
}
await this.archiver.flush();
}
}
40 changes: 26 additions & 14 deletions pkgs/supaworker/src/MessageExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Json } from './types.ts';
import { type MessageRecord } from './Worker.ts';
import { Queue } from './Queue.ts';
import { BatchArchiver } from './BatchArchiver.ts';

class AbortError extends Error {
constructor() {
Expand All @@ -10,24 +11,36 @@ class AbortError extends Error {
}

export class MessageExecutor<MessagePayload extends Json> {
executionPromise?: Promise<void>;
private readonly executionPromise: Promise<void>;
private readonly resolve: (value: void | PromiseLike<void>) => void;
private readonly reject: (reason?: any) => void;
private hasStarted = false;

constructor(
private readonly queue: Queue<MessagePayload>,
private readonly record: MessageRecord<MessagePayload>,
private readonly messageHandler: (message: MessagePayload) => Promise<void>,
private readonly signal: AbortSignal
) {}
private readonly signal: AbortSignal,
private readonly batchArchiver: BatchArchiver<MessagePayload>
) {
const { promise, resolve, reject } = Promise.withResolvers<void>();
this.executionPromise = promise;
this.resolve = resolve;
this.reject = reject;
}

get msgId() {
return this.record.msg_id;
}

execute(): this {
if (!this.executionPromise) {
this.executionPromise = this._execute();
execute(): Promise<void> {
if (!this.hasStarted) {
this.hasStarted = true;
this._execute().then(this.resolve, this.reject);
} else {
console.log('[MessageExecutor] Execution already started');
}
return this;
return this.executionPromise;
}

private async _execute(): Promise<void> {
Expand All @@ -54,22 +67,21 @@ export class MessageExecutor<MessagePayload extends Json> {
abortPromise,
]);

await this.queue.archive(this.record.msg_id);
this.batchArchiver.add(this.msgId);
} catch (error: unknown) {
if (error instanceof Error && error.name === 'AbortError') {
console.log('Message processing cancelled:', this.record.msg_id);
console.log(`[MessageExecutor] Aborted execution for ${this.msgId}`);
} else {
console.error('Error processing message:', error);
// Re-queue the message on non-abort errors
await this.queue.setVt(this.msgId, 2);
}
}
}

finally(onfinally?: (() => void) | null): Promise<void> {
if (!this.executionPromise) {
throw new Error('Executor not started');
finally(onfinally?: (() => void) | null): this {
if (onfinally) {
this.executionPromise.finally(onfinally);
}
return this.executionPromise.finally(onfinally);
return this;
}
}
16 changes: 11 additions & 5 deletions pkgs/supaworker/src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ export class Queue<MessagePayload extends Json> {

async archive(msgId: number): Promise<void> {
await this.sql`
SELECT pgmq.archive(${this.queueName}, ${msgId}::bigint);
SELECT pgmq.archive(queue_name => ${this.queueName}, msg_id => ${msgId}::bigint);
`;
}

async archiveBatch(msgIds: number[]): Promise<void> {
await this.sql`
SELECT pgmq.archive(queue_name => ${this.queueName}, msg_ids => ${msgIds}::bigint[]);
`;
}

async send(message: MessagePayload): Promise<void> {
const msgJson = JSON.stringify(message);
await this.sql`
SELECT pgmq.send(${this.queueName}, ${msgJson}::jsonb)
SELECT pgmq.send(queue_name => ${this.queueName}, msg => ${msgJson}::jsonb)
`;
}

Expand Down Expand Up @@ -49,9 +55,9 @@ export class Queue<MessagePayload extends Json> {
): Promise<MessageRecord<MessagePayload>> {
const records = await this.sql<MessageRecord<MessagePayload>[]>`
SELECT * FROM pgmq.set_vt(
${this.queueName},
${msgId}::bigint,
${vtOffsetSeconds}::integer
queue_name => ${this.queueName},
msg_id => ${msgId}::bigint,
vt => ${vtOffsetSeconds}::integer
);
`;
return records[0];
Expand Down
Loading

0 comments on commit af902bf

Please sign in to comment.