Skip to content

Commit 47a74e2

Browse files
authored
Add dynamic interval cache splitter (#6592)
* add dynamic interval cache splitter Signed-off-by: Ahmed Hassan <[email protected]> * use split interval from stats to generate cache key Signed-off-by: Ahmed Hassan <[email protected]> * rerun tests Signed-off-by: Ahmed Hassan <[email protected]> --------- Signed-off-by: Ahmed Hassan <[email protected]>
1 parent 64a3b83 commit 47a74e2

File tree

3 files changed

+35
-18
lines changed

3 files changed

+35
-18
lines changed

pkg/querier/tripperware/queryrange/query_range_middlewares.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ func Middlewares(
124124
}
125125
return false
126126
}
127-
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer)
127+
128+
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, splitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer)
128129
if err != nil {
129130
return nil, nil, err
130131
}

pkg/querier/tripperware/queryrange/results_cache.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cortexproject/cortex/pkg/cortexpb"
3030
"github.com/cortexproject/cortex/pkg/querier"
3131
"github.com/cortexproject/cortex/pkg/querier/partialdata"
32+
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
3233
"github.com/cortexproject/cortex/pkg/querier/tripperware"
3334
"github.com/cortexproject/cortex/pkg/tenant"
3435
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -141,15 +142,21 @@ func (PrometheusResponseExtractor) ResponseWithoutStats(resp tripperware.Respons
141142
// CacheSplitter generates cache keys. This is a useful interface for downstream
142143
// consumers who wish to implement their own strategies.
143144
type CacheSplitter interface {
144-
GenerateCacheKey(userID string, r tripperware.Request) string
145+
GenerateCacheKey(ctx context.Context, userID string, r tripperware.Request) string
145146
}
146147

147-
// constSplitter is a utility for using a constant split interval when determining cache keys
148-
type constSplitter time.Duration
148+
// splitter is a utility for using split interval when determining cache keys
149+
type splitter time.Duration
149150

150151
// GenerateCacheKey generates a cache key based on the userID, Request and interval.
151-
func (t constSplitter) GenerateCacheKey(userID string, r tripperware.Request) string {
152-
currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond)
152+
func (t splitter) GenerateCacheKey(ctx context.Context, userID string, r tripperware.Request) string {
153+
stats := querier_stats.FromContext(ctx)
154+
interval := stats.LoadSplitInterval()
155+
if interval == 0 {
156+
interval = time.Duration(t)
157+
}
158+
159+
currentInterval := r.GetStart() / int64(interval/time.Millisecond)
153160
return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
154161
}
155162

@@ -232,8 +239,12 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar
232239
return s.next.Do(ctx, r)
233240
}
234241

242+
key := s.splitter.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r)
243+
if err != nil {
244+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
245+
}
246+
235247
var (
236-
key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r)
237248
extents []tripperware.Extent
238249
response tripperware.Response
239250
)

pkg/querier/tripperware/queryrange/results_cache_test.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func TestStatsCacheQuerySamples(t *testing.T) {
308308
rcm, _, err := NewResultsCacheMiddleware(
309309
log.NewNopLogger(),
310310
cfg,
311-
constSplitter(day),
311+
splitter(day),
312312
mockLimits{},
313313
PrometheusCodec,
314314
PrometheusResponseExtractor{},
@@ -1258,7 +1258,7 @@ func TestResultsCache(t *testing.T) {
12581258
rcm, _, err := NewResultsCacheMiddleware(
12591259
log.NewNopLogger(),
12601260
cfg,
1261-
constSplitter(day),
1261+
splitter(day),
12621262
mockLimits{},
12631263
PrometheusCodec,
12641264
PrometheusResponseExtractor{},
@@ -1299,7 +1299,7 @@ func TestResultsCacheRecent(t *testing.T) {
12991299
rcm, _, err := NewResultsCacheMiddleware(
13001300
log.NewNopLogger(),
13011301
cfg,
1302-
constSplitter(day),
1302+
splitter(day),
13031303
mockLimits{maxCacheFreshness: 10 * time.Minute},
13041304
PrometheusCodec,
13051305
PrometheusResponseExtractor{},
@@ -1364,7 +1364,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
13641364
rcm, _, err := NewResultsCacheMiddleware(
13651365
log.NewNopLogger(),
13661366
cfg,
1367-
constSplitter(day),
1367+
splitter(day),
13681368
fakeLimits,
13691369
PrometheusCodec,
13701370
PrometheusResponseExtractor{},
@@ -1381,7 +1381,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
13811381
req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3))
13821382

13831383
// fill cache
1384-
key := constSplitter(day).GenerateCacheKey("1", req)
1384+
key := splitter(day).GenerateCacheKey(ctx, "1", req)
13851385
rc.(*resultsCache).put(ctx, key, []tripperware.Extent{mkExtent(int64(modelNow)-(600*1e3), int64(modelNow))})
13861386

13871387
resp, err := rc.Do(ctx, req)
@@ -1401,7 +1401,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
14011401
rm, _, err := NewResultsCacheMiddleware(
14021402
log.NewNopLogger(),
14031403
cfg,
1404-
constSplitter(day),
1404+
splitter(day),
14051405
mockLimits{},
14061406
PrometheusCodec,
14071407
PrometheusResponseExtractor{},
@@ -1438,7 +1438,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
14381438
require.False(t, hit)
14391439
}
14401440

1441-
func TestConstSplitter_generateCacheKey(t *testing.T) {
1441+
func TestSplitter_generateCacheKey(t *testing.T) {
14421442
t.Parallel()
14431443

14441444
tests := []struct {
@@ -1460,7 +1460,10 @@ func TestConstSplitter_generateCacheKey(t *testing.T) {
14601460
tt := tt
14611461
t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) {
14621462
t.Parallel()
1463-
if got := constSplitter(tt.interval).GenerateCacheKey("fake", tt.r); got != tt.want {
1463+
ctx := user.InjectOrgID(context.Background(), "1")
1464+
got := splitter(tt.interval).GenerateCacheKey(ctx, "fake", tt.r)
1465+
1466+
if got != tt.want {
14641467
t.Errorf("generateKey() = %v, want %v", got, tt.want)
14651468
}
14661469
})
@@ -1513,7 +1516,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
15131516
rcm, _, err := NewResultsCacheMiddleware(
15141517
log.NewNopLogger(),
15151518
cfg,
1516-
constSplitter(day),
1519+
splitter(day),
15171520
mockLimits{maxCacheFreshness: 10 * time.Minute},
15181521
PrometheusCodec,
15191522
PrometheusResponseExtractor{},
@@ -1545,7 +1548,7 @@ func TestResultsCacheFillCompatibility(t *testing.T) {
15451548
rcm, _, err := NewResultsCacheMiddleware(
15461549
log.NewNopLogger(),
15471550
cfg,
1548-
constSplitter(day),
1551+
splitter(day),
15491552
mockLimits{maxCacheFreshness: 10 * time.Minute},
15501553
PrometheusCodec,
15511554
PrometheusResponseExtractor{},
@@ -1563,7 +1566,9 @@ func TestResultsCacheFillCompatibility(t *testing.T) {
15631566
// Check cache and make sure we write response in old format even though the response is new format.
15641567
tenantIDs, err := tenant.TenantIDs(ctx)
15651568
require.NoError(t, err)
1566-
cacheKey := cache.HashKey(constSplitter(day).GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), parsedRequest))
1569+
key := splitter(day).GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), parsedRequest)
1570+
1571+
cacheKey := cache.HashKey(key)
15671572
found, bufs, _ := c.Fetch(ctx, []string{cacheKey})
15681573
require.Equal(t, []string{cacheKey}, found)
15691574
require.Len(t, bufs, 1)

0 commit comments

Comments
 (0)