Skip to content

Commit 20c651d

Browse files
authored
fix: don't send CLIENT CACHING YES in BCAST and OPTOUT mode (#801)
Signed-off-by: Rueian <[email protected]>
1 parent f56a674 commit 20c651d

File tree

8 files changed

+260
-10
lines changed

8 files changed

+260
-10
lines changed

client_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"strings"
1111
"testing"
1212
"time"
13+
14+
"github.com/redis/rueidis/internal/cmds"
1315
)
1416

1517
type mockConn struct {
@@ -191,6 +193,10 @@ func (m *mockConn) Addr() string {
191193
return ""
192194
}
193195

196+
func (m *mockConn) OptInCmd() cmds.Completed {
197+
return cmds.OptInCmd
198+
}
199+
194200
func TestNewSingleClientNoNode(t *testing.T) {
195201
defer ShouldNotLeaked(SetupLeakDetection())
196202
if _, err := newSingleClient(

cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ func askingMultiCache(cc conn, ctx context.Context, multi []CacheableTTL) *redis
899899
commands := make([]Completed, 0, len(multi)*6)
900900
for _, cmd := range multi {
901901
ck, _ := cmds.CacheKey(cmd.Cmd)
902-
commands = append(commands, cmds.OptInCmd, cmds.AskingCmd, cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(cmd.Cmd), cmds.ExecCmd)
902+
commands = append(commands, cc.OptInCmd(), cmds.AskingCmd, cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(cmd.Cmd), cmds.ExecCmd)
903903
}
904904
results := resultsp.Get(0, len(multi))
905905
resps := cc.DoMulti(ctx, commands...)

internal/cmds/cmds.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ const (
77
blockTag = uint16(1 << 14)
88
readonly = uint16(1 << 13)
99
noRetTag = uint16(1<<12) | readonly | pipeTag // make noRetTag can also be retried and auto pipelining
10-
mtGetTag = uint16(1<<11) | readonly // make mtGetTag can also be retried
11-
scrRoTag = uint16(1<<10) | readonly // make scrRoTag can also be retried
10+
mtGetTag = uint16(1<<11) | readonly // make mtGetTag can also be retried
11+
scrRoTag = uint16(1<<10) | readonly // make scrRoTag can also be retried
1212
unsubTag = uint16(1<<9) | noRetTag
13-
pipeTag = uint16(1<<8) // make blocking mode request can use auto pipelining
13+
pipeTag = uint16(1 << 8) // make blocking mode request can use auto pipelining
1414
// InitSlot indicates that the command be sent to any redis node in cluster
1515
InitSlot = uint16(1 << 14)
1616
// NoSlot indicates that the command has no key slot specified
@@ -23,6 +23,11 @@ var (
2323
cs: newCommandSlice([]string{"CLIENT", "CACHING", "YES"}),
2424
cf: optInTag,
2525
}
26+
// OptInNopCmd is a predefined alternative for CLIENT CACHING YES in BCAST/OPTOUT mode.
27+
OptInNopCmd = Completed{
28+
cs: newCommandSlice([]string{"ECHO", ""}),
29+
cf: optInTag,
30+
}
2631
// MultiCmd is predefined MULTI
2732
MultiCmd = Completed{
2833
cs: newCommandSlice([]string{"MULTI"}),

internal/cmds/cmds_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ func TestCompleted_IsOptIn(t *testing.T) {
9090
if cmd := OptInCmd; !cmd.IsOptIn() {
9191
t.Fatalf("should be opt in command")
9292
}
93+
if cmd := OptInNopCmd; !cmd.IsOptIn() {
94+
t.Fatalf("should be opt in command")
95+
}
9396
}
9497

9598
func TestCompleted_NoReply(t *testing.T) {

mux.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"net"
66
"runtime"
7+
"strings"
78
"sync"
89
"sync/atomic"
910
"time"
@@ -41,6 +42,7 @@ type conn interface {
4142
Store(w wire)
4243
Addr() string
4344
SetOnCloseHook(func(error))
45+
OptInCmd() cmds.Completed
4446
}
4547

4648
var _ conn = (*mux)(nil)
@@ -60,6 +62,7 @@ type mux struct {
6062
maxm int
6163

6264
usePool bool
65+
optin bool
6366
}
6467

6568
func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
@@ -95,6 +98,7 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
9598
maxm: option.BlockingPipeline,
9699

97100
usePool: option.DisableAutoPipelining,
101+
optin: isOptIn(option.ClientTrackingOptions),
98102
}
99103
m.clhks.Store(emptyclhks)
100104
for i := 0; i < len(m.wire); i++ {
@@ -106,6 +110,22 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
106110
return m
107111
}
108112

113+
func isOptIn(opts []string) bool {
114+
for _, opt := range opts {
115+
if opt := strings.ToUpper(opt); opt == "BCAST" || opt == "OPTOUT" {
116+
return false
117+
}
118+
}
119+
return true
120+
}
121+
122+
func (m *mux) OptInCmd() cmds.Completed {
123+
if m.optin {
124+
return cmds.OptInCmd
125+
}
126+
return cmds.OptInNopCmd
127+
}
128+
109129
func (m *mux) SetOnCloseHook(fn func(error)) {
110130
m.clhks.Store(fn)
111131
}

mux_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,26 @@ func TestMuxAddr(t *testing.T) {
126126
}
127127
}
128128

129+
func TestMuxOptInCmd(t *testing.T) {
130+
defer ShouldNotLeaked(SetupLeakDetection())
131+
132+
if m := makeMux("dst1", &ClientOption{
133+
ClientTrackingOptions: []string{"OPTOUT"},
134+
}, nil); m.OptInCmd() != cmds.OptInNopCmd {
135+
t.Fatalf("unexpected OptInCmd")
136+
}
137+
if m := makeMux("dst1", &ClientOption{
138+
ClientTrackingOptions: []string{"PREFIX", "a", "BCAST"},
139+
}, nil); m.OptInCmd() != cmds.OptInNopCmd {
140+
t.Fatalf("unexpected OptInCmd")
141+
}
142+
if m := makeMux("dst1", &ClientOption{
143+
ClientTrackingOptions: nil,
144+
}, nil); m.OptInCmd() != cmds.OptInCmd {
145+
t.Fatalf("unexpected OptInCmd")
146+
}
147+
}
148+
129149
func TestMuxDialSuppress(t *testing.T) {
130150
defer ShouldNotLeaked(SetupLeakDetection())
131151
var wires, waits, done int64

pipe.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type pipe struct {
8989
recvs int32
9090
r2ps bool // identify this pipe is used for resp2 pubsub or not
9191
noNoDelay bool
92+
optin bool
9293
}
9394

9495
type pipeFn func(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error)
@@ -116,7 +117,8 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
116117
maxFlushDelay: option.MaxFlushDelay,
117118
noNoDelay: option.DisableTCPNoDelay,
118119

119-
r2ps: r2ps,
120+
r2ps: r2ps,
121+
optin: isOptIn(option.ClientTrackingOptions),
120122
}
121123
if !nobg {
122124
p.queue = newRing(option.RingScaleEachConn)
@@ -1300,6 +1302,13 @@ next:
13001302
return m, nil
13011303
}
13021304

1305+
func (p *pipe) optInCmd() cmds.Completed {
1306+
if p.optin {
1307+
return cmds.OptInCmd
1308+
}
1309+
return cmds.OptInNopCmd
1310+
}
1311+
13031312
func (p *pipe) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) RedisResult {
13041313
if p.cache == nil {
13051314
return p.Do(ctx, Completed(cmd))
@@ -1319,7 +1328,7 @@ func (p *pipe) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) Re
13191328
}
13201329
resp := p.DoMulti(
13211330
ctx,
1322-
cmds.OptInCmd,
1331+
p.optInCmd(),
13231332
cmds.MultiCmd,
13241333
cmds.NewCompleted([]string{"PTTL", ck}),
13251334
Completed(cmd),
@@ -1387,7 +1396,7 @@ func (p *pipe) doCacheMGet(ctx context.Context, cmd Cacheable, ttl time.Duration
13871396
}
13881397

13891398
multi := make([]Completed, 0, keys+4)
1390-
multi = append(multi, cmds.OptInCmd, cmds.MultiCmd)
1399+
multi = append(multi, p.optInCmd(), cmds.MultiCmd)
13911400
for _, key := range rewritten.Commands()[1 : keys+1] {
13921401
multi = append(multi, builder.Pttl().Key(key).Build())
13931402
}
@@ -1473,7 +1482,7 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
14731482
for _, i := range missed {
14741483
ct := multi[i]
14751484
ck, _ := cmds.CacheKey(ct.Cmd)
1476-
missing = append(missing, cmds.OptInCmd, cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(ct.Cmd), cmds.ExecCmd)
1485+
missing = append(missing, p.optInCmd(), cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(ct.Cmd), cmds.ExecCmd)
14771486
}
14781487
} else {
14791488
for i, ct := range multi {
@@ -1487,7 +1496,7 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
14871496
entries.e[i] = entry // store entries for later entry.Wait() to avoid MGET deadlock each others.
14881497
continue
14891498
}
1490-
missing = append(missing, cmds.OptInCmd, cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(ct.Cmd), cmds.ExecCmd)
1499+
missing = append(missing, p.optInCmd(), cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(ct.Cmd), cmds.ExecCmd)
14911500
}
14921501
}
14931502

0 commit comments

Comments
 (0)