Skip to content

Commit 6936b12

Browse files
committed
agent/grpc: add metrics to grpc handler
1 parent 31ad3d5 commit 6936b12

File tree

3 files changed

+141
-4
lines changed

3 files changed

+141
-4
lines changed

pkg/agent/protocol/grpc/handler.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"time"
1111

12+
"github.com/grafana/xk6-disruptor/pkg/agent/protocol"
1213
"google.golang.org/grpc"
1314
"google.golang.org/grpc/codes"
1415
"google.golang.org/grpc/metadata"
@@ -24,10 +25,11 @@ func clientStreamDescForProxy() *grpc.StreamDesc {
2425
}
2526

2627
// NewHandler returns a StreamHandler that attempts to proxy all requests that are not registered in the server.
27-
func NewHandler(disruption Disruption, forwardConn *grpc.ClientConn) grpc.StreamHandler {
28+
func NewHandler(disruption Disruption, forwardConn *grpc.ClientConn, metrics *protocol.MetricMap) grpc.StreamHandler {
2829
handler := &handler{
2930
disruption: disruption,
3031
forwardConn: forwardConn,
32+
metrics: metrics,
3133
}
3234

3335
// return the handler function
@@ -37,6 +39,7 @@ func NewHandler(disruption Disruption, forwardConn *grpc.ClientConn) grpc.Stream
3739
type handler struct {
3840
disruption Disruption
3941
forwardConn *grpc.ClientConn
42+
metrics *protocol.MetricMap
4043
}
4144

4245
// contains verifies if a list of strings contains the given string
@@ -52,6 +55,8 @@ func contains(list []string, target string) bool {
5255
// handles requests from the client. If selected for error injection, returns an error,
5356
// otherwise, forwards to the server transparently
5457
func (h *handler) streamHandler(_ interface{}, serverStream grpc.ServerStream) error {
58+
h.metrics.Inc(protocol.MetricRequests)
59+
5560
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
5661
if !ok {
5762
return status.Errorf(codes.Internal, "ServerTransportStream not exists in context")
@@ -61,18 +66,23 @@ func (h *handler) streamHandler(_ interface{}, serverStream grpc.ServerStream) e
6166
excluded := contains(h.disruption.Excluded, serviceName)
6267
if !excluded {
6368
if h.disruption.ErrorRate > 0 && rand.Float32() <= h.disruption.ErrorRate {
69+
h.metrics.Inc(protocol.MetricRequestsFaulted)
6470
return h.injectError(serverStream)
6571
}
6672

6773
// add delay
6874
if h.disruption.AverageDelay > 0 {
75+
h.metrics.Inc(protocol.MetricRequestsFaulted)
76+
6977
delay := int64(h.disruption.AverageDelay)
7078
if h.disruption.DelayVariation > 0 {
7179
variation := int64(h.disruption.DelayVariation)
7280
delay = delay + variation - 2*rand.Int63n(variation)
7381
}
7482
time.Sleep(time.Duration(delay))
7583
}
84+
} else {
85+
h.metrics.Inc(protocol.MetricRequestsExcluded)
7686
}
7787

7888
return h.transparentForward(serverStream)

pkg/agent/protocol/grpc/proxy.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type proxy struct {
4444
config ProxyConfig
4545
disruption Disruption
4646
srv *grpc.Server
47+
metrics *protocol.MetricMap
4748
}
4849

4950
// NewProxy return a new Proxy
@@ -79,6 +80,7 @@ func NewProxy(c ProxyConfig, d Disruption) (protocol.Proxy, error) {
7980
return &proxy{
8081
disruption: d,
8182
config: c,
83+
metrics: &protocol.MetricMap{},
8284
}, nil
8385
}
8486

@@ -93,7 +95,7 @@ func (p *proxy) Start() error {
9395
if err != nil {
9496
return fmt.Errorf("error dialing %s: %w", p.config.UpstreamAddress, err)
9597
}
96-
handler := NewHandler(p.disruption, conn)
98+
handler := NewHandler(p.disruption, conn, p.metrics)
9799

98100
p.srv = grpc.NewServer(
99101
grpc.UnknownServiceHandler(handler),
@@ -123,7 +125,7 @@ func (p *proxy) Stop() error {
123125
// Metrics returns runtime metrics for the proxy.
124126
// TODO: Add metrics.
125127
func (p *proxy) Metrics() map[string]uint {
126-
return nil
128+
return p.metrics.Map()
127129
}
128130

129131
// Force stops the proxy without waiting for connections to drain

pkg/agent/protocol/grpc/proxy_test.go

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"path/filepath"
99
"testing"
1010

11+
"github.com/google/go-cmp/cmp"
1112
"github.com/google/uuid"
13+
"github.com/grafana/xk6-disruptor/pkg/agent/protocol"
1214
"github.com/grafana/xk6-disruptor/pkg/testutils/grpc/ping"
1315
"google.golang.org/grpc"
1416
"google.golang.org/grpc/codes"
@@ -187,6 +189,7 @@ func Test_ProxyHandler(t *testing.T) {
187189
expectStatus codes.Code
188190
}
189191

192+
// TODO: Add test for excluded endpoints
190193
testCases := []TestCase{
191194
{
192195
title: "default proxy",
@@ -207,7 +210,7 @@ func Test_ProxyHandler(t *testing.T) {
207210
expectStatus: codes.OK,
208211
},
209212
{
210-
title: "error injection ",
213+
title: "error injection",
211214
disruption: Disruption{
212215
AverageDelay: 0,
213216
DelayVariation: 0,
@@ -331,3 +334,125 @@ func Test_ProxyHandler(t *testing.T) {
331334
})
332335
}
333336
}
337+
338+
func Test_ProxyMetrics(t *testing.T) {
339+
t.Parallel()
340+
341+
type TestCase struct {
342+
title string
343+
disruption Disruption
344+
expectedMetrics map[string]uint
345+
}
346+
347+
// TODO: Add test for excluded endpoints
348+
testCases := []TestCase{
349+
{
350+
title: "passthrough",
351+
disruption: Disruption{
352+
AverageDelay: 0,
353+
DelayVariation: 0,
354+
ErrorRate: 0.0,
355+
StatusCode: 0,
356+
StatusMessage: "",
357+
},
358+
expectedMetrics: map[string]uint{
359+
protocol.MetricRequests: 1,
360+
},
361+
},
362+
{
363+
title: "error injection",
364+
disruption: Disruption{
365+
AverageDelay: 0,
366+
DelayVariation: 0,
367+
ErrorRate: 1.0,
368+
StatusCode: int32(codes.Internal),
369+
StatusMessage: "Internal server error",
370+
},
371+
expectedMetrics: map[string]uint{
372+
protocol.MetricRequests: 1,
373+
protocol.MetricRequestsFaulted: 1,
374+
},
375+
},
376+
}
377+
378+
for _, tc := range testCases {
379+
tc := tc
380+
381+
t.Run(tc.title, func(t *testing.T) {
382+
t.Parallel()
383+
384+
// start test server in a random unix socket
385+
serverSocket := filepath.Join(os.TempDir(), uuid.New().String())
386+
l, err := net.Listen("unix", serverSocket)
387+
if err != nil {
388+
t.Errorf("error starting test server in unix:%s: %v", serverSocket, err)
389+
return
390+
}
391+
392+
srv := grpc.NewServer()
393+
ping.RegisterPingServiceServer(srv, ping.NewPingServer())
394+
go func() {
395+
if serr := srv.Serve(l); err != nil {
396+
t.Logf("error in the server: %v", serr)
397+
}
398+
}()
399+
400+
// start proxy in a random unix socket
401+
proxySocket := filepath.Join(os.TempDir(), uuid.New().String())
402+
config := ProxyConfig{
403+
Network: "unix",
404+
ListenAddress: proxySocket,
405+
UpstreamAddress: fmt.Sprintf("unix:%s", serverSocket),
406+
}
407+
408+
proxy, err := NewProxy(config, tc.disruption)
409+
if err != nil {
410+
t.Errorf("error creating proxy: %v", err)
411+
return
412+
}
413+
414+
defer func() {
415+
_ = proxy.Stop()
416+
}()
417+
418+
go func() {
419+
if perr := proxy.Start(); perr != nil {
420+
t.Logf("error starting proxy: %v", perr)
421+
}
422+
}()
423+
424+
// connect client to proxy
425+
conn, err := grpc.DialContext(
426+
context.TODO(),
427+
fmt.Sprintf("unix:%s", proxySocket),
428+
grpc.WithInsecure(),
429+
)
430+
if err != nil {
431+
t.Fatal(err)
432+
}
433+
434+
defer func() {
435+
_ = conn.Close()
436+
}()
437+
438+
client := ping.NewPingServiceClient(conn)
439+
440+
var headers metadata.MD
441+
_, _ = client.Ping(
442+
context.TODO(),
443+
&ping.PingRequest{
444+
Error: 0,
445+
Message: "ping",
446+
},
447+
grpc.Header(&headers),
448+
grpc.WaitForReady(true),
449+
)
450+
451+
metrics := proxy.Metrics()
452+
453+
if diff := cmp.Diff(tc.expectedMetrics, metrics); diff != "" {
454+
t.Fatalf("expected metrics do not match returned:\n%s", diff)
455+
}
456+
})
457+
}
458+
}

0 commit comments

Comments
 (0)