Skip to content

Commit 72df7aa

Browse files
author
Cody Jones
committed
Make Pool safe for use by multiple goroutines
Entries are acquired with "non blocking mutexes" to promote concurrency. Remote queries are not fast compared to the instructions on the local processor. It would be preferable to use sync.Cond so goroutines waiting for a connection could sleep until one becomes available. However, that introduces a race condition I wasn't able to resolve without a (blocking) mutex. The busy wait logic can be overriden by the library user, since since applications have different workloads (batch loading is quite different from individual queries serving a UI!) This way a fixed delay, exponential delay, etc. can be introduced in tandem with delay constants appropriate for each application.
1 parent 3d6444f commit 72df7aa

22 files changed

+595
-166
lines changed

.travis.yml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ go:
77

88
go_import_path: gopkg.in/rethinkdb/rethinkdb-go.v6
99

10-
install: go get -t ./...
11-
1210
before_script:
13-
- source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list
14-
- wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add -
11+
- source /etc/lsb-release && echo "deb https://download.rethinkdb.com/repository/ubuntu-$TRAVIS_DIST $TRAVIS_DIST main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list
12+
- wget -qO- https://download.rethinkdb.com/repository/raw/pubkey.gpg | sudo apt-key add -
1513
- sudo apt-get update
1614
- sudo apt-get install rethinkdb
1715
- rethinkdb > /dev/null 2>&1 &
@@ -20,6 +18,7 @@ before_script:
2018
- rethinkdb --port-offset 3 --directory rethinkdb_data3 --join localhost:29016 > /dev/null 2>&1 &
2119

2220
script:
23-
- go test -race .
24-
- go test -tags='cluster' -short -race -v ./...
25-
- GOMODULE111=off go test .
21+
- GO111MODULE=on go test -race .
22+
- GO111MODULE=on go test -tags='cluster' -short -race -v ./...
23+
- GO111MODULE=on go test .
24+

cluster.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ type Cluster struct {
4646
// NewCluster creates a new cluster by connecting to the given hosts.
4747
func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
4848
c := &Cluster{
49-
hp: newHostPool(opts),
50-
seeds: hosts,
51-
opts: opts,
52-
closed: clusterWorking,
53-
connFactory: NewConnection,
49+
hp: newHostPool(opts),
50+
seeds: hosts,
51+
opts: opts,
52+
closed: clusterWorking,
53+
connFactory: func(host string, opts *ConnectOpts) (connection, error) {
54+
return NewConnection(host, opts)
55+
},
5456
}
5557

5658
err := c.run()
@@ -78,7 +80,8 @@ func (c *Cluster) run() error {
7880
return nil
7981
}
8082

81-
// Query executes a ReQL query using the cluster to connect to the database
83+
// Query executes a ReQL query using the cluster to connect to the database.
84+
// The returned cursor must be closed (either directly or indirectly) when it is no longer needed.
8285
func (c *Cluster) Query(ctx context.Context, q Query) (cursor *Cursor, err error) {
8386
for i := 0; i < c.numRetries(); i++ {
8487
var node *Node

cluster_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ import (
55
"encoding/binary"
66
"encoding/json"
77
"fmt"
8+
"io"
9+
"net"
10+
"time"
11+
812
"github.com/stretchr/testify/mock"
913
test "gopkg.in/check.v1"
1014
"gopkg.in/rethinkdb/rethinkdb-go.v6/encoding"
1115
p "gopkg.in/rethinkdb/rethinkdb-go.v6/ql2"
12-
"io"
13-
"net"
14-
"time"
1516
)
1617

1718
type ClusterSuite struct{}
@@ -360,20 +361,20 @@ type mockDial struct {
360361
}
361362

362363
func mockedConnectionFactory(dial *mockDial) connFactory {
363-
return func(host string, opts *ConnectOpts) (connection *Connection, err error) {
364+
return func(host string, opts *ConnectOpts) (connection, error) {
364365
args := dial.MethodCalled("Dial", host)
365-
err = args.Error(1)
366+
err := args.Error(1)
366367
if err != nil {
367368
return nil, err
368369
}
369370

370-
connection = newConnection(args.Get(0).(net.Conn), host, opts)
371-
done := runConnection(connection)
371+
conn := newConnection(args.Get(0).(net.Conn), host, opts)
372+
done := runConnection(conn)
372373

373374
m := args.Get(0).(*connMock)
374375
m.setDone(done)
375376

376-
return connection, nil
377+
return conn, nil
377378
}
378379
}
379380

connection.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package rethinkdb
22

33
import (
4+
"bytes"
45
"crypto/tls"
56
"encoding/binary"
67
"encoding/json"
78
"fmt"
89
"net"
10+
"sync"
911
"sync/atomic"
1012
"time"
1113

12-
"bytes"
1314
"github.com/opentracing/opentracing-go"
1415
"github.com/opentracing/opentracing-go/ext"
1516
"github.com/opentracing/opentracing-go/log"
1617
"golang.org/x/net/context"
1718
p "gopkg.in/rethinkdb/rethinkdb-go.v6/ql2"
18-
"sync"
1919
)
2020

2121
const (
@@ -61,6 +61,16 @@ type Connection struct {
6161
mu sync.Mutex
6262
}
6363

64+
type connection interface {
65+
Server() (ServerResponse, error)
66+
Query(context.Context, Query) (*Response, *Cursor, error)
67+
Close() error
68+
isBad() bool
69+
isClosed() bool
70+
}
71+
72+
type connFactory func(host string, opts *ConnectOpts) (connection, error)
73+
6474
type responseAndError struct {
6575
response *Response
6676
err error

connection_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ package rethinkdb
33
import (
44
"encoding/binary"
55
"encoding/json"
6+
"io"
7+
"sync"
8+
"time"
9+
610
"github.com/opentracing/opentracing-go"
711
"github.com/opentracing/opentracing-go/mocktracer"
812
"github.com/stretchr/testify/mock"
913
"golang.org/x/net/context"
1014
test "gopkg.in/check.v1"
1115
p "gopkg.in/rethinkdb/rethinkdb-go.v6/ql2"
12-
"io"
13-
"sync"
14-
"time"
1516
)
1617

1718
func runConnection(c *Connection) <-chan struct{} {

cursor.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
var (
1717
errNilCursor = errors.New("cursor is nil")
18-
errCursorClosed = errors.New("connection connClosed, cannot read cursor")
18+
errCursorClosed = errors.New("connection closed, cannot read cursor")
1919
)
2020

2121
func newCursor(ctx context.Context, conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
@@ -61,7 +61,7 @@ func newCursor(ctx context.Context, conn *Connection, cursorType string, token i
6161
type Cursor struct {
6262
releaseConn func() error
6363

64-
conn *Connection
64+
conn connection
6565
connOpts *ConnectOpts
6666
token int64
6767
cursorType string
@@ -120,23 +120,30 @@ func (c *Cursor) Err() error {
120120
}
121121

122122
// Close closes the cursor, preventing further enumeration. If the end is
123-
// encountered, the cursor is connClosed automatically. Close is idempotent.
124-
func (c *Cursor) Close() error {
123+
// encountered, the cursor is closed automatically. Close is idempotent.
124+
func (c *Cursor) Close() (result error) {
125125
if c == nil {
126126
return errNilCursor
127127
}
128128

129129
c.mu.Lock()
130130
defer c.mu.Unlock()
131131

132-
var err error
133-
134-
// If cursor is already connClosed return immediately
132+
// If cursor is already closed return immediately
135133
closed := c.closed
136134
if closed {
137135
return nil
138136
}
139137

138+
if c.releaseConn != nil {
139+
defer func() {
140+
err := c.releaseConn()
141+
if err != nil {
142+
result = err
143+
}
144+
}()
145+
}
146+
140147
// Get connection and check its valid, don't need to lock as this is only
141148
// set when the cursor is created
142149
conn := c.conn
@@ -148,16 +155,12 @@ func (c *Cursor) Close() error {
148155
}
149156

150157
// Stop any unfinished queries
158+
var err error
159+
151160
if !c.finished {
152161
_, _, err = conn.Query(c.ctx, newStopQuery(c.token))
153162
}
154163

155-
if c.releaseConn != nil {
156-
if err := c.releaseConn(); err != nil {
157-
return err
158-
}
159-
}
160-
161164
if span := opentracing.SpanFromContext(c.ctx); span != nil {
162165
span.Finish()
163166
}
@@ -601,7 +604,7 @@ func (c *Cursor) seekCursor(bufferResponse bool) error {
601604
}
602605

603606
// Loop over loading data, applying skips as necessary and loading more data as needed
604-
// until either the cursor is connClosed or finished, or we have applied all outstanding
607+
// until either the cursor is closed or finished, or we have applied all outstanding
605608
// skips and data is available
606609
for {
607610
c.applyPendingSkips(bufferResponse) // if we are buffering the responses, skip can drain from the buffer

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@ require (
1111
github.com/onsi/ginkgo v1.12.0 // indirect
1212
github.com/onsi/gomega v1.9.0 // indirect
1313
github.com/opentracing/opentracing-go v1.1.0
14+
github.com/silentred/gid v1.0.1
1415
github.com/sirupsen/logrus v1.0.6
1516
github.com/stretchr/objx v0.2.0 // indirect
1617
github.com/stretchr/testify v1.5.1
1718
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
1819
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
20+
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
1921
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
2022
gopkg.in/cenkalti/backoff.v2 v2.2.1
2123
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f
2224
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
2325
gopkg.in/yaml.v2 v2.2.8 // indirect
2426
)
2527

26-
go 1.14
28+
go 1.12

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsq
3535
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
3636
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3737
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
38+
github.com/silentred/gid v1.0.1 h1:yiTiT3hpcyOsAU+QyeDjHKAfHxRvayjml9PWxZI80tw=
39+
github.com/silentred/gid v1.0.1/go.mod h1:DMQPn66uY+3ed7rWfzOVET7VbDBAhjz+6AmmlixUK08=
3840
github.com/sirupsen/logrus v1.0.6 h1:hcP1GmhGigz/O7h1WVUM5KklBp1JoNS9FggWKdj/j3s=
3941
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
4042
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -50,6 +52,7 @@ golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPh
5052
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
5153
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
5254
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
55+
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
5356
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
5457
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
5558
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

internal/integration/reql_tests/common.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func runAndAssert(suite suite.Suite, expected, v interface{}, session *r.Session
5353
}
5454

5555
assertExpected(suite, expected, cursor, err)
56+
57+
if cursor != nil {
58+
cursor.Close()
59+
}
5660
}
5761

5862
func fetchAndAssert(suite suite.Suite, expected, result interface{}, count int) {
@@ -81,6 +85,14 @@ func fetchAndAssert(suite suite.Suite, expected, result interface{}, count int)
8185
}
8286

8387
assertExpected(suite, expected, cursor, err)
88+
89+
/* Cursors should ordinarily be closed when they are no longer needed. However, some of the generated
90+
changefeed tests expect the cursor will return additional changes after fetchAndAssert() is called.
91+
This means we must leave test cursors hanging open. Fortunately the generated tests are run in a
92+
single goroutine, otherwise this would be a bigger problem.
93+
94+
// if cursor != nil { cursor.Close() }
95+
*/
8496
}
8597

8698
func maybeLen(v interface{}) interface{} {

internal/integration/reql_tests/reql_changefeeds_edge_test.go

Lines changed: 18 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/integration/reql_tests/reql_changefeeds_idxcopy_test.go

Lines changed: 18 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/integration/reql_tests/reql_changefeeds_include_states_test.go

Lines changed: 18 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)