Skip to content

Commit ab2a6ea

Browse files
committed
Support remote write v2
Signed-off-by: SungJin1212 <[email protected]>
1 parent 41d171d commit ab2a6ea

30 files changed

+10652
-435
lines changed

.github/workflows/test-build-deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ jobs:
144144
- integration_querier
145145
- integration_ruler
146146
- integration_query_fuzz
147+
- integration_remote_write_v2
147148
steps:
148149
- name: Upgrade golang
149150
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ run:
4949
- integration_querier
5050
- integration_ruler
5151
- integration_query_fuzz
52+
- integration_remote_write_v2

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1010
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
12+
* [FEATURE] Distributor/Ingester: Support remote write 2.0. It includes proto, samples, and (native) histograms ingestion. #6292
1213
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
1314
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
1415
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))
8585

8686
# Manually declared dependencies And what goes into each exe
8787
pkg/cortexpb/cortex.pb.go: pkg/cortexpb/cortex.proto
88+
pkg/cortexpbv2/cortexv2.pb.go: pkg/cortexpbv2/cortexv2.proto
8889
pkg/ingester/client/ingester.pb.go: pkg/ingester/client/ingester.proto
8990
pkg/distributor/distributorpb/distributor.pb.go: pkg/distributor/distributorpb/distributor.proto
9091
pkg/ingester/wal.pb.go: pkg/ingester/wal.proto

integration/e2e/util.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ import (
1818
"github.com/prometheus/prometheus/model/histogram"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/prompb"
21+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2122
"github.com/prometheus/prometheus/storage"
2223
"github.com/prometheus/prometheus/tsdb"
2324
"github.com/prometheus/prometheus/tsdb/tsdbutil"
2425
"github.com/thanos-io/thanos/pkg/block/metadata"
2526
"github.com/thanos-io/thanos/pkg/runutil"
2627

28+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
2729
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2830
)
2931

@@ -149,6 +151,46 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
149151
return
150152
}
151153

154+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries) {
155+
tsMillis := TimeToMilliseconds(ts)
156+
157+
labelRefs := []uint32{1, 2}
158+
symbols = []string{"", "__name__", name}
159+
160+
for i, label := range additionalLabels {
161+
symbols = append(symbols, label.Name, label.Value)
162+
labelRefs = append(labelRefs, uint32(i*2+3), uint32(i*2+4))
163+
}
164+
165+
// Generate the expected vector when querying it
166+
metric := model.Metric{}
167+
metric[labels.MetricName] = model.LabelValue(name)
168+
for _, lbl := range additionalLabels {
169+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
170+
}
171+
172+
var (
173+
h *histogram.Histogram
174+
fh *histogram.FloatHistogram
175+
ph cortexpbv2.Histogram
176+
)
177+
if floatHistogram {
178+
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
179+
ph = cortexpbv2.FloatHistogramToHistogramProto(tsMillis, fh)
180+
} else {
181+
h = tsdbutil.GenerateTestHistogram(int(i))
182+
ph = cortexpbv2.HistogramToHistogramProto(tsMillis, h)
183+
}
184+
185+
// Generate the series
186+
series = append(series, cortexpbv2.TimeSeries{
187+
LabelsRefs: labelRefs,
188+
Histograms: []cortexpbv2.Histogram{ph},
189+
})
190+
191+
return
192+
}
193+
152194
func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
153195
tsMillis := TimeToMilliseconds(ts)
154196

@@ -188,6 +230,87 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram
188230
return
189231
}
190232

233+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries, vector model.Vector) {
234+
tsMillis := TimeToMilliseconds(ts)
235+
value := rand.Float64()
236+
237+
st := writev2.NewSymbolTable()
238+
st.Symbolize("__name__")
239+
st.Symbolize(name)
240+
241+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
242+
243+
for _, label := range additionalLabels {
244+
st.Symbolize(label.Name)
245+
st.Symbolize(label.Value)
246+
lbs = append(lbs, labels.Label{
247+
Name: label.Name,
248+
Value: label.Value,
249+
})
250+
}
251+
symbols = st.Symbols()
252+
labelRefs := cortexpbv2.GetLabelsRefsFromLabels(symbols, lbs)
253+
// Generate the series
254+
series = append(series, cortexpbv2.TimeSeries{
255+
LabelsRefs: labelRefs,
256+
Samples: []cortexpbv2.Sample{
257+
{Value: value, Timestamp: tsMillis},
258+
},
259+
Metadata: cortexpbv2.Metadata{
260+
Type: cortexpbv2.METRIC_TYPE_GAUGE,
261+
},
262+
})
263+
264+
// Generate the expected vector when querying it
265+
metric := model.Metric{}
266+
metric[labels.MetricName] = model.LabelValue(name)
267+
for _, lbl := range additionalLabels {
268+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
269+
}
270+
271+
vector = append(vector, &model.Sample{
272+
Metric: metric,
273+
Value: model.SampleValue(value),
274+
Timestamp: model.Time(tsMillis),
275+
})
276+
277+
return
278+
}
279+
280+
func GenerateSeriesV2WithSamples(
281+
name string,
282+
startTime time.Time,
283+
scrapeInterval time.Duration,
284+
startValue int, numSamples int,
285+
additionalLabels ...prompb.Label,
286+
) (symbols []string, series cortexpbv2.TimeSeries) {
287+
tsMillis := TimeToMilliseconds(startTime)
288+
durMillis := scrapeInterval.Milliseconds()
289+
290+
symbols = []string{"", "__name__", name}
291+
labelRefs := []uint32{1, 2}
292+
293+
for i, label := range additionalLabels {
294+
symbols = append(symbols, label.Name, label.Value)
295+
labelRefs = append(labelRefs, uint32(i*2+3), uint32(i*2+4))
296+
}
297+
298+
startTMillis := tsMillis
299+
samples := make([]cortexpbv2.Sample, numSamples)
300+
for i := 0; i < numSamples; i++ {
301+
samples[i] = cortexpbv2.Sample{
302+
Timestamp: startTMillis,
303+
Value: float64(i + startValue),
304+
}
305+
startTMillis += durMillis
306+
}
307+
308+
return symbols, cortexpbv2.TimeSeries{
309+
LabelsRefs: labelRefs,
310+
Samples: samples,
311+
}
312+
}
313+
191314
func GenerateSeriesWithSamples(
192315
name string,
193316
startTime time.Time,

integration/e2ecortex/client.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import (
2525
"github.com/prometheus/prometheus/prompb"
2626
"github.com/prometheus/prometheus/storage"
2727
"github.com/prometheus/prometheus/storage/remote"
28-
yaml "gopkg.in/yaml.v3"
29-
3028
"go.opentelemetry.io/collector/pdata/pcommon"
3129
"go.opentelemetry.io/collector/pdata/pmetric"
3230
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
31+
yaml "gopkg.in/yaml.v3"
3332

33+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
3434
"github.com/cortexproject/cortex/pkg/ruler"
3535
"github.com/cortexproject/cortex/pkg/util/backoff"
3636
)
@@ -113,6 +113,39 @@ func NewPromQueryClient(address string) (*Client, error) {
113113
return c, nil
114114
}
115115

116+
// PushV2 the input timeseries to the remote endpoint
117+
func (c *Client) PushV2(symbols []string, timeseries []cortexpbv2.TimeSeries) (*http.Response, error) {
118+
// Create write request
119+
data, err := proto.Marshal(&cortexpbv2.WriteRequest{Symbols: symbols, Timeseries: timeseries})
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
// Create HTTP request
125+
compressed := snappy.Encode(nil, data)
126+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
127+
if err != nil {
128+
return nil, err
129+
}
130+
131+
req.Header.Add("Content-Encoding", "snappy")
132+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
133+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
134+
req.Header.Set("X-Scope-OrgID", c.orgID)
135+
136+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
137+
defer cancel()
138+
139+
// Execute HTTP request
140+
res, err := c.httpClient.Do(req.WithContext(ctx))
141+
if err != nil {
142+
return nil, err
143+
}
144+
145+
defer res.Body.Close()
146+
return res, nil
147+
}
148+
116149
// Push the input timeseries to the remote endpoint
117150
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
118151
// Create write request
@@ -334,6 +367,11 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
334367
return value, err
335368
}
336369

370+
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
371+
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
372+
return metadata, err
373+
}
374+
337375
// QueryExemplars runs an exemplars query
338376
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
339377
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)

0 commit comments

Comments
 (0)