Skip to content

Commit 129922a

Browse files
Added support for pgx locking table
In order to support running migrations through PgBouncer which does not support advisory locks.
1 parent 856ea12 commit 129922a

File tree

1 file changed

+139
-19
lines changed

1 file changed

+139
-19
lines changed

database/pgx/pgx.go

+139-19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ import (
2525
_ "github.com/jackc/pgx/v4/stdlib"
2626
)
2727

28+
const (
29+
LockStrategyAdvisory = "advisory"
30+
LockStrategyTable = "table"
31+
)
32+
2833
func init() {
2934
db := Postgres{}
3035
database.Register("pgx", &db)
@@ -36,6 +41,8 @@ var (
3641

3742
DefaultMigrationsTable = "schema_migrations"
3843
DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
44+
DefaultLockTable = "schema_lock"
45+
DefaultLockStrategy = "advisory"
3946
)
4047

4148
var (
@@ -49,6 +56,8 @@ type Config struct {
4956
MigrationsTable string
5057
DatabaseName string
5158
SchemaName string
59+
LockTable string
60+
LockStrategy string
5261
migrationsSchemaName string
5362
migrationsTableName string
5463
StatementTimeout time.Duration
@@ -108,6 +117,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
108117
config.MigrationsTable = DefaultMigrationsTable
109118
}
110119

120+
if len(config.LockTable) == 0 {
121+
config.LockTable = DefaultLockTable
122+
}
123+
124+
if len(config.LockStrategy) == 0 {
125+
config.LockStrategy = DefaultLockStrategy
126+
}
127+
111128
config.migrationsSchemaName = config.SchemaName
112129
config.migrationsTableName = config.MigrationsTable
113130
if config.MigrationsTableQuoted {
@@ -133,6 +150,10 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
133150
config: config,
134151
}
135152

153+
if err := px.ensureLockTable(); err != nil {
154+
return nil, err
155+
}
156+
136157
if err := px.ensureVersionTable(); err != nil {
137158
return nil, err
138159
}
@@ -196,13 +217,16 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
196217
}
197218
}
198219

220+
lockStrategy := purl.Query().Get("x-lock-strategy")
221+
199222
px, err := WithInstance(db, &Config{
200223
DatabaseName: purl.Path,
201224
MigrationsTable: migrationsTable,
202225
MigrationsTableQuoted: migrationsTableQuoted,
203226
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
204227
MultiStatementEnabled: multiStatementEnabled,
205228
MultiStatementMaxSize: multiStatementMaxSize,
229+
LockStrategy: lockStrategy,
206230
})
207231

208232
if err != nil {
@@ -221,36 +245,110 @@ func (p *Postgres) Close() error {
221245
return nil
222246
}
223247

224-
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
225248
func (p *Postgres) Lock() error {
226249
return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
227-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
228-
if err != nil {
229-
return err
230-
}
231-
232-
// This will wait indefinitely until the lock can be acquired.
233-
query := `SELECT pg_advisory_lock($1)`
234-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
235-
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
250+
switch p.config.LockStrategy {
251+
case LockStrategyAdvisory:
252+
return p.applyAdvisoryLock()
253+
case LockStrategyTable:
254+
return p.applyTableLock()
255+
default:
256+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
236257
}
237-
return nil
238258
})
239259
}
240260

241261
func (p *Postgres) Unlock() error {
242262
return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
243-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
244-
if err != nil {
245-
return err
263+
switch p.config.LockStrategy {
264+
case LockStrategyAdvisory:
265+
return p.releaseAdvisoryLock()
266+
case LockStrategyTable:
267+
return p.releaseTableLock()
268+
default:
269+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
246270
}
271+
})
272+
}
247273

248-
query := `SELECT pg_advisory_unlock($1)`
249-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
250-
return &database.Error{OrigErr: err, Query: []byte(query)}
274+
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
275+
func (p *Postgres) applyAdvisoryLock() error {
276+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
277+
if err != nil {
278+
return err
279+
}
280+
281+
// This will wait indefinitely until the lock can be acquired.
282+
query := `SELECT pg_advisory_lock($1)`
283+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
284+
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
285+
}
286+
return nil
287+
}
288+
289+
func (p *Postgres) applyTableLock() error {
290+
tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
291+
if err != nil {
292+
return &database.Error{OrigErr: err, Err: "transaction start failed"}
293+
}
294+
295+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
296+
if err != nil {
297+
return err
298+
}
299+
300+
query := "SELECT * FROM " + p.config.LockTable + " WHERE lock_id = $1"
301+
rows, err := tx.Query(query, aid)
302+
if err != nil {
303+
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
304+
}
305+
306+
defer func() {
307+
if errClose := rows.Close(); errClose != nil {
308+
err = multierror.Append(err, errClose)
251309
}
252-
return nil
253-
})
310+
}()
311+
312+
// If row exists at all, lock is present
313+
locked := rows.Next()
314+
if locked {
315+
return database.ErrLocked
316+
}
317+
318+
query = "INSERT INTO " + p.config.LockTable + " (lock_id) VALUES ($1)"
319+
if _, err := tx.Exec(query, aid); err != nil {
320+
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
321+
}
322+
323+
return tx.Commit()
324+
}
325+
326+
func (p *Postgres) releaseAdvisoryLock() error {
327+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
328+
if err != nil {
329+
return err
330+
}
331+
332+
query := `SELECT pg_advisory_unlock($1)`
333+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
334+
return &database.Error{OrigErr: err, Query: []byte(query)}
335+
}
336+
337+
return nil
338+
}
339+
340+
func (p *Postgres) releaseTableLock() error {
341+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
342+
if err != nil {
343+
return err
344+
}
345+
346+
query := "DELETE FROM " + p.config.LockTable + " WHERE lock_id = $1"
347+
if _, err := p.db.Exec(query, aid); err != nil {
348+
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
349+
}
350+
351+
return nil
254352
}
255353

256354
func (p *Postgres) Run(migration io.Reader) error {
@@ -478,6 +576,28 @@ func (p *Postgres) ensureVersionTable() (err error) {
478576
return nil
479577
}
480578

579+
func (p *Postgres) ensureLockTable() error {
580+
if p.config.LockStrategy != LockStrategyTable {
581+
return nil
582+
}
583+
584+
var count int
585+
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1`
586+
if err := p.db.QueryRow(query, p.config.LockTable).Scan(&count); err != nil {
587+
return &database.Error{OrigErr: err, Query: []byte(query)}
588+
}
589+
if count == 1 {
590+
return nil
591+
}
592+
593+
query = `CREATE TABLE "` + p.config.LockTable + `" (lock_id BIGINT NOT NULL PRIMARY KEY)`
594+
if _, err := p.db.Exec(query); err != nil {
595+
return &database.Error{OrigErr: err, Query: []byte(query)}
596+
}
597+
598+
return nil
599+
}
600+
481601
// Copied from lib/pq implementation: https://github.com/lib/pq/blob/v1.9.0/conn.go#L1611
482602
func quoteIdentifier(name string) string {
483603
end := strings.IndexRune(name, 0)

0 commit comments

Comments
 (0)