Skip to content

Make Pool safe for use by multiple goroutines #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ go:

go_import_path: gopkg.in/rethinkdb/rethinkdb-go.v6

install: go get -t ./...

before_script:
- source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list
- wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add -
- source /etc/lsb-release && echo "deb https://download.rethinkdb.com/repository/ubuntu-$TRAVIS_DIST $TRAVIS_DIST main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list
- wget -qO- https://download.rethinkdb.com/repository/raw/pubkey.gpg | sudo apt-key add -
- sudo apt-get update
- sudo apt-get install rethinkdb
- rethinkdb > /dev/null 2>&1 &
Expand All @@ -20,6 +18,7 @@ before_script:
- rethinkdb --port-offset 3 --directory rethinkdb_data3 --join localhost:29016 > /dev/null 2>&1 &

script:
- go test -race .
- go test -tags='cluster' -short -race -v ./...
- GOMODULE111=off go test .
- GO111MODULE=on go test -race .
- GO111MODULE=on go test -tags='cluster' -short -race -v ./...
- GO111MODULE=on go test .

15 changes: 9 additions & 6 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ type Cluster struct {
// NewCluster creates a new cluster by connecting to the given hosts.
func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
c := &Cluster{
hp: newHostPool(opts),
seeds: hosts,
opts: opts,
closed: clusterWorking,
connFactory: NewConnection,
hp: newHostPool(opts),
seeds: hosts,
opts: opts,
closed: clusterWorking,
connFactory: func(host string, opts *ConnectOpts) (connection, error) {
return NewConnection(host, opts)
},
}

err := c.run()
Expand Down Expand Up @@ -78,7 +80,8 @@ func (c *Cluster) run() error {
return nil
}

// Query executes a ReQL query using the cluster to connect to the database
// Query executes a ReQL query using the cluster to connect to the database.
// The returned cursor should be closed (either directly or indirectly) when it is no longer needed.
func (c *Cluster) Query(ctx context.Context, q Query) (cursor *Cursor, err error) {
for i := 0; i < c.numRetries(); i++ {
var node *Node
Expand Down
17 changes: 9 additions & 8 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net"
"time"

"github.com/stretchr/testify/mock"
test "gopkg.in/check.v1"
"gopkg.in/rethinkdb/rethinkdb-go.v6/encoding"
p "gopkg.in/rethinkdb/rethinkdb-go.v6/ql2"
"io"
"net"
"time"
)

type ClusterSuite struct{}
Expand Down Expand Up @@ -360,20 +361,20 @@ type mockDial struct {
}

func mockedConnectionFactory(dial *mockDial) connFactory {
return func(host string, opts *ConnectOpts) (connection *Connection, err error) {
return func(host string, opts *ConnectOpts) (connection, error) {
args := dial.MethodCalled("Dial", host)
err = args.Error(1)
err := args.Error(1)
if err != nil {
return nil, err
}

connection = newConnection(args.Get(0).(net.Conn), host, opts)
done := runConnection(connection)
conn := newConnection(args.Get(0).(net.Conn), host, opts)
done := runConnection(conn)

m := args.Get(0).(*connMock)
m.setDone(done)

return connection, nil
return conn, nil
}
}

Expand Down
14 changes: 12 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package rethinkdb

import (
"bytes"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"bytes"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"golang.org/x/net/context"
p "gopkg.in/rethinkdb/rethinkdb-go.v6/ql2"
"sync"
)

const (
Expand Down Expand Up @@ -61,6 +61,16 @@ type Connection struct {
mu sync.Mutex
}

type connection interface {
Server() (ServerResponse, error)
Query(context.Context, Query) (*Response, *Cursor, error)
Close() error
isBad() bool
isClosed() bool
}

type connFactory func(host string, opts *ConnectOpts) (connection, error)

type responseAndError struct {
response *Response
err error
Expand Down
31 changes: 17 additions & 14 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

var (
errNilCursor = errors.New("cursor is nil")
errCursorClosed = errors.New("connection connClosed, cannot read cursor")
errCursorClosed = errors.New("connection closed, cannot read cursor")
)

func newCursor(ctx context.Context, conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
Expand Down Expand Up @@ -61,7 +61,7 @@ func newCursor(ctx context.Context, conn *Connection, cursorType string, token i
type Cursor struct {
releaseConn func() error

conn *Connection
conn connection
connOpts *ConnectOpts
token int64
cursorType string
Expand Down Expand Up @@ -120,23 +120,30 @@ func (c *Cursor) Err() error {
}

// Close closes the cursor, preventing further enumeration. If the end is
// encountered, the cursor is connClosed automatically. Close is idempotent.
func (c *Cursor) Close() error {
// encountered, the cursor is closed automatically. Close is idempotent.
func (c *Cursor) Close() (result error) {
if c == nil {
return errNilCursor
}

c.mu.Lock()
defer c.mu.Unlock()

var err error

// If cursor is already connClosed return immediately
// If cursor is already closed return immediately
closed := c.closed
if closed {
return nil
}

if c.releaseConn != nil {
defer func() {
err := c.releaseConn()
if err != nil {
result = err
}
}()
}

// Get connection and check its valid, don't need to lock as this is only
// set when the cursor is created
conn := c.conn
Expand All @@ -148,16 +155,12 @@ func (c *Cursor) Close() error {
}

// Stop any unfinished queries
var err error

if !c.finished {
_, _, err = conn.Query(c.ctx, newStopQuery(c.token))
}

if c.releaseConn != nil {
if err := c.releaseConn(); err != nil {
return err
}
}

if span := opentracing.SpanFromContext(c.ctx); span != nil {
span.Finish()
}
Expand Down Expand Up @@ -601,7 +604,7 @@ func (c *Cursor) seekCursor(bufferResponse bool) error {
}

// Loop over loading data, applying skips as necessary and loading more data as needed
// until either the cursor is connClosed or finished, or we have applied all outstanding
// until either the cursor is closed or finished, or we have applied all outstanding
// skips and data is available
for {
c.applyPendingSkips(bufferResponse) // if we are buffering the responses, skip can drain from the buffer
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ require (
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect
github.com/opentracing/opentracing-go v1.1.0
github.com/silentred/gid v1.0.1
github.com/sirupsen/logrus v1.0.6
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
gopkg.in/cenkalti/backoff.v2 v2.2.1
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)

go 1.14
go 1.12
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsq
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/silentred/gid v1.0.1 h1:yiTiT3hpcyOsAU+QyeDjHKAfHxRvayjml9PWxZI80tw=
github.com/silentred/gid v1.0.1/go.mod h1:DMQPn66uY+3ed7rWfzOVET7VbDBAhjz+6AmmlixUK08=
github.com/sirupsen/logrus v1.0.6 h1:hcP1GmhGigz/O7h1WVUM5KklBp1JoNS9FggWKdj/j3s=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -50,6 +52,7 @@ golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
14 changes: 14 additions & 0 deletions internal/integration/reql_tests/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func runAndAssert(suite suite.Suite, expected, v interface{}, session *r.Session
}

assertExpected(suite, expected, cursor, err)

if cursor != nil {
cursor.Close()
}
}

func fetchAndAssert(suite suite.Suite, expected, result interface{}, count int) {
Expand Down Expand Up @@ -81,6 +85,16 @@ func fetchAndAssert(suite suite.Suite, expected, result interface{}, count int)
}

assertExpected(suite, expected, cursor, err)

if cursor != nil {
/* Cursors should ordinarily be closed when they are no longer needed, otherwise an application using
multiple goroutines might hang waiting for a free connection in the pool. However, some of the
generated changefeed tests expect the cursor will return additional changes after fetchAndAssert()
is called. This means we must leave test cursors hanging open for these tests to pass.
*/

//cursor.Close()
}
}

func maybeLen(v interface{}) interface{} {
Expand Down
8 changes: 8 additions & 0 deletions internal/integration/reql_tests/reql_changefeeds_edge_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion internal/integration/reql_tests/template.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,17 @@ func (suite *${module_name}Suite) TearDownSuite() {
suite.T().Log("Tearing down ${module_name}Suite")

if suite.session != nil {
/* TearDownSuite() will block if a cursor is left open by a test, because:
* the pool only contains one connection by default,
* that connection can only be used by the goroutine that acquired it (until it's released), and
* stretchr/testify/suite runs SetupTest() and TestCases() in a different goroutine than TearDownSuite().
*/
err := suite.session.Reconnect()
suite.Require().NoError(err, "Error returned when reconnecting to server")

r.DB("rethinkdb").Table("_debug_scratch").Delete().Exec(suite.session)
%for var_name in table_var_names:
r.DB("test").TableDrop("${var_name}").Exec(suite.session)
r.DB("test").TableDrop("${var_name}").Exec(suite.session)
%endfor
r.DBDrop("test").Exec(suite.session)

Expand Down
Loading