Skip to content

Commit 35822ea

Browse files
committed
refactor: move the flatten map to internal/cache
Signed-off-by: Rueian <[email protected]>
1 parent 26ba6aa commit 35822ea

File tree

7 files changed

+757
-204
lines changed

7 files changed

+757
-204
lines changed

cache.go

+28-204
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package rueidis
22

33
import (
44
"context"
5-
"runtime"
65
"sync"
6+
"sync/atomic"
77
"time"
8-
"unsafe"
8+
9+
"github.com/redis/rueidis/internal/cache"
910
)
1011

1112
// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
@@ -191,250 +192,73 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
191192
}
192193
}
193194

194-
type flatentry struct {
195-
ovfl *flatentry
196-
next unsafe.Pointer
197-
prev unsafe.Pointer
198-
cmd string
199-
key string
200-
val []byte
201-
ttl int64
202-
size int64
203-
mark int64
204-
mu sync.RWMutex
205-
}
206-
207-
func (f *flatentry) insert(e *flatentry) {
208-
f.size += e.size
209-
f.ttl = e.ttl
210-
f.mu.Lock()
211-
e.ovfl = f.ovfl
212-
f.ovfl = e
213-
f.mu.Unlock()
214-
}
215-
216-
func (f *flatentry) find(cmd string, ts int64) ([]byte, bool) {
217-
if f != nil && ts >= f.ttl {
218-
return nil, true
219-
}
220-
for next := f; next != nil; {
221-
if cmd == next.cmd {
222-
return next.val, false
223-
}
224-
next.mu.RLock()
225-
ovfl := next.ovfl
226-
next.mu.RUnlock()
227-
next = ovfl
228-
}
229-
return nil, false
230-
}
231-
232-
const lrBatchSize = 64
233-
const flattEntrySize = unsafe.Sizeof(flatentry{})
234-
235-
type lrBatch struct {
236-
m map[*flatentry]struct{}
237-
}
238-
239195
func NewFlattenCache(limit int) CacheStore {
240-
f := &flatten{
241-
flights: make(map[string]*adapterEntry),
242-
cache: make(map[string]*flatentry),
243-
head: &flatentry{},
244-
tail: &flatentry{},
245-
size: 0,
246-
limit: int64(limit),
196+
return &flatten{
197+
flights: cache.NewDoubleMap[*adapterEntry](64),
198+
cache: cache.NewLRUDoubleMap[[]byte](64, int64(limit)),
247199
}
248-
f.head.next = unsafe.Pointer(f.tail)
249-
f.tail.prev = unsafe.Pointer(f.head)
250-
f.lrup = sync.Pool{New: func() any {
251-
b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)}
252-
runtime.SetFinalizer(b, func(b *lrBatch) {
253-
if len(b.m) >= 0 {
254-
f.mu.Lock()
255-
f.llTailBatch(b)
256-
f.mu.Unlock()
257-
}
258-
})
259-
return b
260-
}}
261-
return f
262200
}
263201

264202
type flatten struct {
265-
flights map[string]*adapterEntry
266-
cache map[string]*flatentry
267-
head *flatentry
268-
tail *flatentry
269-
lrup sync.Pool
270-
mark int64
271-
size int64
272-
limit int64
273-
mu sync.RWMutex
274-
}
275-
276-
func (f *flatten) llAdd(e *flatentry) {
277-
e.mark = f.mark
278-
e.prev = f.tail.prev
279-
e.next = unsafe.Pointer(f.tail)
280-
f.tail.prev = unsafe.Pointer(e)
281-
(*flatentry)(e.prev).next = unsafe.Pointer(e)
282-
}
283-
284-
func (f *flatten) llDel(e *flatentry) {
285-
(*flatentry)(e.prev).next = e.next
286-
(*flatentry)(e.next).prev = e.prev
287-
e.mark = -1
288-
}
289-
290-
func (f *flatten) llTail(e *flatentry) {
291-
f.llDel(e)
292-
f.llAdd(e)
293-
}
294-
295-
func (f *flatten) llTailBatch(b *lrBatch) {
296-
for e := range b.m {
297-
if e.mark == f.mark {
298-
f.llTail(e)
299-
}
300-
}
301-
clear(b.m)
302-
}
303-
304-
func (f *flatten) remove(e *flatentry) {
305-
f.size -= e.size
306-
f.llDel(e)
307-
delete(f.cache, e.key)
203+
flights *cache.DoubleMap[*adapterEntry]
204+
cache *cache.LRUDoubleMap[[]byte]
205+
close int32
308206
}
309207

310208
func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) {
311-
f.mu.RLock()
312-
e := f.cache[key]
313-
f.mu.RUnlock()
314-
ts := now.UnixMilli()
315-
if v, _ := e.find(cmd, ts); v != nil {
316-
batch := f.lrup.Get().(*lrBatch)
317-
batch.m[e] = struct{}{}
318-
if len(batch.m) >= lrBatchSize {
319-
f.mu.Lock()
320-
f.llTailBatch(batch)
321-
f.mu.Unlock()
322-
}
323-
f.lrup.Put(batch)
324-
var ret RedisMessage
325-
_ = ret.CacheUnmarshalView(v)
326-
return ret, nil
209+
if atomic.LoadInt32(&f.close) == 1 {
210+
return RedisMessage{}, nil
327211
}
328-
fk := key + cmd
329-
f.mu.RLock()
330-
af := f.flights[fk]
331-
f.mu.RUnlock()
332-
if af != nil {
333-
return RedisMessage{}, af
334-
}
335-
f.mu.Lock()
336-
e = f.cache[key]
337-
v, expired := e.find(cmd, ts)
338-
if v != nil {
339-
f.llTail(e)
340-
f.mu.Unlock()
212+
ts := now.UnixMilli()
213+
if e, ok := f.cache.Find(key, cmd, ts); ok {
341214
var ret RedisMessage
342-
_ = ret.CacheUnmarshalView(v)
215+
_ = ret.CacheUnmarshalView(e)
343216
return ret, nil
344217
}
345-
defer f.mu.Unlock()
346-
if expired {
347-
f.remove(e)
348-
}
349-
if af = f.flights[fk]; af != nil {
218+
xat := ts + ttl.Milliseconds()
219+
if af, ok := f.flights.FindOrInsert(key, cmd, func() *adapterEntry {
220+
return &adapterEntry{ch: make(chan struct{}), xat: xat}
221+
}); ok {
350222
return RedisMessage{}, af
351223
}
352-
if f.flights != nil {
353-
f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()}
354-
}
355224
return RedisMessage{}, nil
356225
}
357226

358227
func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) {
359-
fk := key + cmd
360-
f.mu.RLock()
361-
af := f.flights[fk]
362-
f.mu.RUnlock()
363-
if af != nil {
228+
if af, ok := f.flights.Find(key, cmd); ok {
364229
sxat = val.getExpireAt()
365230
if af.xat < sxat || sxat == 0 {
366231
sxat = af.xat
367232
val.setExpireAt(sxat)
368233
}
369234
bs := val.CacheMarshal(nil)
370-
fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize) + 64} // 64 for 2 map entries
371-
f.mu.Lock()
372-
if f.flights != nil {
373-
delete(f.flights, fk)
374-
f.size += fe.size
375-
for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); {
376-
e := (*flatentry)(ep)
377-
f.remove(e)
378-
ep = e.next
379-
}
380-
e := f.cache[key]
381-
if e != nil && e.cmd == cmd {
382-
f.size -= e.size
383-
f.llDel(e)
384-
e = nil
385-
}
386-
if e == nil {
387-
fe.key = key
388-
f.cache[key] = fe
389-
f.llAdd(fe)
390-
} else {
391-
e.insert(fe)
392-
}
393-
}
394-
f.mu.Unlock()
235+
f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, bs)
236+
f.flights.Delete(key, cmd)
395237
af.setVal(val)
396238
}
397239
return sxat
398240
}
399241

400242
func (f *flatten) Cancel(key, cmd string, err error) {
401-
fk := key + cmd
402-
f.mu.Lock()
403-
defer f.mu.Unlock()
404-
if af := f.flights[fk]; af != nil {
405-
delete(f.flights, fk)
243+
if af, ok := f.flights.Find(key, cmd); ok {
244+
f.flights.Delete(key, cmd)
406245
af.setErr(err)
407246
}
408247
}
409248

410249
func (f *flatten) Delete(keys []RedisMessage) {
411-
f.mu.Lock()
412-
defer f.mu.Unlock()
413250
if keys == nil {
414-
f.cache = make(map[string]*flatentry, len(f.cache))
415-
f.head.next = unsafe.Pointer(f.tail)
416-
f.tail.prev = unsafe.Pointer(f.head)
417-
f.mark++
418-
f.size = 0
251+
f.cache.DeleteAll()
419252
} else {
420253
for _, k := range keys {
421-
if e := f.cache[k.string]; e != nil {
422-
f.remove(e)
423-
}
254+
f.cache.Delete(k.string)
424255
}
425256
}
426257
}
427258

428259
func (f *flatten) Close(err error) {
429-
f.mu.Lock()
430-
flights := f.flights
431-
f.flights = nil
432-
f.cache = nil
433-
f.tail = nil
434-
f.head = nil
435-
f.mark++
436-
f.mu.Unlock()
437-
for _, entry := range flights {
260+
atomic.StoreInt32(&f.close, 1)
261+
f.flights.Iterate(func(entry *adapterEntry) {
438262
entry.setErr(err)
439-
}
263+
})
440264
}

internal/cache/chain.go

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package cache
2+
3+
type node[V any] struct {
4+
key string
5+
next *node[V]
6+
val V
7+
}
8+
type chain[V any] struct {
9+
node[V]
10+
}
11+
12+
func (h *chain[V]) find(key string) (val V, ok bool) {
13+
if h.node.key == key {
14+
return h.node.val, true
15+
}
16+
for curr := h.node.next; curr != nil; curr = curr.next {
17+
if curr.key == key {
18+
return curr.val, true
19+
}
20+
}
21+
return val, ok
22+
}
23+
24+
func (h *chain[V]) insert(key string, val V) {
25+
if h.node.key == "" {
26+
h.node.key = key
27+
h.node.val = val
28+
} else if h.node.key == key {
29+
h.node.val = val
30+
} else {
31+
n := &node[V]{key: key, val: val}
32+
n.next = h.node.next
33+
h.node.next = n
34+
}
35+
}
36+
37+
func (h *chain[V]) empty() bool {
38+
return h.node.next == nil && h.node.key == ""
39+
}
40+
41+
func (h *chain[V]) delete(key string) bool {
42+
var zero V
43+
if h.node.key == key {
44+
h.node.key = ""
45+
h.node.val = zero
46+
return h.node.next == nil
47+
}
48+
49+
if h.node.next == nil {
50+
return h.node.key == ""
51+
}
52+
53+
if h.node.next.key == key {
54+
h.node.next.key = ""
55+
h.node.next.val = zero
56+
h.node.next, h.node.next.next = h.node.next.next, nil
57+
return h.empty()
58+
}
59+
60+
prev := h.node.next
61+
curr := h.node.next.next
62+
for curr != nil {
63+
if curr.key == key {
64+
curr.key = ""
65+
curr.val = zero
66+
prev.next, curr.next = curr.next, nil
67+
break
68+
}
69+
prev, curr = curr, curr.next
70+
}
71+
return h.empty()
72+
}

0 commit comments

Comments
 (0)