@@ -2,13 +2,6 @@ import { Worker, WorkerConfig } from './Worker.ts';
2
2
import spawnNewEdgeFunction from './spawnNewEdgeFunction.ts' ;
3
3
import { Json } from './types.ts' ;
4
4
5
- /**
6
- * Extracts the edge function name from the request URL
7
- */
8
- function extractFunctionName ( req : Request ) {
9
- return new URL ( req . url ) . pathname . replace ( / ^ \/ + | \/ + $ / g, '' ) ;
10
- }
11
-
12
5
export type SupaworkerConfig = Omit < WorkerConfig , 'connectionString' > ;
13
6
14
7
export class Supaworker {
@@ -18,29 +11,52 @@ export class Supaworker {
18
11
handler : ( message : MessagePayload ) => Promise < unknown > | unknown ,
19
12
config : SupaworkerConfig = { }
20
13
) {
14
+ this . ensureFirstCall ( ) ;
15
+ const connectionString = this . getConnectionString ( ) ;
16
+
17
+ const worker = this . initializeWorker ( handler , {
18
+ ...config ,
19
+ connectionString,
20
+ } ) ;
21
+
22
+ this . setupShutdownHandler ( worker ) ;
23
+ this . setupRequestHandler ( worker ) ;
24
+ }
25
+
26
+ private static ensureFirstCall ( ) {
21
27
if ( this . wasCalled ) {
22
28
throw new Error ( 'Supaworker can only be called once' ) ;
23
29
}
24
30
this . wasCalled = true ;
31
+ }
25
32
33
+ private static getConnectionString ( ) : string {
26
34
// @ts -ignore - TODO: fix the types
27
- const DB_POOL_URL = Deno . env . get ( 'DB_POOL_URL' ) ;
28
-
29
- if ( ! DB_POOL_URL ) {
35
+ const connectionString = Deno . env . get ( 'DB_POOL_URL' ) ;
36
+ if ( ! connectionString ) {
30
37
throw new Error ( 'DB_POOL_URL is not set' ) ;
31
38
}
39
+ return connectionString ;
40
+ }
32
41
33
- const worker = new Worker < MessagePayload > (
42
+ private static initializeWorker < MessagePayload extends Json > (
43
+ handler : ( message : MessagePayload ) => Promise < unknown > | unknown ,
44
+ config : WorkerConfig
45
+ ) : Worker < MessagePayload > {
46
+ return new Worker < MessagePayload > (
34
47
async ( message ) => {
35
48
await handler ( message ) ;
36
49
} ,
37
50
{
38
- connectionString : DB_POOL_URL ,
39
51
queueName : config . queueName || 'tasks' ,
40
52
...config ,
41
53
}
42
54
) ;
55
+ }
43
56
57
+ private static setupShutdownHandler < MessagePayload extends Json > (
58
+ worker : Worker < MessagePayload >
59
+ ) {
44
60
globalThis . onbeforeunload = ( ) => {
45
61
worker . stop ( ) ;
46
62
@@ -52,9 +68,13 @@ export class Supaworker {
52
68
// use waitUntil to prevent the function from exiting
53
69
// @ts -ignore: TODO: fix the types
54
70
EdgeRuntime . waitUntil ( new Promise ( ( ) => { } ) ) ;
71
+ }
55
72
56
- Deno . serve ( ( req ) => {
57
- const edgeFunctionName = extractFunctionName ( req ) ;
73
+ private static setupRequestHandler < MessagePayload extends Json > (
74
+ worker : Worker < MessagePayload >
75
+ ) {
76
+ Deno . serve ( { } , ( req ) => {
77
+ const edgeFunctionName = this . extractFunctionName ( req ) ;
58
78
59
79
worker . startOnlyOnce ( {
60
80
edgeFunctionName,
@@ -67,4 +87,9 @@ export class Supaworker {
67
87
} ) ;
68
88
} ) ;
69
89
}
90
+
91
+ private static extractFunctionName ( req : Request ) : string {
92
+ return new URL ( req . url ) . pathname . replace ( / ^ \/ + | \/ + $ / g, '' ) ;
93
+ }
70
94
}
95
+
0 commit comments