Skip to content

Commit

Permalink
refactor(migrations): consolidate edge worker SQL migrations
Browse files Browse the repository at this point in the history
Merge and simplify SQL migration files for edge worker, including:
- Combine initial schema and read_with_poll function into a single migration file
- Remove redundant migration files
- Preserve pgmq extension and edge_worker schema setup
- Retain custom read_with_poll function implementation for compatibility with older pgmq version
  • Loading branch information
jumski committed Jan 21, 2025
1 parent 21d1df6 commit de8006a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 90 deletions.
86 changes: 86 additions & 0 deletions pkgs/edge-worker/migrations/000_initial.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
create extension if not exists pgmq version '1.4.4';

create schema if not exists edge_worker;

-------------------------------------------------------------------------------
-- Workers Table --------------------------------------------------------------
-------------------------------------------------------------------------------
create table if not exists edge_worker.workers (
worker_id UUID not null primary key,
queue_name TEXT not null,
function_name TEXT not null,
started_at TIMESTAMPTZ not null default now(),
stopped_at TIMESTAMPTZ,
last_heartbeat_at TIMESTAMPTZ not null default now()
);

--------------------------------------------------------------------------------
-- Read With Poll --------------------------------------------------------------
-- --
-- This is a backport of the pgmq.read_with_poll function from version 1.5.0 --
-- It is required because it fixes a bug with high CPU usage and Supabase --
-- is still using version 1.4.4. --
-- --
-- It is slightly modified (removed headers which are not available in 1.4.1) --
-- --
-- This will be removed once Supabase upgrades to 1.5.0 or higher. --
--------------------------------------------------------------------------------
create function edge_worker.read_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER default 5,
poll_interval_ms INTEGER default 100,
conditional JSONB default '{}'
)
returns setof pgmq.message_record as $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;

sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);

FOR r IN
EXECUTE sql USING qty
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$ language plpgsql;
18 changes: 0 additions & 18 deletions pkgs/edge-worker/migrations/000_schema.sql

This file was deleted.

72 changes: 0 additions & 72 deletions pkgs/edge-worker/migrations/010_read_with_poll.sql
Original file line number Diff line number Diff line change
@@ -1,72 +0,0 @@
set search_path to edge_worker;

--------------------------------------------------------------------------------
-- Read With Poll --------------------------------------------------------------
-- --
-- This is a backport of the pgmq.read_with_poll function from version 1.5.0 --
-- It is required because it fixes a bug with high CPU usage and Supabase --
-- is still using version 1.4.4. --
-- --
-- It is slightly modified (removed headers which are not available in 1.4.1) --
-- --
-- This will be removed once Supabase upgrades to 1.5.0 or higher. --
--------------------------------------------------------------------------------
create function edge_worker.read_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER default 5,
poll_interval_ms INTEGER default 100,
conditional JSONB default '{}'
)
returns setof pgmq.message_record as $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;

sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);

FOR r IN
EXECUTE sql USING qty
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$ language plpgsql;

0 comments on commit de8006a

Please sign in to comment.