diff --git a/gbus/abstractions.go b/gbus/abstractions.go index eb47e30..ee8bfbc 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -106,9 +106,6 @@ type HandlerRegister interface { HandleEvent(exchange, topic string, event Message, handler MessageHandler) error } -//MessageHandler signature for all command handlers -type MessageHandler func(invocation Invocation, message *BusMessage) error - //Saga is the base interface for all Sagas. type Saga interface { //StartedBy returns the messages that when received should create a new saga instance diff --git a/gbus/bus.go b/gbus/bus.go index cf4dc26..c4e34cb 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "github.com/wework/grabbit/gbus/metrics" "runtime/debug" "sync" "time" @@ -670,7 +671,6 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply } func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Message, handler MessageHandler) error { - b.HandlersLock.Lock() defer b.HandlersLock.Unlock() @@ -678,6 +678,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag b.Serializer.Register(msg) } + metrics.AddHandlerMetrics(handler.Name()) registration := NewRegistration(exchange, routingKey, msg, handler) b.Registrations = append(b.Registrations, registration) for _, worker := range b.workers { diff --git a/gbus/message_handler.go b/gbus/message_handler.go new file mode 100644 index 0000000..776927b --- /dev/null +++ b/gbus/message_handler.go @@ -0,0 +1,17 @@ +package gbus + +import ( + "reflect" + "runtime" + "strings" +) + +//MessageHandler signature for all command handlers +type MessageHandler func(invocation Invocation, message *BusMessage) error + +func (mg MessageHandler) Name() string { + funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name() + splits := strings.Split(funName, ".") + fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1) + return fn +} diff --git a/gbus/metrics/handler_metrics.go b/gbus/metrics/handler_metrics.go new file mode 100644 index 0000000..4311796 --- /dev/null +++ b/gbus/metrics/handler_metrics.go @@ -0,0 +1,128 @@ +package metrics + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_model/go" + "github.com/sirupsen/logrus" + "sync" +) + +var ( + handlerMetricsByHandlerName = &sync.Map{} +) + +const ( + failure = "failure" + success = "success" + handlerResult = "result" + handlers = "handlers" + grabbitPrefix = "grabbit" +) + +type HandlerMetrics struct { + result *prometheus.CounterVec + latency prometheus.Summary +} + +func AddHandlerMetrics(handlerName string) { + handlerMetrics := newHandlerMetrics(handlerName) + _, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics) + + if !exists { + prometheus.MustRegister(handlerMetrics.latency, handlerMetrics.result) + } +} + +func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error { + handlerMetrics := GetHandlerMetrics(handlerName) + defer func() { + if p := recover(); p != nil { + if handlerMetrics != nil { + handlerMetrics.result.WithLabelValues(failure).Inc() + } + + panic(p) + } + }() + + if handlerMetrics == nil { + logger.WithField("handler", handlerName).Warn("Running with metrics - couldn't find metrics for the given handler") + return handleMessage() + } + + err := trackTime(handleMessage, handlerMetrics.latency) + + if err != nil { + handlerMetrics.result.WithLabelValues(failure).Inc() + } else { + handlerMetrics.result.WithLabelValues(success).Inc() + } + + return err +} + +func GetHandlerMetrics(handlerName string) *HandlerMetrics { + entry, ok := handlerMetricsByHandlerName.Load(handlerName) + if ok { + return entry.(*HandlerMetrics) + } + + return nil +} + +func newHandlerMetrics(handlerName string) *HandlerMetrics { + return &HandlerMetrics{ + result: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: grabbitPrefix, + Subsystem: handlers, + Name: fmt.Sprintf("%s_result", handlerName), + Help: fmt.Sprintf("The %s's result", handlerName), + }, + []string{handlerResult}), + latency: prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: grabbitPrefix, + Subsystem: handlers, + Name: fmt.Sprintf("%s_latency", handlerName), + Help: fmt.Sprintf("The %s's latency", handlerName), + }), + } +} + +func trackTime(functionToTrack func() error, observer prometheus.Observer) error { + timer := prometheus.NewTimer(observer) + defer timer.ObserveDuration() + + return functionToTrack() +} + +func (hm *HandlerMetrics) GetSuccessCount() (float64, error) { + return hm.getLabeledCounterValue(success) +} + +func (hm *HandlerMetrics) GetFailureCount() (float64, error) { + return hm.getLabeledCounterValue(failure) +} + +func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) { + m := &io_prometheus_client.Metric{} + err := hm.latency.Write(m) + if err != nil { + return nil, err + } + + return m.GetSummary().SampleCount, nil +} + +func (hm *HandlerMetrics) getLabeledCounterValue(label string) (float64, error) { + m := &io_prometheus_client.Metric{} + err := hm.result.WithLabelValues(label).Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} diff --git a/gbus/metrics/message_metrics.go b/gbus/metrics/message_metrics.go new file mode 100644 index 0000000..8bf0dfb --- /dev/null +++ b/gbus/metrics/message_metrics.go @@ -0,0 +1,35 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_model/go" +) + +var ( + rejectedMessages = newRejectedMessagesCounter() +) + +func ReportRejectedMessage() { + rejectedMessages.Inc() +} + +func GetRejectedMessagesValue() (float64, error) { + m := &io_prometheus_client.Metric{} + err := rejectedMessages.Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} + +func newRejectedMessagesCounter() prometheus.Counter { + return promauto.NewCounter(prometheus.CounterOpts{ + Namespace: grabbitPrefix, + Subsystem: "messages", + Name: "rejected_messages", + Help: "counting the rejected messages", + }) +} \ No newline at end of file diff --git a/gbus/saga/def.go b/gbus/saga/def.go index e390817..1248312 100644 --- a/gbus/saga/def.go +++ b/gbus/saga/def.go @@ -1,9 +1,8 @@ package saga import ( + "github.com/wework/grabbit/gbus/metrics" "reflect" - "runtime" - "strings" "sync" "github.com/wework/grabbit/gbus" @@ -49,22 +48,14 @@ func (sd *Def) getHandledMessages() []string { } func (sd *Def) addMsgToHandlerMapping(exchange, routingKey string, message gbus.Message, handler gbus.MessageHandler) { - - fn := getFunNameFromHandler(handler) - + handlerName := handler.Name() + metrics.AddHandlerMetrics(handlerName) msgToFunc := &MsgToFuncPair{ Filter: gbus.NewMessageFilter(exchange, routingKey, message), - SagaFuncName: fn} + SagaFuncName: handlerName} sd.msgToFunc = append(sd.msgToFunc, msgToFunc) } -func getFunNameFromHandler(handler gbus.MessageHandler) string { - funName := runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name() - splits := strings.Split(funName, ".") - fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1) - return fn -} - func (sd *Def) newInstance() *Instance { instance := NewInstance(sd.sagaType, sd.msgToFunc) return sd.configureSaga(instance) diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 9513c40..cf90c6e 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus/metrics" "reflect" "time" @@ -45,12 +46,21 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati invocation.Log().WithFields(logrus.Fields{ "method_name": methodName, "saga_id": si.ID, }).Info("invoking method on saga") - returns := method.Call(params) - val := returns[0] - if !val.IsNil() { - return val.Interface().(error) + err := metrics.RunHandlerWithMetric(func() error { + returns := method.Call(params) + + val := returns[0] + if !val.IsNil() { + return val.Interface().(error) + } + return nil + }, methodName, invocation.Log()) + + if err != nil { + return err } + invocation.Log().WithFields(logrus.Fields{ "method_name": methodName, "saga_id": si.ID, }).Info("saga instance invoked") diff --git a/gbus/saga/instance_test.go b/gbus/saga/instance_test.go index 4d828dd..f5bc56a 100644 --- a/gbus/saga/instance_test.go +++ b/gbus/saga/instance_test.go @@ -21,10 +21,10 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) { exchange, routingKey := "", "kong" invocationStub := &sagaInvocation{} - failName := getFunNameFromHandler(s.Fail) + failName := gbus.MessageHandler(s.Fail).Name() failFilter := gbus.NewMessageFilter(exchange, routingKey, m1) - passName := getFunNameFromHandler(s.Pass) + passName := gbus.MessageHandler(s.Pass).Name() passFilter := gbus.NewMessageFilter(exchange, routingKey, m2) //map the filter to correct saga function name diff --git a/gbus/worker.go b/gbus/worker.go index 2ca5683..f3513ad 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -5,9 +5,8 @@ import ( "database/sql" "errors" "fmt" + "github.com/wework/grabbit/gbus/metrics" "math/rand" - "reflect" - "runtime" "runtime/debug" "sync" "time" @@ -322,6 +321,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { _ = worker.ack(delivery) } else { _ = worker.reject(false, delivery) + metrics.ReportRejectedMessage() } } @@ -363,7 +363,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan var hspan opentracing.Span var hsctx context.Context for _, handler := range handlers { - hspan, hsctx = opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) + hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name()) ctx := &defaultInvocationContext{ invocingSvc: delivery.ReplyTo, @@ -378,8 +378,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan MaxRetryCount: MaxRetryCount, }, } - ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name())) - handlerErr = handler(ctx, message) + ctx.SetLogger(worker.log().WithField("handler", handler.Name())) + handlerErr = metrics.RunHandlerWithMetric(func() error { + return handler(ctx, message) + }, handler.Name(), worker.log()) if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) break diff --git a/go.mod b/go.mod index 450e785..ddd8cf2 100644 --- a/go.mod +++ b/go.mod @@ -3,27 +3,42 @@ module github.com/wework/grabbit require ( github.com/DataDog/zstd v1.4.0 // indirect github.com/Rican7/retry v0.1.0 - github.com/Shopify/sarama v1.22.1 // indirect + github.com/Shopify/sarama v1.23.0 // indirect github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 + github.com/eapache/go-resiliency v1.2.0 // indirect + github.com/go-kit/kit v0.9.0 // indirect github.com/go-sql-driver/mysql v1.4.1 - github.com/golang/protobuf v1.3.1 + github.com/gogo/protobuf v1.2.1 // indirect + github.com/golang/protobuf v1.3.2 + github.com/jcmturner/gofork v1.0.0 // indirect + github.com/kisielk/errcheck v1.2.0 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/pretty v0.1.0 // indirect + github.com/kr/pty v1.1.8 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.0.5+incompatible // indirect + github.com/pkg/errors v0.8.1 // indirect + github.com/prometheus/client_golang v1.0.0 + github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 + github.com/prometheus/common v0.6.0 // indirect + github.com/prometheus/procfs v0.0.3 // indirect + github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 // indirect github.com/rs/xid v1.2.1 github.com/sirupsen/logrus v1.4.2 github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 - golang.org/x/net v0.0.0-20190603091049-60506f45cf65 // indirect - golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed // indirect - golang.org/x/text v0.3.2 // indirect - google.golang.org/appengine v1.6.0 // indirect + github.com/stretchr/objx v0.2.0 // indirect + golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect + golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect + golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect + golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d // indirect + google.golang.org/appengine v1.6.1 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index f96bd08..a8e86ca 100644 --- a/go.sum +++ b/go.sum @@ -5,10 +5,18 @@ github.com/Rican7/retry v0.1.0 h1:FqK94z34ly8Baa6K+G8Mmza9rYWTKOJk+yckIBB5qVk= github.com/Rican7/retry v0.1.0/go.mod h1:FgOROf8P5bebcC1DS0PdOQiqGUridaZvikzUmkFW6gg= github.com/Shopify/sarama v1.22.1 h1:exyEsKLGyCsDiqpV5Lr4slFi8ev2KiM3cP1KZ6vnCQ0= github.com/Shopify/sarama v1.22.1/go.mod h1:FRzlvRpMFO/639zY1SDxUxkqH97Y0ndM5CbGj6oG3As= +github.com/Shopify/sarama v1.23.0 h1:slvlbm7bxyp7sKQbUwha5BQdZTqurhRoI+zbKorVigQ= +github.com/Shopify/sarama v1.23.0/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A= github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 h1:QuKWm+/gc4/EuT8SCBAn1qcTh576rg0KoLfi7a0ArMM= github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15/go.mod h1:NBrM4f6cInyw9KSBFONNXzpvPQ/WGige7ON42RICbWM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -16,31 +24,60 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -53,49 +90,105 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= +github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= +github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 h1:eUm8ma4+yPknhXtkYlWh3tMkE6gBjXZToDned9s2gbQ= +github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/gokrb5.v7 v7.3.0 h1:0709Jtq/6QXEuWRfAm260XqlpcwL1vxtO1tUE2qK8Z4= +gopkg.in/jcmturner/gokrb5.v7 v7.3.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4= gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/tests/bus_test.go b/tests/bus_test.go index eb21c5b..5fd2680 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "github.com/wework/grabbit/gbus/metrics" "reflect" "sync" "testing" @@ -144,6 +145,10 @@ func TestSubscribingOnTopic(t *testing.T) { <-proceed } +var ( + handlerRetryProceed = make(chan bool) + attempts = 0 +) func TestHandlerRetry(t *testing.T) { c1 := Command1{} @@ -153,34 +158,45 @@ func TestHandlerRetry(t *testing.T) { bus := createBusForTest() - proceed := make(chan bool) cmdHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { return invocation.Reply(noopTraceContext(), reply) } - attempts := 0 - replyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { - if attempts == 0 { - attempts++ - return fmt.Errorf("expecting retry on errors") - } else if attempts == 1 { - attempts++ - panic("expecting retry on panics") - } else { - proceed <- true - } - return nil - } - bus.HandleMessage(c1, cmdHandler) - bus.HandleMessage(r1, replyHandler) + bus.HandleMessage(r1, handleRetry) bus.Start() defer bus.Shutdown() bus.Send(noopTraceContext(), testSvc1, cmd) - <-proceed + <-handlerRetryProceed + hm := metrics.GetHandlerMetrics("handleRetry") + if hm == nil { + t.Error("Metrics for handleRetry should be initiated") + } + f, _ := hm.GetFailureCount() + s, _ := hm.GetSuccessCount() + + if f != 2 { + t.Errorf("Failure count should be 2 but was %f", f) + } + if s != 1 { + t.Errorf("Success count should be 1 but was %f", s) + } +} + +func handleRetry(invocation gbus.Invocation, message *gbus.BusMessage) error { + if attempts == 0 { + attempts++ + return fmt.Errorf("expecting retry on errors") + } else if attempts == 1 { + attempts++ + panic("expecting retry on panics") + } else { + handlerRetryProceed <- true + } + return nil } func TestRPC(t *testing.T) { @@ -239,6 +255,10 @@ func TestDeadlettering(t *testing.T) { service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) waitgroup.Wait() + count, _ := metrics.GetRejectedMessagesValue() + if count != 1 { + t.Error("Should have one rejected message") + } } func TestRegistrationAfterBusStarts(t *testing.T) { diff --git a/tests/handler_metrics_test.go b/tests/handler_metrics_test.go new file mode 100644 index 0000000..dcfcc75 --- /dev/null +++ b/tests/handler_metrics_test.go @@ -0,0 +1,137 @@ +package tests + +import ( + "errors" + "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus/metrics" + "testing" +) + +var ( + logger logrus.FieldLogger + runningTries = 5 +) + +func TestAddHandlerMetrics(t *testing.T) { + name := "handler1" + metrics.AddHandlerMetrics(name) + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Error("Failed to create handler metrics") + } + + metrics.AddHandlerMetrics(name) + hm1 := metrics.GetHandlerMetrics(name) + + if hm1 == nil { + t.Error("Failed to create handler metrics") + } + + if hm1 != hm { + t.Error("Created two handlers with the same name") + } + + differentName := "handler2" + metrics.AddHandlerMetrics(differentName) + hm2 := metrics.GetHandlerMetrics(differentName) + + if hm2 == nil { + t.Error("Failed to create handler metrics") + } + + if hm2 == hm { + t.Error("Failed to create a different handler metrics") + } +} + +func TestRunHandlerWithMetric_FailureCounter(t *testing.T) { + logger = logrus.WithField("testCase", "TestRunHandlerWithMetric_FailureCounter") + name := "failure" + metrics.AddHandlerMetrics(name) + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Errorf("Couldn't find handler with the name %s", name) + } + failure := func() error { + return errors.New("error in running handler") + } + + for i := 1; i < runningTries; i++ { + err := metrics.RunHandlerWithMetric(failure, name, logger) + + if err == nil { + t.Error("Failed handler run should return an error") + } + + count, err := hm.GetFailureCount() + + if err != nil { + t.Errorf("Failed to get counter value: %e", err) + } + if count != float64(i) { + t.Errorf("Expected to get %f as the value of the failure counter, but got %f", float64(i), count) + } + } +} + +func TestRunHandlerWithMetric_SuccessCounter(t *testing.T) { + logger = logrus.WithField("testCase", "TestRunHandlerWithMetric_SuccessCounter") + name := "success" + metrics.AddHandlerMetrics(name) + success := func() error { + return nil + } + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Errorf("Couldn't find handler with the name %s", name) + } + + for i := 1; i < runningTries; i++ { + err := metrics.RunHandlerWithMetric(success, name, logger) + + if err != nil { + t.Error("Successful handler run shouldn't return an error") + } + + count, err := hm.GetSuccessCount() + + if err != nil { + t.Errorf("Failed to get counter value: %e", err) + } + if count != float64(i) { + t.Errorf("Expected to get %f as the value of the success counter, but got %f", float64(i), count) + } + } +} + +func TestRunHandlerWithMetric_Latency(t *testing.T) { + logger = logrus.WithField("testCase", "TestRunHandlerWithMetric_ExceededRetriesCounter") + name := "latency" + metrics.AddHandlerMetrics(name) + success := func() error { + return nil + } + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Errorf("Couldn't find handler with the name %s", name) + } + + for i := 1; i < runningTries; i++ { + _ = metrics.RunHandlerWithMetric(success, name, logger) + sc, err := hm.GetLatencySampleCount() + + if err != nil { + t.Errorf("Failed to get latency value: %e", err) + } + if sc == nil { + t.Errorf("Expected latency sample count not be nil") + } + if *sc != uint64(i) { + t.Errorf("Expected to get %d as the value of the latency sample count, but got %d", uint64(i), *sc) + } + } +}