Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ adhere to [Semantic Versioning](http://semver.org/spec/v2.0.0.html) starting v1.
- Remove dependency: github.com/pkg/errors (#443)
- Add public Cache.RemainingCost() method
- Add support for uint keys
- Implement public Cache.Iter() method (#475)

**Fixed**

Expand Down
35 changes: 22 additions & 13 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,14 @@ const (

// Item is a full representation of what's stored in the cache for each key-value pair.
type Item[V any] struct {
flag itemFlag
Key uint64
Conflict uint64
Value V
Cost int64
Expiration time.Time
wait chan struct{}
flag itemFlag
Key uint64
OriginalKey any
Conflict uint64
Value V
Cost int64
Expiration time.Time
wait chan struct{}
}

// NewCache returns a new Cache instance and any configuration errors, if any.
Expand Down Expand Up @@ -338,12 +339,13 @@ func (c *Cache[K, V]) SetWithTTL(key K, value V, cost int64, ttl time.Duration)

keyHash, conflictHash := c.keyToHash(key)
i := &Item[V]{
flag: itemNew,
Key: keyHash,
Conflict: conflictHash,
Value: value,
Cost: cost,
Expiration: expiration,
flag: itemNew,
Key: keyHash,
OriginalKey: key,
Conflict: conflictHash,
Value: value,
Cost: cost,
Expiration: expiration,
}
// cost is eventually updated. The expiration must also be immediately updated
// to prevent items from being prematurely removed from the map.
Expand Down Expand Up @@ -414,6 +416,13 @@ func (c *Cache[K, V]) GetTTL(key K) (time.Duration, bool) {
return time.Until(expiration), true
}

// Iter iterates the elements of the Map, passing them to the callback.
// It guarantees that any key in the Map will be visited only once.
// The set of keys visited by Iter is non-deterministic.
func (c *Cache[K, V]) Iter(cb func(k any, v V) (stop bool)) {
c.storedItems.Iter(cb)
}

// Close stops all goroutines and closes all channels.
func (c *Cache[K, V]) Close() {
if c == nil || c.isClosed.Load() {
Expand Down
120 changes: 82 additions & 38 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,23 +276,25 @@ func TestCacheProcessItems(t *testing.T) {

key, conflict = z.KeyToHash(1)
c.setBuf <- &Item[int]{
flag: itemNew,
Key: key,
Conflict: conflict,
Value: 1,
Cost: 0,
flag: itemNew,
Key: key,
OriginalKey: 1,
Conflict: conflict,
Value: 1,
Cost: 0,
}
time.Sleep(wait)
require.True(t, c.cachePolicy.Has(1))
require.Equal(t, int64(1), c.cachePolicy.Cost(1))

key, conflict = z.KeyToHash(1)
c.setBuf <- &Item[int]{
flag: itemUpdate,
Key: key,
Conflict: conflict,
Value: 2,
Cost: 0,
flag: itemUpdate,
Key: key,
OriginalKey: 1,
Conflict: conflict,
Value: 2,
Cost: 0,
}
time.Sleep(wait)
require.Equal(t, int64(2), c.cachePolicy.Cost(1))
Expand All @@ -312,35 +314,39 @@ func TestCacheProcessItems(t *testing.T) {

key, conflict = z.KeyToHash(2)
c.setBuf <- &Item[int]{
flag: itemNew,
Key: key,
Conflict: conflict,
Value: 2,
Cost: 3,
flag: itemNew,
Key: key,
OriginalKey: 2,
Conflict: conflict,
Value: 2,
Cost: 3,
}
key, conflict = z.KeyToHash(3)
c.setBuf <- &Item[int]{
flag: itemNew,
Key: key,
Conflict: conflict,
Value: 3,
Cost: 3,
flag: itemNew,
Key: key,
OriginalKey: 3,
Conflict: conflict,
Value: 3,
Cost: 3,
}
key, conflict = z.KeyToHash(4)
c.setBuf <- &Item[int]{
flag: itemNew,
Key: key,
Conflict: conflict,
Value: 3,
Cost: 3,
flag: itemNew,
Key: key,
OriginalKey: 4,
Conflict: conflict,
Value: 3,
Cost: 3,
}
key, conflict = z.KeyToHash(5)
c.setBuf <- &Item[int]{
flag: itemNew,
Key: key,
Conflict: conflict,
Value: 3,
Cost: 5,
flag: itemNew,
Key: key,
OriginalKey: 5,
Conflict: conflict,
Value: 3,
Cost: 5,
}
time.Sleep(wait)
m.Lock()
Expand All @@ -366,9 +372,10 @@ func TestCacheGet(t *testing.T) {

key, conflict := z.KeyToHash(1)
i := Item[int]{
Key: key,
Conflict: conflict,
Value: 1,
Key: key,
OriginalKey: 1,
Conflict: conflict,
Value: 1,
}
c.storedItems.Set(&i)
val, ok := c.Get(1)
Expand All @@ -388,6 +395,42 @@ func TestCacheGet(t *testing.T) {
require.Zero(t, val)
}

func TestCacheSetIter(t *testing.T) {
c, err := NewCache(&Config[string, int]{
NumCounters: 100,
MaxCost: 10,
BufferItems: 64,
IgnoreInternalCost: true,
Metrics: true,
})
require.NoError(t, err)

expectedValues := map[string]int{
"a": 1,
"b": 2,
"c": 3,
"d": 4,
}
for k, v := range expectedValues {
key, conflict := z.KeyToHash(k)
i := Item[int]{
Key: key,
OriginalKey: k,
Conflict: conflict,
Value: v,
}
c.storedItems.Set(&i)
}

resultMap := make(map[string]int)
c.Iter(func(k any, v int) (stop bool) {
resultMap[k.(string)] = v
return false
})

require.Equal(t, expectedValues, resultMap)
}

// retrySet calls SetWithTTL until the item is accepted by the cache.
func retrySet(t *testing.T, c *Cache[int, int], key, value int, cost int64, ttl time.Duration) {
for {
Expand Down Expand Up @@ -427,11 +470,12 @@ func TestCacheSet(t *testing.T) {
for i := 0; i < setBufSize; i++ {
key, conflict := z.KeyToHash(1)
c.setBuf <- &Item[int]{
flag: itemUpdate,
Key: key,
Conflict: conflict,
Value: 1,
Cost: 1,
flag: itemUpdate,
Key: key,
OriginalKey: 1,
Conflict: conflict,
Value: 1,
Cost: 1,
}
}
require.False(t, c.Set(2, 2, 1))
Expand Down
55 changes: 43 additions & 12 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ type updateFn[V any] func(cur, prev V) bool

// TODO: Do we need this to be a separate struct from Item?
type storeItem[V any] struct {
key uint64
conflict uint64
value V
expiration time.Time
key uint64
originalKey any
conflict uint64
value V
expiration time.Time
}

// store is the interface fulfilled by all hash map implementations in this
Expand Down Expand Up @@ -45,6 +46,10 @@ type store[V any] interface {
// Clear clears all contents of the store.
Clear(onEvict func(item *Item[V]))
SetShouldUpdateFn(f updateFn[V])
// Iter iterates the elements of the Map, passing them to the callback.
// It guarantees that any key in the Map will be visited only once.
// The set of keys visited by Iter is non-deterministic.
Iter(cb func(k any, v V) (stop bool))
}

// newStore returns the default store implementation.
Expand Down Expand Up @@ -76,6 +81,29 @@ func (m *shardedMap[V]) SetShouldUpdateFn(f updateFn[V]) {
}
}

// Iter iterates the elements of the Map, passing them to the callback.
// It guarantees that any key in the Map will be visited only once.
// The set of keys visited by Iter is non-deterministic.
func (sm *shardedMap[V]) Iter(cb func(k any, v V) (stop bool)) {
for _, shard := range sm.shards {
stopped := func() bool {
shard.RLock()
defer shard.RUnlock()

for _, v := range shard.data {
if stop := cb(v.originalKey, v.value); stop {
return true
}
}
return false
}()

if stopped {
break
}
}
}

func (sm *shardedMap[V]) Get(key, conflict uint64) (V, bool) {
return sm.shards[key%numShards].get(key, conflict)
}
Expand Down Expand Up @@ -184,10 +212,11 @@ func (m *lockedMap[V]) Set(i *Item[V]) {
}

m.data[i.Key] = storeItem[V]{
key: i.Key,
conflict: i.Conflict,
value: i.Value,
expiration: i.Expiration,
key: i.Key,
originalKey: i.OriginalKey,
conflict: i.Conflict,
value: i.Value,
expiration: i.Expiration,
}
}

Expand Down Expand Up @@ -226,10 +255,11 @@ func (m *lockedMap[V]) Update(newItem *Item[V]) (V, bool) {

m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration)
m.data[newItem.Key] = storeItem[V]{
key: newItem.Key,
conflict: newItem.Conflict,
value: newItem.Value,
expiration: newItem.Expiration,
key: newItem.Key,
originalKey: newItem.OriginalKey,
conflict: newItem.Conflict,
value: newItem.Value,
expiration: newItem.Expiration,
}

return item.value, true
Expand All @@ -244,6 +274,7 @@ func (m *lockedMap[V]) Clear(onEvict func(item *Item[V])) {
i.Key = si.key
i.Conflict = si.conflict
i.Value = si.value
i.OriginalKey = si.originalKey
onEvict(i)
}
}
Expand Down
Loading