@@ -2,10 +2,11 @@ package rueidis
2
2
3
3
import (
4
4
"context"
5
- "runtime"
6
5
"sync"
6
+ "sync/atomic"
7
7
"time"
8
- "unsafe"
8
+
9
+ "github.com/redis/rueidis/internal/cache"
9
10
)
10
11
11
12
// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
@@ -191,250 +192,77 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
191
192
}
192
193
}
193
194
194
- type flatentry struct {
195
- ovfl * flatentry
196
- next unsafe.Pointer
197
- prev unsafe.Pointer
198
- cmd string
199
- key string
200
- val []byte
201
- ttl int64
202
- size int64
203
- mark int64
204
- mu sync.RWMutex
205
- }
206
-
207
- func (f * flatentry ) insert (e * flatentry ) {
208
- f .size += e .size
209
- f .ttl = e .ttl
210
- f .mu .Lock ()
211
- e .ovfl = f .ovfl
212
- f .ovfl = e
213
- f .mu .Unlock ()
214
- }
215
-
216
- func (f * flatentry ) find (cmd string , ts int64 ) ([]byte , bool ) {
217
- if f != nil && ts >= f .ttl {
218
- return nil , true
219
- }
220
- for next := f ; next != nil ; {
221
- if cmd == next .cmd {
222
- return next .val , false
223
- }
224
- next .mu .RLock ()
225
- ovfl := next .ovfl
226
- next .mu .RUnlock ()
227
- next = ovfl
228
- }
229
- return nil , false
230
- }
231
-
232
- const lrBatchSize = 64
233
- const flattEntrySize = unsafe .Sizeof (flatentry {})
234
-
235
- type lrBatch struct {
236
- m map [* flatentry ]struct {}
237
- }
238
-
239
195
func NewFlattenCache (limit int ) CacheStore {
240
196
f := & flatten {
241
- flights : make (map [string ]* adapterEntry ),
242
- cache : make (map [string ]* flatentry ),
243
- head : & flatentry {},
244
- tail : & flatentry {},
245
- size : 0 ,
246
- limit : int64 (limit ),
197
+ flights : cache.NewDoubleMap [* adapterEntry ](64 ),
198
+ cache : cache .NewLRUDoubleMap [[]byte ](64 , int64 (limit )),
247
199
}
248
- f .head .next = unsafe .Pointer (f .tail )
249
- f .tail .prev = unsafe .Pointer (f .head )
250
- f .lrup = sync.Pool {New : func () any {
251
- b := & lrBatch {m : make (map [* flatentry ]struct {}, lrBatchSize )}
252
- runtime .SetFinalizer (b , func (b * lrBatch ) {
253
- if len (b .m ) >= 0 {
254
- f .mu .Lock ()
255
- f .llTailBatch (b )
256
- f .mu .Unlock ()
257
- }
258
- })
259
- return b
260
- }}
261
200
return f
262
201
}
263
202
264
203
type flatten struct {
265
- flights map [string ]* adapterEntry
266
- cache map [string ]* flatentry
267
- head * flatentry
268
- tail * flatentry
269
- lrup sync.Pool
270
- mark int64
271
- size int64
272
- limit int64
273
- mu sync.RWMutex
274
- }
275
-
276
- func (f * flatten ) llAdd (e * flatentry ) {
277
- e .mark = f .mark
278
- e .prev = f .tail .prev
279
- e .next = unsafe .Pointer (f .tail )
280
- f .tail .prev = unsafe .Pointer (e )
281
- (* flatentry )(e .prev ).next = unsafe .Pointer (e )
282
- }
283
-
284
- func (f * flatten ) llDel (e * flatentry ) {
285
- (* flatentry )(e .prev ).next = e .next
286
- (* flatentry )(e .next ).prev = e .prev
287
- e .mark = - 1
288
- }
289
-
290
- func (f * flatten ) llTail (e * flatentry ) {
291
- f .llDel (e )
292
- f .llAdd (e )
293
- }
294
-
295
- func (f * flatten ) llTailBatch (b * lrBatch ) {
296
- for e := range b .m {
297
- if e .mark == f .mark {
298
- f .llTail (e )
299
- }
300
- }
301
- clear (b .m )
302
- }
303
-
304
- func (f * flatten ) remove (e * flatentry ) {
305
- f .size -= e .size
306
- f .llDel (e )
307
- delete (f .cache , e .key )
204
+ flights * cache.DoubleMap [* adapterEntry ]
205
+ cache * cache.LRUDoubleMap [[]byte ]
206
+ close int32
308
207
}
309
208
310
209
func (f * flatten ) Flight (key , cmd string , ttl time.Duration , now time.Time ) (RedisMessage , CacheEntry ) {
311
- f .mu .RLock ()
312
- e := f .cache [key ]
313
- f .mu .RUnlock ()
314
- ts := now .UnixMilli ()
315
- if v , _ := e .find (cmd , ts ); v != nil {
316
- batch := f .lrup .Get ().(* lrBatch )
317
- batch .m [e ] = struct {}{}
318
- if len (batch .m ) >= lrBatchSize {
319
- f .mu .Lock ()
320
- f .llTailBatch (batch )
321
- f .mu .Unlock ()
322
- }
323
- f .lrup .Put (batch )
324
- var ret RedisMessage
325
- _ = ret .CacheUnmarshalView (v )
326
- return ret , nil
210
+ if atomic .LoadInt32 (& f .close ) == 1 {
211
+ return RedisMessage {}, nil
327
212
}
328
- fk := key + cmd
329
- f .mu .RLock ()
330
- af := f .flights [fk ]
331
- f .mu .RUnlock ()
332
- if af != nil {
333
- return RedisMessage {}, af
334
- }
335
- f .mu .Lock ()
336
- e = f .cache [key ]
337
- v , expired := e .find (cmd , ts )
338
- if v != nil {
339
- f .llTail (e )
340
- f .mu .Unlock ()
213
+ ts := now .UnixMilli ()
214
+ e , ok := f .cache .Find (key , cmd , ts )
215
+ if ok {
341
216
var ret RedisMessage
342
- _ = ret .CacheUnmarshalView (v )
217
+ _ = ret .CacheUnmarshalView (e )
343
218
return ret , nil
344
219
}
345
- defer f . mu . Unlock ()
346
- if expired {
347
- f . remove ( e )
348
- }
349
- if af = f . flights [ fk ]; af != nil {
220
+ xat := ts + ttl . Milliseconds ()
221
+ af , ok := f . flights . FindOrInsert ( key , cmd , func () * adapterEntry {
222
+ return & adapterEntry { ch : make ( chan struct {}), xat : xat }
223
+ })
224
+ if ok {
350
225
return RedisMessage {}, af
351
226
}
352
- if f .flights != nil {
353
- f .flights [fk ] = & adapterEntry {ch : make (chan struct {}), xat : ts + ttl .Milliseconds ()}
354
- }
355
227
return RedisMessage {}, nil
356
228
}
357
229
358
230
func (f * flatten ) Update (key , cmd string , val RedisMessage ) (sxat int64 ) {
359
- fk := key + cmd
360
- f .mu .RLock ()
361
- af := f .flights [fk ]
362
- f .mu .RUnlock ()
363
- if af != nil {
231
+ af , ok := f .flights .Find (key , cmd )
232
+ if ok {
364
233
sxat = val .getExpireAt ()
365
234
if af .xat < sxat || sxat == 0 {
366
235
sxat = af .xat
367
236
val .setExpireAt (sxat )
368
237
}
369
238
bs := val .CacheMarshal (nil )
370
- fe := & flatentry {cmd : cmd , val : bs , ttl : sxat , size : int64 (len (bs )+ len (key )+ len (cmd )) + int64 (flattEntrySize ) + 64 } // 64 for 2 map entries
371
- f .mu .Lock ()
372
- if f .flights != nil {
373
- delete (f .flights , fk )
374
- f .size += fe .size
375
- for ep := f .head .next ; f .size > f .limit && ep != unsafe .Pointer (f .tail ); {
376
- e := (* flatentry )(ep )
377
- f .remove (e )
378
- ep = e .next
379
- }
380
- e := f .cache [key ]
381
- if e != nil && e .cmd == cmd {
382
- f .size -= e .size
383
- f .llDel (e )
384
- e = nil
385
- }
386
- if e == nil {
387
- fe .key = key
388
- f .cache [key ] = fe
389
- f .llAdd (fe )
390
- } else {
391
- e .insert (fe )
392
- }
393
- }
394
- f .mu .Unlock ()
239
+ f .cache .Insert (key , cmd , int64 (len (bs )+ len (key )+ len (cmd ))+ int64 (cache .LRUEntrySize )+ 64 , sxat , bs )
240
+ f .flights .Delete (key , cmd )
395
241
af .setVal (val )
396
242
}
397
243
return sxat
398
244
}
399
245
400
246
func (f * flatten ) Cancel (key , cmd string , err error ) {
401
- fk := key + cmd
402
- f .mu .Lock ()
403
- defer f .mu .Unlock ()
404
- if af := f .flights [fk ]; af != nil {
405
- delete (f .flights , fk )
247
+ if af , ok := f .flights .Find (key , cmd ); ok {
248
+ f .flights .Delete (key , cmd )
406
249
af .setErr (err )
407
250
}
408
251
}
409
252
410
253
func (f * flatten ) Delete (keys []RedisMessage ) {
411
- f .mu .Lock ()
412
- defer f .mu .Unlock ()
413
254
if keys == nil {
414
- f .cache = make (map [string ]* flatentry , len (f .cache ))
415
- f .head .next = unsafe .Pointer (f .tail )
416
- f .tail .prev = unsafe .Pointer (f .head )
417
- f .mark ++
418
- f .size = 0
255
+ f .cache .DeleteAll ()
419
256
} else {
420
257
for _ , k := range keys {
421
- if e := f .cache [k .string ]; e != nil {
422
- f .remove (e )
423
- }
258
+ f .cache .Delete (k .string )
424
259
}
425
260
}
426
261
}
427
262
428
263
func (f * flatten ) Close (err error ) {
429
- f .mu .Lock ()
430
- flights := f .flights
431
- f .flights = nil
432
- f .cache = nil
433
- f .tail = nil
434
- f .head = nil
435
- f .mark ++
436
- f .mu .Unlock ()
437
- for _ , entry := range flights {
264
+ atomic .StoreInt32 (& f .close , 1 )
265
+ f .flights .Iterate (func (entry * adapterEntry ) {
438
266
entry .setErr (err )
439
- }
267
+ })
440
268
}
0 commit comments