Skip to content

Commit 52fa36a

Browse files
committed
refactor: move the flatten map to internal/cache
Signed-off-by: Rueian <[email protected]>
1 parent 26ba6aa commit 52fa36a

File tree

7 files changed

+739
-163
lines changed

7 files changed

+739
-163
lines changed

cache.go

+30-163
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package rueidis
22

33
import (
44
"context"
5-
"runtime"
65
"sync"
6+
"sync/atomic"
77
"time"
88
"unsafe"
9+
10+
"github.com/redis/rueidis/internal/cache"
911
)
1012

1113
// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
@@ -229,212 +231,77 @@ func (f *flatentry) find(cmd string, ts int64) ([]byte, bool) {
229231
return nil, false
230232
}
231233

232-
const lrBatchSize = 64
233-
const flattEntrySize = unsafe.Sizeof(flatentry{})
234-
235-
type lrBatch struct {
236-
m map[*flatentry]struct{}
237-
}
238-
239234
func NewFlattenCache(limit int) CacheStore {
240235
f := &flatten{
241-
flights: make(map[string]*adapterEntry),
242-
cache: make(map[string]*flatentry),
243-
head: &flatentry{},
244-
tail: &flatentry{},
245-
size: 0,
246-
limit: int64(limit),
236+
flights: cache.NewDoubleMap[*adapterEntry](64),
237+
cache: cache.NewLRUDoubleMap[[]byte](64, int64(limit)),
247238
}
248-
f.head.next = unsafe.Pointer(f.tail)
249-
f.tail.prev = unsafe.Pointer(f.head)
250-
f.lrup = sync.Pool{New: func() any {
251-
b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)}
252-
runtime.SetFinalizer(b, func(b *lrBatch) {
253-
if len(b.m) >= 0 {
254-
f.mu.Lock()
255-
f.llTailBatch(b)
256-
f.mu.Unlock()
257-
}
258-
})
259-
return b
260-
}}
261239
return f
262240
}
263241

264242
type flatten struct {
265-
flights map[string]*adapterEntry
266-
cache map[string]*flatentry
267-
head *flatentry
268-
tail *flatentry
269-
lrup sync.Pool
270-
mark int64
271-
size int64
272-
limit int64
273-
mu sync.RWMutex
274-
}
275-
276-
func (f *flatten) llAdd(e *flatentry) {
277-
e.mark = f.mark
278-
e.prev = f.tail.prev
279-
e.next = unsafe.Pointer(f.tail)
280-
f.tail.prev = unsafe.Pointer(e)
281-
(*flatentry)(e.prev).next = unsafe.Pointer(e)
282-
}
283-
284-
func (f *flatten) llDel(e *flatentry) {
285-
(*flatentry)(e.prev).next = e.next
286-
(*flatentry)(e.next).prev = e.prev
287-
e.mark = -1
288-
}
289-
290-
func (f *flatten) llTail(e *flatentry) {
291-
f.llDel(e)
292-
f.llAdd(e)
293-
}
294-
295-
func (f *flatten) llTailBatch(b *lrBatch) {
296-
for e := range b.m {
297-
if e.mark == f.mark {
298-
f.llTail(e)
299-
}
300-
}
301-
clear(b.m)
302-
}
303-
304-
func (f *flatten) remove(e *flatentry) {
305-
f.size -= e.size
306-
f.llDel(e)
307-
delete(f.cache, e.key)
243+
flights *cache.DoubleMap[*adapterEntry]
244+
cache *cache.LRUDoubleMap[[]byte]
245+
close int32
308246
}
309247

310248
func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) {
311-
f.mu.RLock()
312-
e := f.cache[key]
313-
f.mu.RUnlock()
314-
ts := now.UnixMilli()
315-
if v, _ := e.find(cmd, ts); v != nil {
316-
batch := f.lrup.Get().(*lrBatch)
317-
batch.m[e] = struct{}{}
318-
if len(batch.m) >= lrBatchSize {
319-
f.mu.Lock()
320-
f.llTailBatch(batch)
321-
f.mu.Unlock()
322-
}
323-
f.lrup.Put(batch)
324-
var ret RedisMessage
325-
_ = ret.CacheUnmarshalView(v)
326-
return ret, nil
327-
}
328-
fk := key + cmd
329-
f.mu.RLock()
330-
af := f.flights[fk]
331-
f.mu.RUnlock()
332-
if af != nil {
333-
return RedisMessage{}, af
249+
if atomic.LoadInt32(&f.close) == 1 {
250+
return RedisMessage{}, nil
334251
}
335-
f.mu.Lock()
336-
e = f.cache[key]
337-
v, expired := e.find(cmd, ts)
338-
if v != nil {
339-
f.llTail(e)
340-
f.mu.Unlock()
252+
ts := now.UnixMilli()
253+
e, ok := f.cache.Find(key, cmd, ts)
254+
if ok {
341255
var ret RedisMessage
342-
_ = ret.CacheUnmarshalView(v)
256+
_ = ret.CacheUnmarshalView(e)
343257
return ret, nil
344258
}
345-
defer f.mu.Unlock()
346-
if expired {
347-
f.remove(e)
348-
}
349-
if af = f.flights[fk]; af != nil {
259+
xat := ts + ttl.Milliseconds()
260+
af, ok := f.flights.FindOrInsert(key, cmd, func() *adapterEntry {
261+
return &adapterEntry{ch: make(chan struct{}), xat: xat}
262+
})
263+
if ok {
350264
return RedisMessage{}, af
351265
}
352-
if f.flights != nil {
353-
f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()}
354-
}
355266
return RedisMessage{}, nil
356267
}
357268

358269
func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) {
359-
fk := key + cmd
360-
f.mu.RLock()
361-
af := f.flights[fk]
362-
f.mu.RUnlock()
363-
if af != nil {
270+
af, ok := f.flights.Find(key, cmd)
271+
if ok {
364272
sxat = val.getExpireAt()
365273
if af.xat < sxat || sxat == 0 {
366274
sxat = af.xat
367275
val.setExpireAt(sxat)
368276
}
369277
bs := val.CacheMarshal(nil)
370-
fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize) + 64} // 64 for 2 map entries
371-
f.mu.Lock()
372-
if f.flights != nil {
373-
delete(f.flights, fk)
374-
f.size += fe.size
375-
for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); {
376-
e := (*flatentry)(ep)
377-
f.remove(e)
378-
ep = e.next
379-
}
380-
e := f.cache[key]
381-
if e != nil && e.cmd == cmd {
382-
f.size -= e.size
383-
f.llDel(e)
384-
e = nil
385-
}
386-
if e == nil {
387-
fe.key = key
388-
f.cache[key] = fe
389-
f.llAdd(fe)
390-
} else {
391-
e.insert(fe)
392-
}
393-
}
394-
f.mu.Unlock()
278+
f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, bs)
279+
f.flights.Delete(key, cmd)
395280
af.setVal(val)
396281
}
397282
return sxat
398283
}
399284

400285
func (f *flatten) Cancel(key, cmd string, err error) {
401-
fk := key + cmd
402-
f.mu.Lock()
403-
defer f.mu.Unlock()
404-
if af := f.flights[fk]; af != nil {
405-
delete(f.flights, fk)
286+
if af, ok := f.flights.Find(key, cmd); ok {
287+
f.flights.Delete(key, cmd)
406288
af.setErr(err)
407289
}
408290
}
409291

410292
func (f *flatten) Delete(keys []RedisMessage) {
411-
f.mu.Lock()
412-
defer f.mu.Unlock()
413293
if keys == nil {
414-
f.cache = make(map[string]*flatentry, len(f.cache))
415-
f.head.next = unsafe.Pointer(f.tail)
416-
f.tail.prev = unsafe.Pointer(f.head)
417-
f.mark++
418-
f.size = 0
294+
f.cache.DeleteAll()
419295
} else {
420296
for _, k := range keys {
421-
if e := f.cache[k.string]; e != nil {
422-
f.remove(e)
423-
}
297+
f.cache.Delete(k.string)
424298
}
425299
}
426300
}
427301

428302
func (f *flatten) Close(err error) {
429-
f.mu.Lock()
430-
flights := f.flights
431-
f.flights = nil
432-
f.cache = nil
433-
f.tail = nil
434-
f.head = nil
435-
f.mark++
436-
f.mu.Unlock()
437-
for _, entry := range flights {
303+
atomic.StoreInt32(&f.close, 1)
304+
f.flights.Iterate(func(entry *adapterEntry) {
438305
entry.setErr(err)
439-
}
306+
})
440307
}

internal/cache/chain.go

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package cache
2+
3+
type node[V any] struct {
4+
key string
5+
next *node[V]
6+
val V
7+
}
8+
type chain[V any] struct {
9+
node[V]
10+
}
11+
12+
func (h *chain[V]) find(key string) (val V, ok bool) {
13+
if h.node.key == key {
14+
return h.node.val, true
15+
}
16+
for curr := h.node.next; curr != nil; curr = curr.next {
17+
if curr.key == key {
18+
return curr.val, true
19+
}
20+
}
21+
return val, ok
22+
}
23+
24+
func (h *chain[V]) insert(key string, val V) {
25+
if h.node.key == "" {
26+
h.node.key = key
27+
h.node.val = val
28+
} else if h.node.key == key {
29+
h.node.val = val
30+
} else {
31+
n := &node[V]{key: key, val: val}
32+
n.next = h.node.next
33+
h.node.next = n
34+
}
35+
}
36+
37+
func (h *chain[V]) empty() bool {
38+
return h.node.next == nil && h.node.key == ""
39+
}
40+
41+
func (h *chain[V]) delete(key string) bool {
42+
var zero V
43+
if h.node.key == key {
44+
h.node.key = ""
45+
h.node.val = zero
46+
return h.node.next == nil
47+
}
48+
49+
if h.node.next == nil {
50+
return h.node.key == ""
51+
}
52+
53+
if h.node.next.key == key {
54+
h.node.next.key = ""
55+
h.node.next.val = zero
56+
h.node.next, h.node.next.next = h.node.next.next, nil
57+
return h.empty()
58+
}
59+
60+
prev := h.node.next
61+
curr := h.node.next.next
62+
for curr != nil {
63+
if curr.key == key {
64+
curr.key = ""
65+
curr.val = zero
66+
prev.next, curr.next = curr.next, nil
67+
break
68+
}
69+
prev, curr = curr, curr.next
70+
}
71+
return h.empty()
72+
}

internal/cache/chain_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package cache
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestChain(t *testing.T) {
8+
h := chain[int]{}
9+
if h.empty() != true {
10+
t.Fatal("chain is not empty")
11+
}
12+
if _, ok := h.find("any"); ok {
13+
t.Fatal("value is found")
14+
}
15+
if empty := h.delete("any"); !empty {
16+
t.Fatal("not empty")
17+
}
18+
h.insert("1", 1)
19+
h.insert("2", 2)
20+
h.insert("3", 3)
21+
if v, ok := h.find("1"); !ok || v != 1 {
22+
t.Fatal("value is not found")
23+
}
24+
if v, ok := h.find("2"); !ok || v != 2 {
25+
t.Fatal("value is not found")
26+
}
27+
if v, ok := h.find("3"); !ok || v != 3 {
28+
t.Fatal("value is not found")
29+
}
30+
if empty := h.delete("1"); empty {
31+
t.Fatal("empty")
32+
}
33+
if _, ok := h.find("1"); ok {
34+
t.Fatal("value is found")
35+
}
36+
if v, ok := h.find("2"); !ok || v != 2 {
37+
t.Fatal("value is not found")
38+
}
39+
if v, ok := h.find("3"); !ok || v != 3 {
40+
t.Fatal("value is not found")
41+
}
42+
if empty := h.delete("2"); empty {
43+
t.Fatal("empty")
44+
}
45+
if _, ok := h.find("2"); ok {
46+
t.Fatal("value is found")
47+
}
48+
if v, ok := h.find("3"); !ok || v != 3 {
49+
t.Fatal("value is not found")
50+
}
51+
h.insert("4", 4)
52+
if v, ok := h.find("3"); !ok || v != 3 {
53+
t.Fatal("value is not found")
54+
}
55+
if v, ok := h.find("4"); !ok || v != 4 {
56+
t.Fatal("value is not found")
57+
}
58+
if empty := h.delete("3"); empty {
59+
t.Fatal("empty")
60+
}
61+
if empty := h.delete("4"); !empty {
62+
t.Fatal("not empty")
63+
}
64+
}

0 commit comments

Comments
 (0)