Skip to content

Commit 275a5bf

Browse files
authored
Disable chunk trimming in ingester (#6270)
1 parent 3596ce6 commit 275a5bf

File tree

3 files changed

+166
-2
lines changed

3 files changed

+166
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1212
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
1313
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228
14+
* [ENHANCEMENT] Ingester: Disable chunk trimming. #6270
1415
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1516
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1617
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

integration/query_fuzz_test.go

+159-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55

66
import (
77
"context"
8+
"fmt"
89
"math/rand"
910
"os"
1011
"path"
@@ -35,6 +36,164 @@ import (
3536
"github.com/cortexproject/cortex/pkg/util/log"
3637
)
3738

39+
func TestDisableChunkTrimmingFuzz(t *testing.T) {
40+
noneChunkTrimmingImage := "quay.io/cortexproject/cortex:v1.18.0"
41+
s, err := e2e.NewScenario(networkName)
42+
require.NoError(t, err)
43+
defer s.Close()
44+
45+
// Start dependencies.
46+
consul1 := e2edb.NewConsulWithName("consul1")
47+
consul2 := e2edb.NewConsulWithName("consul2")
48+
require.NoError(t, s.StartAndWaitReady(consul1, consul2))
49+
50+
flags1 := mergeFlags(
51+
AlertmanagerLocalFlags(),
52+
map[string]string{
53+
"-store.engine": blocksStorageEngine,
54+
"-blocks-storage.backend": "filesystem",
55+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
56+
"-blocks-storage.tsdb.block-ranges-period": "2h",
57+
"-blocks-storage.tsdb.ship-interval": "1h",
58+
"-blocks-storage.bucket-store.sync-interval": "15m",
59+
"-blocks-storage.tsdb.retention-period": "2h",
60+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
61+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
62+
"-querier.query-store-for-labels-enabled": "true",
63+
// Ingester.
64+
"-ring.store": "consul",
65+
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
66+
// Distributor.
67+
"-distributor.replication-factor": "1",
68+
// Store-gateway.
69+
"-store-gateway.sharding-enabled": "false",
70+
// alert manager
71+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
72+
},
73+
)
74+
flags2 := mergeFlags(
75+
AlertmanagerLocalFlags(),
76+
map[string]string{
77+
"-store.engine": blocksStorageEngine,
78+
"-blocks-storage.backend": "filesystem",
79+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
80+
"-blocks-storage.tsdb.block-ranges-period": "2h",
81+
"-blocks-storage.tsdb.ship-interval": "1h",
82+
"-blocks-storage.bucket-store.sync-interval": "15m",
83+
"-blocks-storage.tsdb.retention-period": "2h",
84+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
85+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
86+
"-querier.query-store-for-labels-enabled": "true",
87+
// Ingester.
88+
"-ring.store": "consul",
89+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
90+
// Distributor.
91+
"-distributor.replication-factor": "1",
92+
// Store-gateway.
93+
"-store-gateway.sharding-enabled": "false",
94+
// alert manager
95+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
96+
},
97+
)
98+
// make alert manager config dir
99+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
100+
101+
path1 := path.Join(s.SharedDir(), "cortex-1")
102+
path2 := path.Join(s.SharedDir(), "cortex-2")
103+
104+
flags1 = mergeFlags(flags1, map[string]string{"-blocks-storage.filesystem.dir": path1})
105+
flags2 = mergeFlags(flags2, map[string]string{"-blocks-storage.filesystem.dir": path2})
106+
// Start Cortex replicas.
107+
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, "")
108+
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, noneChunkTrimmingImage)
109+
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))
110+
111+
// Wait until Cortex replicas have updated the ring state.
112+
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
113+
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
114+
115+
c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1")
116+
require.NoError(t, err)
117+
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
118+
require.NoError(t, err)
119+
120+
now := time.Now()
121+
// Push some series to Cortex.
122+
start := now.Add(-time.Minute * 120)
123+
scrapeInterval := 30 * time.Second
124+
125+
numSeries := 10
126+
numSamples := 240
127+
serieses := make([]prompb.TimeSeries, numSeries)
128+
lbls := make([]labels.Labels, numSeries)
129+
for i := 0; i < numSeries; i++ {
130+
series := e2e.GenerateSeriesWithSamples(fmt.Sprintf("test_series_%d", i), start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "foo", Value: "bar"})
131+
serieses[i] = series
132+
133+
builder := labels.NewBuilder(labels.EmptyLabels())
134+
for _, lbl := range series.Labels {
135+
builder.Set(lbl.Name, lbl.Value)
136+
}
137+
lbls[i] = builder.Labels()
138+
}
139+
140+
res, err := c1.Push(serieses)
141+
require.NoError(t, err)
142+
require.Equal(t, 200, res.StatusCode)
143+
144+
res, err = c2.Push(serieses)
145+
require.NoError(t, err)
146+
require.Equal(t, 200, res.StatusCode)
147+
148+
rnd := rand.New(rand.NewSource(now.Unix()))
149+
opts := []promqlsmith.Option{
150+
promqlsmith.WithEnableOffset(true),
151+
promqlsmith.WithEnableAtModifier(true),
152+
}
153+
ps := promqlsmith.New(rnd, lbls, opts...)
154+
155+
type testCase struct {
156+
query string
157+
res1, res2 model.Value
158+
err1, err2 error
159+
}
160+
161+
queryStart := time.Now().Add(-time.Minute * 40)
162+
queryEnd := time.Now().Add(-time.Minute * 20)
163+
cases := make([]*testCase, 0, 200)
164+
testRun := 500
165+
for i := 0; i < testRun; i++ {
166+
expr := ps.WalkRangeQuery()
167+
query := expr.Pretty(0)
168+
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
169+
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
170+
cases = append(cases, &testCase{
171+
query: query,
172+
res1: res1,
173+
res2: res2,
174+
err1: err1,
175+
err2: err2,
176+
})
177+
}
178+
179+
failures := 0
180+
for i, tc := range cases {
181+
qt := "range query"
182+
if tc.err1 != nil || tc.err2 != nil {
183+
if !cmp.Equal(tc.err1, tc.err2) {
184+
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
185+
failures++
186+
}
187+
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
188+
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
189+
failures++
190+
}
191+
}
192+
if failures > 0 {
193+
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
194+
}
195+
}
196+
38197
func TestVerticalShardingFuzz(t *testing.T) {
39198
s, err := e2e.NewScenario(networkName)
40199
require.NoError(t, err)
@@ -159,7 +318,6 @@ func TestVerticalShardingFuzz(t *testing.T) {
159318
instantQuery bool
160319
}
161320

162-
now = time.Now()
163321
cases := make([]*testCase, 0, 200)
164322
for i := 0; i < 100; i++ {
165323
expr := ps.WalkInstantQuery()

pkg/ingester/ingester.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1982,8 +1982,13 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
19821982
if err != nil {
19831983
return 0, 0, 0, err
19841984
}
1985+
hints := &storage.SelectHints{
1986+
Start: from,
1987+
End: through,
1988+
DisableTrimming: true,
1989+
}
19851990
// It's not required to return sorted series because series are sorted by the Cortex querier.
1986-
ss := q.Select(ctx, false, nil, matchers...)
1991+
ss := q.Select(ctx, false, hints, matchers...)
19871992
c()
19881993
if ss.Err() != nil {
19891994
return 0, 0, 0, ss.Err()

0 commit comments

Comments
 (0)