Skip to content

Commit

Permalink
chore: debug problems with worker getting stuck
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 0e7f1c7e85bcb7dbfdc737e5e12cc59315ea13a3
Author: Wojtek Majewski <[email protected]>
Date:   Wed Jan 22 16:18:51 2025 +0100

    chore(deps): update deno.lock with p-queue dependency

    Add npm p-queue package to project dependencies
    Update lock file with new package and its transitive dependencies

commit aa17c507bb840461e0ca2a55795d27ec044098e2
Author: Wojtek Majewski <[email protected]>
Date:   Wed Jan 22 16:18:28 2025 +0100

    refactor(executor): Simplify message execution and error handling

    Modify message execution flow to use direct signal abort check and
    update archiving method.
    Remove complex Promise.race implementation
    in favor of a more straightforward execution approach.

commit 6c1b1c278319951b6e1814ab5a056da7226cc4ef
Author: Wojtek Majewski <[email protected]>
Date:   Wed Jan 22 16:15:47 2025 +0100

    refactor(edge-worker): Adjust max concurrency and delay settings

    Modify max concurrent workers and randomize delay time for test sequence
    increment to improve worker behavior and testing
  • Loading branch information
jumski committed Jan 22, 2025
1 parent 97b7e3e commit da12151
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
20 changes: 19 additions & 1 deletion pkgs/edge-worker/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 6 additions & 16 deletions pkgs/edge-worker/src/MessageExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,16 @@ export class MessageExecutor<MessagePayload extends Json> {
throw new AbortError();
}

await Promise.race([
this.messageHandler(this.record.message!),
new Promise((_, reject) => {
this.signal.addEventListener(
'abort',
() => {
console.log(
`################ MessageExecutor aborted during execution: ${this.msgId} ##`
);
reject(new AbortError());
},
{ once: true }
);
}),
]);
// Check if already aborted before starting
this.signal.throwIfAborted();

await this.messageHandler(this.record.message!);

console.log(
`[MessageExecutor] Task ${this.msgId} completed successfully, archiving...`
);
await this.batchArchiver.add(this.msgId);
await this.queue.archive(this.msgId);
// await this.batchArchiver.add(this.msgId);
} catch (error) {
await this.handleExecutionError(error);
}
Expand Down
8 changes: 4 additions & 4 deletions pkgs/edge-worker/supabase/functions/max_concurrency/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL);
const sql = postgres(EDGE_WORKER_DB_URL, { prepare: true });

async function incrementSeq() {
await delay(100);
// const randTimeMs = Math.floor(Math.random() * 10);
// await delay(randTimeMs);
// await delay(1000);
const randTimeMs = Math.floor(Math.random() * 100 + 50);
await delay(randTimeMs);
console.log(
'[max_concurrency] last_val =',
await sql`SELECT nextval('test_seq')`
Expand All @@ -19,6 +19,6 @@ async function incrementSeq() {

EdgeWorker.start(incrementSeq, {
queueName: 'max_concurrency',
maxConcurrent: 40,
maxConcurrent: 10,
maxPgConnections: 4,
});

0 comments on commit da12151

Please sign in to comment.