Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit e33a841

Browse files
committed
Removed the prometheus/prometheus go library
It has been replaced with a lighter version based on buf.build
1 parent 3743b65 commit e33a841

File tree

11 files changed

+418
-1937
lines changed

11 files changed

+418
-1937
lines changed

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ K6_PROMETHEUS_PASSWORD=bar \
3232
./k6 run script.js -o output-prometheus-remote
3333
```
3434

35-
Note: Prometheus remote client relies on a snappy library for serialization which could panic on [encode operations](https://github.com/golang/snappy/blob/544b4180ac705b7605231d4a4550a1acb22a19fe/encode.go#L22).
36-
3735
### On sample rate
3836

3937
k6 processes its outputs once per second and that is also a default flush period in this extension. The number of k6 builtin metrics is 26 and they are collected at the rate of 50ms. In practice it means that there will be around 1000-1500 samples on average per each flush period in case of raw mapping. If custom metrics are configured, that estimate will have to be adjusted.

go.mod

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,36 @@
11
module github.com/grafana/xk6-output-prometheus-remote
22

3-
go 1.17
3+
go 1.18
44

55
require (
6-
github.com/golang/protobuf v1.5.2
6+
github.com/gogo/protobuf v1.3.2
77
github.com/golang/snappy v0.0.4
88
github.com/kubernetes/helm v2.17.0+incompatible
9-
github.com/prometheus/common v0.32.1
10-
github.com/prometheus/prometheus v1.8.2-0.20211005150130-f29caccc4255
119
github.com/sirupsen/logrus v1.8.1
1210
github.com/stretchr/testify v1.8.0
11+
go.buf.build/grpc/go/prometheus/prometheus v1.4.3
1312
go.k6.io/k6 v0.40.0
1413
gopkg.in/guregu/null.v3 v3.5.0
1514
)
1615

1716
require (
18-
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect
19-
github.com/aws/aws-sdk-go v1.40.37 // indirect
20-
github.com/beorn7/perks v1.0.1 // indirect
21-
github.com/cespare/xxhash/v2 v2.1.2 // indirect
22-
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 // indirect
2317
github.com/davecgh/go-spew v1.1.1 // indirect
24-
github.com/dennwc/varint v1.0.0 // indirect
25-
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021 // indirect
2618
github.com/fatih/color v1.13.0 // indirect
2719
github.com/ghodss/yaml v1.0.0 // indirect
28-
github.com/go-kit/log v0.1.0 // indirect
29-
github.com/go-logfmt/logfmt v0.5.1 // indirect
30-
github.com/gogo/protobuf v1.3.2 // indirect
31-
github.com/jmespath/go-jmespath v0.4.0 // indirect
32-
github.com/jpillora/backoff v1.0.0 // indirect
20+
github.com/google/go-cmp v0.5.6 // indirect
21+
github.com/kr/text v0.2.0 // indirect
3322
github.com/mailru/easyjson v0.7.7 // indirect
3423
github.com/mattn/go-colorable v0.1.12 // indirect
3524
github.com/mattn/go-isatty v0.0.14 // indirect
36-
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
37-
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
38-
github.com/opentracing-contrib/go-stdlib v1.0.0 // indirect
39-
github.com/opentracing/opentracing-go v1.2.0 // indirect
4025
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
41-
github.com/pkg/errors v0.9.1 // indirect
4226
github.com/pmezard/go-difflib v1.0.0 // indirect
43-
github.com/prometheus/client_golang v1.11.0 // indirect
44-
github.com/prometheus/client_model v0.2.0 // indirect
45-
github.com/prometheus/common/sigv4 v0.1.0 // indirect
46-
github.com/prometheus/procfs v0.6.0 // indirect
4727
github.com/spf13/afero v1.3.4 // indirect
48-
go.uber.org/atomic v1.9.0 // indirect
49-
golang.org/x/net v0.0.0-20220630215102-69896b714898 // indirect
50-
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
28+
go.buf.build/grpc/go/gogo/protobuf v1.4.9 // indirect
5129
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
5230
golang.org/x/text v0.3.7 // indirect
5331
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
54-
google.golang.org/appengine v1.6.7 // indirect
55-
google.golang.org/protobuf v1.27.1 // indirect
32+
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83 // indirect
33+
google.golang.org/protobuf v1.28.1 // indirect
5634
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
5735
gopkg.in/yaml.v2 v2.4.0 // indirect
5836
gopkg.in/yaml.v3 v3.0.1 // indirect

go.sum

Lines changed: 7 additions & 1775 deletions
Large diffs are not rendered by default.

pkg/remote/client.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package remote
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/tls"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"net/url"
11+
"time"
12+
13+
"github.com/gogo/protobuf/proto"
14+
"github.com/golang/snappy"
15+
prompb "go.buf.build/grpc/go/prometheus/prometheus"
16+
)
17+
18+
type HTTPConfig struct {
19+
Timeout time.Duration
20+
TLSConfig *tls.Config
21+
BasicAuth *BasicAuth
22+
Headers http.Header
23+
}
24+
25+
type BasicAuth struct {
26+
Username, Password string
27+
}
28+
29+
// WriteClient is a client implementation of the Prometheus remote write protocol.
30+
// It follows the specs defined by the official design document:
31+
// https://docs.google.com/document/d/1LPhVRSFkGNSuU1fBd81ulhsCPR4hkSZyyBj1SZ8fWOM
32+
type WriteClient struct {
33+
hc *http.Client
34+
url *url.URL
35+
cfg *HTTPConfig
36+
}
37+
38+
func NewWriteClient(endpoint string, cfg *HTTPConfig) (*WriteClient, error) {
39+
if cfg == nil {
40+
cfg = &HTTPConfig{}
41+
}
42+
u, err := url.Parse(endpoint)
43+
if err != nil {
44+
return nil, err
45+
}
46+
wc := &WriteClient{
47+
hc: &http.Client{
48+
Timeout: cfg.Timeout,
49+
},
50+
url: u,
51+
cfg: cfg,
52+
}
53+
if cfg.TLSConfig != nil {
54+
wc.hc.Transport = &http.Transport{
55+
TLSClientConfig: cfg.TLSConfig,
56+
}
57+
}
58+
return wc, nil
59+
}
60+
61+
// Store sends a batch of samples to the HTTP endpoint,
62+
// the request is the proto marshaled and encoded.
63+
func (c *WriteClient) Store(ctx context.Context, series []*prompb.TimeSeries) error {
64+
b, err := newWriteRequestBody(series)
65+
if err != nil {
66+
return err
67+
}
68+
req, err := http.NewRequestWithContext(
69+
ctx, http.MethodPost, c.url.String(), bytes.NewReader(b))
70+
if err != nil {
71+
return fmt.Errorf("create new HTTP request failed: %w", err)
72+
}
73+
if c.cfg.BasicAuth != nil {
74+
req.SetBasicAuth(c.cfg.BasicAuth.Username, c.cfg.BasicAuth.Password)
75+
}
76+
77+
if len(c.cfg.Headers) > 0 {
78+
req.Header = c.cfg.Headers.Clone()
79+
}
80+
81+
req.Header.Set("User-Agent", "k6-prometheus-rw-output")
82+
83+
// They are mostly defined by the specs
84+
req.Header.Set("Content-Encoding", "snappy")
85+
req.Header.Set("Content-Type", "application/x-protobuf")
86+
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
87+
88+
resp, err := c.hc.Do(req)
89+
if err != nil {
90+
return fmt.Errorf("HTTP POST request failed: %w", err)
91+
}
92+
defer resp.Body.Close()
93+
io.Copy(io.Discard, resp.Body) //nolint:errcheck
94+
95+
if resp.StatusCode != http.StatusNoContent {
96+
return fmt.Errorf("got status code: %s instead expected: 204 No Content", resp.Status)
97+
}
98+
return nil
99+
}
100+
101+
func newWriteRequestBody(series []*prompb.TimeSeries) ([]byte, error) {
102+
b, err := proto.Marshal(&prompb.WriteRequest{
103+
Timeseries: series,
104+
})
105+
if err != nil {
106+
return nil, fmt.Errorf("encoding series as protobuf write request failed: %w", err)
107+
}
108+
if snappy.MaxEncodedLen(len(b)) < 0 {
109+
return nil, fmt.Errorf("the protobuf message is too large to be handled by Snappy encoder; "+
110+
"size: %d, limit: %d", len(b), 0xffffffff)
111+
}
112+
return snappy.Encode(nil, b), nil
113+
}

pkg/remote/client_test.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package remote
2+
3+
import (
4+
"context"
5+
"io"
6+
"net/http"
7+
"net/http/httptest"
8+
"net/url"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
prompb "go.buf.build/grpc/go/prometheus/prometheus"
15+
)
16+
17+
func TestNewWrtiteClient(t *testing.T) {
18+
t.Parallel()
19+
t.Run("DefaultConfig", func(t *testing.T) {
20+
t.Parallel()
21+
wc, err := NewWriteClient("http://example.com/api/v1/write", nil)
22+
require.NoError(t, err)
23+
require.NotNil(t, wc)
24+
assert.Equal(t, wc.cfg, &HTTPConfig{})
25+
})
26+
27+
t.Run("CustomConfig", func(t *testing.T) {
28+
t.Parallel()
29+
hc := &HTTPConfig{Timeout: time.Second}
30+
wc, err := NewWriteClient("http://example.com/api/v1/write", hc)
31+
require.NoError(t, err)
32+
require.NotNil(t, wc)
33+
assert.Equal(t, wc.cfg, hc)
34+
})
35+
36+
t.Run("InvalidURL", func(t *testing.T) {
37+
t.Parallel()
38+
wc, err := NewWriteClient("fake://bad url", nil)
39+
require.Error(t, err)
40+
assert.Nil(t, wc)
41+
})
42+
}
43+
44+
func TestClientStore(t *testing.T) {
45+
t.Parallel()
46+
h := func(rw http.ResponseWriter, r *http.Request) {
47+
assert.Equal(t, r.Header.Get("Content-Encoding"), "snappy")
48+
assert.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")
49+
assert.Equal(t, r.Header.Get("User-Agent"), "k6-prometheus-rw-output")
50+
assert.Equal(t, r.Header.Get("X-Prometheus-Remote-Write-Version"), "0.1.0")
51+
assert.NotEmpty(t, r.Header.Get("Content-Length"))
52+
53+
b, err := io.ReadAll(r.Body)
54+
assert.NoError(t, err)
55+
assert.NotEmpty(t, len(b))
56+
57+
rw.WriteHeader(http.StatusNoContent)
58+
}
59+
ts := httptest.NewServer(http.HandlerFunc(h))
60+
defer ts.Close()
61+
62+
u, err := url.Parse(ts.URL)
63+
require.NoError(t, err)
64+
65+
c := &WriteClient{
66+
hc: ts.Client(),
67+
url: u,
68+
cfg: &HTTPConfig{},
69+
}
70+
data := &prompb.TimeSeries{
71+
Labels: []*prompb.Label{
72+
{
73+
Name: "label1",
74+
Value: "label1-val",
75+
},
76+
},
77+
Samples: []*prompb.Sample{
78+
{
79+
Value: 8.5,
80+
Timestamp: time.Now().UnixMilli(),
81+
},
82+
},
83+
}
84+
err = c.Store(context.Background(), []*prompb.TimeSeries{data})
85+
assert.NoError(t, err)
86+
}
87+
88+
func TestClientStoreHTTPError(t *testing.T) {
89+
t.Parallel()
90+
h := func(w http.ResponseWriter, r *http.Request) {
91+
http.Error(w, "bad bad", http.StatusUnauthorized)
92+
}
93+
ts := httptest.NewServer(http.HandlerFunc(h))
94+
defer ts.Close()
95+
96+
u, err := url.Parse(ts.URL)
97+
require.NoError(t, err)
98+
99+
c := &WriteClient{
100+
hc: ts.Client(),
101+
url: u,
102+
cfg: &HTTPConfig{},
103+
}
104+
assert.Error(t, c.Store(context.Background(), nil))
105+
}
106+
107+
func TestClientStoreHTTPBasic(t *testing.T) {
108+
t.Parallel()
109+
h := func(w http.ResponseWriter, r *http.Request) {
110+
u, pwd, ok := r.BasicAuth()
111+
require.True(t, ok)
112+
assert.Equal(t, "usertest", u)
113+
assert.Equal(t, "pwdtest", pwd)
114+
}
115+
ts := httptest.NewServer(http.HandlerFunc(h))
116+
defer ts.Close()
117+
118+
u, err := url.Parse(ts.URL)
119+
require.NoError(t, err)
120+
121+
c := &WriteClient{
122+
hc: ts.Client(),
123+
url: u,
124+
cfg: &HTTPConfig{
125+
BasicAuth: &BasicAuth{
126+
Username: "usertest",
127+
Password: "pwdtest",
128+
},
129+
},
130+
}
131+
assert.Error(t, c.Store(context.Background(), nil))
132+
}
133+
134+
func TestClientStoreHeaders(t *testing.T) {
135+
t.Parallel()
136+
h := func(w http.ResponseWriter, r *http.Request) {
137+
assert.Equal(t, r.Header.Get("X-Prometheus-Remote-Write-Version"), "0.1.0")
138+
assert.Equal(t, r.Header.Get("X-MY-CUSTOM-HEADER"), "fake")
139+
}
140+
ts := httptest.NewServer(http.HandlerFunc(h))
141+
defer ts.Close()
142+
143+
u, err := url.Parse(ts.URL)
144+
require.NoError(t, err)
145+
146+
c := &WriteClient{
147+
hc: ts.Client(),
148+
url: u,
149+
cfg: &HTTPConfig{
150+
Headers: http.Header(map[string][]string{
151+
"X-MY-CUSTOM-HEADER": {"fake"},
152+
// If the same key, of a mandatory protocol's header
153+
// is provided, it will be overwritten.
154+
"X-Prometheus-Remote-Write-Version": {"fake"},
155+
}),
156+
},
157+
}
158+
assert.Error(t, c.Store(context.Background(), nil))
159+
}
160+
161+
func TestNewWriteRequestBody(t *testing.T) {
162+
ts := []*prompb.TimeSeries{
163+
{
164+
Labels: []*prompb.Label{{Name: "label1", Value: "val1"}},
165+
Samples: []*prompb.Sample{{Value: 10.1, Timestamp: time.Unix(1, 0).Unix()}},
166+
},
167+
}
168+
b, err := newWriteRequestBody(ts)
169+
require.NoError(t, err)
170+
require.NotEmpty(t, string(b))
171+
assert.Contains(t, string(b), `label1`)
172+
}

0 commit comments

Comments
 (0)