Skip to content

Commit 3267515

Browse files
authored
Cache Expended Posting on ingesters (#6296)
* Implementing Expanded Postings Cache Signed-off-by: alanprot <[email protected]> * small nit Signed-off-by: alanprot <[email protected]> * refactoring the cache so we dont need to call expire on every request Signed-off-by: alanprot <[email protected]> * Update total cache size when updating the item Signed-off-by: alanprot <[email protected]> * Fix fuzzy test after change the flag name Signed-off-by: alanprot <[email protected]> * remove max item config + create a new test case with only head cache enabled Signed-off-by: alanprot <[email protected]> * Documenting enabled as first field on the config Signed-off-by: alanprot <[email protected]> * Fix race on chunks multilevel cache + Optimize to avoid refetching already found keys. (#6312) * Creating a test to show the race on the multilevel cache Signed-off-by: alanprot <[email protected]> * fix the race problem * Only fetch keys that were not found on the previous cache Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]> * Improve Doc Signed-off-by: alanprot <[email protected]> * create new cortex_ingester_expanded_postings_non_cacheable_queries metric Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent 7548068 commit 3267515

13 files changed

+1302
-9
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
2727
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257
2828
* [ENHANCEMENT] Upgrade build image and Go version to 1.23.2. #6261 #6262
29+
* [ENHANCEMENT] Ingester: Introduce a new experimental feature for caching expanded postings on the ingester. #6296
2930
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
3031
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
3132
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224

Diff for: docs/blocks-storage/querier.md

+34
Original file line numberDiff line numberDiff line change
@@ -1544,4 +1544,38 @@ blocks_storage:
15441544
# [EXPERIMENTAL] True to enable native histogram.
15451545
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
15461546
[enable_native_histograms: <boolean> | default = false]
1547+
1548+
# [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when
1549+
# querying blocks. Caching can be configured separately for the head and
1550+
# compacted blocks.
1551+
expanded_postings_cache:
1552+
# If enabled, ingesters will cache expanded postings for the head block.
1553+
# Only queries with with an equal matcher for metric __name__ are cached.
1554+
head:
1555+
# Whether the postings cache is enabled or not
1556+
# CLI flag: -blocks-storage.expanded_postings_cache.head.enabled
1557+
[enabled: <boolean> | default = false]
1558+
1559+
# Max bytes for postings cache
1560+
# CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes
1561+
[max_bytes: <int> | default = 10485760]
1562+
1563+
# TTL for postings cache
1564+
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
1565+
[ttl: <duration> | default = 10m]
1566+
1567+
# If enabled, ingesters will cache expanded postings for the compacted
1568+
# blocks. The cache is shared between all blocks.
1569+
blocks:
1570+
# Whether the postings cache is enabled or not
1571+
# CLI flag: -blocks-storage.expanded_postings_cache.block.enabled
1572+
[enabled: <boolean> | default = false]
1573+
1574+
# Max bytes for postings cache
1575+
# CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes
1576+
[max_bytes: <int> | default = 10485760]
1577+
1578+
# TTL for postings cache
1579+
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
1580+
[ttl: <duration> | default = 10m]
15471581
```

Diff for: docs/blocks-storage/store-gateway.md

+34
Original file line numberDiff line numberDiff line change
@@ -1635,4 +1635,38 @@ blocks_storage:
16351635
# [EXPERIMENTAL] True to enable native histogram.
16361636
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
16371637
[enable_native_histograms: <boolean> | default = false]
1638+
1639+
# [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when
1640+
# querying blocks. Caching can be configured separately for the head and
1641+
# compacted blocks.
1642+
expanded_postings_cache:
1643+
# If enabled, ingesters will cache expanded postings for the head block.
1644+
# Only queries with with an equal matcher for metric __name__ are cached.
1645+
head:
1646+
# Whether the postings cache is enabled or not
1647+
# CLI flag: -blocks-storage.expanded_postings_cache.head.enabled
1648+
[enabled: <boolean> | default = false]
1649+
1650+
# Max bytes for postings cache
1651+
# CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes
1652+
[max_bytes: <int> | default = 10485760]
1653+
1654+
# TTL for postings cache
1655+
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
1656+
[ttl: <duration> | default = 10m]
1657+
1658+
# If enabled, ingesters will cache expanded postings for the compacted
1659+
# blocks. The cache is shared between all blocks.
1660+
blocks:
1661+
# Whether the postings cache is enabled or not
1662+
# CLI flag: -blocks-storage.expanded_postings_cache.block.enabled
1663+
[enabled: <boolean> | default = false]
1664+
1665+
# Max bytes for postings cache
1666+
# CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes
1667+
[max_bytes: <int> | default = 10485760]
1668+
1669+
# TTL for postings cache
1670+
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
1671+
[ttl: <duration> | default = 10m]
16381672
```

Diff for: docs/configuration/config-file-reference.md

+34
Original file line numberDiff line numberDiff line change
@@ -2081,6 +2081,40 @@ tsdb:
20812081
# [EXPERIMENTAL] True to enable native histogram.
20822082
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
20832083
[enable_native_histograms: <boolean> | default = false]
2084+
2085+
# [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when
2086+
# querying blocks. Caching can be configured separately for the head and
2087+
# compacted blocks.
2088+
expanded_postings_cache:
2089+
# If enabled, ingesters will cache expanded postings for the head block.
2090+
# Only queries with with an equal matcher for metric __name__ are cached.
2091+
head:
2092+
# Whether the postings cache is enabled or not
2093+
# CLI flag: -blocks-storage.expanded_postings_cache.head.enabled
2094+
[enabled: <boolean> | default = false]
2095+
2096+
# Max bytes for postings cache
2097+
# CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes
2098+
[max_bytes: <int> | default = 10485760]
2099+
2100+
# TTL for postings cache
2101+
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
2102+
[ttl: <duration> | default = 10m]
2103+
2104+
# If enabled, ingesters will cache expanded postings for the compacted
2105+
# blocks. The cache is shared between all blocks.
2106+
blocks:
2107+
# Whether the postings cache is enabled or not
2108+
# CLI flag: -blocks-storage.expanded_postings_cache.block.enabled
2109+
[enabled: <boolean> | default = false]
2110+
2111+
# Max bytes for postings cache
2112+
# CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes
2113+
[max_bytes: <int> | default = 10485760]
2114+
2115+
# TTL for postings cache
2116+
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
2117+
[ttl: <duration> | default = 10m]
20842118
```
20852119
20862120
### `compactor_config`

Diff for: integration/query_fuzz_test.go

+197
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,203 @@ func TestDisableChunkTrimmingFuzz(t *testing.T) {
205205
}
206206
}
207207

208+
func TestExpandedPostingsCacheFuzz(t *testing.T) {
209+
stableCortexImage := "quay.io/cortexproject/cortex:v1.18.0"
210+
s, err := e2e.NewScenario(networkName)
211+
require.NoError(t, err)
212+
defer s.Close()
213+
214+
// Start dependencies.
215+
consul1 := e2edb.NewConsulWithName("consul1")
216+
consul2 := e2edb.NewConsulWithName("consul2")
217+
require.NoError(t, s.StartAndWaitReady(consul1, consul2))
218+
219+
flags1 := mergeFlags(
220+
AlertmanagerLocalFlags(),
221+
map[string]string{
222+
"-store.engine": blocksStorageEngine,
223+
"-blocks-storage.backend": "filesystem",
224+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
225+
"-blocks-storage.tsdb.block-ranges-period": "2h",
226+
"-blocks-storage.tsdb.ship-interval": "1h",
227+
"-blocks-storage.bucket-store.sync-interval": "15m",
228+
"-blocks-storage.tsdb.retention-period": "2h",
229+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
230+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
231+
"-querier.query-store-for-labels-enabled": "true",
232+
// Ingester.
233+
"-ring.store": "consul",
234+
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
235+
// Distributor.
236+
"-distributor.replication-factor": "1",
237+
// Store-gateway.
238+
"-store-gateway.sharding-enabled": "false",
239+
// alert manager
240+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
241+
},
242+
)
243+
flags2 := mergeFlags(
244+
AlertmanagerLocalFlags(),
245+
map[string]string{
246+
"-store.engine": blocksStorageEngine,
247+
"-blocks-storage.backend": "filesystem",
248+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
249+
"-blocks-storage.tsdb.block-ranges-period": "2h",
250+
"-blocks-storage.tsdb.ship-interval": "1h",
251+
"-blocks-storage.bucket-store.sync-interval": "15m",
252+
"-blocks-storage.tsdb.retention-period": "2h",
253+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
254+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
255+
"-querier.query-store-for-labels-enabled": "true",
256+
"-blocks-storage.expanded_postings_cache.head.enabled": "true",
257+
"-blocks-storage.expanded_postings_cache.block.enabled": "true",
258+
// Ingester.
259+
"-ring.store": "consul",
260+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
261+
// Distributor.
262+
"-distributor.replication-factor": "1",
263+
// Store-gateway.
264+
"-store-gateway.sharding-enabled": "false",
265+
// alert manager
266+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
267+
},
268+
)
269+
// make alert manager config dir
270+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
271+
272+
path1 := path.Join(s.SharedDir(), "cortex-1")
273+
path2 := path.Join(s.SharedDir(), "cortex-2")
274+
275+
flags1 = mergeFlags(flags1, map[string]string{"-blocks-storage.filesystem.dir": path1})
276+
flags2 = mergeFlags(flags2, map[string]string{"-blocks-storage.filesystem.dir": path2})
277+
// Start Cortex replicas.
278+
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, stableCortexImage)
279+
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "")
280+
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))
281+
282+
// Wait until Cortex replicas have updated the ring state.
283+
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
284+
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
285+
286+
var clients []*e2ecortex.Client
287+
c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1")
288+
require.NoError(t, err)
289+
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
290+
require.NoError(t, err)
291+
292+
clients = append(clients, c1, c2)
293+
294+
now := time.Now()
295+
// Push some series to Cortex.
296+
start := now.Add(-24 * time.Hour)
297+
scrapeInterval := 30 * time.Second
298+
299+
numSeries := 10
300+
numberOfLabelsPerSeries := 5
301+
numSamples := 10
302+
ss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries)
303+
lbls := make([]labels.Labels, numSeries*numberOfLabelsPerSeries)
304+
305+
for i := 0; i < numSeries; i++ {
306+
for j := 0; j < numberOfLabelsPerSeries; j++ {
307+
series := e2e.GenerateSeriesWithSamples(
308+
fmt.Sprintf("test_series_%d", i),
309+
start,
310+
scrapeInterval,
311+
i*numSamples,
312+
numSamples,
313+
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
314+
)
315+
ss[i*numberOfLabelsPerSeries+j] = series
316+
317+
builder := labels.NewBuilder(labels.EmptyLabels())
318+
for _, lbl := range series.Labels {
319+
builder.Set(lbl.Name, lbl.Value)
320+
}
321+
lbls[i*numberOfLabelsPerSeries+j] = builder.Labels()
322+
}
323+
}
324+
325+
rnd := rand.New(rand.NewSource(now.Unix()))
326+
opts := []promqlsmith.Option{
327+
promqlsmith.WithEnableOffset(true),
328+
promqlsmith.WithEnableAtModifier(true),
329+
}
330+
ps := promqlsmith.New(rnd, lbls, opts...)
331+
332+
// Create the queries with the original labels
333+
testRun := 100
334+
queries := make([]string, testRun)
335+
for i := 0; i < testRun; i++ {
336+
expr := ps.WalkRangeQuery()
337+
queries[i] = expr.Pretty(0)
338+
}
339+
340+
// Lets run multiples iterations and create new series every iteration
341+
for k := 0; k < 5; k++ {
342+
343+
nss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries)
344+
for i := 0; i < numSeries; i++ {
345+
for j := 0; j < numberOfLabelsPerSeries; j++ {
346+
nss[i*numberOfLabelsPerSeries+j] = e2e.GenerateSeriesWithSamples(
347+
fmt.Sprintf("test_series_%d", i),
348+
start.Add(scrapeInterval*time.Duration(numSamples*j)),
349+
scrapeInterval,
350+
i*numSamples,
351+
numSamples,
352+
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
353+
prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)},
354+
)
355+
}
356+
}
357+
358+
for _, client := range clients {
359+
res, err := client.Push(nss)
360+
require.NoError(t, err)
361+
require.Equal(t, 200, res.StatusCode)
362+
}
363+
364+
type testCase struct {
365+
query string
366+
res1, res2 model.Value
367+
err1, err2 error
368+
}
369+
370+
queryStart := time.Now().Add(-time.Hour * 24)
371+
queryEnd := time.Now()
372+
cases := make([]*testCase, 0, 200)
373+
374+
for _, query := range queries {
375+
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
376+
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
377+
cases = append(cases, &testCase{
378+
query: query,
379+
res1: res1,
380+
res2: res2,
381+
err1: err1,
382+
err2: err2,
383+
})
384+
}
385+
386+
failures := 0
387+
for i, tc := range cases {
388+
qt := "range query"
389+
if tc.err1 != nil || tc.err2 != nil {
390+
if !cmp.Equal(tc.err1, tc.err2) {
391+
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
392+
failures++
393+
}
394+
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
395+
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
396+
failures++
397+
}
398+
}
399+
if failures > 0 {
400+
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
401+
}
402+
}
403+
}
404+
208405
func TestVerticalShardingFuzz(t *testing.T) {
209406
s, err := e2e.NewScenario(networkName)
210407
require.NoError(t, err)

0 commit comments

Comments
 (0)