Skip to content

Commit 5d4473d

Browse files
committed
Add BigQuery driver.
1 parent 555501f commit 5d4473d

13 files changed

+594
-73
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab
2-
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite
2+
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite bigquery
33
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher
44
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
55
TEST_FLAGS ?=

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
4444
* [Firebird](database/firebird)
4545
* [MS SQL Server](database/sqlserver)
4646
* [rqlite](database/rqlite)
47+
* [BigQuery (Alpha)](database/bigquery)
4748

4849
### Database URLs
4950

database/bigquery/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
tmp/
2+
!tmp/.gitkeep

database/bigquery/README.md

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# BigQuery (Beta)
2+
3+
* Driver works with Google Cloud BigQuery
4+
* [Examples](./examples)
5+
6+
### Usage
7+
`bigquery://https://www.googleapis.com/bigquery/v2/?x-statement-timeout=0&credentials_filename=./myproject-XXXXXXXXXXXXX-XXXXXXXXXXXX.json&project_id=myproject-XXXXXXXXXXXXX&dataset_id=mydataset`
8+
9+
| Key | WithInstance Config | Default | Description |
10+
|------------------------|---------------------|--------------------------|--------------------------------------------------------------------------------------------------|
11+
| `x-migrations-table` | `MigrationsTable` | schema_migrations | Name of the migrations table |
12+
| `x-statement-timeout` | `StatementTimeout` | 0 | Abort any statement that takes more than the specified number of milliseconds |
13+
| `credentials_filename` | - | - | The location of a credential JSON file. |
14+
| `project_id` | - | - | The current Google Cloud project ID. |
15+
| `dataset_id` | `DatasetID` | - | ID of the default dataset in the current project. |
16+
17+
### Environment variables:
18+
- https://cloud.google.com/docs/authentication/application-default-credentials#GAC
19+
20+
| Key | Description |
21+
|--------------------------------|--------------------------------------------------|
22+
| GOOGLE_APPLICATION_CREDENTIALS | The location of a credential JSON file. |
23+
24+
### Data definition language (DDL) statements in Google Standard SQL
25+
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language
26+
27+
### Work with multi-statement queries
28+
https://cloud.google.com/bigquery/docs/multi-statement-queries
29+

database/bigquery/bigquery.go

+329
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
package bigquery
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
nurl "net/url"
9+
"os"
10+
"strconv"
11+
"time"
12+
13+
"cloud.google.com/go/bigquery"
14+
"github.com/hashicorp/go-multierror"
15+
"go.uber.org/atomic"
16+
"google.golang.org/api/googleapi"
17+
"google.golang.org/api/option"
18+
19+
"github.com/golang-migrate/migrate/v4/database"
20+
)
21+
22+
func init() {
23+
database.Register("bigquery", &BigQuery{})
24+
}
25+
26+
const (
27+
DefaultMigrationsTable = "schema_migrations"
28+
)
29+
30+
var (
31+
ErrNoClient = fmt.Errorf("no client")
32+
ErrNoDatasetID = fmt.Errorf("no dataset id")
33+
)
34+
35+
type Config struct {
36+
MigrationsTable string
37+
StatementTimeout time.Duration
38+
DatasetID string
39+
}
40+
41+
type VersionInfo struct {
42+
Version int `bigquery:"version"`
43+
Dirty bool `bigquery:"dirty"`
44+
}
45+
46+
type BigQuery struct {
47+
client *bigquery.Client
48+
49+
// Locking and unlocking need to use the same connection
50+
isLocked atomic.Bool
51+
52+
// Open and WithInstance need to guarantee that config is never nil
53+
config *Config
54+
}
55+
56+
func WithInstance(ctx context.Context, client *bigquery.Client, config *Config) (database.Driver, error) {
57+
if client == nil {
58+
return nil, ErrNoClient
59+
}
60+
61+
if config == nil {
62+
config = &Config{}
63+
}
64+
65+
job, err := client.Query("SELECT 1").Run(ctx)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
_, err = job.Read(ctx)
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
if len(config.DatasetID) == 0 {
76+
return nil, ErrNoDatasetID
77+
}
78+
79+
if len(config.MigrationsTable) == 0 {
80+
config.MigrationsTable = DefaultMigrationsTable
81+
}
82+
83+
bx := &BigQuery{
84+
client: client,
85+
config: config,
86+
}
87+
88+
if err := bx.ensureVersionTable(); err != nil {
89+
return nil, err
90+
}
91+
92+
return bx, nil
93+
}
94+
95+
func (b *BigQuery) Open(url string) (database.Driver, error) {
96+
ctx := context.Background()
97+
98+
purl, err := nurl.Parse(url)
99+
if err != nil {
100+
return nil, err
101+
}
102+
103+
config := &Config{}
104+
105+
opts := make([]option.ClientOption, 0)
106+
107+
q := purl.Query()
108+
109+
if q.Has("x-migrations-table") {
110+
config.MigrationsTable = q.Get("x-migrations-table")
111+
}
112+
113+
if q.Has("x-statement-timeout") {
114+
statementTimeoutString := q.Get("x-statement-timeout")
115+
if statementTimeoutString != "" {
116+
statementTimeout, err := strconv.Atoi(statementTimeoutString)
117+
if err != nil {
118+
return nil, err
119+
}
120+
config.StatementTimeout = time.Duration(statementTimeout)
121+
}
122+
}
123+
124+
if q.Has("credentials_filename") {
125+
opts = append(opts, option.WithCredentialsFile(q.Get("credentials_filename")))
126+
} else if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") != "" {
127+
opts = append(opts, option.WithCredentialsFile(os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")))
128+
} else {
129+
opts = append(opts, option.WithoutAuthentication())
130+
}
131+
132+
projectID := bigquery.DetectProjectID
133+
if q.Has("project_id") {
134+
projectID = q.Get("project_id")
135+
}
136+
137+
if q.Has("dataset_id") {
138+
config.DatasetID = q.Get("dataset_id")
139+
}
140+
141+
opts = append(opts, option.WithEndpoint(fmt.Sprintf("%s%s", purl.Host, purl.Path)))
142+
143+
client, err := bigquery.NewClient(ctx, projectID, opts...)
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
bx, err := WithInstance(ctx, client, config)
149+
if err != nil {
150+
return nil, err
151+
}
152+
153+
return bx, nil
154+
}
155+
156+
func (b *BigQuery) Close() error {
157+
err := b.client.Close()
158+
if err != nil {
159+
return err
160+
}
161+
162+
return nil
163+
}
164+
165+
func (b *BigQuery) Lock() error {
166+
if !b.isLocked.CAS(false, true) {
167+
return database.ErrLocked
168+
}
169+
return nil
170+
}
171+
172+
func (b *BigQuery) Unlock() error {
173+
if !b.isLocked.CAS(true, false) {
174+
return database.ErrNotLocked
175+
}
176+
return nil
177+
}
178+
179+
func (b *BigQuery) Run(migration io.Reader) error {
180+
migr, err := io.ReadAll(migration)
181+
if err != nil {
182+
return fmt.Errorf("error on Run: %w", err)
183+
}
184+
185+
statement := migr[:]
186+
187+
ctx := context.Background()
188+
if b.config.StatementTimeout != 0 {
189+
var cancel context.CancelFunc
190+
ctx, cancel = context.WithTimeout(ctx, b.config.StatementTimeout)
191+
defer cancel()
192+
}
193+
194+
query := b.client.Query(string(statement))
195+
query.DefaultDatasetID = b.config.DatasetID
196+
197+
job, err := query.Run(ctx)
198+
if err != nil {
199+
return fmt.Errorf("error on Run: %w", err)
200+
}
201+
202+
_, err = job.Read(ctx)
203+
if err != nil {
204+
var gErr *googleapi.Error
205+
if errors.As(err, &gErr) {
206+
return fmt.Errorf("error on Run: %w\n%s", gErr, string(statement))
207+
}
208+
return fmt.Errorf("error on Run: %w", err)
209+
}
210+
211+
return nil
212+
}
213+
214+
func (b *BigQuery) SetVersion(version int, dirty bool) error {
215+
ctx := context.Background()
216+
217+
query := fmt.Sprintf(`
218+
BEGIN TRANSACTION;
219+
DELETE FROM `+"`%[1]s.%[2]s`"+` WHERE true;
220+
INSERT INTO `+"`%[1]s.%[2]s`"+` (version, dirty) VALUES (%[3]d, %[4]t);
221+
COMMIT TRANSACTION;
222+
`, b.config.DatasetID, b.config.MigrationsTable, version, dirty)
223+
224+
job, err := b.client.Query(query).Run(ctx)
225+
if err != nil {
226+
return fmt.Errorf("error on SetVersion: %w", err)
227+
}
228+
229+
_, err = job.Read(ctx)
230+
if err != nil {
231+
return fmt.Errorf("error on SetVersion: %w", err)
232+
}
233+
234+
return nil
235+
}
236+
237+
func (b *BigQuery) Version() (int, bool, error) {
238+
ctx := context.Background()
239+
240+
it := b.getVersionTable().Read(ctx)
241+
242+
versionInfo := VersionInfo{}
243+
if err := it.Next(&versionInfo); err != nil {
244+
if err.Error() != "no more items in iterator" {
245+
return database.NilVersion, false, fmt.Errorf("error on Version: %w", err)
246+
}
247+
return database.NilVersion, false, nil
248+
}
249+
250+
return versionInfo.Version, versionInfo.Dirty, nil
251+
}
252+
253+
func (b *BigQuery) Drop() error {
254+
ctx := context.Background()
255+
256+
it := b.getDataset().Tables(ctx)
257+
258+
for {
259+
table, err := it.Next()
260+
if err != nil {
261+
if err.Error() == "no more items in iterator" {
262+
break
263+
}
264+
return fmt.Errorf("error on Drop: %w", err)
265+
}
266+
267+
err = table.Delete(ctx)
268+
if err != nil {
269+
return fmt.Errorf("error on Drop: %w", err)
270+
}
271+
}
272+
273+
return nil
274+
}
275+
276+
// ensureVersionTable checks if versions table exists and, if not, creates it.
277+
func (b *BigQuery) ensureVersionTable() (err error) {
278+
if err = b.Lock(); err != nil {
279+
return err
280+
}
281+
282+
defer func() {
283+
if e := b.Unlock(); e != nil {
284+
if err == nil {
285+
err = e
286+
} else {
287+
err = multierror.Append(err, e)
288+
}
289+
}
290+
}()
291+
292+
ctx := context.Background()
293+
294+
table := b.getVersionTable()
295+
296+
// This block checks whether the `MigrationsTable` already exists.
297+
// This is useful because it allows read only users to also check the current version.
298+
md, err := table.Metadata(ctx)
299+
if err != nil {
300+
var gErr *googleapi.Error
301+
if !errors.As(err, &gErr) || gErr.Code != 404 {
302+
return err
303+
}
304+
}
305+
306+
if md != nil {
307+
return nil
308+
}
309+
310+
schema, err := bigquery.InferSchema(VersionInfo{})
311+
if err != nil {
312+
return err
313+
}
314+
315+
md = &bigquery.TableMetadata{Schema: schema}
316+
if err := table.Create(ctx, md); err != nil {
317+
return err
318+
}
319+
320+
return nil
321+
}
322+
323+
func (b *BigQuery) getDataset() *bigquery.Dataset {
324+
return b.client.Dataset(b.config.DatasetID)
325+
}
326+
327+
func (b *BigQuery) getVersionTable() *bigquery.Table {
328+
return b.getDataset().Table(b.config.MigrationsTable)
329+
}

0 commit comments

Comments
 (0)