Skip to content

Commit 0815e2d

Browse files
authored
Merge pull request #992 from fortnox-andreas/master
Add support for locking table in pgx-driver
2 parents 5aa4670 + 091ad5d commit 0815e2d

File tree

3 files changed

+182
-19
lines changed

3 files changed

+182
-19
lines changed

database/pgx/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ This package is for [pgx/v4](https://pkg.go.dev/github.com/jackc/pgx/v4). A back
1111
| `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds |
1212
| `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) |
1313
| `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) |
14+
| `x-lock-strategy` | `LockStrategy` | Strategy used for locking during migration (default: advisory) |
15+
| `x-lock-table` | `LockTable` | Name of the table which maintains the migration lock (default: schema_lock) |
1416
| `dbname` | `DatabaseName` | The name of the database to connect to |
1517
| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. |
1618
| `user` | | The user to sign in as |

database/pgx/pgx.go

Lines changed: 154 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ import (
2323
"github.com/jackc/pgconn"
2424
"github.com/jackc/pgerrcode"
2525
_ "github.com/jackc/pgx/v4/stdlib"
26+
"github.com/lib/pq"
27+
)
28+
29+
const (
30+
LockStrategyAdvisory = "advisory"
31+
LockStrategyTable = "table"
2632
)
2733

2834
func init() {
@@ -36,6 +42,8 @@ var (
3642

3743
DefaultMigrationsTable = "schema_migrations"
3844
DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
45+
DefaultLockTable = "schema_lock"
46+
DefaultLockStrategy = LockStrategyAdvisory
3947
)
4048

4149
var (
@@ -49,6 +57,8 @@ type Config struct {
4957
MigrationsTable string
5058
DatabaseName string
5159
SchemaName string
60+
LockTable string
61+
LockStrategy string
5262
migrationsSchemaName string
5363
migrationsTableName string
5464
StatementTimeout time.Duration
@@ -108,6 +118,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
108118
config.MigrationsTable = DefaultMigrationsTable
109119
}
110120

121+
if len(config.LockTable) == 0 {
122+
config.LockTable = DefaultLockTable
123+
}
124+
125+
if len(config.LockStrategy) == 0 {
126+
config.LockStrategy = DefaultLockStrategy
127+
}
128+
111129
config.migrationsSchemaName = config.SchemaName
112130
config.migrationsTableName = config.MigrationsTable
113131
if config.MigrationsTableQuoted {
@@ -133,6 +151,10 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
133151
config: config,
134152
}
135153

154+
if err := px.ensureLockTable(); err != nil {
155+
return nil, err
156+
}
157+
136158
if err := px.ensureVersionTable(); err != nil {
137159
return nil, err
138160
}
@@ -196,13 +218,18 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
196218
}
197219
}
198220

221+
lockStrategy := purl.Query().Get("x-lock-strategy")
222+
lockTable := purl.Query().Get("x-lock-table")
223+
199224
px, err := WithInstance(db, &Config{
200225
DatabaseName: purl.Path,
201226
MigrationsTable: migrationsTable,
202227
MigrationsTableQuoted: migrationsTableQuoted,
203228
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
204229
MultiStatementEnabled: multiStatementEnabled,
205230
MultiStatementMaxSize: multiStatementMaxSize,
231+
LockStrategy: lockStrategy,
232+
LockTable: lockTable,
206233
})
207234

208235
if err != nil {
@@ -221,36 +248,116 @@ func (p *Postgres) Close() error {
221248
return nil
222249
}
223250

224-
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
225251
func (p *Postgres) Lock() error {
226252
return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
227-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
228-
if err != nil {
229-
return err
230-
}
231-
232-
// This will wait indefinitely until the lock can be acquired.
233-
query := `SELECT pg_advisory_lock($1)`
234-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
235-
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
253+
switch p.config.LockStrategy {
254+
case LockStrategyAdvisory:
255+
return p.applyAdvisoryLock()
256+
case LockStrategyTable:
257+
return p.applyTableLock()
258+
default:
259+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
236260
}
237-
return nil
238261
})
239262
}
240263

241264
func (p *Postgres) Unlock() error {
242265
return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
243-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
244-
if err != nil {
245-
return err
266+
switch p.config.LockStrategy {
267+
case LockStrategyAdvisory:
268+
return p.releaseAdvisoryLock()
269+
case LockStrategyTable:
270+
return p.releaseTableLock()
271+
default:
272+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
246273
}
274+
})
275+
}
247276

248-
query := `SELECT pg_advisory_unlock($1)`
249-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
250-
return &database.Error{OrigErr: err, Query: []byte(query)}
277+
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
278+
func (p *Postgres) applyAdvisoryLock() error {
279+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
280+
if err != nil {
281+
return err
282+
}
283+
284+
// This will wait indefinitely until the lock can be acquired.
285+
query := `SELECT pg_advisory_lock($1)`
286+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
287+
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
288+
}
289+
return nil
290+
}
291+
292+
func (p *Postgres) applyTableLock() error {
293+
tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
294+
if err != nil {
295+
return &database.Error{OrigErr: err, Err: "transaction start failed"}
296+
}
297+
defer func() {
298+
errRollback := tx.Rollback()
299+
if errRollback != nil {
300+
err = multierror.Append(err, errRollback)
251301
}
252-
return nil
253-
})
302+
}()
303+
304+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
305+
if err != nil {
306+
return err
307+
}
308+
309+
query := "SELECT * FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1"
310+
rows, err := tx.Query(query, aid)
311+
if err != nil {
312+
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
313+
}
314+
315+
defer func() {
316+
if errClose := rows.Close(); errClose != nil {
317+
err = multierror.Append(err, errClose)
318+
}
319+
}()
320+
321+
// If row exists at all, lock is present
322+
locked := rows.Next()
323+
if locked {
324+
return database.ErrLocked
325+
}
326+
327+
query = "INSERT INTO " + pq.QuoteIdentifier(p.config.LockTable) + " (lock_id) VALUES ($1)"
328+
if _, err := tx.Exec(query, aid); err != nil {
329+
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
330+
}
331+
332+
return tx.Commit()
333+
}
334+
335+
func (p *Postgres) releaseAdvisoryLock() error {
336+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
337+
if err != nil {
338+
return err
339+
}
340+
341+
query := `SELECT pg_advisory_unlock($1)`
342+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
343+
return &database.Error{OrigErr: err, Query: []byte(query)}
344+
}
345+
346+
return nil
347+
}
348+
349+
func (p *Postgres) releaseTableLock() error {
350+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
351+
if err != nil {
352+
return err
353+
}
354+
355+
query := "DELETE FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1"
356+
if _, err := p.db.Exec(query, aid); err != nil {
357+
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
358+
}
359+
360+
return nil
254361
}
255362

256363
func (p *Postgres) Run(migration io.Reader) error {
@@ -414,6 +521,12 @@ func (p *Postgres) Drop() (err error) {
414521
if err := tables.Scan(&tableName); err != nil {
415522
return err
416523
}
524+
525+
// do not drop lock table
526+
if tableName == p.config.LockTable && p.config.LockStrategy == LockStrategyTable {
527+
continue
528+
}
529+
417530
if len(tableName) > 0 {
418531
tableNames = append(tableNames, tableName)
419532
}
@@ -478,6 +591,28 @@ func (p *Postgres) ensureVersionTable() (err error) {
478591
return nil
479592
}
480593

594+
func (p *Postgres) ensureLockTable() error {
595+
if p.config.LockStrategy != LockStrategyTable {
596+
return nil
597+
}
598+
599+
var count int
600+
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1`
601+
if err := p.db.QueryRow(query, p.config.LockTable).Scan(&count); err != nil {
602+
return &database.Error{OrigErr: err, Query: []byte(query)}
603+
}
604+
if count == 1 {
605+
return nil
606+
}
607+
608+
query = `CREATE TABLE ` + pq.QuoteIdentifier(p.config.LockTable) + ` (lock_id BIGINT NOT NULL PRIMARY KEY)`
609+
if _, err := p.db.Exec(query); err != nil {
610+
return &database.Error{OrigErr: err, Query: []byte(query)}
611+
}
612+
613+
return nil
614+
}
615+
481616
// Copied from lib/pq implementation: https://github.com/lib/pq/blob/v1.9.0/conn.go#L1611
482617
func quoteIdentifier(name string) string {
483618
end := strings.IndexRune(name, 0)

database/pgx/pgx_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,32 @@ func TestMigrate(t *testing.T) {
134134
})
135135
}
136136

137+
func TestMigrateLockTable(t *testing.T) {
138+
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
139+
ip, port, err := c.FirstPort()
140+
if err != nil {
141+
t.Fatal(err)
142+
}
143+
144+
addr := pgConnectionString(ip, port, "x-lock-strategy=table", "x-lock-table=lock_table")
145+
p := &Postgres{}
146+
d, err := p.Open(addr)
147+
if err != nil {
148+
t.Fatal(err)
149+
}
150+
defer func() {
151+
if err := d.Close(); err != nil {
152+
t.Error(err)
153+
}
154+
}()
155+
m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "pgx", d)
156+
if err != nil {
157+
t.Fatal(err)
158+
}
159+
dt.TestMigrate(t, m)
160+
})
161+
}
162+
137163
func TestMultipleStatements(t *testing.T) {
138164
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
139165
ip, port, err := c.FirstPort()

0 commit comments

Comments
 (0)