Skip to content

Commit

Permalink
feat: search objects v2
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jan 28, 2025
1 parent ddc5163 commit 831465a
Show file tree
Hide file tree
Showing 13 changed files with 546 additions and 76 deletions.
168 changes: 168 additions & 0 deletions migrations/tenant/0026-objects-prefixes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
--- Index Functions

CREATE OR REPLACE FUNCTION "storage"."get_level"("name" text)
RETURNS int
AS $func$
SELECT array_length(string_to_array("name", '/'), 1);
$func$ LANGUAGE SQL IMMUTABLE STRICT;

-- Table

CREATE TABLE IF NOT EXISTS "storage"."prefixes" (
"bucket_id" text,
"name" text COLLATE "C" NOT NULL,
"level" int GENERATED ALWAYS AS ("storage"."get_level"("name")) STORED,
"created_at" timestamptz DEFAULT now(),
"updated_at" timestamptz DEFAULT now(),
CONSTRAINT "prefixes_bucketId_fkey" FOREIGN KEY ("bucket_id") REFERENCES "storage"."buckets"("id"),
PRIMARY KEY ("bucket_id", "level", "name")
);

revoke all on storage.prefixes from anon, authenticated, service_role;

-- Functions

CREATE OR REPLACE FUNCTION "storage"."get_prefix"("name" text)
RETURNS text
AS $func$
SELECT
CASE WHEN strpos("name", '/') > 0 THEN
regexp_replace("name", '[\/]{1}[^\/]+\/?$', '')
ELSE
''
END;
$func$ LANGUAGE SQL IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION "storage"."get_prefixes"("name" text)
RETURNS text[]
AS $func$
DECLARE
parts text[];
prefixes text[];
prefix text;
BEGIN
-- Split the name into parts by '/'
parts := string_to_array("name", '/');
prefixes := '{}';

-- Construct the prefixes, stopping one level below the last part
FOR i IN 1..array_length(parts, 1) - 1 LOOP
prefix := array_to_string(parts[1:i], '/');
prefixes := array_append(prefixes, prefix);
END LOOP;

RETURN prefixes;
END;
$func$ LANGUAGE plpgsql IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION "storage"."add_prefixes"(
"_bucket_id" TEXT,
"_name" TEXT
)
RETURNS void
SECURITY DEFINER
AS $func$
DECLARE
prefixes text[];
BEGIN
prefixes := "storage"."get_prefixes"("_name");

IF array_length(prefixes, 1) > 0 THEN
INSERT INTO storage.prefixes (name, bucket_id)
SELECT UNNEST(prefixes) as name, "_bucket_id" ON CONFLICT DO NOTHING;
END IF;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."delete_prefix" (
"_bucket_id" TEXT,
"_name" TEXT
) RETURNS boolean
SECURITY DEFINER
AS $func$
BEGIN
-- Check if we can delete the prefix
IF EXISTS(
SELECT FROM "storage"."prefixes"
WHERE "prefixes"."bucket_id" = "_bucket_id"
AND level = "storage"."get_level"("_name") + 1
AND "prefixes"."name" COLLATE "C" LIKE "_name" || '/%'
LIMIT 1
)
OR EXISTS(
SELECT FROM "storage"."objects"
WHERE "objects"."bucket_id" = "_bucket_id"
AND "storage"."get_level"("objects"."name") = "storage"."get_level"("_name") + 1
AND "objects"."name" COLLATE "C" LIKE "_name" || '/%'
LIMIT 1
) THEN
-- There are sub-objects, skip deletion
RETURN false;
ELSE
DELETE FROM "storage"."prefixes"
WHERE "prefixes"."bucket_id" = "_bucket_id"
AND level = "storage"."get_level"("_name")
AND "prefixes"."name" = "_name";
RETURN true;
END IF;
END;
$func$ LANGUAGE plpgsql VOLATILE;

-- Triggers
CREATE OR REPLACE FUNCTION "storage"."prefixes_insert_trigger"()
RETURNS trigger
AS $func$
BEGIN
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
RETURN NEW;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."objects_insert_trigger"()
RETURNS trigger
AS $func$
BEGIN
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");

RETURN NEW;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."objects_delete_trigger"()
RETURNS trigger
AS $func$
DECLARE
prefix text;
BEGIN
prefix := "storage"."get_prefix"(OLD."name");

IF coalesce(prefix, '') != '' THEN
PERFORM "storage"."delete_prefix"(OLD."bucket_id", prefix);
END IF;

RETURN OLD;
END;
$func$ LANGUAGE plpgsql VOLATILE;

-- "storage"."prefixes"
CREATE OR REPLACE TRIGGER "prefixes_insert"
BEFORE INSERT ON "storage"."prefixes"
FOR EACH ROW
WHEN (pg_trigger_depth() < 1)
EXECUTE FUNCTION "storage"."prefixes_insert_trigger"();

CREATE OR REPLACE TRIGGER "prefixes_delete"
AFTER DELETE ON "storage"."prefixes"
FOR EACH ROW
EXECUTE FUNCTION "storage"."objects_delete_trigger"();

-- "storage"."objects"
CREATE OR REPLACE TRIGGER "objects_insert"
BEFORE INSERT ON "storage"."objects"
FOR EACH ROW
EXECUTE FUNCTION "storage"."objects_insert_trigger"();

CREATE OR REPLACE TRIGGER "objects_delete"
AFTER DELETE ON "storage"."objects"
FOR EACH ROW
EXECUTE FUNCTION "storage"."objects_delete_trigger"();
116 changes: 116 additions & 0 deletions migrations/tenant/0027-search-v2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@

-- Function to search prefixes
-- We use security definer to bypass the security invoker of the function
-- However, we filter by objects which the user has access to (see function object_exists_with_prefix)
CREATE OR REPLACE FUNCTION storage.search_prefixes (
prefix text,
bucket_name text,
limits int DEFAULT 100,
levels int default 1,
start_after text DEFAULT ''
) RETURNS TABLE (
key text,
name text,
id uuid,
updated_at timestamptz,
created_at timestamptz,
metadata jsonb
)
SECURITY DEFINER
AS $func$
BEGIN
RETURN query EXECUTE
$sql$
SELECT
split_part(name, '/', $4) AS key,
name || '/' AS name,
NULL::uuid AS id,
NULL::timestamptz AS updated_at,
NULL::timestamptz AS created_at,
NULL::jsonb AS metadata
FROM storage.prefixes
WHERE name COLLATE "C" LIKE $1 || '%'
AND bucket_id = $2
AND level = $4
AND name COLLATE "C" > $5
AND storage.object_exists_with_prefix(bucket_id, name)
ORDER BY name COLLATE "C" LIMIT $3;
$sql$
USING prefix, bucket_name, limits, levels, start_after;
END;
$func$ LANGUAGE plpgsql STABLE;


CREATE OR REPLACE FUNCTION storage.search_v2 (
prefix text,
bucket_name text,
limits int DEFAULT 100,
levels int default 1,
start_after text DEFAULT ''
) RETURNS TABLE (
key text,
name text,
id uuid,
updated_at timestamptz,
created_at timestamptz,
metadata jsonb
)
SECURITY INVOKER
AS $func$
BEGIN
RETURN query EXECUTE
$sql$
SELECT * FROM (
(SELECT * FROM storage.search_prefixes($1, $2, $3, $4, $5))
UNION ALL
(SELECT split_part(name, '/', $4) AS key,
name,
id,
updated_at,
created_at,
metadata
FROM storage.objects
WHERE name COLLATE "C" LIKE $1 || '%'
AND bucket_id = $2
AND "storage"."get_level"("name") = $4
AND name COLLATE "C" > $5
ORDER BY name COLLATE "C" LIMIT $3)
) obj
ORDER BY name COLLATE "C" LIMIT $3;
$sql$
USING prefix, bucket_name, limits, levels, start_after;
END;
$func$ LANGUAGE plpgsql STABLE;

-- Function to check if object with prefix exists
-- We use security invoker to make sure that we return only prefixes in which the user
-- has access to, if the user has bypassed RLS, we return true and don't need the check penalty
CREATE OR REPLACE FUNCTION storage.object_exists_with_prefix(
p_bucket_id TEXT,
p_name TEXT
)
RETURNS BOOLEAN
LANGUAGE plpgsql
STABLE
SECURITY INVOKER
AS $$
DECLARE
can_bypass_rls BOOLEAN;
BEGIN
SELECT rolbypassrls
INTO can_bypass_rls
FROM pg_roles
WHERE rolname = current_user;

IF can_bypass_rls THEN
RETURN TRUE;
ELSE
RETURN EXISTS (
SELECT 1
FROM storage.objects o
WHERE o.bucket_id = p_bucket_id
AND o.name LIKE p_name || '%'
);
END IF;
END;
$$;
3 changes: 3 additions & 0 deletions migrations/tenant/0028-objects-level-index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- postgres-migrations disable-transaction
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "objects_bucket_id_level_idx"
ON "storage"."objects" ("bucket_id", "storage"."get_level"("name"), "name" COLLATE "C");
2 changes: 2 additions & 0 deletions migrations/tenant/0029-object-bucket-name-sorting.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- postgres-migrations disable-transaction
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_unique on storage.objects (name COLLATE "C", bucket_id);
47 changes: 47 additions & 0 deletions migrations/tenant/0030-create-prefixes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

-- Backfill prefixes table records
-- We run this with 50k batch size to avoid long running transaction
DO $$
DECLARE
batch_size INTEGER := 50000;
total_scanned INTEGER := 0;
row_returned INTEGER := 0;
last_name TEXT COLLATE "C" := NULL;
last_bucket_id TEXT COLLATE "C" := NULL;
BEGIN
LOOP
-- Fetch a batch of objects ordered by name COLLATE "C"
WITH batch as (
SELECT id, bucket_id, name, owner, storage.get_prefix(name) as prefix
FROM storage.objects
WHERE (last_name IS NULL OR (name COLLATE "C", bucket_id) > (last_name, last_bucket_id))
ORDER BY name COLLATE "C", bucket_id
LIMIT batch_size
),
batch_count as (
SELECT COUNT(*) as count FROM batch
),
cursor as (
SELECT name as last_name, bucket_id as last_bucket FROM batch b
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
),
insert_prefixes as (
INSERT INTO storage.prefixes (bucket_id, name)
SELECT bucket_id, prefix FROM batch
WHERE coalesce(prefix, '') != ''
ON CONFLICT DO NOTHING
)
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;

RAISE NOTICE 'Object Row returned: %', row_returned;
RAISE NOTICE 'Last Object: %', last_name;

total_scanned := total_scanned + row_returned;

IF row_returned IS NULL OR row_returned < batch_size THEN
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
EXIT;
END IF;
END LOOP;
END;
$$;
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"format": "prettier -c --write src/**",
"lint": "prettier -v && prettier -c src/**",
"migration:run": "tsx src/scripts/migrate-call.ts",
"migrations:types": "tsx src/scripts/migrations-types.ts",
"docs:export": "tsx ./src/scripts/export-docs.ts",
"test:dummy-data": "tsx -r dotenv/config ./src/test/db/import-dummy-data.ts",
"test": "npm run infra:restart && npm run test:dummy-data && jest --runInBand --forceExit",
Expand Down
Loading

0 comments on commit 831465a

Please sign in to comment.