Skip to content

Commit a4aea25

Browse files
authored
Ensure context isn't exhausted via concurrent query as opposed to sentinel query (#3334)
1 parent e2149b0 commit a4aea25

File tree

2 files changed

+68
-18
lines changed

2 files changed

+68
-18
lines changed

sentinel.go

+49-18
Original file line numberDiff line numberDiff line change
@@ -566,29 +566,60 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
566566
}
567567
}
568568

569-
for i, sentinelAddr := range c.sentinelAddrs {
570-
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
569+
var (
570+
masterAddr string
571+
wg sync.WaitGroup
572+
once sync.Once
573+
errCh = make(chan error, len(c.sentinelAddrs))
574+
)
571575

572-
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
573-
if err != nil {
574-
_ = sentinel.Close()
575-
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
576-
return "", err
577-
}
578-
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
579-
c.opt.MasterName, err)
580-
continue
581-
}
576+
ctx, cancel := context.WithCancel(ctx)
577+
defer cancel()
582578

583-
// Push working sentinel to the top.
584-
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
585-
c.setSentinel(ctx, sentinel)
579+
for i, sentinelAddr := range c.sentinelAddrs {
580+
wg.Add(1)
581+
go func(i int, addr string) {
582+
defer wg.Done()
583+
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
584+
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
585+
if err != nil {
586+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
587+
// Report immediately and return
588+
errCh <- err
589+
return
590+
}
591+
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
592+
addr, c.opt.MasterName, err)
593+
_ = sentinelCli.Close()
594+
return
595+
}
586596

587-
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
588-
return addr, nil
597+
once.Do(func() {
598+
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
599+
// Push working sentinel to the top
600+
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
601+
c.setSentinel(ctx, sentinelCli)
602+
internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
603+
cancel()
604+
})
605+
}(i, sentinelAddr)
589606
}
590607

591-
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
608+
done := make(chan struct{})
609+
go func() {
610+
wg.Wait()
611+
close(done)
612+
}()
613+
614+
select {
615+
case <-done:
616+
if masterAddr != "" {
617+
return masterAddr, nil
618+
}
619+
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
620+
case err := <-errCh:
621+
return "", err
622+
}
592623
}
593624

594625
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {

sentinel_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis_test
33
import (
44
"context"
55
"net"
6+
"time"
67

78
. "github.com/bsm/ginkgo/v2"
89
. "github.com/bsm/gomega"
@@ -32,6 +33,24 @@ var _ = Describe("Sentinel PROTO 2", func() {
3233
})
3334
})
3435

36+
var _ = Describe("Sentinel resolution", func() {
37+
It("should resolve master without context exhaustion", func() {
38+
shortCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
39+
defer cancel()
40+
41+
client := redis.NewFailoverClient(&redis.FailoverOptions{
42+
MasterName: sentinelName,
43+
SentinelAddrs: sentinelAddrs,
44+
MaxRetries: -1,
45+
})
46+
47+
err := client.Ping(shortCtx).Err()
48+
Expect(err).NotTo(HaveOccurred(), "expected master to resolve without context exhaustion")
49+
50+
_ = client.Close()
51+
})
52+
})
53+
3554
var _ = Describe("Sentinel", func() {
3655
var client *redis.Client
3756
var master *redis.Client

0 commit comments

Comments
 (0)