Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit 5ac1293

Browse files
authored
Merge pull request #23 from grafana/local-synk
Not sink the global metrics
2 parents 78df114 + 70a9df8 commit 5ac1293

File tree

3 files changed

+110
-72
lines changed

3 files changed

+110
-72
lines changed

pkg/remotewrite/metrics.go

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,37 +13,62 @@ import (
1313

1414
// metricsStorage is an in-memory gather point for metrics
1515
type metricsStorage struct {
16-
m map[string]metrics.Sample
16+
m map[string]*metrics.Metric
1717
}
1818

1919
func newMetricsStorage() *metricsStorage {
2020
return &metricsStorage{
21-
m: make(map[string]metrics.Sample),
21+
m: make(map[string]*metrics.Metric),
2222
}
2323
}
2424

2525
// update modifies metricsStorage and returns updated sample
2626
// so that the stored metric and the returned metric hold the same value
27-
func (ms *metricsStorage) update(sample metrics.Sample, add func(current, s metrics.Sample) metrics.Sample) metrics.Sample {
28-
if current, ok := ms.m[sample.Metric.Name]; ok {
29-
if add == nil {
30-
current.Metric.Sink.Add(sample)
31-
} else {
32-
current = add(current, sample)
27+
func (ms *metricsStorage) update(sample metrics.Sample, add func(*metrics.Metric, metrics.Sample)) *metrics.Metric {
28+
m, ok := ms.m[sample.Metric.Name]
29+
if !ok {
30+
var sink metrics.Sink
31+
switch sample.Metric.Type {
32+
case metrics.Counter:
33+
sink = &metrics.CounterSink{}
34+
case metrics.Gauge:
35+
sink = &metrics.GaugeSink{}
36+
case metrics.Trend:
37+
sink = &metrics.TrendSink{}
38+
case metrics.Rate:
39+
sink = &metrics.RateSink{}
40+
default:
41+
panic("the Metric Type is not supported")
3342
}
34-
current.Time = sample.Time // to avoid duplicates in timestamps
35-
// Sometimes remote write endpoint throws an error about duplicates even if the values
36-
// sent were different. By current observations, this is a hard to repeat case and
37-
// potentially a bug.
38-
// Related: https://github.com/prometheus/prometheus/issues/9210
39-
40-
ms.m[current.Metric.Name] = current
41-
return current
43+
44+
m = &metrics.Metric{
45+
Name: sample.Metric.Name,
46+
Type: sample.Metric.Type,
47+
Contains: sample.Metric.Contains,
48+
Sink: sink,
49+
}
50+
51+
ms.m[m.Name] = m
52+
}
53+
54+
// TODO: https://github.com/grafana/xk6-output-prometheus-remote/issues/11
55+
//
56+
// Sometimes remote write endpoint throws an error about duplicates even if the values
57+
// sent were different. By current observations, this is a hard to repeat case and
58+
// potentially a bug.
59+
// Related: https://github.com/prometheus/prometheus/issues/9210
60+
61+
// TODO: Trend is the unique type that benefits from this logic.
62+
// so this logic can be removed just creating
63+
// a new implementation in this extension
64+
// for TrendSink and its Add method.
65+
if add == nil {
66+
m.Sink.Add(sample)
4267
} else {
43-
sample.Metric.Sink.Add(sample)
44-
ms.m[sample.Metric.Name] = sample
45-
return sample
68+
add(m, sample)
4669
}
70+
71+
return m
4772
}
4873

4974
// transform k6 sample into TimeSeries for remote-write

pkg/remotewrite/prometheus.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
type PrometheusMapping struct{}
1414

1515
func (pm *PrometheusMapping) MapCounter(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
16-
sample = ms.update(sample, nil)
17-
aggr := sample.Metric.Sink.Format(0)
16+
metric := ms.update(sample, nil)
17+
aggr := metric.Sink.Format(0)
1818

1919
return []prompb.TimeSeries{
2020
{
@@ -33,9 +33,6 @@ func (pm *PrometheusMapping) MapCounter(ms *metricsStorage, sample metrics.Sampl
3333
}
3434

3535
func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
36-
sample = ms.update(sample, nil)
37-
aggr := sample.Metric.Sink.Format(0)
38-
3936
return []prompb.TimeSeries{
4037
{
4138
Labels: append(labels, prompb.Label{
@@ -44,7 +41,9 @@ func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample,
4441
}),
4542
Samples: []prompb.Sample{
4643
{
47-
Value: aggr["value"],
44+
// Gauge is just the latest value
45+
// so we can skip the sink using directly the value from the sample.
46+
Value: sample.Value,
4847
Timestamp: timestamp.FromTime(sample.Time),
4948
},
5049
},
@@ -53,8 +52,8 @@ func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample,
5352
}
5453

5554
func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
56-
sample = ms.update(sample, nil)
57-
aggr := sample.Metric.Sink.Format(0)
55+
metric := ms.update(sample, nil)
56+
aggr := metric.Sink.Format(0)
5857

5958
return []prompb.TimeSeries{
6059
{
@@ -73,9 +72,13 @@ func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample metrics.Sample,
7372
}
7473

7574
func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
76-
sample = ms.update(sample, trendAdd)
75+
metric := ms.update(sample, trendAdd)
76+
77+
// Prometheus metric system does not support Trend so this mapping will store gauges
78+
// to keep track of key values.
79+
// TODO: when Prometheus implements support for sparse histograms, re-visit this implementation
7780

78-
s := sample.Metric.Sink.(*metrics.TrendSink)
81+
s := metric.Sink.(*metrics.TrendSink)
7982
aggr := map[string]float64{
8083
"min": s.Min,
8184
"max": s.Max,
@@ -85,10 +88,6 @@ func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample,
8588
"p(95)": p(s, 0.95),
8689
}
8790

88-
// Prometheus metric system does not support Trend so this mapping will store gauges
89-
// to keep track of key values.
90-
// TODO: when Prometheus implements support for sparse histograms, re-visit this implementation
91-
9291
return []prompb.TimeSeries{
9392
{
9493
Labels: append(labels, prompb.Label{
@@ -169,8 +168,8 @@ func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample,
169168
// and are a partial copy-paste from k6/metrics.
170169
// TODO: re-write & refactor this once metrics refactoring progresses in k6.
171170

172-
func trendAdd(current, s metrics.Sample) metrics.Sample {
173-
t := current.Metric.Sink.(*metrics.TrendSink)
171+
func trendAdd(current *metrics.Metric, s metrics.Sample) {
172+
t := current.Sink.(*metrics.TrendSink)
174173

175174
// insert into sorted array instead of sorting anew on each addition
176175
index := sort.Search(len(t.Values), func(i int) bool {
@@ -197,8 +196,7 @@ func trendAdd(current, s metrics.Sample) metrics.Sample {
197196
t.Med = t.Values[t.Count/2]
198197
}
199198

200-
current.Metric.Sink = t
201-
return current
199+
current.Sink = t
202200
}
203201

204202
func p(t *metrics.TrendSink, pct float64) float64 {

pkg/remotewrite/prometheus_test.go

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package remotewrite
22

33
import (
4+
"math/rand"
45
"testing"
56
"time"
67

7-
"math/rand"
8-
98
"github.com/stretchr/testify/assert"
109
"go.k6.io/k6/metrics"
1110
)
@@ -15,81 +14,97 @@ func TestTrendAdd(t *testing.T) {
1514
t.Parallel()
1615

1716
testCases := []struct {
18-
current, s metrics.Sample
17+
current *metrics.Metric
18+
s metrics.Sample
19+
expected metrics.TrendSink
1920
}{
2021
{
21-
current: metrics.Sample{Metric: &metrics.Metric{
22+
current: &metrics.Metric{
2223
Sink: &metrics.TrendSink{},
23-
}},
24+
},
2425
s: metrics.Sample{Value: 2},
26+
expected: metrics.TrendSink{
27+
Values: []float64{2},
28+
Count: 1,
29+
Min: 2,
30+
Max: 2,
31+
Sum: 2,
32+
Avg: 2,
33+
Med: 2,
34+
},
2535
},
2636
{
27-
current: metrics.Sample{Metric: &metrics.Metric{
37+
current: &metrics.Metric{
2838
Sink: &metrics.TrendSink{
2939
Values: []float64{8, 3, 1, 7, 4, 2},
3040
Count: 6,
31-
Min: 1, Max: 8,
32-
Sum: 25, Avg: (8 + 3 + 1 + 7 + 4 + 2) / 6,
33-
Med: (3 + 4) / 2,
41+
Min: 1,
42+
Max: 8,
43+
Sum: 25,
3444
},
35-
}},
45+
},
3646
s: metrics.Sample{Value: 12.3},
47+
expected: metrics.TrendSink{
48+
Values: []float64{8, 3, 1, 7, 4, 2, 12.3},
49+
Count: 7,
50+
Min: 1,
51+
Max: 12.3,
52+
Sum: 37.3,
53+
Avg: 37.3 / 7,
54+
Med: 7,
55+
},
3756
},
3857
}
3958

4059
for _, testCase := range testCases {
4160
// trendAdd should result in the same values as Sink.Add
4261

43-
s := trendAdd(testCase.current, testCase.s)
44-
sink := s.Metric.Sink.(*metrics.TrendSink)
62+
trendAdd(testCase.current, testCase.s)
63+
sink := testCase.current.Sink.(*metrics.TrendSink)
4564

46-
testCase.current.Metric.Sink.Add(testCase.s)
47-
expected := testCase.current.Metric.Sink.(*metrics.TrendSink)
48-
49-
assert.Equal(t, expected.Count, sink.Count)
50-
assert.Equal(t, expected.Min, sink.Min)
51-
assert.Equal(t, expected.Max, sink.Max)
52-
assert.Equal(t, expected.Sum, sink.Sum)
53-
assert.Equal(t, expected.Avg, sink.Avg)
54-
assert.EqualValues(t, expected.Values, sink.Values)
65+
assert.Equal(t, testCase.expected.Count, sink.Count)
66+
assert.Equal(t, testCase.expected.Min, sink.Min)
67+
assert.Equal(t, testCase.expected.Max, sink.Max)
68+
assert.Equal(t, testCase.expected.Sum, sink.Sum)
69+
assert.Equal(t, testCase.expected.Avg, sink.Avg)
70+
assert.Equal(t, testCase.expected.Med, sink.Med)
71+
assert.Equal(t, testCase.expected.Values, sink.Values)
5572
}
5673
}
5774

5875
func BenchmarkTrendAdd(b *testing.B) {
59-
benchF := []func(b *testing.B, start metrics.Sample){
60-
func(b *testing.B, s metrics.Sample) {
76+
benchF := []func(b *testing.B, start metrics.Metric){
77+
func(b *testing.B, m metrics.Metric) {
6178
b.ResetTimer()
6279
rand.Seed(time.Now().Unix())
6380

6481
for i := 0; i < b.N; i++ {
65-
s = trendAdd(s, metrics.Sample{Value: rand.Float64() * 1000})
66-
sink := s.Metric.Sink.(*metrics.TrendSink)
82+
trendAdd(&m, metrics.Sample{Value: rand.Float64() * 1000})
83+
sink := m.Sink.(*metrics.TrendSink)
6784
p(sink, 0.90)
6885
p(sink, 0.95)
6986
}
7087
},
71-
func(b *testing.B, start metrics.Sample) {
88+
func(b *testing.B, start metrics.Metric) {
7289
b.ResetTimer()
7390
rand.Seed(time.Now().Unix())
7491

7592
for i := 0; i < b.N; i++ {
76-
start.Metric.Sink.Add(metrics.Sample{Value: rand.Float64() * 1000})
77-
start.Metric.Sink.Format(0)
93+
start.Sink.Add(metrics.Sample{Value: rand.Float64() * 1000})
94+
start.Sink.Format(0)
7895
}
7996
},
8097
}
8198

82-
s := metrics.Sample{
83-
Metric: &metrics.Metric{
84-
Type: metrics.Trend,
85-
Sink: &metrics.TrendSink{},
86-
},
99+
start := metrics.Metric{
100+
Type: metrics.Trend,
101+
Sink: &metrics.TrendSink{},
87102
}
88103

89104
b.Run("trendAdd", func(b *testing.B) {
90-
benchF[0](b, s)
105+
benchF[0](b, start)
91106
})
92107
b.Run("TrendSink.Add", func(b *testing.B) {
93-
benchF[1](b, s)
108+
benchF[1](b, start)
94109
})
95110
}

0 commit comments

Comments
 (0)