Skip to content

Commit 46229bf

Browse files
committed
Support remote write v2
Signed-off-by: SungJin1212 <[email protected]>
1 parent 8a95aed commit 46229bf

29 files changed

+7193
-435
lines changed

Diff for: .github/workflows/test-build-deploy.yml

+1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ jobs:
144144
- integration_querier
145145
- integration_ruler
146146
- integration_query_fuzz
147+
- integration_remote_write_v2
147148
steps:
148149
- name: Upgrade golang
149150
uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0

Diff for: .golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ run:
4949
- integration_querier
5050
- integration_ruler
5151
- integration_query_fuzz
52+
- integration_remote_write_v2

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1010
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
12+
* [FEATURE] Distributor/Ingester: Support remote write 2.0. It includes proto, samples, and (native) histograms ingestion. #6292
1213
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
1314
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
1415
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228

Diff for: Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))
8585

8686
# Manually declared dependencies And what goes into each exe
8787
pkg/cortexpb/cortex.pb.go: pkg/cortexpb/cortex.proto
88+
pkg/cortexpbv2/cortexv2.pb.go: pkg/cortexpbv2/cortexv2.proto
8889
pkg/ingester/client/ingester.pb.go: pkg/ingester/client/ingester.proto
8990
pkg/distributor/distributorpb/distributor.pb.go: pkg/distributor/distributorpb/distributor.proto
9091
pkg/ingester/wal.pb.go: pkg/ingester/wal.proto

Diff for: integration/e2e/util.go

+111
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/thanos-io/thanos/pkg/block/metadata"
2525
"github.com/thanos-io/thanos/pkg/runutil"
2626

27+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
2728
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2829
)
2930

@@ -149,6 +150,46 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
149150
return
150151
}
151152

153+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries) {
154+
tsMillis := TimeToMilliseconds(ts)
155+
156+
labelRefs := []uint32{1, 2}
157+
symbols = []string{"", "__name__", name}
158+
159+
for i, label := range additionalLabels {
160+
symbols = append(symbols, label.Name, label.Value)
161+
labelRefs = append(labelRefs, uint32(i*2+3), uint32(i*2+4))
162+
}
163+
164+
// Generate the expected vector when querying it
165+
metric := model.Metric{}
166+
metric[labels.MetricName] = model.LabelValue(name)
167+
for _, lbl := range additionalLabels {
168+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
169+
}
170+
171+
var (
172+
h *histogram.Histogram
173+
fh *histogram.FloatHistogram
174+
ph cortexpbv2.Histogram
175+
)
176+
if floatHistogram {
177+
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
178+
ph = cortexpbv2.FloatHistogramToHistogramProto(tsMillis, fh)
179+
} else {
180+
h = tsdbutil.GenerateTestHistogram(int(i))
181+
ph = cortexpbv2.HistogramToHistogramProto(tsMillis, h)
182+
}
183+
184+
// Generate the series
185+
series = append(series, cortexpbv2.TimeSeries{
186+
LabelsRefs: labelRefs,
187+
Histograms: []cortexpbv2.Histogram{ph},
188+
})
189+
190+
return
191+
}
192+
152193
func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
153194
tsMillis := TimeToMilliseconds(ts)
154195

@@ -188,6 +229,76 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram
188229
return
189230
}
190231

232+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries, vector model.Vector) {
233+
tsMillis := TimeToMilliseconds(ts)
234+
value := rand.Float64()
235+
236+
labelRefs := []uint32{1, 2}
237+
symbols = []string{"", "__name__", name}
238+
239+
for i, label := range additionalLabels {
240+
symbols = append(symbols, label.Name, label.Value)
241+
labelRefs = append(labelRefs, uint32(i*2+3), uint32(i*2+4))
242+
}
243+
244+
// Generate the series
245+
series = append(series, cortexpbv2.TimeSeries{
246+
LabelsRefs: labelRefs,
247+
Samples: []cortexpbv2.Sample{
248+
{Value: value, Timestamp: tsMillis},
249+
},
250+
})
251+
252+
// Generate the expected vector when querying it
253+
metric := model.Metric{}
254+
metric[labels.MetricName] = model.LabelValue(name)
255+
for _, lbl := range additionalLabels {
256+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
257+
}
258+
259+
vector = append(vector, &model.Sample{
260+
Metric: metric,
261+
Value: model.SampleValue(value),
262+
Timestamp: model.Time(tsMillis),
263+
})
264+
265+
return
266+
}
267+
268+
func GenerateSeriesV2WithSamples(
269+
name string,
270+
startTime time.Time,
271+
scrapeInterval time.Duration,
272+
startValue int, numSamples int,
273+
additionalLabels ...prompb.Label,
274+
) (symbols []string, series cortexpbv2.TimeSeries) {
275+
tsMillis := TimeToMilliseconds(startTime)
276+
durMillis := scrapeInterval.Milliseconds()
277+
278+
symbols = []string{"", "__name__", name}
279+
labelRefs := []uint32{1, 2}
280+
281+
for i, label := range additionalLabels {
282+
symbols = append(symbols, label.Name, label.Value)
283+
labelRefs = append(labelRefs, uint32(i*2+3), uint32(i*2+4))
284+
}
285+
286+
startTMillis := tsMillis
287+
samples := make([]cortexpbv2.Sample, numSamples)
288+
for i := 0; i < numSamples; i++ {
289+
samples[i] = cortexpbv2.Sample{
290+
Timestamp: startTMillis,
291+
Value: float64(i + startValue),
292+
}
293+
startTMillis += durMillis
294+
}
295+
296+
return symbols, cortexpbv2.TimeSeries{
297+
LabelsRefs: labelRefs,
298+
Samples: samples,
299+
}
300+
}
301+
191302
func GenerateSeriesWithSamples(
192303
name string,
193304
startTime time.Time,

Diff for: integration/e2ecortex/client.go

+35-2
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import (
2525
"github.com/prometheus/prometheus/prompb"
2626
"github.com/prometheus/prometheus/storage"
2727
"github.com/prometheus/prometheus/storage/remote"
28-
yaml "gopkg.in/yaml.v3"
29-
3028
"go.opentelemetry.io/collector/pdata/pcommon"
3129
"go.opentelemetry.io/collector/pdata/pmetric"
3230
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
31+
yaml "gopkg.in/yaml.v3"
3332

33+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
3434
"github.com/cortexproject/cortex/pkg/ruler"
3535
"github.com/cortexproject/cortex/pkg/util/backoff"
3636
)
@@ -113,6 +113,39 @@ func NewPromQueryClient(address string) (*Client, error) {
113113
return c, nil
114114
}
115115

116+
// PushV2 the input timeseries to the remote endpoint
117+
func (c *Client) PushV2(symbols []string, timeseries []cortexpbv2.TimeSeries) (*http.Response, error) {
118+
// Create write request
119+
data, err := proto.Marshal(&cortexpbv2.WriteRequest{Symbols: symbols, Timeseries: timeseries})
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
// Create HTTP request
125+
compressed := snappy.Encode(nil, data)
126+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
127+
if err != nil {
128+
return nil, err
129+
}
130+
131+
req.Header.Add("Content-Encoding", "snappy")
132+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
133+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
134+
req.Header.Set("X-Scope-OrgID", c.orgID)
135+
136+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
137+
defer cancel()
138+
139+
// Execute HTTP request
140+
res, err := c.httpClient.Do(req.WithContext(ctx))
141+
if err != nil {
142+
return nil, err
143+
}
144+
145+
defer res.Body.Close()
146+
return res, nil
147+
}
148+
116149
// Push the input timeseries to the remote endpoint
117150
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
118151
// Create write request

0 commit comments

Comments
 (0)