Skip to content

Commit d08f93b

Browse files
authoredOct 9, 2024··
Add multi-level chunk cache (#6249)
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent e6e9fea commit d08f93b

File tree

9 files changed

+580
-44
lines changed

9 files changed

+580
-44
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
88
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
99
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
10+
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1011
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1112
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1213
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

‎docs/blocks-storage/querier.md

+19-2
Original file line numberDiff line numberDiff line change
@@ -806,8 +806,10 @@ blocks_storage:
806806
[max_backfill_items: <int> | default = 10000]
807807

808808
chunks_cache:
809-
# Backend for chunks cache, if not empty. Supported values: memcached,
810-
# redis, inmemory, and '' (disable).
809+
# The chunks cache backend type. Single or Multiple cache backend can be
810+
# provided. Supported values in single cache: memcached, redis, inmemory,
811+
# and '' (disable). Supported values in multi level cache: a
812+
# comma-separated list of (inmemory, memcached, redis)
811813
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
812814
[backend: <string> | default = ""]
813815

@@ -1018,6 +1020,21 @@ blocks_storage:
10181020
# CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent
10191021
[failure_percent: <float> | default = 0.05]
10201022

1023+
multilevel:
1024+
# The maximum number of concurrent asynchronous operations can occur
1025+
# when backfilling cache items.
1026+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency
1027+
[max_async_concurrency: <int> | default = 3]
1028+
1029+
# The maximum number of enqueued asynchronous operations allowed when
1030+
# backfilling cache items.
1031+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size
1032+
[max_async_buffer_size: <int> | default = 10000]
1033+
1034+
# The maximum number of items to backfill per asynchronous operation.
1035+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items
1036+
[max_backfill_items: <int> | default = 10000]
1037+
10211038
# Size of each subrange that bucket object is split into for better
10221039
# caching.
10231040
# CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size

‎docs/blocks-storage/store-gateway.md

+19-2
Original file line numberDiff line numberDiff line change
@@ -903,8 +903,10 @@ blocks_storage:
903903
[max_backfill_items: <int> | default = 10000]
904904

905905
chunks_cache:
906-
# Backend for chunks cache, if not empty. Supported values: memcached,
907-
# redis, inmemory, and '' (disable).
906+
# The chunks cache backend type. Single or Multiple cache backend can be
907+
# provided. Supported values in single cache: memcached, redis, inmemory,
908+
# and '' (disable). Supported values in multi level cache: a
909+
# comma-separated list of (inmemory, memcached, redis)
908910
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
909911
[backend: <string> | default = ""]
910912

@@ -1115,6 +1117,21 @@ blocks_storage:
11151117
# CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent
11161118
[failure_percent: <float> | default = 0.05]
11171119

1120+
multilevel:
1121+
# The maximum number of concurrent asynchronous operations can occur
1122+
# when backfilling cache items.
1123+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency
1124+
[max_async_concurrency: <int> | default = 3]
1125+
1126+
# The maximum number of enqueued asynchronous operations allowed when
1127+
# backfilling cache items.
1128+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size
1129+
[max_async_buffer_size: <int> | default = 10000]
1130+
1131+
# The maximum number of items to backfill per asynchronous operation.
1132+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items
1133+
[max_backfill_items: <int> | default = 10000]
1134+
11181135
# Size of each subrange that bucket object is split into for better
11191136
# caching.
11201137
# CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size

‎docs/configuration/config-file-reference.md

+19-2
Original file line numberDiff line numberDiff line change
@@ -1339,8 +1339,10 @@ bucket_store:
13391339
[max_backfill_items: <int> | default = 10000]
13401340

13411341
chunks_cache:
1342-
# Backend for chunks cache, if not empty. Supported values: memcached,
1343-
# redis, inmemory, and '' (disable).
1342+
# The chunks cache backend type. Single or Multiple cache backend can be
1343+
# provided. Supported values in single cache: memcached, redis, inmemory,
1344+
# and '' (disable). Supported values in multi level cache: a comma-separated
1345+
# list of (inmemory, memcached, redis)
13441346
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
13451347
[backend: <string> | default = ""]
13461348

@@ -1549,6 +1551,21 @@ bucket_store:
15491551
# CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent
15501552
[failure_percent: <float> | default = 0.05]
15511553

1554+
multilevel:
1555+
# The maximum number of concurrent asynchronous operations can occur when
1556+
# backfilling cache items.
1557+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency
1558+
[max_async_concurrency: <int> | default = 3]
1559+
1560+
# The maximum number of enqueued asynchronous operations allowed when
1561+
# backfilling cache items.
1562+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size
1563+
[max_async_buffer_size: <int> | default = 10000]
1564+
1565+
# The maximum number of items to backfill per asynchronous operation.
1566+
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items
1567+
[max_backfill_items: <int> | default = 10000]
1568+
15521569
# Size of each subrange that bucket object is split into for better caching.
15531570
# CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size
15541571
[subrange_size: <int> | default = 16000]

‎integration/querier_test.go

+28-2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
9797
chunkCacheBackend: tsdb.CacheBackendRedis,
9898
bucketIndexEnabled: true,
9999
},
100+
"blocks sharding disabled, in-memory chunk cache": {
101+
blocksShardingStrategy: "",
102+
indexCacheBackend: tsdb.IndexCacheBackendRedis,
103+
chunkCacheBackend: tsdb.CacheBackendInMemory,
104+
bucketIndexEnabled: true,
105+
},
100106
"blocks default sharding, in-memory chunk cache": {
101107
blocksShardingStrategy: "default",
102108
indexCacheBackend: tsdb.IndexCacheBackendRedis,
@@ -110,6 +116,25 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
110116
chunkCacheBackend: tsdb.CacheBackendInMemory,
111117
bucketIndexEnabled: true,
112118
},
119+
"block sharding disabled, multi-level chunk cache": {
120+
blocksShardingStrategy: "",
121+
indexCacheBackend: tsdb.IndexCacheBackendRedis,
122+
chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis),
123+
bucketIndexEnabled: true,
124+
},
125+
"block default sharding, multi-level chunk cache": {
126+
blocksShardingStrategy: "default",
127+
indexCacheBackend: tsdb.IndexCacheBackendRedis,
128+
chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis),
129+
bucketIndexEnabled: true,
130+
},
131+
"block shuffle sharding, multi-level chunk cache": {
132+
blocksShardingStrategy: "shuffle-sharding",
133+
tenantShardSize: 1,
134+
indexCacheBackend: tsdb.IndexCacheBackendRedis,
135+
chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis),
136+
bucketIndexEnabled: true,
137+
},
113138
}
114139

115140
for testName, testCfg := range tests {
@@ -154,9 +179,10 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
154179
if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendRedis) {
155180
flags["-blocks-storage.bucket-store.index-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort)
156181
}
157-
if testCfg.chunkCacheBackend == tsdb.CacheBackendMemcached {
182+
if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendMemcached) {
158183
flags["-blocks-storage.bucket-store.chunks-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)
159-
} else if testCfg.chunkCacheBackend == tsdb.CacheBackendRedis {
184+
}
185+
if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) {
160186
flags["-blocks-storage.bucket-store.chunks-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort)
161187
}
162188

‎pkg/storage/tsdb/caching_bucket.go

+82-36
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@ import (
2121
"github.com/thanos-io/thanos/pkg/cacheutil"
2222
"github.com/thanos-io/thanos/pkg/model"
2323
storecache "github.com/thanos-io/thanos/pkg/store/cache"
24+
25+
"github.com/cortexproject/cortex/pkg/util"
2426
)
2527

2628
var (
29+
supportedChunkCacheBackends = []string{CacheBackendInMemory, CacheBackendMemcached, CacheBackendRedis}
2730
supportedMetadataCacheBackends = []string{CacheBackendMemcached, CacheBackendRedis}
2831

2932
errUnsupportedChunkCacheBackend = errors.New("unsupported chunk cache backend")
33+
errDuplicatedChunkCacheBackend = errors.New("duplicated chunk cache backend")
3034
)
3135

3236
const (
@@ -56,23 +60,52 @@ func (cfg *MetadataCacheBackend) Validate() error {
5660
}
5761

5862
type ChunkCacheBackend struct {
59-
Backend string `yaml:"backend"`
60-
InMemory InMemoryChunkCacheConfig `yaml:"inmemory"`
61-
Memcached MemcachedClientConfig `yaml:"memcached"`
62-
Redis RedisClientConfig `yaml:"redis"`
63+
Backend string `yaml:"backend"`
64+
InMemory InMemoryChunkCacheConfig `yaml:"inmemory"`
65+
Memcached MemcachedClientConfig `yaml:"memcached"`
66+
Redis RedisClientConfig `yaml:"redis"`
67+
MultiLevel MultiLevelChunkCacheConfig `yaml:"multilevel"`
6368
}
6469

6570
// Validate the config.
6671
func (cfg *ChunkCacheBackend) Validate() error {
67-
switch cfg.Backend {
68-
case CacheBackendMemcached:
69-
return cfg.Memcached.Validate()
70-
case CacheBackendRedis:
71-
return cfg.Redis.Validate()
72-
case CacheBackendInMemory, "":
73-
default:
74-
return errUnsupportedChunkCacheBackend
72+
if cfg.Backend == "" {
73+
return nil
74+
}
75+
76+
splitBackends := strings.Split(cfg.Backend, ",")
77+
configuredBackends := map[string]struct{}{}
78+
79+
if len(splitBackends) > 1 {
80+
if err := cfg.MultiLevel.Validate(); err != nil {
81+
return err
82+
}
7583
}
84+
85+
for _, backend := range splitBackends {
86+
if !util.StringsContain(supportedChunkCacheBackends, backend) {
87+
return errUnsupportedChunkCacheBackend
88+
}
89+
90+
if _, ok := configuredBackends[backend]; ok {
91+
return errDuplicatedChunkCacheBackend
92+
}
93+
94+
switch backend {
95+
case CacheBackendMemcached:
96+
if err := cfg.Memcached.Validate(); err != nil {
97+
return err
98+
}
99+
case CacheBackendRedis:
100+
if err := cfg.Redis.Validate(); err != nil {
101+
return err
102+
}
103+
case CacheBackendInMemory:
104+
}
105+
106+
configuredBackends[backend] = struct{}{}
107+
}
108+
76109
return nil
77110
}
78111

@@ -86,16 +119,22 @@ type ChunksCacheConfig struct {
86119
}
87120

88121
func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
89-
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s, %s, and '' (disable).", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory))
122+
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The chunks cache backend type. Single or Multiple cache backend can be provided. "+
123+
"Supported values in single cache: %s, %s, %s, and '' (disable). "+
124+
"Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedChunkCacheBackends, ", ")))
90125

91126
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
92127
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
93128
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
129+
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")
94130

95131
f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
96132
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.")
97133
f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for chunks.")
98134
f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual chunks subranges.")
135+
136+
// In the multi level chunk cache, backfill TTL follows subrange TTL
137+
cfg.ChunkCacheBackend.MultiLevel.BackFillTTL = cfg.SubrangeTTL
99138
}
100139

101140
func (cfg *ChunksCacheConfig) Validate() error {
@@ -232,34 +271,41 @@ func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, l
232271
}
233272

234273
func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
235-
switch cacheBackend.Backend {
236-
case "":
274+
if cacheBackend.Backend == "" {
237275
// No caching.
238276
return nil, nil
239-
case CacheBackendInMemory:
240-
inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig())
241-
if err != nil {
242-
return nil, errors.Wrapf(err, "failed to create in-memory chunk cache")
243-
}
244-
return inMemoryCache, nil
245-
case CacheBackendMemcached:
246-
var client cacheutil.MemcachedClient
247-
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
248-
if err != nil {
249-
return nil, errors.Wrapf(err, "failed to create memcached client")
250-
}
251-
return cache.NewMemcachedCache(cacheName, logger, client, reg), nil
277+
}
252278

253-
case CacheBackendRedis:
254-
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg)
255-
if err != nil {
256-
return nil, errors.Wrapf(err, "failed to create redis client")
279+
splitBackends := strings.Split(cacheBackend.Backend, ",")
280+
var (
281+
caches []cache.Cache
282+
)
283+
284+
for _, backend := range splitBackends {
285+
switch backend {
286+
case CacheBackendInMemory:
287+
inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig())
288+
if err != nil {
289+
return nil, errors.Wrapf(err, "failed to create in-memory chunk cache")
290+
}
291+
caches = append(caches, inMemoryCache)
292+
case CacheBackendMemcached:
293+
var client cacheutil.MemcachedClient
294+
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
295+
if err != nil {
296+
return nil, errors.Wrapf(err, "failed to create memcached client")
297+
}
298+
caches = append(caches, cache.NewMemcachedCache(cacheName, logger, client, reg))
299+
case CacheBackendRedis:
300+
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg)
301+
if err != nil {
302+
return nil, errors.Wrapf(err, "failed to create redis client")
303+
}
304+
caches = append(caches, cache.NewRedisCache(cacheName, logger, redisCache, reg))
257305
}
258-
return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil
259-
260-
default:
261-
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend)
262306
}
307+
308+
return newMultiLevelChunkCache(cacheName, cacheBackend.MultiLevel, reg, caches...), nil
263309
}
264310

265311
type Matchers struct {

‎pkg/storage/tsdb/caching_bucket_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,76 @@ func Test_ChunkCacheBackendValidation(t *testing.T) {
4949
},
5050
expectedErr: errUnsupportedChunkCacheBackend,
5151
},
52+
"valid multi chunk cache type": {
53+
cfg: ChunkCacheBackend{
54+
Backend: fmt.Sprintf("%s,%s,%s", CacheBackendInMemory, CacheBackendMemcached, CacheBackendRedis),
55+
Memcached: MemcachedClientConfig{
56+
Addresses: "dns+localhost:11211",
57+
},
58+
Redis: RedisClientConfig{
59+
Addresses: "localhost:6379",
60+
},
61+
MultiLevel: MultiLevelChunkCacheConfig{
62+
MaxAsyncConcurrency: 1,
63+
MaxAsyncBufferSize: 1,
64+
MaxBackfillItems: 1,
65+
},
66+
},
67+
expectedErr: nil,
68+
},
69+
"duplicate multi chunk cache type": {
70+
cfg: ChunkCacheBackend{
71+
Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendInMemory),
72+
MultiLevel: MultiLevelChunkCacheConfig{
73+
MaxAsyncConcurrency: 1,
74+
MaxAsyncBufferSize: 1,
75+
MaxBackfillItems: 1,
76+
},
77+
},
78+
expectedErr: errDuplicatedChunkCacheBackend,
79+
},
80+
"invalid max async concurrency": {
81+
cfg: ChunkCacheBackend{
82+
Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached),
83+
Memcached: MemcachedClientConfig{
84+
Addresses: "dns+localhost:11211",
85+
},
86+
MultiLevel: MultiLevelChunkCacheConfig{
87+
MaxAsyncConcurrency: 0,
88+
MaxAsyncBufferSize: 1,
89+
MaxBackfillItems: 1,
90+
},
91+
},
92+
expectedErr: errInvalidMaxAsyncConcurrency,
93+
},
94+
"invalid max async buffer size": {
95+
cfg: ChunkCacheBackend{
96+
Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached),
97+
Memcached: MemcachedClientConfig{
98+
Addresses: "dns+localhost:11211",
99+
},
100+
MultiLevel: MultiLevelChunkCacheConfig{
101+
MaxAsyncConcurrency: 1,
102+
MaxAsyncBufferSize: 0,
103+
MaxBackfillItems: 1,
104+
},
105+
},
106+
expectedErr: errInvalidMaxAsyncBufferSize,
107+
},
108+
"invalid max back fill items": {
109+
cfg: ChunkCacheBackend{
110+
Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached),
111+
Memcached: MemcachedClientConfig{
112+
Addresses: "dns+localhost:11211",
113+
},
114+
MultiLevel: MultiLevelChunkCacheConfig{
115+
MaxAsyncConcurrency: 1,
116+
MaxAsyncBufferSize: 1,
117+
MaxBackfillItems: 0,
118+
},
119+
},
120+
expectedErr: errInvalidMaxBackfillItems,
121+
},
52122
}
53123

54124
for name, tc := range tests {
+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package tsdb
2+
3+
import (
4+
"context"
5+
"errors"
6+
"flag"
7+
"time"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
"github.com/thanos-io/thanos/pkg/cache"
12+
"github.com/thanos-io/thanos/pkg/cacheutil"
13+
)
14+
15+
type multiLevelChunkCache struct {
16+
name string
17+
caches []cache.Cache
18+
19+
backfillProcessor *cacheutil.AsyncOperationProcessor
20+
fetchLatency *prometheus.HistogramVec
21+
backFillLatency *prometheus.HistogramVec
22+
storeDroppedItems prometheus.Counter
23+
backfillDroppedItems prometheus.Counter
24+
maxBackfillItems int
25+
backfillTTL time.Duration
26+
}
27+
28+
type MultiLevelChunkCacheConfig struct {
29+
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`
30+
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`
31+
MaxBackfillItems int `yaml:"max_backfill_items"`
32+
33+
BackFillTTL time.Duration `yaml:"-"`
34+
}
35+
36+
func (cfg *MultiLevelChunkCacheConfig) Validate() error {
37+
if cfg.MaxAsyncBufferSize <= 0 {
38+
return errInvalidMaxAsyncBufferSize
39+
}
40+
if cfg.MaxAsyncConcurrency <= 0 {
41+
return errInvalidMaxAsyncConcurrency
42+
}
43+
if cfg.MaxBackfillItems <= 0 {
44+
return errInvalidMaxBackfillItems
45+
}
46+
return nil
47+
}
48+
49+
func (cfg *MultiLevelChunkCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
50+
f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 3, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.")
51+
f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.")
52+
f.IntVar(&cfg.MaxBackfillItems, prefix+"max-backfill-items", 10000, "The maximum number of items to backfill per asynchronous operation.")
53+
}
54+
55+
func newMultiLevelChunkCache(name string, cfg MultiLevelChunkCacheConfig, reg prometheus.Registerer, c ...cache.Cache) cache.Cache {
56+
if len(c) == 1 {
57+
return c[0]
58+
}
59+
60+
return &multiLevelChunkCache{
61+
name: name,
62+
caches: c,
63+
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
64+
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
65+
Name: "cortex_store_multilevel_chunks_cache_fetch_duration_seconds",
66+
Help: "Histogram to track latency to fetch items from multi level chunk cache",
67+
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
68+
}, nil),
69+
backFillLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
70+
Name: "cortex_store_multilevel_chunks_cache_backfill_duration_seconds",
71+
Help: "Histogram to track latency to backfill items from multi level chunk cache",
72+
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
73+
}, nil),
74+
storeDroppedItems: promauto.With(reg).NewCounter(prometheus.CounterOpts{
75+
Name: "cortex_store_multilevel_chunks_cache_backfill_dropped_items_total",
76+
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
77+
}),
78+
backfillDroppedItems: promauto.With(reg).NewCounter(prometheus.CounterOpts{
79+
Name: "cortex_store_multilevel_chunks_cache_store_dropped_items_total",
80+
Help: "Total number of items dropped due to async buffer full when storing multilevel cache ",
81+
}),
82+
maxBackfillItems: cfg.MaxBackfillItems,
83+
backfillTTL: cfg.BackFillTTL,
84+
}
85+
}
86+
87+
func (m *multiLevelChunkCache) Store(data map[string][]byte, ttl time.Duration) {
88+
for _, c := range m.caches {
89+
if err := m.backfillProcessor.EnqueueAsync(func() {
90+
c.Store(data, ttl)
91+
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
92+
m.storeDroppedItems.Inc()
93+
}
94+
}
95+
}
96+
97+
func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
98+
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues())
99+
defer timer.ObserveDuration()
100+
101+
hits := map[string][]byte{}
102+
backfillItems := make([]map[string][]byte, len(m.caches)-1)
103+
104+
for i, c := range m.caches {
105+
if i < len(m.caches)-1 {
106+
backfillItems[i] = map[string][]byte{}
107+
}
108+
if ctx.Err() != nil {
109+
return nil
110+
}
111+
if data := c.Fetch(ctx, keys); len(data) > 0 {
112+
for k, d := range data {
113+
hits[k] = d
114+
}
115+
116+
if i > 0 && len(hits) > 0 {
117+
backfillItems[i-1] = hits
118+
}
119+
120+
if len(hits) == len(keys) {
121+
// fetch done
122+
break
123+
}
124+
}
125+
}
126+
127+
defer func() {
128+
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues())
129+
defer backFillTimer.ObserveDuration()
130+
131+
for i, values := range backfillItems {
132+
if len(values) == 0 {
133+
continue
134+
}
135+
136+
if err := m.backfillProcessor.EnqueueAsync(func() {
137+
m.caches[i].Store(values, m.backfillTTL)
138+
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
139+
m.backfillDroppedItems.Inc()
140+
}
141+
}
142+
}()
143+
144+
return hits
145+
}
146+
147+
func (m *multiLevelChunkCache) Name() string {
148+
return m.name
149+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package tsdb
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func Test_MultiLevelChunkCacheStore(t *testing.T) {
14+
ttl := time.Hour * 24
15+
cfg := MultiLevelChunkCacheConfig{
16+
MaxAsyncConcurrency: 10,
17+
MaxAsyncBufferSize: 100000,
18+
MaxBackfillItems: 10000,
19+
BackFillTTL: ttl,
20+
}
21+
22+
data := map[string][]byte{
23+
"key1": []byte("value1"),
24+
"key2": []byte("value2"),
25+
"key3": []byte("value3"),
26+
}
27+
28+
testCases := map[string]struct {
29+
m1InitData map[string][]byte
30+
m2InitData map[string][]byte
31+
expectedM1Data map[string][]byte
32+
expectedM2Data map[string][]byte
33+
storeData map[string][]byte
34+
}{
35+
"should stored data to both caches": {
36+
m1InitData: nil,
37+
m2InitData: nil,
38+
expectedM1Data: data,
39+
expectedM2Data: data,
40+
storeData: data,
41+
},
42+
"should stored data to m1 cache": {
43+
m1InitData: nil,
44+
m2InitData: data,
45+
expectedM1Data: data,
46+
expectedM2Data: data,
47+
storeData: data,
48+
},
49+
"should stored data to m2 cache": {
50+
m1InitData: data,
51+
m2InitData: nil,
52+
expectedM1Data: data,
53+
expectedM2Data: data,
54+
storeData: data,
55+
},
56+
}
57+
for name, tc := range testCases {
58+
t.Run(name, func(t *testing.T) {
59+
m1 := newMockChunkCache("m1", tc.m1InitData)
60+
m2 := newMockChunkCache("m2", tc.m2InitData)
61+
reg := prometheus.NewRegistry()
62+
c := newMultiLevelChunkCache("chunk-cache", cfg, reg, m1, m2)
63+
c.Store(tc.storeData, ttl)
64+
65+
mlc := c.(*multiLevelChunkCache)
66+
// Wait until async operation finishes.
67+
mlc.backfillProcessor.Stop()
68+
69+
require.Equal(t, tc.expectedM1Data, m1.data)
70+
require.Equal(t, tc.expectedM2Data, m2.data)
71+
})
72+
}
73+
}
74+
75+
func Test_MultiLevelChunkCacheFetch(t *testing.T) {
76+
cfg := MultiLevelChunkCacheConfig{
77+
MaxAsyncConcurrency: 10,
78+
MaxAsyncBufferSize: 100000,
79+
MaxBackfillItems: 10000,
80+
BackFillTTL: time.Hour * 24,
81+
}
82+
83+
testCases := map[string]struct {
84+
m1ExistingData map[string][]byte
85+
m2ExistingData map[string][]byte
86+
expectedM1Data map[string][]byte
87+
expectedM2Data map[string][]byte
88+
expectedFetchedData map[string][]byte
89+
fetchKeys []string
90+
}{
91+
"fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": {
92+
m1ExistingData: map[string][]byte{
93+
"key1": []byte("value1"),
94+
},
95+
m2ExistingData: map[string][]byte{
96+
"key2": []byte("value2"),
97+
"key3": []byte("value3"),
98+
},
99+
expectedM1Data: map[string][]byte{
100+
"key1": []byte("value1"),
101+
"key2": []byte("value2"),
102+
"key3": []byte("value3"),
103+
},
104+
expectedM2Data: map[string][]byte{
105+
"key2": []byte("value2"),
106+
"key3": []byte("value3"),
107+
},
108+
expectedFetchedData: map[string][]byte{
109+
"key1": []byte("value1"),
110+
"key2": []byte("value2"),
111+
"key3": []byte("value3"),
112+
},
113+
fetchKeys: []string{"key1", "key2", "key3"},
114+
},
115+
"should be not fetched data that do not exist in both caches": {
116+
m1ExistingData: map[string][]byte{
117+
"key1": []byte("value1"),
118+
},
119+
m2ExistingData: map[string][]byte{
120+
"key2": []byte("value2"),
121+
},
122+
expectedM1Data: map[string][]byte{
123+
"key1": []byte("value1"),
124+
"key2": []byte("value2"),
125+
},
126+
expectedM2Data: map[string][]byte{
127+
"key2": []byte("value2"),
128+
},
129+
expectedFetchedData: map[string][]byte{
130+
"key1": []byte("value1"),
131+
"key2": []byte("value2"),
132+
},
133+
fetchKeys: []string{"key1", "key2", "key3"},
134+
},
135+
}
136+
137+
for name, tc := range testCases {
138+
t.Run(name, func(t *testing.T) {
139+
m1 := newMockChunkCache("m1", tc.m1ExistingData)
140+
m2 := newMockChunkCache("m2", tc.m2ExistingData)
141+
reg := prometheus.NewRegistry()
142+
c := newMultiLevelChunkCache("chunk-cache", cfg, reg, m1, m2)
143+
fetchData := c.Fetch(context.Background(), tc.fetchKeys)
144+
145+
mlc := c.(*multiLevelChunkCache)
146+
// Wait until async operation finishes.
147+
mlc.backfillProcessor.Stop()
148+
149+
require.Equal(t, tc.expectedM1Data, m1.data)
150+
require.Equal(t, tc.expectedM2Data, m2.data)
151+
require.Equal(t, tc.expectedFetchedData, fetchData)
152+
})
153+
}
154+
}
155+
156+
type mockChunkCache struct {
157+
mu sync.Mutex
158+
name string
159+
data map[string][]byte
160+
}
161+
162+
func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache {
163+
if data == nil {
164+
data = make(map[string][]byte)
165+
}
166+
167+
return &mockChunkCache{
168+
name: name,
169+
data: data,
170+
}
171+
}
172+
173+
func (m *mockChunkCache) Store(data map[string][]byte, _ time.Duration) {
174+
m.data = data
175+
}
176+
177+
func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]byte {
178+
m.mu.Lock()
179+
defer m.mu.Unlock()
180+
h := map[string][]byte{}
181+
182+
for _, k := range keys {
183+
if _, ok := m.data[k]; ok {
184+
h[k] = m.data[k]
185+
}
186+
}
187+
188+
return h
189+
}
190+
191+
func (m *mockChunkCache) Name() string {
192+
return m.name
193+
}

0 commit comments

Comments
 (0)
Please sign in to comment.