Skip to content

Commit 6ba2add

Browse files
committed
feat: async tenant migrations
1 parent 46e7ff6 commit 6ba2add

File tree

12 files changed

+229
-49
lines changed

12 files changed

+229
-49
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ static/api.json
88
data/
99
bin/
1010
coverage/
11-
.idea/
11+
.idea/
12+
migrations/tenants-migration-hash.txt

docker-compose-infra.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ services:
4848
PGBOUNCER_STATS_USERS: postgres
4949

5050
supavisor:
51-
image: supabase/supavisor:1.1.6
51+
image: supabase/supavisor:1.1.23
5252
depends_on:
5353
multitenant_db:
5454
condition: service_healthy
@@ -78,7 +78,7 @@ services:
7878
command: sh -c "/app/bin/migrate && /app/bin/server"
7979

8080
supavisor_setup:
81-
image: supabase/supavisor:1.1.6
81+
image: supabase/supavisor:1.1.23
8282
command: |
8383
curl -X PUT \
8484
"http://supavisor:4000/api/tenants/bjhaohmqunupljrqypxz" \
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
ALTER TABLE tenants ADD COLUMN IF NOT EXISTS cursor_id SERIAL;
2+
ALTER TABLE tenants ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT current_timestamp;
3+
ALTER TABLE tenants ADD COLUMN IF NOT EXISTS migrations_version text null DEFAULT null;
4+
5+
create index if not exists tenants_migration_version_idx on tenants(cursor_id, migrations_version);

src/config.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import dotenv from 'dotenv'
22
import jwt from 'jsonwebtoken'
3+
import path from 'path'
34

45
export type StorageBackendType = 'file' | 's3'
56

@@ -29,6 +30,8 @@ type StorageConfigType = {
2930
dbRefreshMigrationHashesOnMismatch: boolean
3031
dbSuperUser: string
3132
dbSearchPath: string
33+
dbMigrationHash?: string
34+
dbDisableTenantMigrations: boolean
3235
databaseURL: string
3336
databaseSSLRootCert?: string
3437
databasePoolURL?: string
@@ -130,13 +133,15 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
130133

131134
dotenv.config()
132135

136+
const isMultitenant = getOptionalConfigFromEnv('MULTI_TENANT', 'IS_MULTITENANT') === 'true'
137+
133138
config = {
134139
// Tenant
135140
tenantId:
136141
getOptionalConfigFromEnv('PROJECT_REF') ||
137142
getOptionalConfigFromEnv('TENANT_ID') ||
138143
'storage-single-tenant',
139-
isMultitenant: getOptionalConfigFromEnv('MULTI_TENANT', 'IS_MULTITENANT') === 'true',
144+
isMultitenant,
140145

141146
// Server
142147
region: getOptionalConfigFromEnv('SERVER_REGION', 'REGION') || 'not-specified',
@@ -230,6 +235,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
230235
getOptionalConfigFromEnv('DB_ALLOW_MIGRATION_REFRESH') === 'false'
231236
),
232237
dbSuperUser: getOptionalConfigFromEnv('DB_SUPER_USER') || 'postgres',
238+
dbMigrationHash: getOptionalConfigFromEnv('DB_MIGRATION_HASH'),
239+
dbDisableTenantMigrations: getOptionalConfigFromEnv('DB_DISABLE_TENANT_MIGRATIONS') === 'true',
233240

234241
// Database - Connection
235242
dbSearchPath: getOptionalConfigFromEnv('DATABASE_SEARCH_PATH', 'DB_SEARCH_PATH') || '',

src/database/migrate.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import { validateMigrationHashes } from 'postgres-migrations/dist/validation'
77
import { runMigration } from 'postgres-migrations/dist/run-migration'
88
import SQL from 'sql-template-strings'
99
import { searchPath } from './connection'
10+
import { updateTenantMigrationVersion } from './tenant'
1011

1112
const {
13+
isMultitenant,
1214
multitenantDatabaseUrl,
1315
databaseSSLRootCert,
1416
dbAnonRole,

src/database/tenant.ts

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { getConfig } from '../config'
22
import { decrypt, verifyJWT } from '../auth'
3-
import { runMigrationsOnTenant } from './migrate'
43
import { knex } from './multitenant-db'
54
import { StorageBackendError } from '../storage'
65
import { JwtPayload } from 'jsonwebtoken'
76
import { PubSubAdapter } from '../pubsub'
7+
import { RunMigrationsEvent } from '../queue/events/run-migrations'
88

99
interface TenantConfig {
1010
anonKey?: string
@@ -26,7 +26,14 @@ export interface Features {
2626
}
2727
}
2828

29-
const { isMultitenant, dbServiceRole, serviceKey, jwtSecret } = getConfig()
29+
const {
30+
isMultitenant,
31+
dbServiceRole,
32+
serviceKey,
33+
jwtSecret,
34+
dbMigrationHash,
35+
dbDisableTenantMigrations,
36+
} = getConfig()
3037

3138
const tenantConfigCache = new Map<string, TenantConfig>()
3239

@@ -45,29 +52,77 @@ const singleTenantServiceKey:
4552
: undefined
4653

4754
/**
48-
* Runs migrations in a specific tenant
49-
* @param tenantId
50-
* @param databaseUrl
51-
* @param logOnError
55+
* List all tenants that have not run migrations yet
5256
*/
53-
export async function runMigrations(
54-
tenantId: string,
55-
databaseUrl: string,
56-
logOnError = false
57-
): Promise<void> {
57+
export async function* listTenantsToMigrate() {
58+
let lastCursor = 0
59+
60+
while (true) {
61+
const data = await knex
62+
.table('tenants')
63+
.select('id', 'cursor_id')
64+
.where('cursor_id', '>', lastCursor)
65+
.where((builder) => {
66+
builder
67+
.where('migrations_version', '=', dbMigrationHash || '')
68+
.orWhere('migrations_version', null)
69+
})
70+
.orderBy('cursor_id', 'desc')
71+
.limit(100)
72+
73+
yield data.map((tenant) => tenant.id)
74+
75+
if (data.length === 0) {
76+
break
77+
}
78+
79+
lastCursor = data[data.length - 1].cursor_id
80+
}
81+
}
82+
83+
/**
84+
* Runs migrations for all tenants
85+
*/
86+
export async function runMigrations() {
87+
if (dbDisableTenantMigrations) {
88+
return
89+
}
90+
const result = await knex.raw(`SELECT pg_try_advisory_lock(?);`, ['-8575985245963000605'])
91+
const lockAcquired = result.rows.shift()?.pg_try_advisory_lock || false
92+
93+
if (!lockAcquired) {
94+
return
95+
}
96+
5897
try {
59-
await runMigrationsOnTenant(databaseUrl)
60-
console.log(`${tenantId} migrations ran successfully`)
61-
} catch (error: any) {
62-
if (logOnError) {
63-
console.error(`${tenantId} migration error:`, error.message)
64-
return
65-
} else {
66-
throw error
98+
const tenants = listTenantsToMigrate()
99+
for await (const tenantBatch of tenants) {
100+
await Promise.allSettled(
101+
tenantBatch.map((tenant) => {
102+
return RunMigrationsEvent.send({
103+
tenantId: tenant,
104+
singletonKey: tenant,
105+
tenant: {
106+
ref: tenant,
107+
},
108+
})
109+
})
110+
)
67111
}
112+
} finally {
113+
try {
114+
await knex.raw(`SELECT pg_advisory_unlock(?);`, ['-8575985245963000605'])
115+
} catch (e) {}
68116
}
69117
}
70118

119+
export function updateTenantMigrationVersion(tenantIds: string[]) {
120+
return knex
121+
.table('tenants')
122+
.whereIn('id', tenantIds)
123+
.update({ migrations_version: dbMigrationHash })
124+
}
125+
71126
/**
72127
* Deletes tenants config from the in-memory cache
73128
* @param tenantId
@@ -124,7 +179,7 @@ export async function getTenantConfig(tenantId: string): Promise<TenantConfig> {
124179
},
125180
},
126181
}
127-
await cacheTenantConfigAndRunMigrations(tenantId, config)
182+
tenantConfigCache.set(tenantId, config)
128183
return config
129184
}
130185

@@ -195,12 +250,3 @@ export async function listenForTenantUpdate(pubSub: PubSubAdapter): Promise<void
195250
tenantConfigCache.delete(tenantId)
196251
})
197252
}
198-
199-
async function cacheTenantConfigAndRunMigrations(
200-
tenantId: string,
201-
config: TenantConfig,
202-
logOnError = false
203-
): Promise<void> {
204-
await runMigrations(tenantId, config.databaseUrl, logOnError)
205-
tenantConfigCache.set(tenantId, config)
206-
}

src/http/routes/tenant/index.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ import { FromSchema } from 'json-schema-to-ts'
33
import apiKey from '../../plugins/apikey'
44
import { decrypt, encrypt } from '../../../auth'
55
import { knex } from '../../../database/multitenant-db'
6-
import { deleteTenantConfig, runMigrations } from '../../../database/tenant'
6+
import { deleteTenantConfig } from '../../../database/tenant'
77
import { dbSuperUser, storage } from '../../plugins'
8+
import { runMigrationsOnTenant } from '../../../database/migrate'
89

910
const patchSchema = {
1011
body: {
@@ -150,7 +151,7 @@ export default async function routes(fastify: FastifyInstance) {
150151
maxConnections,
151152
} = request.body
152153

153-
await runMigrations(tenantId, databaseUrl)
154+
await runMigrationsOnTenant(databaseUrl)
154155
await knex('tenants').insert({
155156
id: tenantId,
156157
anon_key: encrypt(anonKey),
@@ -181,7 +182,7 @@ export default async function routes(fastify: FastifyInstance) {
181182
} = request.body
182183
const { tenantId } = request.params
183184
if (databaseUrl) {
184-
await runMigrations(tenantId, databaseUrl)
185+
await runMigrationsOnTenant(databaseUrl)
185186
}
186187
console.log(databasePoolUrl, databasePoolUrl === null)
187188
await knex('tenants')
@@ -216,7 +217,7 @@ export default async function routes(fastify: FastifyInstance) {
216217
maxConnections,
217218
} = request.body
218219
const { tenantId } = request.params
219-
await runMigrations(tenantId, databaseUrl)
220+
await runMigrationsOnTenant(databaseUrl)
220221

221222
const tenantInfo: tenantDBInterface = {
222223
id: tenantId,

src/queue/events/base-event.ts

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Queue } from '../queue'
2-
import { Job, SendOptions, WorkOptions } from 'pg-boss'
2+
import { BatchWorkOptions, Job, SendOptions, WorkOptions } from 'pg-boss'
33
import { getServiceKeyUser } from '../../database/tenant'
44
import { getPostgresConnection } from '../../database'
55
import { Storage } from '../../storage'
@@ -11,19 +11,33 @@ import { logger } from '../../monitoring'
1111

1212
export interface BasePayload {
1313
$version: string
14+
singletonKey?: string
1415
reqId?: string
1516
tenant: {
1617
ref: string
1718
host: string
1819
}
1920
}
2021

21-
export type StaticThis<T> = { new (...args: any): T }
22-
2322
const { pgQueueEnable, storageBackendType, storageS3Endpoint } = getConfig()
2423
const storageS3Protocol = storageS3Endpoint?.includes('http://') ? 'http' : 'https'
2524
const httpAgent = createAgent(storageS3Protocol)
2625

26+
type StaticThis<T extends BaseEvent<any>> = BaseEventConstructor<T>
27+
28+
interface BaseEventConstructor<Base extends BaseEvent<any>> {
29+
version: string
30+
31+
new (...args: any): Base
32+
33+
send(
34+
this: StaticThis<Base>,
35+
payload: Omit<Base['payload'], '$version'>
36+
): Promise<string | void | null>
37+
38+
eventName(): string
39+
}
40+
2741
export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
2842
public static readonly version: string = 'v1'
2943
protected static queueName = ''
@@ -46,7 +60,7 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
4660
return undefined
4761
}
4862

49-
static getWorkerOptions(): WorkOptions {
63+
static getWorkerOptions(): WorkOptions | BatchWorkOptions {
5064
return {}
5165
}
5266

@@ -55,7 +69,7 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
5569
payload: Omit<T['payload'], '$version'>
5670
) {
5771
if (!payload.$version) {
58-
;(payload as any).$version = (this as any).version
72+
;(payload as T['payload']).$version = this.version
5973
}
6074
const that = new this(payload)
6175
return that.send()
@@ -67,13 +81,13 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
6781
) {
6882
// eslint-disable-next-line @typescript-eslint/no-var-requires
6983
const { Webhook } = require('./webhook')
70-
const eventType = (this as any).eventName()
84+
const eventType = this.eventName()
7185

7286
try {
7387
await Webhook.send({
7488
event: {
7589
type: eventType,
76-
$version: (this as any).version,
90+
$version: this.version,
7791
applyTime: Date.now(),
7892
payload,
7993
},
@@ -85,7 +99,7 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
8599
error: e,
86100
event: {
87101
type: eventType,
88-
$version: (this as any).version,
102+
$version: this.version,
89103
applyTime: Date.now(),
90104
payload: JSON.stringify(payload),
91105
},
@@ -96,7 +110,7 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
96110
}
97111
}
98112

99-
static handle(job: Job<BaseEvent<any>['payload']>) {
113+
static handle(job: Job<BaseEvent<any>['payload']> | Job<BaseEvent<any>['payload']>[]) {
100114
throw new Error('not implemented')
101115
}
102116

@@ -122,6 +136,10 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
122136
return new Storage(storageBackend, db)
123137
}
124138

139+
singletonKey(payload: T) {
140+
return
141+
}
142+
125143
async send() {
126144
const constructor = this.constructor as typeof BaseEvent
127145

@@ -144,7 +162,10 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
144162
...this.payload,
145163
$version: constructor.version,
146164
},
147-
options: constructor.getQueueOptions(),
165+
options: {
166+
...constructor.getQueueOptions(),
167+
singletonKey: this.payload.singletonKey,
168+
},
148169
})
149170

150171
timer({

0 commit comments

Comments
 (0)