Skip to content

Commit 16d63e3

Browse files
committed
Add support for multi-schema migrations in Postgres
There is lock conflict on parallel migrations in different postgres schemas. To avoid this conflicts function GenerateAdvisoryLockId added variadic params to change lock id with schema name. Schema name taked with postgres CURRENT_SCHEMA function. Null byte used as separator between database and schema name, because any other symbol may be used in both of it. Closes #118
1 parent 9f5e1bd commit 16d63e3

File tree

4 files changed

+80
-4
lines changed

4 files changed

+80
-4
lines changed

database/postgres/postgres.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ var (
3535
type Config struct {
3636
MigrationsTable string
3737
DatabaseName string
38+
SchemaName string
3839
}
3940

4041
type Postgres struct {
@@ -68,6 +69,18 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
6869

6970
config.DatabaseName = databaseName
7071

72+
query = `SELECT CURRENT_SCHEMA()`
73+
var schemaName string
74+
if err := instance.QueryRow(query).Scan(&schemaName); err != nil {
75+
return nil, &database.Error{OrigErr: err, Query: []byte(query)}
76+
}
77+
78+
if len(schemaName) == 0 {
79+
return nil, ErrNoSchema
80+
}
81+
82+
config.SchemaName = schemaName
83+
7184
if len(config.MigrationsTable) == 0 {
7285
config.MigrationsTable = DefaultMigrationsTable
7386
}
@@ -133,7 +146,7 @@ func (p *Postgres) Lock() error {
133146
return database.ErrLocked
134147
}
135148

136-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
149+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.SchemaName)
137150
if err != nil {
138151
return err
139152
}

database/postgres/postgres_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,56 @@ func TestWithSchema(t *testing.T) {
178178
})
179179
}
180180

181+
func TestParallelSchema(t *testing.T) {
182+
mt.ParallelTest(t, versions, isReady,
183+
func(t *testing.T, i mt.Instance) {
184+
p := &Postgres{}
185+
addr := pgConnectionString(i.Host(), i.Port())
186+
d, err := p.Open(addr)
187+
if err != nil {
188+
t.Fatalf("%v", err)
189+
}
190+
defer d.Close()
191+
192+
// create foo and bar schemas
193+
if err := d.Run(bytes.NewReader([]byte("CREATE SCHEMA foo AUTHORIZATION postgres"))); err != nil {
194+
t.Fatal(err)
195+
}
196+
if err := d.Run(bytes.NewReader([]byte("CREATE SCHEMA bar AUTHORIZATION postgres"))); err != nil {
197+
t.Fatal(err)
198+
}
199+
200+
// re-connect using that schemas
201+
dfoo, err := p.Open(fmt.Sprintf("postgres://postgres@%v:%v/postgres?sslmode=disable&search_path=foo", i.Host(), i.Port()))
202+
if err != nil {
203+
t.Fatalf("%v", err)
204+
}
205+
defer dfoo.Close()
206+
207+
dbar, err := p.Open(fmt.Sprintf("postgres://postgres@%v:%v/postgres?sslmode=disable&search_path=bar", i.Host(), i.Port()))
208+
if err != nil {
209+
t.Fatalf("%v", err)
210+
}
211+
defer dbar.Close()
212+
213+
if err := dfoo.Lock(); err != nil {
214+
t.Fatal(err)
215+
}
216+
217+
if err := dbar.Lock(); err != nil {
218+
t.Fatal(err)
219+
}
220+
221+
if err := dbar.Unlock(); err != nil {
222+
t.Fatal(err)
223+
}
224+
225+
if err := dfoo.Unlock(); err != nil {
226+
t.Fatal(err)
227+
}
228+
})
229+
}
230+
181231
func TestWithInstance(t *testing.T) {
182232

183233
}

database/util.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package database
22

33
import (
4+
"bytes"
45
"fmt"
56
"hash/crc32"
67
)
78

89
const advisoryLockIdSalt uint = 1486364155
910

1011
// GenerateAdvisoryLockId inspired by rails migrations, see https://goo.gl/8o9bCT
11-
func GenerateAdvisoryLockId(databaseName string) (string, error) {
12-
sum := crc32.ChecksumIEEE([]byte(databaseName))
12+
func GenerateAdvisoryLockId(databaseName string, additionalNames ...string) (string, error) {
13+
buf := bytes.NewBufferString(databaseName)
14+
for _, name := range additionalNames {
15+
buf.WriteByte(0)
16+
buf.WriteString(name)
17+
}
18+
sum := crc32.ChecksumIEEE(buf.Bytes())
1319
sum = sum * uint32(advisoryLockIdSalt)
1420
return fmt.Sprintf("%v", sum), nil
1521
}

database/util_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,21 @@ import (
77
func TestGenerateAdvisoryLockId(t *testing.T) {
88
testcases := []struct {
99
dbname string
10+
schema string
1011
expectedID string // empty string signifies that an error is expected
1112
}{
1213
{dbname: "database_name", expectedID: "1764327054"},
14+
{dbname: "database_name", schema: "schema_name_1", expectedID: "3244152297"},
15+
{dbname: "database_name", schema: "schema_name_2", expectedID: "810103531"},
1316
}
1417

1518
for _, tc := range testcases {
1619
t.Run(tc.dbname, func(t *testing.T) {
17-
if id, err := GenerateAdvisoryLockId("database_name"); err == nil {
20+
names := []string{}
21+
if len(tc.schema) > 0 {
22+
names = append(names, tc.schema)
23+
}
24+
if id, err := GenerateAdvisoryLockId(tc.dbname, names...); err == nil {
1825
if id != tc.expectedID {
1926
t.Error("Generated incorrect ID:", id, "!=", tc.expectedID)
2027
}

0 commit comments

Comments
 (0)