Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jan 29, 2024
2 parents 6ba2add + 0fb6962 commit 9075ee7
Show file tree
Hide file tree
Showing 17 changed files with 401 additions and 344 deletions.
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
dist
.env
.env
migrations/tenants-migration-hash.txt
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ COPY migrations migrations
COPY ecosystem.config.js package.json ./
COPY --from=0 /app/node_modules node_modules
COPY --from=1 /app/dist dist

RUN node dist/scripts/migration-hash.js
RUN export DB_MIGRATION_HASH=$(echo "$(cut ./dist/migrations/tenants-migration-hash.txt)")

ENV DB_MIGRATION_HASH=$DB_MIGRATION_HASH

EXPOSE 5000
ENTRYPOINT ["docker-entrypoint.sh"]
CMD ["node", "dist/server.js"]
3 changes: 2 additions & 1 deletion src/admin-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const build = (opts: FastifyServerOptions = {}, appInstance?: FastifyInstance):
app.register(plugins.adminTenantId)
app.register(plugins.logTenantId)
app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics', '/health'] }))
app.register(routes.tenant, { prefix: 'tenants' })
app.register(routes.tenants, { prefix: 'tenants' })
app.register(routes.migrations, { prefix: 'migrations' })

let registriesToMerge: Registry[] = []

Expand Down
2 changes: 0 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type StorageConfigType = {
dbSuperUser: string
dbSearchPath: string
dbMigrationHash?: string
dbDisableTenantMigrations: boolean
databaseURL: string
databaseSSLRootCert?: string
databasePoolURL?: string
Expand Down Expand Up @@ -236,7 +235,6 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
),
dbSuperUser: getOptionalConfigFromEnv('DB_SUPER_USER') || 'postgres',
dbMigrationHash: getOptionalConfigFromEnv('DB_MIGRATION_HASH'),
dbDisableTenantMigrations: getOptionalConfigFromEnv('DB_DISABLE_TENANT_MIGRATIONS') === 'true',

// Database - Connection
dbSearchPath: getOptionalConfigFromEnv('DATABASE_SEARCH_PATH', 'DB_SEARCH_PATH') || '',
Expand Down
66 changes: 61 additions & 5 deletions src/database/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import { Client, ClientConfig } from 'pg'
import { loadMigrationFiles, MigrationError } from 'postgres-migrations'
import { getConfig } from '../config'
import { logger } from '../monitoring'
import { logger, logSchema } from '../monitoring'
import { BasicPgClient, Migration } from 'postgres-migrations/dist/types'
import { validateMigrationHashes } from 'postgres-migrations/dist/validation'
import { runMigration } from 'postgres-migrations/dist/run-migration'
import SQL from 'sql-template-strings'
import { searchPath } from './connection'
import { updateTenantMigrationVersion } from './tenant'
import { listTenantsToMigrate, updateTenantMigrationVersion } from './tenant'
import { knex } from './multitenant-db'
import { RunMigrationsOnTenants } from '../queue'

const {
isMultitenant,
multitenantDatabaseUrl,
pgQueueEnable,
databaseSSLRootCert,
dbAnonRole,
dbAuthenticatedRole,
Expand All @@ -34,6 +37,43 @@ const backportMigrations = [
},
]

/**
* Runs migrations for all tenants
* only one instance at the time is allowed to run
*/
export async function runMigrationsOnAllTenants() {
if (pgQueueEnable) {
return
}
const result = await knex.raw(`SELECT pg_try_advisory_lock(?);`, ['-8575985245963000605'])
const lockAcquired = result.rows.shift()?.pg_try_advisory_lock || false

if (!lockAcquired) {
return
}

try {
const tenants = listTenantsToMigrate()
for await (const tenantBatch of tenants) {
await Promise.allSettled(
tenantBatch.map((tenant) => {
return RunMigrationsOnTenants.send({
tenantId: tenant,
singletonKey: tenant,
tenant: {
ref: tenant,
},
})
})
)
}
} finally {
try {
await knex.raw(`SELECT pg_advisory_unlock(?);`, ['-8575985245963000605'])
} catch (e) {}
}
}

/**
* Runs multi-tenant migrations
*/
Expand All @@ -46,15 +86,20 @@ export async function runMultitenantMigrations(): Promise<void> {
/**
* Runs migrations on a specific tenant by providing its database DSN
* @param databaseUrl
* @param tenantId
*/
export async function runMigrationsOnTenant(databaseUrl: string): Promise<void> {
export async function runMigrationsOnTenant(databaseUrl: string, tenantId?: string): Promise<void> {
let ssl: ClientConfig['ssl'] | undefined = undefined

if (databaseSSLRootCert) {
ssl = { ca: databaseSSLRootCert }
}

await connectAndMigrate(databaseUrl, './migrations/tenant', ssl)
await connectAndMigrate(databaseUrl, './migrations/tenant', ssl, undefined, tenantId)

if (isMultitenant && tenantId) {
await updateTenantMigrationVersion([tenantId])
}
}

/**
Expand All @@ -63,12 +108,14 @@ export async function runMigrationsOnTenant(databaseUrl: string): Promise<void>
* @param migrationsDirectory
* @param ssl
* @param shouldCreateStorageSchema
* @param tenantId
*/
async function connectAndMigrate(
databaseUrl: string | undefined,
migrationsDirectory: string,
ssl?: ClientConfig['ssl'],
shouldCreateStorageSchema?: boolean
shouldCreateStorageSchema?: boolean,
tenantId?: string
) {
const dbConfig: ClientConfig = {
connectionString: databaseUrl,
Expand All @@ -78,6 +125,13 @@ async function connectAndMigrate(
}

const client = new Client(dbConfig)
client.on('error', (err) => {
logSchema.error(logger, 'Error on database connection', {
type: 'error',
error: err,
project: tenantId,
})
})
try {
await client.connect()
await migrate({ client }, migrationsDirectory, shouldCreateStorageSchema)
Expand Down Expand Up @@ -114,6 +168,8 @@ function runMigrations(migrationsDirectory: string, shouldCreateStorageSchema =
try {
const migrationTableName = 'migrations'

await client.query(`SET search_path TO ${searchPath.join(',')}`)

let appliedMigrations: Migration[] = []
if (await doesTableExist(client, migrationTableName)) {
const { rows } = await client.query(`SELECT * FROM ${migrationTableName} ORDER BY id`)
Expand Down
46 changes: 1 addition & 45 deletions src/database/tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { knex } from './multitenant-db'
import { StorageBackendError } from '../storage'
import { JwtPayload } from 'jsonwebtoken'
import { PubSubAdapter } from '../pubsub'
import { RunMigrationsEvent } from '../queue/events/run-migrations'

interface TenantConfig {
anonKey?: string
Expand All @@ -26,14 +25,7 @@ export interface Features {
}
}

const {
isMultitenant,
dbServiceRole,
serviceKey,
jwtSecret,
dbMigrationHash,
dbDisableTenantMigrations,
} = getConfig()
const { isMultitenant, dbServiceRole, serviceKey, jwtSecret, dbMigrationHash } = getConfig()

const tenantConfigCache = new Map<string, TenantConfig>()

Expand Down Expand Up @@ -80,42 +72,6 @@ export async function* listTenantsToMigrate() {
}
}

/**
* Runs migrations for all tenants
*/
export async function runMigrations() {
if (dbDisableTenantMigrations) {
return
}
const result = await knex.raw(`SELECT pg_try_advisory_lock(?);`, ['-8575985245963000605'])
const lockAcquired = result.rows.shift()?.pg_try_advisory_lock || false

if (!lockAcquired) {
return
}

try {
const tenants = listTenantsToMigrate()
for await (const tenantBatch of tenants) {
await Promise.allSettled(
tenantBatch.map((tenant) => {
return RunMigrationsEvent.send({
tenantId: tenant,
singletonKey: tenant,
tenant: {
ref: tenant,
},
})
})
)
}
} finally {
try {
await knex.raw(`SELECT pg_advisory_unlock(?);`, ['-8575985245963000605'])
} catch (e) {}
}
}

export function updateTenantMigrationVersion(tenantIds: string[]) {
return knex
.table('tenants')
Expand Down
2 changes: 1 addition & 1 deletion src/http/routes/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export { default as bucket } from './bucket'
export { default as object } from './object'
export { default as render } from './render'
export { default as tenant } from './tenant'
export { default as multiPart } from './tus'
export { default as healthcheck } from './health'
export * from './tenant'
Loading

0 comments on commit 9075ee7

Please sign in to comment.