Skip to content

Commit 94b9cf3

Browse files
author
Runze Cui
committed
Add support for cluster mode client and pool unit test
1 parent d424c75 commit 94b9cf3

File tree

5 files changed

+71
-25
lines changed

5 files changed

+71
-25
lines changed

cluster.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,6 @@ func (opt *ClusterOptions) init() {
128128
opt.MaxRetryBackoff = 512 * time.Millisecond
129129
}
130130

131-
if opt.ConnReqQueueSize <= 0 {
132-
// If ConnReqQueueSize is less than or equal to 0, set it to 1,000,000
133-
opt.ConnReqQueueSize = 1000000
134-
}
135-
if opt.ConnReqQueueSize <= 100*opt.PoolSize {
136-
// If ConnReqQueueSize is less than or equal to 100 times PoolSize, set it to 100 times PoolSize
137-
opt.ConnReqQueueSize = 100 * opt.PoolSize
138-
}
139-
140131
if opt.NewClient == nil {
141132
opt.NewClient = NewClient
142133
}
@@ -167,6 +158,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
167158
PoolTimeout: opt.PoolTimeout,
168159
IdleTimeout: opt.IdleTimeout,
169160
IdleCheckFrequency: disableIdleCheck,
161+
ConnReqQueueSize: opt.ConnReqQueueSize,
170162

171163
TLSConfig: opt.TLSConfig,
172164
// If ClusterSlots is populated, then we probably have an artificial

internal/pool/pool_dynamic.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
)
1111

1212
const (
13+
// connReqsQueueSize: buffered chan size of connection opener, this value should be larger than the maximum typical
14+
// value used for poolSize, otherwise it might block ALL calls in Pool until pending connection request is satisfied
15+
connReqsQueueSize = 1000000
16+
1317
minIdleCheckFrequency = time.Second
1418

1519
// tryDialFrequency: frequency used to try dial underlying resource to check if it is recovered after a consecutive
@@ -62,10 +66,19 @@ type DynamicConnPool struct {
6266
var _ DynamicPooler = (*DynamicConnPool)(nil)
6367

6468
func NewDynamicConnPool(opt *Options) *DynamicConnPool {
69+
connReqQueueSize := opt.ConnReqQueueSize
70+
if connReqQueueSize <= 0 {
71+
connReqQueueSize = connReqsQueueSize
72+
}
73+
// the buffer size should be at least 100x of the pool size to handle spiking
74+
if connReqQueueSize <= 100*opt.PoolSize {
75+
connReqQueueSize = 100 * opt.PoolSize
76+
}
77+
6578
p := &DynamicConnPool{
6679
opt: opt,
6780
closedCh: make(chan struct{}),
68-
connReqs: make(chan ctxConnChan, opt.ConnReqQueueSize),
81+
connReqs: make(chan ctxConnChan, connReqQueueSize),
6982
}
7083

7184
p.connsMu.Lock()
@@ -198,10 +211,12 @@ func (p *DynamicConnPool) IdleLen() int {
198211
return p.idleConnsLen
199212
}
200213

214+
// QueueLen returns number of connections in the queue
201215
func (p *DynamicConnPool) QueueLen() int {
202216
return len(p.connReqs)
203217
}
204218

219+
// QueueCap returns the capacity of the queue
205220
func (p *DynamicConnPool) QueueCap() int {
206221
return cap(p.connReqs)
207222
}

internal/pool/pool_dynamic_test.go

+45-5
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ import (
1313
"gitlab.myteksi.net/dbops/Redis/v8/internal/pool"
1414
)
1515

16-
func assertPoolStats(pooler pool.DynamicPooler, hits, misses, timeouts, totals, idles, stales uint32) {
16+
func assertPoolStats(pooler pool.DynamicPooler, hits, misses, timeouts, totals, idles, stales, capacity uint32) {
1717
Expect(pooler.Stats()).To(Equal(&pool.Stats{
1818
Hits: hits,
1919
Misses: misses,
2020
Timeouts: timeouts,
2121
TotalConns: totals,
2222
IdleConns: idles,
2323
StaleConns: stales,
24+
QueueCap: capacity,
2425
}))
2526
}
2627

@@ -63,21 +64,60 @@ var _ = Describe("ConnPool", func() {
6364
PoolTimeout: time.Hour,
6465
IdleTimeout: time.Millisecond,
6566
IdleCheckFrequency: time.Millisecond * 500,
67+
ConnReqQueueSize: 1000000,
6668
})
6769
wg.Wait()
6870

6971
// MinIdleConns should optimistically increase poolSize though actual conn creation is pending
70-
assertPoolStats(connPool, 0, 0, 0, 10, 10, 0)
72+
assertPoolStats(connPool, 0, 0, 0, 10, 10, 0, 1000000)
7173

7274
// no error since no idle conn queued yet during pool close and stats remains
7375
Expect(connPool.Close()).NotTo(HaveOccurred())
74-
assertPoolStats(connPool, 0, 0, 0, 10, 10, 0)
76+
assertPoolStats(connPool, 0, 0, 0, 10, 10, 0, 1000000)
7577

7678
close(closedChan) // release the channel to make all dail success
7779

7880
// wait for 5ms and all conn put back idle list should fail and decrease pool size
7981
time.Sleep(time.Millisecond * 5)
80-
assertPoolStats(connPool, 0, 0, 0, 0, 0, 0)
82+
assertPoolStats(connPool, 0, 0, 0, 0, 0, 0, 1000000)
83+
})
84+
85+
It("should create idle connections then safe close and queue size is bigger than 100x of pool size", func() {
86+
const minIdleConns = 10
87+
88+
var (
89+
wg sync.WaitGroup
90+
closedChan = make(chan struct{})
91+
)
92+
wg.Add(minIdleConns)
93+
connPool = pool.NewDynamicConnPool(&pool.Options{
94+
Dialer: func(ctx context.Context) (net.Conn, error) {
95+
wg.Done()
96+
<-closedChan
97+
return &net.TCPConn{}, nil
98+
},
99+
PoolSize: 10,
100+
MinIdleConns: minIdleConns,
101+
MaxIdleConns: 10,
102+
PoolTimeout: time.Hour,
103+
IdleTimeout: time.Millisecond,
104+
IdleCheckFrequency: time.Millisecond * 500,
105+
ConnReqQueueSize: 100,
106+
})
107+
wg.Wait()
108+
109+
// MinIdleConns should optimistically increase poolSize though actual conn creation is pending
110+
assertPoolStats(connPool, 0, 0, 0, 10, 10, 0, 1000)
111+
112+
// no error since no idle conn queued yet during pool close and stats remains
113+
Expect(connPool.Close()).NotTo(HaveOccurred())
114+
assertPoolStats(connPool, 0, 0, 0, 10, 10, 0, 1000)
115+
116+
close(closedChan) // release the channel to make all dail success
117+
118+
// wait for 5ms and all conn put back idle list should fail and decrease pool size
119+
time.Sleep(time.Millisecond * 5)
120+
assertPoolStats(connPool, 0, 0, 0, 0, 0, 0, 1000)
81121
})
82122

83123
It("should unblock client when conn is removed", func() {
@@ -642,7 +682,7 @@ var _ = Describe("dynamic update", func() {
642682
getConnsExpectNoErr(connPool, minIdleConns-1) // use all connections
643683

644684
// all get request should hit and use up all idle connections
645-
assertPoolStats(connPool, minIdleConns-1, 0, 0, minIdleConns-1, 0, 0)
685+
assertPoolStats(connPool, minIdleConns-1, 0, 0, minIdleConns-1, 0, 0, 1000000)
646686

647687
done := make(chan struct{})
648688
go func() {

options.go

+1-10
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ type Options struct {
115115
// Buffered chan size of connection opener, this value should be larger than the maximum typical
116116
// value used for poolSize, otherwise it might block ALL calls in Pool until pending connection request is satisfied
117117
// Default is 1,000,000 and minimum is 100x of the pool size
118-
// Expect to see memory usage increase when queue size is increasing
118+
// Expect memory usage increase when queue size is increasing, no dynamic reloading allowed
119119
ConnReqQueueSize int
120120

121121
// Enables read only queries on slave nodes.
@@ -202,15 +202,6 @@ func (opt *Options) init() {
202202
case 0:
203203
opt.MaxRetryBackoff = 512 * time.Millisecond
204204
}
205-
206-
if opt.ConnReqQueueSize <= 0 {
207-
// If ConnReqQueueSize is less than or equal to 0, set it to 1,000,000
208-
opt.ConnReqQueueSize = 1000000
209-
}
210-
if opt.ConnReqQueueSize <= 100*opt.PoolSize {
211-
// If ConnReqQueueSize is less than or equal to 100 times PoolSize, set it to 100 times PoolSize
212-
opt.ConnReqQueueSize = 100 * opt.PoolSize
213-
}
214205
}
215206

216207
func (opt *Options) clone() *Options {

redis_dynamic_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ var limiterError = fmt.Errorf("limiter error")
164164
type errorLimiter struct {
165165
}
166166

167+
func (l *errorLimiter) IsCBOpen() bool {
168+
return true
169+
}
170+
167171
func (l *errorLimiter) Execute(f func() error) error {
168172
return nil
169173
}
@@ -181,6 +185,10 @@ type normalLimiter struct {
181185
errors []error
182186
}
183187

188+
func (l *normalLimiter) IsCBOpen() bool {
189+
return false
190+
}
191+
184192
func (l *normalLimiter) Execute(f func() error) error {
185193
l.count++
186194
return f()

0 commit comments

Comments
 (0)