Skip to content

increased test coverage #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
RPCHandlers: make(map[string]gbus.MessageHandler),
Serializer: builder.serializer,
DLX: builder.dlx,
DefaultPolicies: make([]gbus.MessagePolicy, 0),
DefaultPolicies: builder.defaultPolicies,
DbPingTimeout: 3}

gb.Confirm = builder.confirm
Expand Down
20 changes: 10 additions & 10 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type DefaultBus struct {
Confirm bool
healthChan chan error
backpreasure bool
rabbitFailure bool
DbPingTimeout time.Duration
amqpConnected bool
}

var (
Expand Down Expand Up @@ -257,7 +257,7 @@ func (b *DefaultBus) Start() error {
//start monitoring on amqp related errors
go b.monitorAMQPErrors()
//start consuming messags from service queue

b.amqpConnected = true
return nil
}

Expand Down Expand Up @@ -293,12 +293,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors}
go func() {
err := w.Start()
if err != nil {
log.WithError(err)
}
}()

err := w.Start()
if err != nil {
log.WithError(err).Error("failed to start worker")
}

workers = append(workers, w)
}
Expand All @@ -321,6 +320,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
err := worker.Stop()
if err != nil {
b.log().WithError(err).Error("could not stop worker")
return err
}
}
b.Outgoing.shutdown()
Expand Down Expand Up @@ -359,7 +359,7 @@ func (b *DefaultBus) GetHealth() HealthCard {
return HealthCard{
DbConnected: dbConnected,
RabbitBackPressure: b.backpreasure,
RabbitConnected: !b.rabbitFailure,
RabbitConnected: b.amqpConnected,
}
}

Expand Down Expand Up @@ -577,7 +577,7 @@ func (b *DefaultBus) monitorAMQPErrors() {
}
b.backpreasure = blocked.Active
case amqpErr := <-b.amqpErrors:
b.rabbitFailure = true
b.amqpConnected = false
b.log().WithField("amqp_error", amqpErr).Error("amqp error")
if b.healthChan != nil {
b.healthChan <- amqpErr
Expand Down
3 changes: 2 additions & 1 deletion gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type worker struct {
func (worker *worker) Start() error {

worker.log().Info("starting worker")
worker.stop = make(chan bool)
worker.channel.NotifyClose(worker.amqpErrors)

var (
Expand All @@ -62,7 +63,7 @@ func (worker *worker) Start() error {
}
worker.messages = messages
worker.rpcMessages = rpcmsgs
worker.stop = make(chan bool)

go worker.consumeMessages()

return nil
Expand Down
41 changes: 32 additions & 9 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tests
import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -209,28 +211,34 @@ func TestRPC(t *testing.T) {
}

func TestDeadlettering(t *testing.T) {

var waitgroup sync.WaitGroup
waitgroup.Add(2)
poision := gbus.NewBusMessage(PoisionMessage{})
service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true)
deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true)
proceed := make(chan bool)
handler := func(tx *sql.Tx, poision amqp.Delivery) error {
proceed <- true

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
waitgroup.Done()
return nil
}

deadletterSvc.HandleDeadletter(handler)
faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
return errors.New("fail")
}

deadletterSvc.HandleDeadletter(deadMessageHandler)
service1.HandleMessage(Command1{}, faultyHandler)

deadletterSvc.Start()
defer deadletterSvc.Shutdown()
service1.Start()
defer service1.Shutdown()

e := service1.Send(context.Background(), testSvc1, poision)
if e != nil {
log.Printf("send error: %v", e)
}
service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))

<-proceed
waitgroup.Wait()
}

func TestRegistrationAfterBusStarts(t *testing.T) {
Expand Down Expand Up @@ -323,6 +331,21 @@ func TestSendingPanic(t *testing.T) {
}
}

func TestHealthCheck(t *testing.T) {
svc1 := createNamedBusForTest(testSvc1)
err := svc1.Start()
if err != nil {
t.Error(err.Error())
}
defer svc1.Shutdown()
health := svc1.GetHealth()

fmt.Printf("%v", health)
if !health.DbConnected || !health.RabbitConnected || health.RabbitBackPressure {
t.Error("bus expected to be healthy but failed health check")
}
}

func noopTraceContext() context.Context {
return context.Background()
// tracer := opentracing.NoopTracer{}
Expand Down
5 changes: 4 additions & 1 deletion tests/consts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package tests

import (
"time"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
"github.com/wework/grabbit/gbus/policy"
Expand All @@ -26,7 +28,8 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu
busBuilder := builder.
New().
Bus(connStr).
WithPolicies(&policy.Durable{}).
WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}).
WorkerNum(3, 1).
WithConfirms()

if txnl {
Expand Down