Skip to content

Commit 010c934

Browse files
committed
Merge branch 'v8.11.5-enhanced' into 'v8'
V8.11.5 enhanced See merge request dbops/Redis!1
2 parents cae6772 + f33ae11 commit 010c934

11 files changed

Lines changed: 2536 additions & 24 deletions

cluster.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type ClusterOptions struct {
7474
// PoolSize applies per cluster node and not for the whole cluster.
7575
PoolSize int
7676
MinIdleConns int
77+
MaxIdleConns int
7778
MaxConnAge time.Duration
7879
PoolTimeout time.Duration
7980
IdleTimeout time.Duration
@@ -226,6 +227,10 @@ func (n *clusterNode) MarkAsFailing() {
226227
}
227228

228229
func (n *clusterNode) Failing() bool {
230+
if !n.Client.Allow() {
231+
return true
232+
}
233+
229234
const timeout = 15 // 15 seconds
230235

231236
failing := atomic.LoadUint32(&n.failing)
@@ -1068,7 +1073,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
10681073
}
10691074

10701075
for _, node := range nodes {
1071-
_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
1076+
_, err := node.Client.connPool.(pool.Reaper).ReapStaleConns()
10721077
if err != nil {
10731078
internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
10741079
}

cluster_dynamic.go

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package redis
2+
3+
import (
4+
"crypto/tls"
5+
"runtime"
6+
"time"
7+
8+
"github.com/go-redis/redis/v8/internal"
9+
)
10+
11+
// NewDynamicClusterClient is similar to NewClusterClient, but it supports dynamic connection pool management
12+
// in exiting clients if NewClient option is not specific in ClusterOptions
13+
func NewDynamicClusterClient(opt *ClusterOptions) *ClusterClient {
14+
if opt.NewClient == nil {
15+
opt.NewClient = NewDynamicClient
16+
}
17+
18+
return NewClusterClient(opt)
19+
}
20+
21+
func (c *ClusterClient) SetMaxRedirects(maxRedirects int) {
22+
if maxRedirects == -1 {
23+
maxRedirects = 0
24+
} else if maxRedirects == 0 {
25+
maxRedirects = 3
26+
}
27+
28+
c.opt.MaxRedirects = maxRedirects
29+
}
30+
31+
func (c *ClusterClient) SetReadOnly(readOnly bool) {
32+
c.opt.ReadOnly = readOnly
33+
}
34+
35+
func (c *ClusterClient) SetRouteByLatency(routeByLatency bool) {
36+
if routeByLatency {
37+
c.opt.ReadOnly = true
38+
for _, node := range c.nodes.nodes {
39+
go node.updateLatency()
40+
}
41+
}
42+
c.opt.RouteByLatency = routeByLatency
43+
}
44+
45+
func (c *ClusterClient) SetRouteRandomly(routeRandomly bool) {
46+
if routeRandomly {
47+
c.opt.ReadOnly = true
48+
}
49+
c.opt.RouteRandomly = routeRandomly
50+
}
51+
52+
func (c *ClusterClient) SetUsername(username string) {
53+
if err := c.applyUpdateFn(func(client *Client) {
54+
client.SetUsername(username)
55+
}); err != nil {
56+
internal.Logger.Printf(c.Context(), "SetUsername failed: %s", err)
57+
return
58+
}
59+
60+
c.opt.Username = username
61+
}
62+
63+
func (c *ClusterClient) SetPassword(password string) {
64+
if err := c.applyUpdateFn(func(client *Client) {
65+
client.SetPassword(password)
66+
}); err != nil {
67+
internal.Logger.Printf(c.Context(), "SetPassword failed: %s", err)
68+
return
69+
}
70+
71+
c.opt.Password = password
72+
}
73+
74+
func (c *ClusterClient) SetMaxRetries(maxRetries int) {
75+
if maxRetries == 0 {
76+
maxRetries = -1
77+
}
78+
79+
if err := c.applyUpdateFn(func(client *Client) {
80+
client.SetMaxRetries(maxRetries)
81+
}); err != nil {
82+
internal.Logger.Printf(c.Context(), "SetMaxRetries failed: %s", err)
83+
return
84+
}
85+
86+
c.opt.MaxRetries = maxRetries
87+
}
88+
89+
func (c *ClusterClient) SetMinRetryBackoff(minRetryBackoff time.Duration) {
90+
if minRetryBackoff == -1 {
91+
minRetryBackoff = 0
92+
} else if minRetryBackoff == 0 {
93+
minRetryBackoff = 8 * time.Millisecond
94+
}
95+
96+
if err := c.applyUpdateFn(func(client *Client) {
97+
client.SetMinRetryBackoff(minRetryBackoff)
98+
}); err != nil {
99+
internal.Logger.Printf(c.Context(), "SetMinRetryBackoff failed: %s", err)
100+
return
101+
}
102+
103+
c.opt.MinRetryBackoff = minRetryBackoff
104+
}
105+
106+
func (c *ClusterClient) SetMaxRetryBackoff(maxRetryBackoff time.Duration) {
107+
if maxRetryBackoff == -1 {
108+
maxRetryBackoff = 0
109+
} else if maxRetryBackoff == 0 {
110+
maxRetryBackoff = 512 * time.Millisecond
111+
}
112+
113+
if err := c.applyUpdateFn(func(client *Client) {
114+
client.SetMaxRetryBackoff(maxRetryBackoff)
115+
}); err != nil {
116+
internal.Logger.Printf(c.Context(), "SetMaxRetryBackoff failed: %s", err)
117+
return
118+
}
119+
120+
c.opt.MaxRetryBackoff = maxRetryBackoff
121+
}
122+
123+
func (c *ClusterClient) SetDialTimeout(dialTimeout time.Duration) {
124+
if err := c.applyUpdateFn(func(client *Client) {
125+
client.SetDialTimeout(dialTimeout)
126+
}); err != nil {
127+
internal.Logger.Printf(c.Context(), "SetDialTimeout failed: %s", err)
128+
return
129+
}
130+
131+
c.opt.DialTimeout = dialTimeout
132+
}
133+
134+
func (c *ClusterClient) SetReadTimeout(readTimeout time.Duration) {
135+
if readTimeout == -1 {
136+
readTimeout = 0
137+
} else if readTimeout == 0 {
138+
readTimeout = 3 * time.Second
139+
}
140+
141+
if err := c.applyUpdateFn(func(client *Client) {
142+
client.SetReadTimeout(readTimeout)
143+
}); err != nil {
144+
internal.Logger.Printf(c.Context(), "SetReadTimeout failed: %s", err)
145+
return
146+
}
147+
148+
c.opt.ReadTimeout = readTimeout
149+
}
150+
151+
func (c *ClusterClient) SetWriteTimeout(writeTimeout time.Duration) {
152+
if writeTimeout == -1 {
153+
writeTimeout = 0
154+
} else if writeTimeout == 0 {
155+
writeTimeout = c.opt.ReadTimeout
156+
}
157+
158+
if err := c.applyUpdateFn(func(client *Client) {
159+
client.SetWriteTimeout(writeTimeout)
160+
}); err != nil {
161+
internal.Logger.Printf(c.Context(), "SetWriteTimeout failed: %s", err)
162+
return
163+
}
164+
165+
c.opt.WriteTimeout = writeTimeout
166+
}
167+
168+
func (c *ClusterClient) SetPoolFIFO(poolFIFO bool) {
169+
if err := c.applyUpdateFn(func(client *Client) {
170+
client.SetPoolFIFO(poolFIFO)
171+
}); err != nil {
172+
internal.Logger.Printf(c.Context(), "SetPoolFIFO failed: %s", err)
173+
return
174+
}
175+
176+
c.opt.PoolFIFO = poolFIFO
177+
}
178+
179+
func (c *ClusterClient) SetPoolSize(poolSize int) {
180+
if poolSize == 0 {
181+
poolSize = 5 * runtime.GOMAXPROCS(0)
182+
}
183+
184+
if err := c.applyUpdateFn(func(client *Client) {
185+
client.SetPoolSize(poolSize)
186+
}); err != nil {
187+
internal.Logger.Printf(c.Context(), "SetPoolSize failed: %s", err)
188+
return
189+
}
190+
191+
c.opt.PoolSize = poolSize
192+
}
193+
194+
func (c *ClusterClient) SetMinIdleConns(minIdleConns int) {
195+
if err := c.applyUpdateFn(func(client *Client) {
196+
client.SetMinIdleConns(minIdleConns)
197+
}); err != nil {
198+
internal.Logger.Printf(c.Context(), "SetMinIdleConns failed: %s", err)
199+
return
200+
}
201+
202+
c.opt.MinIdleConns = minIdleConns
203+
}
204+
205+
func (c *ClusterClient) SetMaxIdleConns(maxIdleConns int) {
206+
if err := c.applyUpdateFn(func(client *Client) {
207+
client.SetMaxIdleConns(maxIdleConns)
208+
}); err != nil {
209+
internal.Logger.Printf(c.Context(), "SetMaxIdleConns failed: %s", err)
210+
return
211+
}
212+
213+
c.opt.MaxIdleConns = maxIdleConns
214+
}
215+
216+
func (c *ClusterClient) SetMaxConnAge(maxConnAge time.Duration) {
217+
if err := c.applyUpdateFn(func(client *Client) {
218+
client.SetMaxConnAge(maxConnAge)
219+
}); err != nil {
220+
internal.Logger.Printf(c.Context(), "SetMaxConnAge failed: %s", err)
221+
return
222+
}
223+
224+
c.opt.MaxConnAge = maxConnAge
225+
}
226+
227+
func (c *ClusterClient) SetPoolTimeout(poolTimeout time.Duration) {
228+
if err := c.applyUpdateFn(func(client *Client) {
229+
client.SetPoolTimeout(poolTimeout)
230+
}); err != nil {
231+
internal.Logger.Printf(c.Context(), "SetPoolTimeout failed: %s", err)
232+
return
233+
}
234+
235+
c.opt.PoolTimeout = poolTimeout
236+
}
237+
238+
func (c *ClusterClient) SetIdleTimeout(idleTimeout time.Duration) {
239+
if err := c.applyUpdateFn(func(client *Client) {
240+
client.SetIdleTimeout(idleTimeout)
241+
}); err != nil {
242+
internal.Logger.Printf(c.Context(), "SetIdleTimeout failed: %s", err)
243+
return
244+
}
245+
246+
c.opt.IdleTimeout = idleTimeout
247+
}
248+
249+
func (c *ClusterClient) SetIdleCheckFrequency(idleCheckFrequency time.Duration) {
250+
if err := c.applyUpdateFn(func(client *Client) {
251+
client.SetIdleCheckFrequency(idleCheckFrequency)
252+
}); err != nil {
253+
internal.Logger.Printf(c.Context(), "SetIdleCheckFrequency failed: %s", err)
254+
return
255+
}
256+
257+
c.opt.IdleCheckFrequency = idleCheckFrequency
258+
}
259+
260+
func (c *ClusterClient) SetTLSConfig(tlsConfig *tls.Config) {
261+
if err := c.applyUpdateFn(func(client *Client) {
262+
client.SetTLSConfig(tlsConfig)
263+
}); err != nil {
264+
internal.Logger.Printf(c.Context(), "SetTLSConfig failed: %s", err)
265+
return
266+
}
267+
268+
c.opt.TLSConfig = tlsConfig
269+
}
270+
271+
func (c *ClusterClient) applyUpdateFn(fn func(client *Client)) error {
272+
nodes, err := c.nodes.All()
273+
if err != nil {
274+
return err
275+
}
276+
277+
for _, node := range nodes {
278+
fn(node.Client)
279+
}
280+
return nil
281+
}

cluster_dynamic_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package redis_test
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
"github.com/go-redis/redis/v8"
9+
. "github.com/onsi/ginkgo"
10+
. "github.com/onsi/gomega"
11+
)
12+
13+
var _ = Describe("Dynamic ClusterClient", func() {
14+
var client *redis.ClusterClient
15+
16+
Context("retry", func() {
17+
timePingRequest := func() time.Duration {
18+
now := time.Now()
19+
err := client.Ping(ctx).Err()
20+
Expect(err).To(HaveOccurred())
21+
return time.Since(now)
22+
}
23+
24+
const normalRedirects, normalMinBackoff, normalMaxBackoff = 3, time.Millisecond * 8, time.Millisecond * 512
25+
26+
BeforeEach(func() {
27+
client = redis.NewDynamicClusterClient(&redis.ClusterOptions{
28+
Addrs: []string{":1234"},
29+
MaxRedirects: normalRedirects,
30+
MinRetryBackoff: normalMinBackoff,
31+
MaxRetryBackoff: normalMaxBackoff,
32+
ClusterSlots: func(ctx context.Context) ([]redis.ClusterSlot, error) {
33+
return []redis.ClusterSlot{{
34+
Start: 0,
35+
End: 16383,
36+
Nodes: []redis.ClusterNode{{
37+
Addr: ":1234",
38+
},
39+
}}}, nil
40+
},
41+
})
42+
})
43+
44+
AfterEach(func() {
45+
Expect(client.Close()).NotTo(HaveOccurred())
46+
})
47+
48+
It("maxRedirects update", func() {
49+
elapseNormalRedirect := timePingRequest()
50+
51+
client.SetMaxRedirects(-1)
52+
elapseNoRedirects := timePingRequest()
53+
Expect(elapseNormalRedirect).To(BeNumerically(">", elapseNoRedirects, 10*time.Millisecond))
54+
55+
client.SetMaxRedirects(normalRedirects * 2)
56+
elapseMoreRedirects := timePingRequest()
57+
Expect(elapseMoreRedirects).To(BeNumerically(">", elapseNormalRedirect, 10*time.Millisecond))
58+
59+
log.Println("redirect elapsed", elapseNoRedirects, elapseNormalRedirect, elapseMoreRedirects)
60+
})
61+
62+
It("retries backoff update", func() {
63+
elapseNormalRedirect := timePingRequest()
64+
65+
client.SetMinRetryBackoff(normalMinBackoff / 2)
66+
client.SetMaxRetryBackoff(normalMaxBackoff / 2)
67+
elapseLessRetryBackoff := timePingRequest()
68+
69+
client.SetMinRetryBackoff(normalMinBackoff * 2)
70+
client.SetMaxRetryBackoff(normalMaxBackoff * 2)
71+
elapseMoreRedirectBackoff := timePingRequest()
72+
73+
log.Println("backoff elapsed", elapseLessRetryBackoff, elapseNormalRedirect, elapseMoreRedirectBackoff)
74+
Expect(elapseNormalRedirect).To(BeNumerically(">", elapseLessRetryBackoff, 10*time.Millisecond))
75+
Expect(elapseMoreRedirectBackoff).To(BeNumerically(">", elapseNormalRedirect, 10*time.Millisecond))
76+
})
77+
})
78+
})

0 commit comments

Comments
 (0)