Skip to content

Commit b5240cd

Browse files
committed
simple connection reusage round robin
1 parent 040a60f commit b5240cd

File tree

1 file changed

+30
-72
lines changed

1 file changed

+30
-72
lines changed

pool.go

Lines changed: 30 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ package rethinkdb
22

33
import (
44
"errors"
5-
"fmt"
6-
"net"
75
"sync"
6+
"sync/atomic"
87

98
"golang.org/x/net/context"
109
"gopkg.in/fatih/pool.v2"
@@ -19,7 +18,8 @@ type Pool struct {
1918
host Host
2019
opts *ConnectOpts
2120

22-
pool pool.Pool
21+
conns []*Connection
22+
pointer int32
2323

2424
mu sync.RWMutex // protects following fields
2525
closed bool
@@ -36,36 +36,31 @@ func NewPool(host Host, opts *ConnectOpts) (*Pool, error) {
3636

3737
maxOpen := opts.MaxOpen
3838
if maxOpen <= 0 {
39-
maxOpen = 2
39+
maxOpen = 1
4040
}
4141

42-
p, err := pool.NewChannelPool(initialCap, maxOpen, func() (net.Conn, error) {
43-
conn, err := NewConnection(host.String(), opts)
42+
conns := make([]*Connection, maxOpen)
43+
var err error
44+
for i := range conns {
45+
conns[i], err = NewConnection(host.String(), opts)
4446
if err != nil {
4547
return nil, err
4648
}
47-
48-
return conn, err
49-
})
50-
if err != nil {
51-
return nil, err
5249
}
5350

5451
return &Pool{
55-
pool: p,
56-
host: host,
57-
opts: opts,
52+
conns: conns,
53+
pointer: -1,
54+
host: host,
55+
opts: opts,
5856
}, nil
5957
}
6058

6159
// Ping verifies a connection to the database is still alive,
6260
// establishing a connection if necessary.
6361
func (p *Pool) Ping() error {
64-
_, pc, err := p.conn()
65-
if err != nil {
66-
return err
67-
}
68-
return pc.Close()
62+
_, _, err := p.conn()
63+
return err
6964
}
7065

7166
// Close closes the database, releasing any open resources.
@@ -79,37 +74,32 @@ func (p *Pool) Close() error {
7974
return nil
8075
}
8176

82-
p.pool.Close()
77+
for _, c := range p.conns {
78+
err := c.Close()
79+
if err != nil {
80+
return err
81+
}
82+
}
8383

8484
return nil
8585
}
8686

8787
func (p *Pool) conn() (*Connection, *pool.PoolConn, error) {
8888
p.mu.RLock()
89-
defer p.mu.RUnlock()
9089

9190
if p.closed {
91+
p.mu.RUnlock()
9292
return nil, nil, errPoolClosed
9393
}
94+
p.mu.RUnlock()
9495

95-
nc, err := p.pool.Get()
96-
if err != nil {
97-
return nil, nil, err
98-
}
99-
100-
pc, ok := nc.(*pool.PoolConn)
101-
if !ok {
102-
// This should never happen!
103-
return nil, nil, fmt.Errorf("Invalid connection in pool")
104-
}
105-
106-
conn, ok := pc.Conn.(*Connection)
107-
if !ok {
108-
// This should never happen!
109-
return nil, nil, fmt.Errorf("Invalid connection in pool")
96+
pos := atomic.AddInt32(&p.pointer, 1)
97+
if pos == int32(len(p.conns)) {
98+
atomic.StoreInt32(&p.pointer, 0)
11099
}
100+
pos = pos % int32(len(p.conns))
111101

112-
return conn, pc, nil
102+
return p.conns[pos], nil, nil
113103
}
114104

115105
// SetInitialPoolCap sets the initial capacity of the connection pool.
@@ -138,67 +128,35 @@ func (p *Pool) SetMaxOpenConns(n int) {
138128

139129
// Exec executes a query without waiting for any response.
140130
func (p *Pool) Exec(ctx context.Context, q Query) error {
141-
c, pc, err := p.conn()
131+
c, _, err := p.conn()
142132
if err != nil {
143133
return err
144134
}
145-
defer pc.Close()
146135

147136
_, _, err = c.Query(ctx, q)
148-
149-
if c.isBad() {
150-
pc.MarkUnusable()
151-
}
152-
153137
return err
154138
}
155139

156140
// Query executes a query and waits for the response
157141
func (p *Pool) Query(ctx context.Context, q Query) (*Cursor, error) {
158-
c, pc, err := p.conn()
142+
c, _, err := p.conn()
159143
if err != nil {
160144
return nil, err
161145
}
162146

163147
_, cursor, err := c.Query(ctx, q)
164-
165-
if err == nil {
166-
cursor.releaseConn = releaseConn(c, pc)
167-
} else {
168-
if c.isBad() {
169-
pc.MarkUnusable()
170-
}
171-
pc.Close()
172-
}
173-
174148
return cursor, err
175149
}
176150

177151
// Server returns the server name and server UUID being used by a connection.
178152
func (p *Pool) Server() (ServerResponse, error) {
179153
var response ServerResponse
180154

181-
c, pc, err := p.conn()
155+
c, _, err := p.conn()
182156
if err != nil {
183157
return response, err
184158
}
185-
defer pc.Close()
186159

187160
response, err = c.Server()
188-
189-
if c.isBad() {
190-
pc.MarkUnusable()
191-
}
192-
193161
return response, err
194162
}
195-
196-
func releaseConn(c *Connection, pc *pool.PoolConn) func() error {
197-
return func() error {
198-
if c.isBad() {
199-
pc.MarkUnusable()
200-
}
201-
202-
return pc.Close()
203-
}
204-
}

0 commit comments

Comments
 (0)