Skip to content

Commit 0a658d6

Browse files
authored
LocalCache: Add cleanup method (#86)
* LocalCache: Add cleanup method * linter * Enhanced local cache gc * Update readme * Set error * tests * tests and minor fix * tests * fix tests
1 parent 0d92218 commit 0a658d6

File tree

8 files changed

+252
-24
lines changed

8 files changed

+252
-24
lines changed

pkg/metrics/register.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ var (
2323
)
2424

2525
func (t *taskMetrics) RegisterMetric(name string, help string, labels []string, handler MetricHandler) error {
26+
if handler == nil {
27+
panic("handler is mandatory")
28+
}
29+
2630
labels = append(labels, APPNameLabel)
2731
var metric prometheus.Collector
2832

pkg/zcache/combined_cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type combinedCache struct {
1919
remoteCache RemoteCache
2020
logger *zap.Logger
2121
isRemoteBestEffort bool
22-
metricsServer *metrics.TaskMetrics
22+
metricsServer metrics.TaskMetrics
2323
appName string
2424
}
2525

@@ -111,7 +111,7 @@ func (c *combinedCache) IsNotFoundError(err error) bool {
111111
}
112112

113113
func (c *combinedCache) SetupAndMonitorMetrics(appName string, metricsServer metrics.TaskMetrics, updateInterval time.Duration) []error {
114-
c.metricsServer = &metricsServer
114+
c.metricsServer = metricsServer
115115
c.appName = appName
116116

117117
errs := setupAndMonitorCacheMetrics(appName, metricsServer, c, updateInterval)

pkg/zcache/combined_cache_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package zcache
33
import (
44
"context"
55
"github.com/stretchr/testify/suite"
6+
"github.com/zondax/golem/pkg/metrics"
67
"go.uber.org/zap"
78
"os"
89
"testing"
@@ -22,20 +23,23 @@ type CombinedCacheTestSuite struct {
2223
cacheRemoteBrokenNotBestEffort CombinedCache
2324
cacheOkNotBestEffort CombinedCache
2425
cacheRemote RemoteCache
26+
ms metrics.TaskMetrics
2527
}
2628

2729
func (suite *CombinedCacheTestSuite) SetupSuite() {
2830
mr, err := miniredis.Run()
2931
suite.Require().NoError(err)
3032
suite.mr = mr
31-
33+
suite.ms = metrics.NewTaskMetrics("", "", "appname")
3234
logger, err := zap.NewDevelopment()
3335
suite.Require().NoError(err)
3436

3537
prefix := os.Getenv("PREFIX")
3638
suite.cacheRemoteBrokenBestEffort, err = NewCombinedCache(
3739
&CombinedConfig{
38-
Local: &LocalConfig{},
40+
Local: &LocalConfig{
41+
MetricServer: suite.ms,
42+
},
3943
Remote: &RemoteConfig{
4044
Addr: "0.0.0.0",
4145
},
@@ -46,7 +50,9 @@ func (suite *CombinedCacheTestSuite) SetupSuite() {
4650
suite.Nil(err)
4751

4852
suite.cacheOkNotBestEffort, err = NewCombinedCache(&CombinedConfig{
49-
Local: &LocalConfig{},
53+
Local: &LocalConfig{
54+
MetricServer: suite.ms,
55+
},
5056
Remote: &RemoteConfig{
5157
Addr: mr.Addr(),
5258
},
@@ -58,7 +64,9 @@ func (suite *CombinedCacheTestSuite) SetupSuite() {
5864

5965
suite.cacheRemoteBrokenNotBestEffort, err = NewCombinedCache(
6066
&CombinedConfig{
61-
Local: &LocalConfig{},
67+
Local: &LocalConfig{
68+
MetricServer: suite.ms,
69+
},
6270
Remote: &RemoteConfig{
6371
Addr: "0.0.0.0",
6472
},

pkg/zcache/config.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package zcache
33
import (
44
"github.com/allegro/bigcache/v3"
55
"github.com/go-redis/redis/v8"
6+
"github.com/zondax/golem/pkg/metrics"
67
"go.uber.org/zap"
78
"time"
89
)
@@ -26,8 +27,12 @@ type RemoteConfig struct {
2627
}
2728

2829
type LocalConfig struct {
29-
Prefix string
30-
Logger *zap.Logger
30+
Prefix string
31+
Logger *zap.Logger
32+
MetricServer metrics.TaskMetrics
33+
CleanupInterval time.Duration
34+
BatchSize int
35+
ThrottleTime time.Duration
3136
}
3237

3338
func (c *RemoteConfig) ToRedisConfig() *redis.Options {

pkg/zcache/local_cache.go

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,18 @@ import (
77
"fmt"
88
"github.com/allegro/bigcache/v3"
99
"github.com/zondax/golem/pkg/metrics"
10+
"github.com/zondax/golem/pkg/metrics/collectors"
1011
"go.uber.org/zap"
1112
"time"
1213
)
1314

1415
const (
15-
neverExpires = -1
16+
neverExpires = -1
17+
errorTypeLabel = "error_type"
18+
cleanupErrorMetricKey = "localCacheCleanupErrors"
19+
iterationErrorLabel = "iteration_error"
20+
unmarshalErrorLabel = "unmarshal_error"
21+
deletionErrorLabel = "deletion_error"
1622
)
1723

1824
type CacheItem struct {
@@ -44,11 +50,14 @@ type LocalCache interface {
4450
}
4551

4652
type localCache struct {
47-
client *bigcache.BigCache
48-
prefix string
49-
logger *zap.Logger
50-
metricsServer *metrics.TaskMetrics
51-
appName string
53+
client *bigcache.BigCache
54+
prefix string
55+
logger *zap.Logger
56+
metricsServer metrics.TaskMetrics
57+
appName string
58+
cleanupInterval time.Duration
59+
batchSize int
60+
throttleTime time.Duration
5261
}
5362

5463
func (c *localCache) Set(_ context.Context, key string, value interface{}, ttl time.Duration) error {
@@ -126,7 +135,7 @@ func (c *localCache) IsNotFoundError(err error) bool {
126135
}
127136

128137
func (c *localCache) SetupAndMonitorMetrics(appName string, metricsServer metrics.TaskMetrics, updateInterval time.Duration) []error {
129-
c.metricsServer = &metricsServer
138+
c.metricsServer = metricsServer
130139
c.appName = appName
131140

132141
errs := setupAndMonitorCacheMetrics(appName, metricsServer, c, updateInterval)
@@ -142,3 +151,61 @@ func (c *localCache) registerInternalCacheMetrics() []error {
142151

143152
return []error{}
144153
}
154+
155+
func (c *localCache) startCleanupProcess(interval time.Duration) {
156+
ticker := time.NewTicker(interval)
157+
go func() {
158+
for range ticker.C {
159+
c.cleanupExpiredKeys()
160+
}
161+
}()
162+
}
163+
164+
func (c *localCache) cleanupExpiredKeys() {
165+
iterator := c.client.Iterator()
166+
var keysToDelete []string
167+
168+
for iterator.SetNext() {
169+
entry, err := iterator.Value()
170+
if err != nil {
171+
_ = c.metricsServer.UpdateMetric(cleanupErrorMetricKey, 1, iterationErrorLabel)
172+
continue
173+
}
174+
175+
var cachedItem CacheItem
176+
if err = json.Unmarshal(entry.Value(), &cachedItem); err != nil {
177+
_ = c.metricsServer.UpdateMetric(cleanupErrorMetricKey, 1, unmarshalErrorLabel)
178+
continue
179+
}
180+
181+
if cachedItem.IsExpired() {
182+
keysToDelete = append(keysToDelete, entry.Key())
183+
}
184+
185+
if len(keysToDelete) >= c.batchSize {
186+
c.deleteKeysInBatch(keysToDelete)
187+
keysToDelete = keysToDelete[:0]
188+
time.Sleep(c.throttleTime)
189+
}
190+
}
191+
192+
if len(keysToDelete) > 0 {
193+
c.deleteKeysInBatch(keysToDelete)
194+
}
195+
}
196+
197+
func (c *localCache) deleteKeysInBatch(keys []string) {
198+
for _, key := range keys {
199+
if err := c.client.Delete(key); err != nil {
200+
if err = c.metricsServer.UpdateMetric(cleanupErrorMetricKey, 1, deletionErrorLabel); err != nil {
201+
c.logger.Error("Failed to update deletion error metric", zap.Error(err))
202+
}
203+
}
204+
}
205+
}
206+
207+
func (c *localCache) registerCleanupMetrics() {
208+
if err := c.metricsServer.RegisterMetric(cleanupErrorMetricKey, "Counts different types of errors occurred during cache cleanup process", []string{errorTypeLabel}, &collectors.Counter{}); err != nil {
209+
c.logger.Error("Failed to register cleanup metrics", zap.Error(err))
210+
}
211+
}

pkg/zcache/local_cache_test.go

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,21 @@ package zcache
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/allegro/bigcache/v3"
58
"github.com/stretchr/testify/suite"
9+
"github.com/zondax/golem/pkg/metrics"
610
"os"
711
"testing"
812
"time"
913
)
1014

15+
const (
16+
testValue = "testValue"
17+
expireKey = "expireKey"
18+
)
19+
1120
func TestLocalCacheTestSuite(t *testing.T) {
1221
suite.Run(t, new(LocalCacheTestSuite))
1322
}
@@ -21,7 +30,8 @@ func (suite *LocalCacheTestSuite) SetupSuite() {
2130
prefix := os.Getenv("PREFIX")
2231
var err error
2332
config := LocalConfig{
24-
Prefix: prefix,
33+
Prefix: prefix,
34+
MetricServer: metrics.NewTaskMetrics("", "", "appname"),
2535
}
2636
suite.cache, err = NewLocalCache(&config)
2737
suite.Nil(err)
@@ -30,7 +40,7 @@ func (suite *LocalCacheTestSuite) SetupSuite() {
3040
func (suite *LocalCacheTestSuite) TestSetAndGet() {
3141
ctx := context.Background()
3242
key := "testKey"
33-
value := "testValue"
43+
value := testValue
3444

3545
err := suite.cache.Set(ctx, key, value, 0)
3646
suite.NoError(err)
@@ -44,7 +54,7 @@ func (suite *LocalCacheTestSuite) TestSetAndGet() {
4454
func (suite *LocalCacheTestSuite) TestDelete() {
4555
ctx := context.Background()
4656
key := "testKey"
47-
value := "testValue"
57+
value := testValue
4858

4959
suite.NoError(suite.cache.Set(ctx, key, value, 0))
5060

@@ -56,17 +66,100 @@ func (suite *LocalCacheTestSuite) TestDelete() {
5666
}
5767

5868
func (suite *LocalCacheTestSuite) TestCacheItemExpiration() {
59-
item := NewCacheItem([]byte("testValue"), 1*time.Second)
69+
item := NewCacheItem([]byte(testValue), 1*time.Second)
6070
suite.False(item.IsExpired(), "CacheItem should not be expired right after creation")
6171
time.Sleep(2 * time.Second)
6272

6373
suite.True(item.IsExpired(), "CacheItem should be expired after its TTL")
6474
}
6575

6676
func (suite *LocalCacheTestSuite) TestCacheItemNeverExpires() {
67-
item := NewCacheItem([]byte("testValue"), -1)
77+
item := NewCacheItem([]byte(testValue), -1)
6878
suite.False(item.IsExpired(), "CacheItem with negative TTL should never expire")
6979
time.Sleep(2 * time.Second)
7080

7181
suite.False(item.IsExpired(), "CacheItem with negative TTL should never expire, even after some time")
7282
}
83+
84+
func (suite *LocalCacheTestSuite) TestCleanupProcess() {
85+
cleanupInterval := 1 * time.Second
86+
ttl := 10 * time.Millisecond
87+
88+
cache, err := NewLocalCache(&LocalConfig{
89+
Prefix: "test",
90+
CleanupInterval: cleanupInterval,
91+
MetricServer: metrics.NewTaskMetrics("", "", "appname")})
92+
suite.NoError(err)
93+
94+
ctx := context.Background()
95+
key := expireKey
96+
value := testValue
97+
98+
err = cache.Set(ctx, key, value, ttl)
99+
suite.NoError(err)
100+
101+
time.Sleep(2 * cleanupInterval)
102+
103+
var result string
104+
err = cache.Get(ctx, key, &result)
105+
106+
suite.True(errors.Is(err, bigcache.ErrEntryNotFound), "Expected 'key not found' error, but got a different error")
107+
}
108+
109+
func (suite *LocalCacheTestSuite) TestCleanupProcessBatchLogic() {
110+
cleanupInterval := 100 * time.Millisecond
111+
testBatchSize := 5
112+
113+
cache, err := NewLocalCache(&LocalConfig{
114+
Prefix: "testBatch",
115+
CleanupInterval: cleanupInterval,
116+
MetricServer: metrics.NewTaskMetrics("", "", "appname"),
117+
BatchSize: testBatchSize,
118+
})
119+
suite.NoError(err)
120+
121+
ctx := context.Background()
122+
123+
for i := 0; i < testBatchSize*2; i++ {
124+
key := fmt.Sprintf("key%d", i)
125+
value := fmt.Sprintf("value%d", i)
126+
err = cache.Set(ctx, key, value, 1*time.Millisecond)
127+
suite.NoError(err)
128+
}
129+
130+
time.Sleep(2 * time.Second)
131+
132+
for i := 0; i < testBatchSize*2; i++ {
133+
key := fmt.Sprintf("key%d", i)
134+
var result string
135+
err = cache.Get(ctx, key, &result)
136+
137+
suite.True(errors.Is(err, bigcache.ErrEntryNotFound), "Expected 'ErrEntryNotFound' for key: %s, but got a different error or no error: %s", key, err.Error())
138+
}
139+
}
140+
141+
func (suite *LocalCacheTestSuite) TestCleanupProcessItemDoesNotExpire() {
142+
cleanupInterval := 1 * time.Second
143+
144+
cache, err := NewLocalCache(&LocalConfig{
145+
Prefix: "test",
146+
CleanupInterval: cleanupInterval,
147+
MetricServer: metrics.NewTaskMetrics("", "", "appname"),
148+
})
149+
suite.NoError(err)
150+
151+
ctx := context.Background()
152+
key := "permanentKey"
153+
value := "thisValueShouldPersist"
154+
155+
err = cache.Set(ctx, key, value, neverExpires)
156+
suite.NoError(err)
157+
158+
time.Sleep(2 * cleanupInterval)
159+
160+
var result string
161+
err = cache.Get(ctx, key, &result)
162+
163+
suite.NoError(err, "Did not expect an error when retrieving a non-expiring item")
164+
suite.Equal(value, result, "The retrieved value should match the original value")
165+
}

0 commit comments

Comments
 (0)