Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit 591f19a

Browse files
committed
refactor; unit tests; update README
1 parent 41e41ed commit 591f19a

File tree

8 files changed

+245
-73
lines changed

8 files changed

+245
-73
lines changed

README.md

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,37 @@
11
# xk6-output-prometheus-remote
22

3-
K6 extension for Prometheus remote-write output. **WIP!**
3+
> ### ⚠️ Beta version
4+
5+
k6 extension for Prometheus remote-write output.
46

57
*Distinguish from [Prometheus remote write **client** extension](https://github.com/grafana/xk6-client-prometheus-remote) :)*
68

7-
According to [Prometheus API Stability Guarantees](https://prometheus.io/docs/prometheus/latest/stability/) remote write is an **experimental feature**, thus it is unstable and is subject to change as of now. There are many options for remote-write compatible agents, the official list can be found [here](https://prometheus.io/docs/operating/integrations/). The exact details of how metrics will be processed or stored depends on the underlying agent used.
9+
According to [Prometheus API Stability Guarantees](https://prometheus.io/docs/prometheus/latest/stability/) remote write is an **experimental feature**, thus it is unstable and is subject to change. There are many options for remote-write compatible agents, the official list can be found [here](https://prometheus.io/docs/operating/integrations/). The exact details of how metrics will be processed or stored depends on the underlying agent used.
810

911
Key points to know:
1012

1113
- remote write format does not contain explicit definition of any metric types while metadata definition is still in flux and can have different implementation depending on the remote-write compatible agent
1214
- remote read is a separate interface and it is much less defined. For example, remote read may not work without precise queries; see [here](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations) and [here](https://github.com/timescale/promscale/issues/64) for details
1315
- some remote-write compatible agents may support additional formats for remote write, like JSON, but it is not part of official Prometheus remote write specification and therefore absent here
14-
- exemplars are not yet officially supported by format, see [here](https://github.com/prometheus/prometheus/issues/9317)
1516

1617
### Usage
1718

1819
To build k6 binary with the Prometheus remote write output extension use:
19-
2020
```
21-
xk6 build --with github.com/grafana/xk6-output-prometheus-remote=.
21+
xk6 build --with github.com/grafana/xk6-output-prometheus-remote@latest
2222
```
2323

2424
Then run new k6 binary with:
25-
2625
```
2726
K6_PROMETHEUS_REMOTE_URL=http://localhost:9090/api/v1/write ./k6 run script.js -o output-prometheus-remote
2827
```
2928

3029
Add TLS and HTTP basic authentication:
31-
3230
```
3331
K6_PROMETHEUS_REMOTE_URL=https://localhost:9090/api/v1/write K6_PROMETHEUS_INSECURE_SKIP_TLS_VERIFY=false K6_CA_CERT_FILE=example/tls.crt K6_PROMETHEUS_USER=foo K6_PROMETHEUS_PASSWORD=bar ./k6 run script.js -o output-prometheus-remote
3432
```
3533

36-
Different remote storage agents are supported with mapping option. The default is Prometheus itself but there is a simpler raw mapping that can be used as a starting point for other storages:
37-
34+
Different remote storage agents are supported with mapping option. The default is Prometheus itself but there is a simpler raw mapping that can be used as a starting point for other remote agents:
3835
```
3936
K6_PROMETHEUS_MAPPING=raw K6_PROMETHEUS_REMOTE_URL=http://localhost:9090/api/v1/write ./k6 run script.js -o output-prometheus-remote
4037
```
@@ -43,25 +40,12 @@ Note: Prometheus remote client relies on a snappy library for serialization whic
4340

4441
### On sample rate
4542

46-
K6 processes its outputs once per second and that is also a default flush period in this extension. The number of K6 builtin metrics is 26 as of now and they are collected at the rate of 50ms. In practice it means that there will be around 1000-1500 samples on average per each flush period in case of raw mapping. If custom metrics are configured, that estimate will have to be adjusted.
43+
k6 processes its outputs once per second and that is also a default flush period in this extension. The number of k6 builtin metrics is 26 and they are collected at the rate of 50ms. In practice it means that there will be around 1000-1500 samples on average per each flush period in case of raw mapping. If custom metrics are configured, that estimate will have to be adjusted.
4744

48-
Depending on exact Prometheus setup, it may be necessary to configure Prometheus and / or remote-write agent to handle the load. For example, see [`queue_config` parameter](https://prometheus.io/docs/practices/remote_write/) of Prometheus.
45+
Depending on exact setup, it may be necessary to configure Prometheus and / or remote-write agent to handle the load. For example, see [`queue_config` parameter](https://prometheus.io/docs/practices/remote_write/) of Prometheus.
46+
47+
If remote endpoint responds too slowly or the k6 test run generates too many metrics, extension may start discarding samples in order to continue to adhere to the flush period.
4948

5049
### Prometheus as remote-write agent
5150

5251
To enable remote write in Prometheus 2.x use `--enable-feature=remote-write-receiver` option. See docker-compose samples in `example/`. Options for remote write storage can be found [here](https://prometheus.io/docs/operating/integrations/).
53-
54-
### Next steps
55-
56-
*Note:* this list is meant to keep up to date with current roadmap and status of the extension and is very dynamic.
57-
58-
- [X] support of raw format of Prometheus remote write specification
59-
- [X] support of Prometheus as remote-write agent
60-
- [X] mapping of k6 metrics to Prometheus metrics
61-
- [X] Counter
62-
- [X] Gauge
63-
- [X] Rate
64-
- [X] Trend (draft)
65-
- [X] ability to switch Prometheus as remote agent on as a configurable option
66-
- [ ] decide on submetrics processing
67-
- [ ] investigate: the processing loop may need an overhaul with long running tests and cloud setups and / or look into more complex aggregations

pkg/remotewrite/config_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
)
1717

1818
func TestApply(t *testing.T) {
19+
t.Parallel()
20+
1921
fullConfig := Config{
2022
Url: null.StringFrom("some-url"),
2123
InsecureSkipTLSVerify: null.BoolFrom(false),
@@ -48,6 +50,8 @@ func TestApply(t *testing.T) {
4850
}
4951

5052
func TestConfigParseArg(t *testing.T) {
53+
t.Parallel()
54+
5155
c, err := ParseArg("url=http://prometheus.remote:3412/write")
5256
assert.Nil(t, err)
5357
assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.Url)

pkg/remotewrite/labels.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"go.k6.io/k6/stats"
66
)
77

8-
func tagsToPrometheusLabels(tags *stats.SampleTags, config Config) ([]prompb.Label, error) {
8+
func tagsToLabels(tags *stats.SampleTags, config Config) ([]prompb.Label, error) {
99
if !config.KeepTags.Bool {
1010
return []prompb.Label{}, nil
1111
}
@@ -22,11 +22,6 @@ func tagsToPrometheusLabels(tags *stats.SampleTags, config Config) ([]prompb.Lab
2222
continue
2323
}
2424

25-
// TODO add checks:
26-
// - reserved underscore
27-
// - sorting
28-
// - duplicates?
29-
3025
labelPairs = append(labelPairs, prompb.Label{
3126
Name: name,
3227
Value: value,
@@ -37,7 +32,3 @@ func tagsToPrometheusLabels(tags *stats.SampleTags, config Config) ([]prompb.Lab
3732

3833
return labelPairs[:len(labelPairs):len(labelPairs)], nil
3934
}
40-
41-
// func (l labels) Len() int { return len(l) }
42-
// func (l labels) Less(i, j int) bool { return l[i].Name < l[j].Name }
43-
// func (l labels) Swap(i, j int) { l[i], l[j] = l[j], l[i] }

pkg/remotewrite/labels_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package remotewrite
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/prometheus/prometheus/prompb"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
"go.k6.io/k6/stats"
11+
"gopkg.in/guregu/null.v3"
12+
)
13+
14+
func TestTagsToLabels(t *testing.T) {
15+
t.Parallel()
16+
17+
testCases := map[string]struct {
18+
tags *stats.SampleTags
19+
config Config
20+
labels []prompb.Label
21+
}{
22+
"empty-tags": {
23+
tags: &stats.SampleTags{},
24+
config: Config{
25+
KeepTags: null.BoolFrom(true),
26+
KeepNameTag: null.BoolFrom(false),
27+
},
28+
labels: []prompb.Label{},
29+
},
30+
"name-tag-discard": {
31+
tags: stats.NewSampleTags(map[string]string{"foo": "bar", "name": "nnn"}),
32+
config: Config{
33+
KeepTags: null.BoolFrom(true),
34+
KeepNameTag: null.BoolFrom(false),
35+
},
36+
labels: []prompb.Label{
37+
{Name: "foo", Value: "bar"},
38+
},
39+
},
40+
"name-tag-keep": {
41+
tags: stats.NewSampleTags(map[string]string{"foo": "bar", "name": "nnn"}),
42+
config: Config{
43+
KeepTags: null.BoolFrom(true),
44+
KeepNameTag: null.BoolFrom(true),
45+
},
46+
labels: []prompb.Label{
47+
{Name: "foo", Value: "bar"},
48+
{Name: "name", Value: "nnn"},
49+
},
50+
},
51+
"discard-tags": {
52+
tags: stats.NewSampleTags(map[string]string{"foo": "bar", "name": "nnn"}),
53+
config: Config{
54+
KeepTags: null.BoolFrom(false),
55+
},
56+
labels: []prompb.Label{},
57+
},
58+
}
59+
60+
for name, testCase := range testCases {
61+
testCase := testCase
62+
t.Run(name, func(t *testing.T) {
63+
labels, err := tagsToLabels(testCase.tags, testCase.config)
64+
require.NoError(t, err)
65+
66+
assert.Equal(t, len(testCase.labels), len(labels))
67+
68+
for i := range testCase.labels {
69+
var found bool
70+
71+
// order is not guaranteed ATM
72+
for j := range labels {
73+
if labels[j].Name == testCase.labels[i].Name {
74+
assert.Equal(t, testCase.labels[i].Value, labels[j].Value)
75+
found = true
76+
break
77+
}
78+
79+
}
80+
if !found {
81+
assert.Fail(t, fmt.Sprintf("Not found label %s: \n"+
82+
"expected: %v\n"+
83+
"actual : %v", testCase.labels[i].Name, testCase.labels, labels))
84+
}
85+
}
86+
})
87+
}
88+
}

pkg/remotewrite/metrics.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
// Note: k6 Registry is not used here since Output is getting
1212
// samples only from k6 engine, hence we assume they are already vetted.
13+
14+
// metricsStorage is an in-memory gather point for metrics
1315
type metricsStorage struct {
1416
m map[string]stats.Sample
1517
}
@@ -21,7 +23,7 @@ func newMetricsStorage() *metricsStorage {
2123
}
2224

2325
// update modifies metricsStorage and returns updated sample
24-
// so that they hold the same value for the given metric
26+
// so that the stored metric and the returned metric hold the same value
2527
func (ms *metricsStorage) update(sample stats.Sample, add func(current, s stats.Sample) stats.Sample) stats.Sample {
2628
if current, ok := ms.m[sample.Metric.Name]; ok {
2729
if add == nil {

pkg/remotewrite/prometheus.go

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -73,39 +73,7 @@ func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample stats.Sample, la
7373
}
7474

7575
func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample stats.Sample, labels []prompb.Label) []prompb.TimeSeries {
76-
sample = ms.update(sample, func(current, s stats.Sample) stats.Sample {
77-
// this is an attempt to optimize what happens in TrendSink;
78-
// partial copy-paste from k6/stats
79-
t := current.Metric.Sink.(*stats.TrendSink)
80-
81-
index := sort.Search(len(t.Values), func(i int) bool {
82-
return t.Values[i] > s.Value
83-
})
84-
t.Values = append(t.Values, 0)
85-
copy(t.Values[index+1:], t.Values[index:])
86-
t.Values[index] = s.Value
87-
88-
t.Count += 1
89-
t.Sum += s.Value
90-
t.Avg = t.Sum / float64(t.Count)
91-
92-
if s.Value > t.Max {
93-
t.Max = s.Value
94-
}
95-
if s.Value < t.Min || t.Count == 1 {
96-
t.Min = s.Value
97-
}
98-
99-
// The median of an even number of values is the average of the middle two.
100-
if (t.Count & 0x01) == 0 {
101-
t.Med = (t.Values[(t.Count/2)-1] + t.Values[(t.Count/2)]) / 2
102-
} else {
103-
t.Med = t.Values[t.Count/2]
104-
}
105-
106-
current.Metric.Sink = t
107-
return current
108-
})
76+
sample = ms.update(sample, trendAdd)
10977

11078
s := sample.Metric.Sink.(*stats.TrendSink)
11179
aggr := map[string]float64{
@@ -197,7 +165,42 @@ func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample stats.Sample, l
197165
}
198166
}
199167

200-
// Note: this is a copy-paste from k6/stats/sink.go but without additional call to Calc
168+
// The following functions are an attempt to add ad-hoc optimization to TrendSink,
169+
// and are a partial copy-paste from k6/stats.
170+
// TODO: re-write & refactor this once metrics refactoring progresses in k6.
171+
172+
func trendAdd(current, s stats.Sample) stats.Sample {
173+
t := current.Metric.Sink.(*stats.TrendSink)
174+
175+
// insert into sorted array instead of sorting anew on each addition
176+
index := sort.Search(len(t.Values), func(i int) bool {
177+
return t.Values[i] > s.Value
178+
})
179+
t.Values = append(t.Values, 0)
180+
copy(t.Values[index+1:], t.Values[index:])
181+
t.Values[index] = s.Value
182+
183+
t.Count += 1
184+
t.Sum += s.Value
185+
t.Avg = t.Sum / float64(t.Count)
186+
187+
if s.Value > t.Max {
188+
t.Max = s.Value
189+
}
190+
if s.Value < t.Min || t.Count == 1 {
191+
t.Min = s.Value
192+
}
193+
194+
if (t.Count & 0x01) == 0 {
195+
t.Med = (t.Values[(t.Count/2)-1] + t.Values[(t.Count/2)]) / 2
196+
} else {
197+
t.Med = t.Values[t.Count/2]
198+
}
199+
200+
current.Metric.Sink = t
201+
return current
202+
}
203+
201204
func p(t *stats.TrendSink, pct float64) float64 {
202205
switch t.Count {
203206
case 0:

0 commit comments

Comments
 (0)