Skip to content

Commit a417e1f

Browse files
committed
Add push writen header to response
Signed-off-by: SungJin1212 <[email protected]>
1 parent ab2a6ea commit a417e1f

File tree

3 files changed

+94
-9
lines changed

3 files changed

+94
-9
lines changed

integration/remote_write_v2_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55

66
import (
77
"math/rand"
8+
"net/http"
89
"path"
910
"testing"
1011
"time"
@@ -82,6 +83,7 @@ func TestIngest(t *testing.T) {
8283
res, err := c.PushV2(symbols1, series)
8384
require.NoError(t, err)
8485
require.Equal(t, 200, res.StatusCode)
86+
testPushHeader(t, res.Header, "1", "0", "0")
8587

8688
// sample
8789
result, err := c.Query("test_series", now)
@@ -99,11 +101,13 @@ func TestIngest(t *testing.T) {
99101
res, err = c.PushV2(symbols2, histogramSeries)
100102
require.NoError(t, err)
101103
require.Equal(t, 200, res.StatusCode)
104+
testPushHeader(t, res.Header, "1", "1", "0")
102105

103106
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
104107
res, err = c.PushV2(symbols3, histogramFloatSeries)
105108
require.NoError(t, err)
106109
require.Equal(t, 200, res.StatusCode)
110+
testPushHeader(t, res.Header, "1", "1", "0")
107111

108112
testHistogramTimestamp := now.Add(blockRangePeriod * 2)
109113
expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx))
@@ -192,6 +196,7 @@ func TestExemplar(t *testing.T) {
192196
res, err := c.PushV2(req.Symbols, req.Timeseries)
193197
require.NoError(t, err)
194198
require.Equal(t, 200, res.StatusCode)
199+
testPushHeader(t, res.Header, "1", "0", "1")
195200

196201
start := time.Now().Add(-time.Minute)
197202
end := now.Add(time.Minute)
@@ -200,3 +205,9 @@ func TestExemplar(t *testing.T) {
200205
require.NoError(t, err)
201206
require.Equal(t, 1, len(exemplars))
202207
}
208+
209+
func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
210+
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
211+
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
212+
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
213+
}

pkg/distributor/distributor.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,7 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W
706706
return seriesKeys, validatedTimeseries, int64(validatedMetadata), int64(validatedFloatSamples), int64(validatedHistogramSamples), int64(validatedExemplars), firstPartialErr, nil
707707
}
708708

709-
func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteRequest, subRing ring.ReadRing, keys []uint32, validatedTimeseries []cortexpbv2.TimeSeries, userID string) error {
709+
func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteRequest, subRing ring.ReadRing, keys []uint32, validatedTimeseries []cortexpbv2.TimeSeries, userID string, stats *WriteStats) error {
710710
span, _ := opentracing.StartSpanFromContext(ctx, "doBatchV2")
711711
defer span.Finish()
712712

@@ -736,13 +736,13 @@ func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteReques
736736
timeseries = append(timeseries, validatedTimeseries[i])
737737
}
738738

739-
return d.sendV2(localCtx, req.Symbols, ingester, timeseries, req.Source)
739+
return d.sendV2(localCtx, req.Symbols, ingester, timeseries, req.Source, stats)
740740
}, func() {
741741
cancel()
742742
})
743743
}
744744

745-
func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester ring.InstanceDesc, timeseries []cortexpbv2.TimeSeries, source cortexpbv2.WriteRequest_SourceEnum) error {
745+
func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester ring.InstanceDesc, timeseries []cortexpbv2.TimeSeries, source cortexpbv2.WriteRequest_SourceEnum, stats *WriteStats) error {
746746
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
747747
if err != nil {
748748
return err
@@ -760,7 +760,7 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin
760760
req.Timeseries = timeseries
761761
req.Source = source
762762

763-
_, err = c.PushV2(ctx, &req)
763+
resp, err := c.PushV2(ctx, &req)
764764

765765
if len(timeseries) > 0 {
766766
d.ingesterAppends.WithLabelValues(id, typeSamples).Inc()
@@ -774,6 +774,13 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin
774774
}
775775
}
776776

777+
if resp != nil {
778+
// track stats
779+
stats.SetSamples(resp.Samples)
780+
stats.SetHistograms(resp.Histograms)
781+
stats.SetExemplars(resp.Exemplars)
782+
}
783+
777784
return err
778785
}
779786

@@ -885,7 +892,6 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)
885892
}
886893

887894
if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
888-
fmt.Println("V2 d.ingestionRate.Rate()", d.ingestionRate.Rate())
889895
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
890896
return nil, errMaxSamplesPushRateLimitReached
891897
}
@@ -935,13 +941,20 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)
935941

936942
keys := seriesKeys
937943

938-
err = d.doBatchV2(ctx, req, subRing, keys, validatedTimeseries, userID)
944+
s := WriteStats{}
945+
946+
err = d.doBatchV2(ctx, req, subRing, keys, validatedTimeseries, userID, &s)
939947
if err != nil {
940948
return nil, err
941949
}
942950

943-
// TODO(Sungjin1212) track stat
944-
return &cortexpbv2.WriteResponse{}, firstPartialErr
951+
resp := &cortexpbv2.WriteResponse{
952+
Samples: s.LoadSamples(),
953+
Histograms: s.LoadHistogram(),
954+
Exemplars: s.LoadExemplars(),
955+
}
956+
957+
return resp, firstPartialErr
945958
}
946959

947960
// Push implements client.IngesterServer
@@ -981,7 +994,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
981994
}
982995

983996
if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
984-
fmt.Println("V1 d.ingestionRate.Rate()", d.ingestionRate.Rate())
985997
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
986998
return nil, errMaxSamplesPushRateLimitReached
987999
}

pkg/distributor/stats.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package distributor
2+
3+
import (
4+
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.
5+
)
6+
7+
type WriteStats struct {
8+
// Samples represents X-Prometheus-Remote-Write-Written-Samples
9+
Samples int64
10+
// Histograms represents X-Prometheus-Remote-Write-Written-Histograms
11+
Histograms int64
12+
// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
13+
Exemplars int64
14+
}
15+
16+
func (w *WriteStats) SetSamples(samples int64) {
17+
if w == nil {
18+
return
19+
}
20+
21+
atomic.StoreInt64(&w.Samples, samples)
22+
}
23+
24+
func (w *WriteStats) SetHistograms(histograms int64) {
25+
if w == nil {
26+
return
27+
}
28+
29+
atomic.StoreInt64(&w.Histograms, histograms)
30+
}
31+
32+
func (w *WriteStats) SetExemplars(exemplars int64) {
33+
if w == nil {
34+
return
35+
}
36+
37+
atomic.StoreInt64(&w.Exemplars, exemplars)
38+
}
39+
40+
func (w *WriteStats) LoadSamples() int64 {
41+
if w == nil {
42+
return 0
43+
}
44+
45+
return atomic.LoadInt64(&w.Samples)
46+
}
47+
48+
func (w *WriteStats) LoadHistogram() int64 {
49+
if w == nil {
50+
return 0
51+
}
52+
53+
return atomic.LoadInt64(&w.Histograms)
54+
}
55+
56+
func (w *WriteStats) LoadExemplars() int64 {
57+
if w == nil {
58+
return 0
59+
}
60+
61+
return atomic.LoadInt64(&w.Exemplars)
62+
}

0 commit comments

Comments
 (0)