Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit 6c3f96a

Browse files
committed
Use directly metrics.TimeSeries
Replace the internal time series data model with the new defined by k6.
1 parent 2595b16 commit 6c3f96a

File tree

4 files changed

+66
-117
lines changed

4 files changed

+66
-117
lines changed

pkg/remotewrite/prometheus.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,26 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/mstoykov/atlas"
78
prompb "go.buf.build/grpc/go/prometheus/prometheus"
89
"go.k6.io/k6/metrics"
910
)
1011

11-
func MapTagSet(t *metrics.SampleTags) []*prompb.Label {
12-
tags := t.CloneTags()
13-
14-
labels := make([]*prompb.Label, 0, len(tags))
15-
for k, v := range tags {
16-
labels = append(labels, &prompb.Label{Name: k, Value: v})
12+
func MapTagSet(t *metrics.TagSet) []*prompb.Label {
13+
n := (*atlas.Node)(t)
14+
if n.Len() < 1 {
15+
return nil
16+
}
17+
labels := make([]*prompb.Label, 0, n.Len())
18+
for !n.IsRoot() {
19+
prev, key, value := n.Data()
20+
labels = append(labels, &prompb.Label{Name: key, Value: value})
21+
n = prev
1722
}
1823
return labels
1924
}
2025

21-
func MapSeries(ts TimeSeries) prompb.TimeSeries {
26+
func MapSeries(ts metrics.TimeSeries) prompb.TimeSeries {
2227
return prompb.TimeSeries{
2328
Labels: append(MapTagSet(ts.Tags), &prompb.Label{
2429
Name: "__name__",
@@ -27,7 +32,7 @@ func MapSeries(ts TimeSeries) prompb.TimeSeries {
2732
}
2833
}
2934

30-
func MapTrend(series TimeSeries, t time.Time, sink *trendSink) []*prompb.TimeSeries {
35+
func MapTrend(series metrics.TimeSeries, t time.Time, sink *trendSink) []*prompb.TimeSeries {
3136
// Prometheus metric system does not support Trend so this mapping will
3237
// store a counter for the number of reported values and gauges to keep
3338
// track of aggregated values. Also store a sum of the values to allow

pkg/remotewrite/prometheus_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ func assertTimeSeriesEqual(t *testing.T, expected *prompb.TimeSeries, actual *pr
5555

5656
// sortTimeSeries sorts an array of TimeSeries by name
5757
func sortTimeSeries(ts []*prompb.TimeSeries) []*prompb.TimeSeries {
58-
sorted := make([]*prompb.TimeSeries, 0, len(ts))
58+
sorted := make([]*prompb.TimeSeries, len(ts))
5959
copy(sorted, ts)
6060
sort.Slice(sorted, func(i int, j int) bool {
6161
return getTimeSeriesName(sorted[i]) < getTimeSeriesName(sorted[j])
6262
})
63+
6364
return sorted
6465
}
6566

@@ -83,6 +84,7 @@ func TestMapTrend(t *testing.T) {
8384
t.Parallel()
8485

8586
now := time.Now()
87+
r := metrics.NewRegistry()
8688

8789
testCases := []struct {
8890
sample metrics.Sample
@@ -91,11 +93,13 @@ func TestMapTrend(t *testing.T) {
9193
}{
9294
{
9395
sample: metrics.Sample{
94-
Metric: &metrics.Metric{
95-
Name: "test",
96-
Type: metrics.Trend,
96+
TimeSeries: metrics.TimeSeries{
97+
Metric: &metrics.Metric{
98+
Name: "test",
99+
Type: metrics.Trend,
100+
},
101+
Tags: r.RootTagSet().With("tagk1", "tagv1"),
97102
},
98-
Tags: metrics.NewSampleTags(map[string]string{"tagk1": "tagv1"}),
99103
Value: 1.0,
100104
Time: now,
101105
},
@@ -116,7 +120,7 @@ func TestMapTrend(t *testing.T) {
116120
st := &trendSink{}
117121
st.Add(tc.sample)
118122

119-
ts := MapTrend(TimeSeries{tc.sample.Metric, tc.sample.Tags}, tc.sample.Time, st)
123+
ts := MapTrend(tc.sample.TimeSeries, tc.sample.Time, st)
120124
assertTimeSeriesMatch(t, tc.expected, ts)
121125
}
122126
}

pkg/remotewrite/remotewrite.go

Lines changed: 13 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package remotewrite
33
import (
44
"context"
55
"fmt"
6-
"sort"
7-
"strings"
86
"time"
97

108
"github.com/grafana/xk6-output-prometheus-remote/pkg/remote"
@@ -24,7 +22,7 @@ type Output struct {
2422
config Config
2523
logger logrus.FieldLogger
2624
periodicFlusher *output.PeriodicFlusher
27-
tsdb map[string]*seriesWithMeasure
25+
tsdb map[metrics.TimeSeries]*seriesWithMeasure
2826

2927
// TODO: copy the prometheus/remote.WriteClient interface and depend on it
3028
client *remote.WriteClient
@@ -45,14 +43,14 @@ func New(params output.Params) (*Output, error) {
4543

4644
wc, err := remote.NewWriteClient(config.URL.String, clientConfig)
4745
if err != nil {
48-
return nil, fmt.Errorf("failed to initialize the client Prometheus remote write client: %w", err)
46+
return nil, fmt.Errorf("failed to initialize the Prometheus remote write client: %w", err)
4947
}
5048

5149
return &Output{
5250
client: wc,
5351
config: config,
5452
logger: logger,
55-
tsdb: make(map[string]*seriesWithMeasure),
53+
tsdb: make(map[metrics.TimeSeries]*seriesWithMeasure),
5654
}, nil
5755
}
5856

@@ -131,33 +129,29 @@ func (o *Output) convertToPbSeries(samplesContainers []metrics.SampleContainer)
131129
// so we need to aggregate all the samples in the same time bucket.
132130
// More context can be found in the issue
133131
// https://github.com/grafana/xk6-output-prometheus-remote/issues/11
134-
seen := make(map[string]struct{})
132+
seen := make(map[metrics.TimeSeries]struct{})
135133

136134
for _, samplesContainer := range samplesContainers {
137135
samples := samplesContainer.GetSamples()
138136

139137
for _, sample := range samples {
140138
truncTime := sample.Time.Truncate(time.Millisecond)
141-
timeSeriesKey := timeSeriesKey(sample.Metric, sample.Tags)
142-
swm, ok := o.tsdb[timeSeriesKey]
139+
swm, ok := o.tsdb[sample.TimeSeries]
143140
if !ok {
144141
swm = &seriesWithMeasure{
145-
TimeSeries: TimeSeries{
146-
Metric: sample.Metric,
147-
Tags: sample.Tags,
148-
},
149-
Measure: sinkByType(sample.Metric.Type),
150-
Latest: truncTime,
142+
TimeSeries: sample.TimeSeries,
143+
Measure: sinkByType(sample.Metric.Type),
144+
Latest: truncTime,
151145
}
152-
o.tsdb[timeSeriesKey] = swm
153-
seen[timeSeriesKey] = struct{}{}
146+
o.tsdb[sample.TimeSeries] = swm
147+
seen[sample.TimeSeries] = struct{}{}
154148
} else {
155149
// save as a seen item only when the samples have a time greater than
156150
// the previous saved, otherwise some implementations
157151
// could see it as a duplicate and generate warnings (e.g. Mimir)
158152
if truncTime.After(swm.Latest) {
159153
swm.Latest = truncTime
160-
seen[timeSeriesKey] = struct{}{}
154+
seen[sample.TimeSeries] = struct{}{}
161155
}
162156

163157
// If current == previous:
@@ -191,6 +185,7 @@ func (o *Output) convertToPbSeries(samplesContainers []metrics.SampleContainer)
191185
}
192186

193187
type seriesWithMeasure struct {
188+
metrics.TimeSeries
194189
Measure metrics.Sink
195190

196191
// Latest tracks the latest time
@@ -200,17 +195,13 @@ type seriesWithMeasure struct {
200195
// in a method in struct
201196
Latest time.Time
202197

203-
// TimeSeries will be replaced with the native k6 version
204-
// when it will be available.
205-
TimeSeries
206-
207198
// TODO: maybe add some caching for the mapping?
208199
}
209200

210201
func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
211202
var newts []*prompb.TimeSeries
212203

213-
mapMonoSeries := func(s TimeSeries, t time.Time) prompb.TimeSeries {
204+
mapMonoSeries := func(s metrics.TimeSeries, t time.Time) prompb.TimeSeries {
214205
return prompb.TimeSeries{
215206
Labels: append(MapTagSet(swm.Tags), &prompb.Label{
216207
Name: "__name__",
@@ -266,36 +257,3 @@ func sinkByType(mt metrics.MetricType) metrics.Sink {
266257
}
267258
return sink
268259
}
269-
270-
// the code below will be removed
271-
// when TimeSeries will be a native k6's concept.
272-
273-
type TimeSeries struct {
274-
Metric *metrics.Metric
275-
Tags *metrics.SampleTags
276-
}
277-
278-
var bytesep = []byte{0xff}
279-
280-
func timeSeriesKey(m *metrics.Metric, sampleTags *metrics.SampleTags) string {
281-
if sampleTags.IsEmpty() {
282-
return m.Name
283-
}
284-
285-
tmap := sampleTags.CloneTags()
286-
keys := make([]string, 0, len(tmap))
287-
for k := range tmap {
288-
keys = append(keys, k)
289-
}
290-
sort.Strings(keys)
291-
292-
var b strings.Builder
293-
b.WriteString(m.Name)
294-
for i := 0; i < len(keys); i++ {
295-
b.Write(bytesep)
296-
b.WriteString(keys[i])
297-
b.Write(bytesep)
298-
b.WriteString(tmap[keys[i]])
299-
}
300-
return b.String()
301-
}

pkg/remotewrite/remotewrite_test.go

Lines changed: 30 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package remotewrite
22

33
import (
4-
"strconv"
54
"testing"
65
"time"
76

@@ -28,31 +27,37 @@ func TestOutputConvertToPbSeries(t *testing.T) {
2827

2928
registry := metrics.NewRegistry()
3029
metric1 := registry.MustNewMetric("metric1", metrics.Counter)
31-
tagset := metrics.NewSampleTags(map[string]string{"tagk1": "tagv1"})
30+
tagset := registry.RootTagSet().With("tagk1", "tagv1")
3231

3332
samples := []metrics.SampleContainer{
3433
metrics.Sample{
35-
Metric: metric1,
36-
Tags: tagset,
37-
Time: time.Date(2022, time.September, 1, 0, 0, 0, 0, time.UTC),
38-
Value: 3,
34+
TimeSeries: metrics.TimeSeries{
35+
Metric: metric1,
36+
Tags: tagset,
37+
},
38+
Time: time.Date(2022, time.September, 1, 0, 0, 0, 0, time.UTC),
39+
Value: 3,
3940
},
4041
metrics.Sample{
41-
Metric: metric1,
42-
Tags: tagset,
43-
Time: time.Date(2022, time.August, 31, 0, 0, 0, 0, time.UTC),
44-
Value: 4,
42+
TimeSeries: metrics.TimeSeries{
43+
Metric: metric1,
44+
Tags: tagset,
45+
},
46+
Time: time.Date(2022, time.August, 31, 0, 0, 0, 0, time.UTC),
47+
Value: 4,
4548
},
4649
metrics.Sample{
47-
Metric: registry.MustNewMetric("metric2", metrics.Counter),
48-
Tags: tagset,
49-
Time: time.Date(2022, time.September, 1, 0, 0, 0, 0, time.UTC),
50-
Value: 2,
50+
TimeSeries: metrics.TimeSeries{
51+
Metric: registry.MustNewMetric("metric2", metrics.Counter),
52+
Tags: tagset,
53+
},
54+
Time: time.Date(2022, time.September, 1, 0, 0, 0, 0, time.UTC),
55+
Value: 2,
5156
},
5257
}
5358

5459
o := Output{
55-
tsdb: make(map[string]*seriesWithMeasure),
60+
tsdb: make(map[metrics.TimeSeries]*seriesWithMeasure),
5661
}
5762

5863
pbseries := o.convertToPbSeries(samples)
@@ -89,11 +94,11 @@ func TestOutputConvertToPbSeries_WithPreviousState(t *testing.T) {
8994

9095
registry := metrics.NewRegistry()
9196
metric1 := registry.MustNewMetric("metric1", metrics.Counter)
92-
tagset := metrics.NewSampleTags(map[string]string{"tagk1": "tagv1"})
97+
tagset := registry.RootTagSet().With("tagk1", "tagv1")
9398
t0 := time.Date(2022, time.September, 1, 0, 0, 0, 0, time.UTC).Add(10 * time.Millisecond)
9499

95100
swm := &seriesWithMeasure{
96-
TimeSeries: TimeSeries{
101+
TimeSeries: metrics.TimeSeries{
97102
Metric: metric1,
98103
Tags: tagset,
99104
},
@@ -103,8 +108,8 @@ func TestOutputConvertToPbSeries_WithPreviousState(t *testing.T) {
103108
}
104109

105110
o := Output{
106-
tsdb: map[string]*seriesWithMeasure{
107-
timeSeriesKey(metric1, tagset): swm,
111+
tsdb: map[metrics.TimeSeries]*seriesWithMeasure{
112+
swm.TimeSeries: swm,
108113
},
109114
}
110115

@@ -142,10 +147,12 @@ func TestOutputConvertToPbSeries_WithPreviousState(t *testing.T) {
142147
t.Run(tc.name, func(t *testing.T) {
143148
pbseries := o.convertToPbSeries([]metrics.SampleContainer{
144149
metrics.Sample{
145-
Metric: metric1,
146-
Tags: tagset,
147-
Value: 1,
148-
Time: tc.time,
150+
TimeSeries: metrics.TimeSeries{
151+
Metric: metric1,
152+
Tags: tagset,
153+
},
154+
Value: 1,
155+
Time: tc.time,
149156
},
150157
})
151158
require.Len(t, o.tsdb, 1)
@@ -155,28 +162,3 @@ func TestOutputConvertToPbSeries_WithPreviousState(t *testing.T) {
155162
})
156163
}
157164
}
158-
159-
func TestTimeSeriesKey(t *testing.T) {
160-
t.Parallel()
161-
162-
registry := metrics.NewRegistry()
163-
metric1 := registry.MustNewMetric("metric1", metrics.Counter)
164-
165-
tagsmap := make(map[string]string)
166-
for i := 0; i < 8; i++ {
167-
is := strconv.Itoa(i)
168-
tagsmap["tagk"+is] = "tagv" + is
169-
}
170-
tagset := metrics.NewSampleTags(tagsmap)
171-
172-
key := timeSeriesKey(metric1, tagset)
173-
174-
expected := "metric1"
175-
sbytesep := string(bytesep)
176-
for i := 0; i < 8; i++ {
177-
is := strconv.Itoa(i)
178-
expected += sbytesep + "tagk" + is + sbytesep + "tagv" + is
179-
}
180-
181-
assert.Equal(t, expected, key)
182-
}

0 commit comments

Comments
 (0)