Skip to content

Commit f617e04

Browse files
danielwitzGuy Baron
authored and
Guy Baron
committed
add handler metrics to bus and saga (#101)
* add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric
1 parent 4ab2be5 commit f617e04

13 files changed

+497
-51
lines changed

gbus/abstractions.go

-3
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,6 @@ type HandlerRegister interface {
106106
HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
107107
}
108108

109-
//MessageHandler signature for all command handlers
110-
type MessageHandler func(invocation Invocation, message *BusMessage) error
111-
112109
//Saga is the base interface for all Sagas.
113110
type Saga interface {
114111
//StartedBy returns the messages that when received should create a new saga instance

gbus/bus.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
"github.com/wework/grabbit/gbus/metrics"
89
"runtime/debug"
910
"sync"
1011
"time"
@@ -670,14 +671,14 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
670671
}
671672

672673
func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Message, handler MessageHandler) error {
673-
674674
b.HandlersLock.Lock()
675675
defer b.HandlersLock.Unlock()
676676

677677
if msg != nil {
678678
b.Serializer.Register(msg)
679679
}
680680

681+
metrics.AddHandlerMetrics(handler.Name())
681682
registration := NewRegistration(exchange, routingKey, msg, handler)
682683
b.Registrations = append(b.Registrations, registration)
683684
for _, worker := range b.workers {

gbus/message_handler.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package gbus
2+
3+
import (
4+
"reflect"
5+
"runtime"
6+
"strings"
7+
)
8+
9+
//MessageHandler signature for all command handlers
10+
type MessageHandler func(invocation Invocation, message *BusMessage) error
11+
12+
func (mg MessageHandler) Name() string {
13+
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
14+
splits := strings.Split(funName, ".")
15+
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
16+
return fn
17+
}

gbus/metrics/handler_metrics.go

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package metrics
2+
3+
import (
4+
"fmt"
5+
"github.com/prometheus/client_golang/prometheus"
6+
"github.com/prometheus/client_model/go"
7+
"github.com/sirupsen/logrus"
8+
"sync"
9+
)
10+
11+
var (
12+
handlerMetricsByHandlerName = &sync.Map{}
13+
)
14+
15+
const (
16+
failure = "failure"
17+
success = "success"
18+
handlerResult = "result"
19+
handlers = "handlers"
20+
grabbitPrefix = "grabbit"
21+
)
22+
23+
type HandlerMetrics struct {
24+
result *prometheus.CounterVec
25+
latency prometheus.Summary
26+
}
27+
28+
func AddHandlerMetrics(handlerName string) {
29+
handlerMetrics := newHandlerMetrics(handlerName)
30+
_, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics)
31+
32+
if !exists {
33+
prometheus.MustRegister(handlerMetrics.latency, handlerMetrics.result)
34+
}
35+
}
36+
37+
func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error {
38+
handlerMetrics := GetHandlerMetrics(handlerName)
39+
defer func() {
40+
if p := recover(); p != nil {
41+
if handlerMetrics != nil {
42+
handlerMetrics.result.WithLabelValues(failure).Inc()
43+
}
44+
45+
panic(p)
46+
}
47+
}()
48+
49+
if handlerMetrics == nil {
50+
logger.WithField("handler", handlerName).Warn("Running with metrics - couldn't find metrics for the given handler")
51+
return handleMessage()
52+
}
53+
54+
err := trackTime(handleMessage, handlerMetrics.latency)
55+
56+
if err != nil {
57+
handlerMetrics.result.WithLabelValues(failure).Inc()
58+
} else {
59+
handlerMetrics.result.WithLabelValues(success).Inc()
60+
}
61+
62+
return err
63+
}
64+
65+
func GetHandlerMetrics(handlerName string) *HandlerMetrics {
66+
entry, ok := handlerMetricsByHandlerName.Load(handlerName)
67+
if ok {
68+
return entry.(*HandlerMetrics)
69+
}
70+
71+
return nil
72+
}
73+
74+
func newHandlerMetrics(handlerName string) *HandlerMetrics {
75+
return &HandlerMetrics{
76+
result: prometheus.NewCounterVec(
77+
prometheus.CounterOpts{
78+
Namespace: grabbitPrefix,
79+
Subsystem: handlers,
80+
Name: fmt.Sprintf("%s_result", handlerName),
81+
Help: fmt.Sprintf("The %s's result", handlerName),
82+
},
83+
[]string{handlerResult}),
84+
latency: prometheus.NewSummary(
85+
prometheus.SummaryOpts{
86+
Namespace: grabbitPrefix,
87+
Subsystem: handlers,
88+
Name: fmt.Sprintf("%s_latency", handlerName),
89+
Help: fmt.Sprintf("The %s's latency", handlerName),
90+
}),
91+
}
92+
}
93+
94+
func trackTime(functionToTrack func() error, observer prometheus.Observer) error {
95+
timer := prometheus.NewTimer(observer)
96+
defer timer.ObserveDuration()
97+
98+
return functionToTrack()
99+
}
100+
101+
func (hm *HandlerMetrics) GetSuccessCount() (float64, error) {
102+
return hm.getLabeledCounterValue(success)
103+
}
104+
105+
func (hm *HandlerMetrics) GetFailureCount() (float64, error) {
106+
return hm.getLabeledCounterValue(failure)
107+
}
108+
109+
func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) {
110+
m := &io_prometheus_client.Metric{}
111+
err := hm.latency.Write(m)
112+
if err != nil {
113+
return nil, err
114+
}
115+
116+
return m.GetSummary().SampleCount, nil
117+
}
118+
119+
func (hm *HandlerMetrics) getLabeledCounterValue(label string) (float64, error) {
120+
m := &io_prometheus_client.Metric{}
121+
err := hm.result.WithLabelValues(label).Write(m)
122+
123+
if err != nil {
124+
return 0, err
125+
}
126+
127+
return m.GetCounter().GetValue(), nil
128+
}

gbus/metrics/message_metrics.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
"github.com/prometheus/client_model/go"
7+
)
8+
9+
var (
10+
rejectedMessages = newRejectedMessagesCounter()
11+
)
12+
13+
func ReportRejectedMessage() {
14+
rejectedMessages.Inc()
15+
}
16+
17+
func GetRejectedMessagesValue() (float64, error) {
18+
m := &io_prometheus_client.Metric{}
19+
err := rejectedMessages.Write(m)
20+
21+
if err != nil {
22+
return 0, err
23+
}
24+
25+
return m.GetCounter().GetValue(), nil
26+
}
27+
28+
func newRejectedMessagesCounter() prometheus.Counter {
29+
return promauto.NewCounter(prometheus.CounterOpts{
30+
Namespace: grabbitPrefix,
31+
Subsystem: "messages",
32+
Name: "rejected_messages",
33+
Help: "counting the rejected messages",
34+
})
35+
}

gbus/saga/def.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package saga
22

33
import (
4+
"github.com/wework/grabbit/gbus/metrics"
45
"reflect"
5-
"runtime"
6-
"strings"
76
"sync"
87

98
"github.com/wework/grabbit/gbus"
@@ -49,22 +48,14 @@ func (sd *Def) getHandledMessages() []string {
4948
}
5049

5150
func (sd *Def) addMsgToHandlerMapping(exchange, routingKey string, message gbus.Message, handler gbus.MessageHandler) {
52-
53-
fn := getFunNameFromHandler(handler)
54-
51+
handlerName := handler.Name()
52+
metrics.AddHandlerMetrics(handlerName)
5553
msgToFunc := &MsgToFuncPair{
5654
Filter: gbus.NewMessageFilter(exchange, routingKey, message),
57-
SagaFuncName: fn}
55+
SagaFuncName: handlerName}
5856
sd.msgToFunc = append(sd.msgToFunc, msgToFunc)
5957
}
6058

61-
func getFunNameFromHandler(handler gbus.MessageHandler) string {
62-
funName := runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()
63-
splits := strings.Split(funName, ".")
64-
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
65-
return fn
66-
}
67-
6859
func (sd *Def) newInstance() *Instance {
6960
instance := NewInstance(sd.sagaType, sd.msgToFunc)
7061
return sd.configureSaga(instance)

gbus/saga/instance.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"database/sql"
55
"fmt"
66
"github.com/sirupsen/logrus"
7+
"github.com/wework/grabbit/gbus/metrics"
78
"reflect"
89
"time"
910

@@ -45,12 +46,21 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati
4546
invocation.Log().WithFields(logrus.Fields{
4647
"method_name": methodName, "saga_id": si.ID,
4748
}).Info("invoking method on saga")
48-
returns := method.Call(params)
4949

50-
val := returns[0]
51-
if !val.IsNil() {
52-
return val.Interface().(error)
50+
err := metrics.RunHandlerWithMetric(func() error {
51+
returns := method.Call(params)
52+
53+
val := returns[0]
54+
if !val.IsNil() {
55+
return val.Interface().(error)
56+
}
57+
return nil
58+
}, methodName, invocation.Log())
59+
60+
if err != nil {
61+
return err
5362
}
63+
5464
invocation.Log().WithFields(logrus.Fields{
5565
"method_name": methodName, "saga_id": si.ID,
5666
}).Info("saga instance invoked")

gbus/saga/instance_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) {
2121
exchange, routingKey := "", "kong"
2222
invocationStub := &sagaInvocation{}
2323

24-
failName := getFunNameFromHandler(s.Fail)
24+
failName := gbus.MessageHandler(s.Fail).Name()
2525
failFilter := gbus.NewMessageFilter(exchange, routingKey, m1)
2626

27-
passName := getFunNameFromHandler(s.Pass)
27+
passName := gbus.MessageHandler(s.Pass).Name()
2828
passFilter := gbus.NewMessageFilter(exchange, routingKey, m2)
2929

3030
//map the filter to correct saga function name

gbus/worker.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
"github.com/wework/grabbit/gbus/metrics"
89
"math/rand"
9-
"reflect"
10-
"runtime"
1110
"runtime/debug"
1211
"sync"
1312
"time"
@@ -322,6 +321,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
322321
_ = worker.ack(delivery)
323322
} else {
324323
_ = worker.reject(false, delivery)
324+
metrics.ReportRejectedMessage()
325325
}
326326
}
327327

@@ -363,7 +363,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
363363
var hspan opentracing.Span
364364
var hsctx context.Context
365365
for _, handler := range handlers {
366-
hspan, hsctx = opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name())
366+
hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name())
367367

368368
ctx := &defaultInvocationContext{
369369
invocingSvc: delivery.ReplyTo,
@@ -378,8 +378,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
378378
MaxRetryCount: MaxRetryCount,
379379
},
380380
}
381-
ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()))
382-
handlerErr = handler(ctx, message)
381+
ctx.SetLogger(worker.log().WithField("handler", handler.Name()))
382+
handlerErr = metrics.RunHandlerWithMetric(func() error {
383+
return handler(ctx, message)
384+
}, handler.Name(), worker.log())
383385
if handlerErr != nil {
384386
hspan.LogFields(slog.Error(handlerErr))
385387
break

go.mod

+21-6
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,42 @@ module github.com/wework/grabbit
33
require (
44
github.com/DataDog/zstd v1.4.0 // indirect
55
github.com/Rican7/retry v0.1.0
6-
github.com/Shopify/sarama v1.22.1 // indirect
6+
github.com/Shopify/sarama v1.23.0 // indirect
77
github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect
88
github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15
9+
github.com/eapache/go-resiliency v1.2.0 // indirect
10+
github.com/go-kit/kit v0.9.0 // indirect
911
github.com/go-sql-driver/mysql v1.4.1
10-
github.com/golang/protobuf v1.3.1
12+
github.com/gogo/protobuf v1.2.1 // indirect
13+
github.com/golang/protobuf v1.3.2
14+
github.com/jcmturner/gofork v1.0.0 // indirect
15+
github.com/kisielk/errcheck v1.2.0 // indirect
1116
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
1217
github.com/kr/pretty v0.1.0 // indirect
18+
github.com/kr/pty v1.1.8 // indirect
1319
github.com/linkedin/goavro v2.1.0+incompatible
1420
github.com/onsi/ginkgo v1.8.0 // indirect
1521
github.com/onsi/gomega v1.5.0 // indirect
1622
github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620
1723
github.com/opentracing/opentracing-go v1.1.0
1824
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
25+
github.com/pkg/errors v0.8.1 // indirect
26+
github.com/prometheus/client_golang v1.0.0
27+
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
28+
github.com/prometheus/common v0.6.0 // indirect
29+
github.com/prometheus/procfs v0.0.3 // indirect
30+
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 // indirect
1931
github.com/rs/xid v1.2.1
2032
github.com/sirupsen/logrus v1.4.2
2133
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
22-
golang.org/x/net v0.0.0-20190603091049-60506f45cf65 // indirect
23-
golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed // indirect
24-
golang.org/x/text v0.3.2 // indirect
25-
google.golang.org/appengine v1.6.0 // indirect
34+
github.com/stretchr/objx v0.2.0 // indirect
35+
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
36+
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
37+
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect
38+
golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d // indirect
39+
google.golang.org/appengine v1.6.1 // indirect
2640
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
41+
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
2742
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
2843
gopkg.in/yaml.v2 v2.2.2 // indirect
2944
)

0 commit comments

Comments
 (0)