Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit a97d401

Browse files
committed
Not sink the global metric
The global metric is controlled by the k6's ingester output, so the sink operation on the global metric in the sample is already executed. Use a local metric in the metricsStorage for executing a local and dedicated sink in the extension. It fixes the data race where both the extension and the k6 ingester calls Add and sink for the same metrics.
1 parent a79a353 commit a97d401

File tree

3 files changed

+114
-72
lines changed

3 files changed

+114
-72
lines changed

pkg/remotewrite/metrics.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,37 +13,66 @@ import (
1313

1414
// metricsStorage is an in-memory gather point for metrics
1515
type metricsStorage struct {
16-
m map[string]metrics.Sample
16+
m map[string]*metrics.Metric
1717
}
1818

1919
func newMetricsStorage() *metricsStorage {
2020
return &metricsStorage{
21-
m: make(map[string]metrics.Sample),
21+
m: make(map[string]*metrics.Metric),
2222
}
2323
}
2424

2525
// update modifies metricsStorage and returns updated sample
2626
// so that the stored metric and the returned metric hold the same value
27-
func (ms *metricsStorage) update(sample metrics.Sample, add func(current, s metrics.Sample) metrics.Sample) metrics.Sample {
28-
if current, ok := ms.m[sample.Metric.Name]; ok {
29-
if add == nil {
30-
current.Metric.Sink.Add(sample)
31-
} else {
32-
current = add(current, sample)
27+
func (ms *metricsStorage) update(sample metrics.Sample, add func(*metrics.Metric, metrics.Sample)) *metrics.Metric {
28+
m, ok := ms.m[sample.Metric.Name]
29+
if !ok {
30+
var sink metrics.Sink
31+
switch sample.Metric.Type {
32+
case metrics.Counter:
33+
sink = &metrics.CounterSink{}
34+
case metrics.Gauge:
35+
sink = &metrics.GaugeSink{}
36+
case metrics.Trend:
37+
sink = &metrics.TrendSink{}
38+
case metrics.Rate:
39+
sink = &metrics.RateSink{}
40+
default:
41+
panic("the Metric Type is not supported")
3342
}
34-
current.Time = sample.Time // to avoid duplicates in timestamps
35-
// Sometimes remote write endpoint throws an error about duplicates even if the values
36-
// sent were different. By current observations, this is a hard to repeat case and
37-
// potentially a bug.
38-
// Related: https://github.com/prometheus/prometheus/issues/9210
39-
40-
ms.m[current.Metric.Name] = current
41-
return current
43+
44+
m = &metrics.Metric{
45+
Name: sample.Metric.Name,
46+
Type: sample.Metric.Type,
47+
Contains: sample.Metric.Contains,
48+
Sink: sink,
49+
}
50+
51+
ms.m[m.Name] = m
52+
}
53+
54+
// TODO: this is just avoiding duplicates with the previous
55+
// Implement a better and complete solution
56+
// maybe discard any timestamp < latest?
57+
//
58+
//current.Time = sample.Time // to avoid duplicates in timestamps
59+
60+
// Sometimes remote write endpoint throws an error about duplicates even if the values
61+
// sent were different. By current observations, this is a hard to repeat case and
62+
// potentially a bug.
63+
// Related: https://github.com/prometheus/prometheus/issues/9210
64+
65+
// TODO: Trend is the unique type that benefits from this logic.
66+
// so this logic can be removed just creating
67+
// a new implementation in this extension
68+
// for TrendSink and its Add method.
69+
if add == nil {
70+
m.Sink.Add(sample)
4271
} else {
43-
sample.Metric.Sink.Add(sample)
44-
ms.m[sample.Metric.Name] = sample
45-
return sample
72+
add(m, sample)
4673
}
74+
75+
return m
4776
}
4877

4978
// transform k6 sample into TimeSeries for remote-write

pkg/remotewrite/prometheus.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
type PrometheusMapping struct{}
1414

1515
func (pm *PrometheusMapping) MapCounter(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
16-
sample = ms.update(sample, nil)
17-
aggr := sample.Metric.Sink.Format(0)
16+
metric := ms.update(sample, nil)
17+
aggr := metric.Sink.Format(0)
1818

1919
return []prompb.TimeSeries{
2020
{
@@ -33,9 +33,6 @@ func (pm *PrometheusMapping) MapCounter(ms *metricsStorage, sample metrics.Sampl
3333
}
3434

3535
func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
36-
sample = ms.update(sample, nil)
37-
aggr := sample.Metric.Sink.Format(0)
38-
3936
return []prompb.TimeSeries{
4037
{
4138
Labels: append(labels, prompb.Label{
@@ -44,7 +41,9 @@ func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample,
4441
}),
4542
Samples: []prompb.Sample{
4643
{
47-
Value: aggr["value"],
44+
// Gauge is just the latest value
45+
// so we can skip the sink using directly the value from the sample.
46+
Value: sample.Value,
4847
Timestamp: timestamp.FromTime(sample.Time),
4948
},
5049
},
@@ -53,8 +52,8 @@ func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample,
5352
}
5453

5554
func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
56-
sample = ms.update(sample, nil)
57-
aggr := sample.Metric.Sink.Format(0)
55+
metric := ms.update(sample, nil)
56+
aggr := metric.Sink.Format(0)
5857

5958
return []prompb.TimeSeries{
6059
{
@@ -73,9 +72,13 @@ func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample metrics.Sample,
7372
}
7473

7574
func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
76-
sample = ms.update(sample, trendAdd)
75+
metric := ms.update(sample, trendAdd)
76+
77+
// Prometheus metric system does not support Trend so this mapping will store gauges
78+
// to keep track of key values.
79+
// TODO: when Prometheus implements support for sparse histograms, re-visit this implementation
7780

78-
s := sample.Metric.Sink.(*metrics.TrendSink)
81+
s := metric.Sink.(*metrics.TrendSink)
7982
aggr := map[string]float64{
8083
"min": s.Min,
8184
"max": s.Max,
@@ -85,10 +88,6 @@ func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample,
8588
"p(95)": p(s, 0.95),
8689
}
8790

88-
// Prometheus metric system does not support Trend so this mapping will store gauges
89-
// to keep track of key values.
90-
// TODO: when Prometheus implements support for sparse histograms, re-visit this implementation
91-
9291
return []prompb.TimeSeries{
9392
{
9493
Labels: append(labels, prompb.Label{
@@ -169,8 +168,8 @@ func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample,
169168
// and are a partial copy-paste from k6/metrics.
170169
// TODO: re-write & refactor this once metrics refactoring progresses in k6.
171170

172-
func trendAdd(current, s metrics.Sample) metrics.Sample {
173-
t := current.Metric.Sink.(*metrics.TrendSink)
171+
func trendAdd(current *metrics.Metric, s metrics.Sample) {
172+
t := current.Sink.(*metrics.TrendSink)
174173

175174
// insert into sorted array instead of sorting anew on each addition
176175
index := sort.Search(len(t.Values), func(i int) bool {
@@ -197,8 +196,7 @@ func trendAdd(current, s metrics.Sample) metrics.Sample {
197196
t.Med = t.Values[t.Count/2]
198197
}
199198

200-
current.Metric.Sink = t
201-
return current
199+
current.Sink = t
202200
}
203201

204202
func p(t *metrics.TrendSink, pct float64) float64 {

pkg/remotewrite/prometheus_test.go

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package remotewrite
22

33
import (
4+
"math/rand"
45
"testing"
56
"time"
67

7-
"math/rand"
8-
98
"github.com/stretchr/testify/assert"
109
"go.k6.io/k6/metrics"
1110
)
@@ -15,81 +14,97 @@ func TestTrendAdd(t *testing.T) {
1514
t.Parallel()
1615

1716
testCases := []struct {
18-
current, s metrics.Sample
17+
current *metrics.Metric
18+
s metrics.Sample
19+
expected metrics.TrendSink
1920
}{
2021
{
21-
current: metrics.Sample{Metric: &metrics.Metric{
22+
current: &metrics.Metric{
2223
Sink: &metrics.TrendSink{},
23-
}},
24+
},
2425
s: metrics.Sample{Value: 2},
26+
expected: metrics.TrendSink{
27+
Values: []float64{2},
28+
Count: 1,
29+
Min: 2,
30+
Max: 2,
31+
Sum: 2,
32+
Avg: 2,
33+
Med: 2,
34+
},
2535
},
2636
{
27-
current: metrics.Sample{Metric: &metrics.Metric{
37+
current: &metrics.Metric{
2838
Sink: &metrics.TrendSink{
2939
Values: []float64{8, 3, 1, 7, 4, 2},
3040
Count: 6,
31-
Min: 1, Max: 8,
32-
Sum: 25, Avg: (8 + 3 + 1 + 7 + 4 + 2) / 6,
33-
Med: (3 + 4) / 2,
41+
Min: 1,
42+
Max: 8,
43+
Sum: 25,
3444
},
35-
}},
45+
},
3646
s: metrics.Sample{Value: 12.3},
47+
expected: metrics.TrendSink{
48+
Values: []float64{8, 3, 1, 7, 4, 2, 12.3},
49+
Count: 7,
50+
Min: 1,
51+
Max: 12.3,
52+
Sum: 37.3,
53+
Avg: 37.3 / 7,
54+
Med: 7,
55+
},
3756
},
3857
}
3958

4059
for _, testCase := range testCases {
4160
// trendAdd should result in the same values as Sink.Add
4261

43-
s := trendAdd(testCase.current, testCase.s)
44-
sink := s.Metric.Sink.(*metrics.TrendSink)
62+
trendAdd(testCase.current, testCase.s)
63+
sink := testCase.current.Sink.(*metrics.TrendSink)
4564

46-
testCase.current.Metric.Sink.Add(testCase.s)
47-
expected := testCase.current.Metric.Sink.(*metrics.TrendSink)
48-
49-
assert.Equal(t, expected.Count, sink.Count)
50-
assert.Equal(t, expected.Min, sink.Min)
51-
assert.Equal(t, expected.Max, sink.Max)
52-
assert.Equal(t, expected.Sum, sink.Sum)
53-
assert.Equal(t, expected.Avg, sink.Avg)
54-
assert.EqualValues(t, expected.Values, sink.Values)
65+
assert.Equal(t, testCase.expected.Count, sink.Count)
66+
assert.Equal(t, testCase.expected.Min, sink.Min)
67+
assert.Equal(t, testCase.expected.Max, sink.Max)
68+
assert.Equal(t, testCase.expected.Sum, sink.Sum)
69+
assert.Equal(t, testCase.expected.Avg, sink.Avg)
70+
assert.Equal(t, testCase.expected.Med, sink.Med)
71+
assert.Equal(t, testCase.expected.Values, sink.Values)
5572
}
5673
}
5774

5875
func BenchmarkTrendAdd(b *testing.B) {
59-
benchF := []func(b *testing.B, start metrics.Sample){
60-
func(b *testing.B, s metrics.Sample) {
76+
benchF := []func(b *testing.B, start metrics.Metric){
77+
func(b *testing.B, m metrics.Metric) {
6178
b.ResetTimer()
6279
rand.Seed(time.Now().Unix())
6380

6481
for i := 0; i < b.N; i++ {
65-
s = trendAdd(s, metrics.Sample{Value: rand.Float64() * 1000})
66-
sink := s.Metric.Sink.(*metrics.TrendSink)
82+
trendAdd(&m, metrics.Sample{Value: rand.Float64() * 1000})
83+
sink := m.Sink.(*metrics.TrendSink)
6784
p(sink, 0.90)
6885
p(sink, 0.95)
6986
}
7087
},
71-
func(b *testing.B, start metrics.Sample) {
88+
func(b *testing.B, start metrics.Metric) {
7289
b.ResetTimer()
7390
rand.Seed(time.Now().Unix())
7491

7592
for i := 0; i < b.N; i++ {
76-
start.Metric.Sink.Add(metrics.Sample{Value: rand.Float64() * 1000})
77-
start.Metric.Sink.Format(0)
93+
start.Sink.Add(metrics.Sample{Value: rand.Float64() * 1000})
94+
start.Sink.Format(0)
7895
}
7996
},
8097
}
8198

82-
s := metrics.Sample{
83-
Metric: &metrics.Metric{
84-
Type: metrics.Trend,
85-
Sink: &metrics.TrendSink{},
86-
},
99+
start := metrics.Metric{
100+
Type: metrics.Trend,
101+
Sink: &metrics.TrendSink{},
87102
}
88103

89104
b.Run("trendAdd", func(b *testing.B) {
90-
benchF[0](b, s)
105+
benchF[0](b, start)
91106
})
92107
b.Run("TrendSink.Add", func(b *testing.B) {
93-
benchF[1](b, s)
108+
benchF[1](b, start)
94109
})
95110
}

0 commit comments

Comments
 (0)