Skip to content

Commit 59b4e99

Browse files
committed
Add v1 test porting to v2
Signed-off-by: SungJin1212 <[email protected]>
1 parent a417e1f commit 59b4e99

File tree

10 files changed

+3845
-710
lines changed

10 files changed

+3845
-710
lines changed

integration/e2e/util.go

+38
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,44 @@ func GenerateSeriesV2WithSamples(
311311
}
312312
}
313313

314+
func GenerateSeriesWithSamplesV2(
315+
name string,
316+
startTime time.Time,
317+
scrapeInterval time.Duration,
318+
startValue int,
319+
numSamples int,
320+
additionalLabels ...prompb.Label,
321+
) (symbols []string, series cortexpbv2.TimeSeries) {
322+
tsMillis := TimeToMilliseconds(startTime)
323+
durMillis := scrapeInterval.Milliseconds()
324+
325+
st := writev2.NewSymbolTable()
326+
st.Symbolize("__name__")
327+
st.Symbolize(name)
328+
329+
lbls := labels.Labels{{Name: labels.MetricName, Value: name}}
330+
for _, label := range additionalLabels {
331+
st.Symbolize(label.Name)
332+
st.Symbolize(label.Value)
333+
lbls = append(lbls, labels.Label{Name: label.Name, Value: label.Value})
334+
}
335+
336+
startTMillis := tsMillis
337+
samples := make([]cortexpbv2.Sample, numSamples)
338+
for i := 0; i < numSamples; i++ {
339+
samples[i] = cortexpbv2.Sample{
340+
Timestamp: startTMillis,
341+
Value: float64(i + startValue),
342+
}
343+
startTMillis += durMillis
344+
}
345+
346+
return st.Symbols(), cortexpbv2.TimeSeries{
347+
LabelsRefs: cortexpbv2.GetLabelsRefsFromLabels(st.Symbols(), lbls),
348+
Samples: samples,
349+
}
350+
}
351+
314352
func GenerateSeriesWithSamples(
315353
name string,
316354
startTime time.Time,

integration/query_fuzz_test.go

+160
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"testing"
1717
"time"
1818

19+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
1920
"github.com/cortexproject/promqlsmith"
2021
"github.com/google/go-cmp/cmp"
2122
"github.com/google/go-cmp/cmp/cmpopts"
@@ -52,6 +53,165 @@ func init() {
5253
}
5354
}
5455

56+
func TestRemoteWriteV1AndV2QueryResultFuzz(t *testing.T) {
57+
s, err := e2e.NewScenario(networkName)
58+
require.NoError(t, err)
59+
defer s.Close()
60+
61+
// Start dependencies.
62+
consul1 := e2edb.NewConsulWithName("consul1")
63+
consul2 := e2edb.NewConsulWithName("consul2")
64+
require.NoError(t, s.StartAndWaitReady(consul1, consul2))
65+
66+
flags := mergeFlags(
67+
AlertmanagerLocalFlags(),
68+
map[string]string{
69+
"-store.engine": blocksStorageEngine,
70+
"-blocks-storage.backend": "filesystem",
71+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
72+
"-blocks-storage.tsdb.block-ranges-period": "2h",
73+
"-blocks-storage.tsdb.ship-interval": "1h",
74+
"-blocks-storage.bucket-store.sync-interval": "15m",
75+
"-blocks-storage.tsdb.retention-period": "2h",
76+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
77+
"-querier.query-store-for-labels-enabled": "true",
78+
// Ingester.
79+
"-ring.store": "consul",
80+
// Distributor.
81+
"-distributor.replication-factor": "1",
82+
// Store-gateway.
83+
"-store-gateway.sharding-enabled": "false",
84+
// alert manager
85+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
86+
},
87+
)
88+
89+
// make alert manager config dir
90+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
91+
92+
path1 := path.Join(s.SharedDir(), "cortex-1")
93+
path2 := path.Join(s.SharedDir(), "cortex-2")
94+
95+
flags1 := mergeFlags(flags, map[string]string{
96+
"-blocks-storage.filesystem.dir": path1,
97+
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
98+
})
99+
flags2 := mergeFlags(flags, map[string]string{
100+
"-blocks-storage.filesystem.dir": path2,
101+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
102+
})
103+
// Start Cortex replicas.
104+
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, "")
105+
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "")
106+
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))
107+
108+
// Wait until Cortex replicas have updated the ring state.
109+
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
110+
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
111+
112+
c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1")
113+
require.NoError(t, err)
114+
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
115+
require.NoError(t, err)
116+
117+
now := time.Now()
118+
// Push some series to Cortex.
119+
start := now.Add(-time.Minute * 60)
120+
scrapeInterval := 30 * time.Second
121+
122+
numSeries := 10
123+
numSamples := 120
124+
serieses := make([]prompb.TimeSeries, numSeries)
125+
seriesesV2 := make([]cortexpbv2.TimeSeries, numSeries)
126+
lbls := make([]labels.Labels, numSeries)
127+
128+
// make v1 series
129+
for i := 0; i < numSeries; i++ {
130+
series := e2e.GenerateSeriesWithSamples("test_series", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
131+
serieses[i] = series
132+
133+
builder := labels.NewBuilder(labels.EmptyLabels())
134+
for _, lbl := range series.Labels {
135+
builder.Set(lbl.Name, lbl.Value)
136+
}
137+
lbls[i] = builder.Labels()
138+
}
139+
// make v2 series
140+
for i := 0; i < numSeries; i++ {
141+
series := e2e.GenerateSeriesWithSamplesV2("test_series", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
142+
seriesesV2[i] = series
143+
}
144+
145+
res, err := c1.Push(serieses)
146+
require.NoError(t, err)
147+
require.Equal(t, 200, res.StatusCode)
148+
149+
res, err = c2.PushV2(seriesesV2)
150+
require.NoError(t, err)
151+
require.Equal(t, 200, res.StatusCode)
152+
153+
waitUntilReady(t, context.Background(), c1, c2, `{job="test"}`, start, now)
154+
155+
rnd := rand.New(rand.NewSource(now.Unix()))
156+
opts := []promqlsmith.Option{
157+
promqlsmith.WithEnableOffset(true),
158+
promqlsmith.WithEnableAtModifier(true),
159+
promqlsmith.WithEnabledFunctions(enabledFunctions),
160+
}
161+
ps := promqlsmith.New(rnd, lbls, opts...)
162+
163+
type testCase struct {
164+
query string
165+
res1, res2 model.Value
166+
err1, err2 error
167+
}
168+
169+
queryStart := now.Add(-time.Minute * 50)
170+
queryEnd := now.Add(-time.Minute * 10)
171+
cases := make([]*testCase, 0, 500)
172+
testRun := 500
173+
var (
174+
expr parser.Expr
175+
query string
176+
)
177+
for i := 0; i < testRun; i++ {
178+
for {
179+
expr = ps.WalkRangeQuery()
180+
query = expr.Pretty(0)
181+
// timestamp is a known function that break with disable chunk trimming.
182+
if isValidQuery(expr, 5) && !strings.Contains(query, "timestamp") {
183+
break
184+
}
185+
}
186+
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
187+
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
188+
cases = append(cases, &testCase{
189+
query: query,
190+
res1: res1,
191+
res2: res2,
192+
err1: err1,
193+
err2: err2,
194+
})
195+
}
196+
197+
failures := 0
198+
for i, tc := range cases {
199+
qt := "range query"
200+
if tc.err1 != nil || tc.err2 != nil {
201+
if !cmp.Equal(tc.err1, tc.err2) {
202+
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
203+
failures++
204+
}
205+
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
206+
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
207+
failures++
208+
}
209+
}
210+
if failures > 0 {
211+
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
212+
}
213+
}
214+
55215
func TestDisableChunkTrimmingFuzz(t *testing.T) {
56216
s, err := e2e.NewScenario(networkName)
57217
require.NoError(t, err)

pkg/cortexpbv2/compatv2.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,27 @@ package cortexpbv2
22

33
import (
44
"github.com/prometheus/prometheus/model/labels"
5+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
56

67
"github.com/cortexproject/cortex/pkg/cortexpb"
78
)
89

910
// ToWriteRequestV2 converts matched slices of Labels, Samples, and Histograms into a WriteRequest proto.
10-
func ToWriteRequestV2(lbls []labels.Labels, symbols []string, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum) *WriteRequest {
11+
func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum, additionalSymbols ...string) *WriteRequest {
12+
st := writev2.NewSymbolTable()
13+
for _, lbl := range lbls {
14+
lbl.Range(func(l labels.Label) {
15+
st.Symbolize(l.Name)
16+
st.Symbolize(l.Value)
17+
})
18+
}
19+
20+
for _, s := range additionalSymbols {
21+
st.Symbolize(s)
22+
}
23+
24+
symbols := st.Symbols()
25+
1126
req := &WriteRequest{
1227
Symbols: symbols,
1328
Source: source,

pkg/distributor/distributor.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -768,9 +768,18 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin
768768
d.ingesterAppendFailures.WithLabelValues(id, typeSamples, getErrorStatus(err)).Inc()
769769
}
770770

771-
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
772-
if err != nil {
773-
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
771+
metadataAppend := false
772+
for _, ts := range timeseries {
773+
if ts.Metadata.Type != cortexpbv2.METRIC_TYPE_UNSPECIFIED {
774+
metadataAppend = true
775+
break
776+
}
777+
}
778+
if metadataAppend {
779+
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
780+
if err != nil {
781+
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
782+
}
774783
}
775784
}
776785

@@ -926,7 +935,7 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)
926935
// Return a 429 here to tell the client it is going too fast.
927936
// Client may discard the data or slow down and re-send.
928937
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
929-
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", d.ingestionRateLimiter.Limit(now, userID), totalSamples)
938+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, validatedMetadatas)
930939
}
931940

932941
// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.

0 commit comments

Comments
 (0)