From da12151fbae6ecde0911a2fe6f76df43b4677304 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Wed, 22 Jan 2025 16:19:05 +0100 Subject: [PATCH] chore: debug problems with worker getting stuck Squashed commit of the following: commit 0e7f1c7e85bcb7dbfdc737e5e12cc59315ea13a3 Author: Wojtek Majewski Date: Wed Jan 22 16:18:51 2025 +0100 chore(deps): update deno.lock with p-queue dependency Add npm p-queue package to project dependencies Update lock file with new package and its transitive dependencies commit aa17c507bb840461e0ca2a55795d27ec044098e2 Author: Wojtek Majewski Date: Wed Jan 22 16:18:28 2025 +0100 refactor(executor): Simplify message execution and error handling Modify message execution flow to use direct signal abort check and update archiving method. Remove complex Promise.race implementation in favor of a more straightforward execution approach. commit 6c1b1c278319951b6e1814ab5a056da7226cc4ef Author: Wojtek Majewski Date: Wed Jan 22 16:15:47 2025 +0100 refactor(edge-worker): Adjust max concurrency and delay settings Modify max concurrent workers and randomize delay time for test sequence increment to improve worker behavior and testing --- pkgs/edge-worker/deno.lock | 20 ++++++++++++++++- pkgs/edge-worker/src/MessageExecutor.ts | 22 +++++-------------- .../functions/max_concurrency/index.ts | 8 +++---- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/pkgs/edge-worker/deno.lock b/pkgs/edge-worker/deno.lock index 0d98690..470fec3 100644 --- a/pkgs/edge-worker/deno.lock +++ b/pkgs/edge-worker/deno.lock @@ -7,7 +7,8 @@ "jsr:@std/async": "jsr:@std/async@1.0.9", "jsr:@std/fmt@1.0.3": "jsr:@std/fmt@1.0.3", "jsr:@std/internal@^1.0.5": "jsr:@std/internal@1.0.5", - "jsr:@std/io@0.225.0": "jsr:@std/io@0.225.0" + "jsr:@std/io@0.225.0": "jsr:@std/io@0.225.0", + "npm:p-queue": "npm:p-queue@8.0.1" }, "jsr": { "@deno-library/progress@1.5.1": { @@ -35,6 +36,23 @@ "@std/io@0.225.0": { "integrity": "c1db7c5e5a231629b32d64b9a53139445b2ca640d828c26bf23e1c55f8c079b3" } + }, + "npm": { + "eventemitter3@5.0.1": { + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", + "dependencies": {} + }, + "p-queue@8.0.1": { + "integrity": "sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==", + "dependencies": { + "eventemitter3": "eventemitter3@5.0.1", + "p-timeout": "p-timeout@6.1.3" + } + }, + "p-timeout@6.1.3": { + "integrity": "sha512-UJUyfKbwvr/uZSV6btANfb+0t/mOhKV/KXcCUTp8FcQI+v/0d+wXqH4htrW0E4rR6WiEO/EPvUFiV9D5OI4vlw==", + "dependencies": {} + } } }, "remote": { diff --git a/pkgs/edge-worker/src/MessageExecutor.ts b/pkgs/edge-worker/src/MessageExecutor.ts index 7effda8..e59f9cd 100644 --- a/pkgs/edge-worker/src/MessageExecutor.ts +++ b/pkgs/edge-worker/src/MessageExecutor.ts @@ -39,26 +39,16 @@ export class MessageExecutor { throw new AbortError(); } - await Promise.race([ - this.messageHandler(this.record.message!), - new Promise((_, reject) => { - this.signal.addEventListener( - 'abort', - () => { - console.log( - `################ MessageExecutor aborted during execution: ${this.msgId} ##` - ); - reject(new AbortError()); - }, - { once: true } - ); - }), - ]); + // Check if already aborted before starting + this.signal.throwIfAborted(); + + await this.messageHandler(this.record.message!); console.log( `[MessageExecutor] Task ${this.msgId} completed successfully, archiving...` ); - await this.batchArchiver.add(this.msgId); + await this.queue.archive(this.msgId); + // await this.batchArchiver.add(this.msgId); } catch (error) { await this.handleExecutionError(error); } diff --git a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts index 7d14eb7..029ca3d 100644 --- a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts +++ b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts @@ -8,9 +8,9 @@ console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); const sql = postgres(EDGE_WORKER_DB_URL, { prepare: true }); async function incrementSeq() { - await delay(100); - // const randTimeMs = Math.floor(Math.random() * 10); - // await delay(randTimeMs); + // await delay(1000); + const randTimeMs = Math.floor(Math.random() * 100 + 50); + await delay(randTimeMs); console.log( '[max_concurrency] last_val =', await sql`SELECT nextval('test_seq')` @@ -19,6 +19,6 @@ async function incrementSeq() { EdgeWorker.start(incrementSeq, { queueName: 'max_concurrency', - maxConcurrent: 40, + maxConcurrent: 10, maxPgConnections: 4, });