Skip to content

Commit 665d2d1

Browse files
author
Guy Baron
authored
Merge pull request #71 from wework/coverage
increased test coverage
2 parents bbbeac7 + a3069de commit 665d2d1

File tree

5 files changed

+49
-22
lines changed

5 files changed

+49
-22
lines changed

gbus/builder/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
4949
RPCHandlers: make(map[string]gbus.MessageHandler),
5050
Serializer: builder.serializer,
5151
DLX: builder.dlx,
52-
DefaultPolicies: make([]gbus.MessagePolicy, 0),
52+
DefaultPolicies: builder.defaultPolicies,
5353
DbPingTimeout: 3}
5454

5555
gb.Confirm = builder.confirm

gbus/bus.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ type DefaultBus struct {
5757
Confirm bool
5858
healthChan chan error
5959
backpreasure bool
60-
rabbitFailure bool
6160
DbPingTimeout time.Duration
61+
amqpConnected bool
6262
}
6363

6464
var (
@@ -257,7 +257,7 @@ func (b *DefaultBus) Start() error {
257257
//start monitoring on amqp related errors
258258
go b.monitorAMQPErrors()
259259
//start consuming messags from service queue
260-
260+
b.amqpConnected = true
261261
return nil
262262
}
263263

@@ -293,12 +293,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
293293
serializer: b.Serializer,
294294
b: b,
295295
amqpErrors: b.amqpErrors}
296-
go func() {
297-
err := w.Start()
298-
if err != nil {
299-
log.WithError(err)
300-
}
301-
}()
296+
297+
err := w.Start()
298+
if err != nil {
299+
log.WithError(err).Error("failed to start worker")
300+
}
302301

303302
workers = append(workers, w)
304303
}
@@ -321,6 +320,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
321320
err := worker.Stop()
322321
if err != nil {
323322
b.log().WithError(err).Error("could not stop worker")
323+
return err
324324
}
325325
}
326326
b.Outgoing.shutdown()
@@ -359,7 +359,7 @@ func (b *DefaultBus) GetHealth() HealthCard {
359359
return HealthCard{
360360
DbConnected: dbConnected,
361361
RabbitBackPressure: b.backpreasure,
362-
RabbitConnected: !b.rabbitFailure,
362+
RabbitConnected: b.amqpConnected,
363363
}
364364
}
365365

@@ -577,7 +577,7 @@ func (b *DefaultBus) monitorAMQPErrors() {
577577
}
578578
b.backpreasure = blocked.Active
579579
case amqpErr := <-b.amqpErrors:
580-
b.rabbitFailure = true
580+
b.amqpConnected = false
581581
b.log().WithField("amqp_error", amqpErr).Error("amqp error")
582582
if b.healthChan != nil {
583583
b.healthChan <- amqpErr

gbus/worker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type worker struct {
4747
func (worker *worker) Start() error {
4848

4949
worker.log().Info("starting worker")
50+
worker.stop = make(chan bool)
5051
worker.channel.NotifyClose(worker.amqpErrors)
5152

5253
var (
@@ -62,7 +63,7 @@ func (worker *worker) Start() error {
6263
}
6364
worker.messages = messages
6465
worker.rpcMessages = rpcmsgs
65-
worker.stop = make(chan bool)
66+
6667
go worker.consumeMessages()
6768

6869
return nil

tests/bus_test.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package tests
33
import (
44
"context"
55
"database/sql"
6+
"errors"
67
"fmt"
78
"reflect"
9+
"sync"
810
"testing"
911
"time"
1012

@@ -209,28 +211,34 @@ func TestRPC(t *testing.T) {
209211
}
210212

211213
func TestDeadlettering(t *testing.T) {
214+
215+
var waitgroup sync.WaitGroup
216+
waitgroup.Add(2)
212217
poision := gbus.NewBusMessage(PoisionMessage{})
213218
service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true)
214219
deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true)
215-
proceed := make(chan bool)
216-
handler := func(tx *sql.Tx, poision amqp.Delivery) error {
217-
proceed <- true
220+
221+
deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
222+
waitgroup.Done()
218223
return nil
219224
}
220225

221-
deadletterSvc.HandleDeadletter(handler)
226+
faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
227+
return errors.New("fail")
228+
}
229+
230+
deadletterSvc.HandleDeadletter(deadMessageHandler)
231+
service1.HandleMessage(Command1{}, faultyHandler)
222232

223233
deadletterSvc.Start()
224234
defer deadletterSvc.Shutdown()
225235
service1.Start()
226236
defer service1.Shutdown()
227237

228-
e := service1.Send(context.Background(), testSvc1, poision)
229-
if e != nil {
230-
log.Printf("send error: %v", e)
231-
}
238+
service1.Send(context.Background(), testSvc1, poision)
239+
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))
232240

233-
<-proceed
241+
waitgroup.Wait()
234242
}
235243

236244
func TestRegistrationAfterBusStarts(t *testing.T) {
@@ -323,6 +331,21 @@ func TestSendingPanic(t *testing.T) {
323331
}
324332
}
325333

334+
func TestHealthCheck(t *testing.T) {
335+
svc1 := createNamedBusForTest(testSvc1)
336+
err := svc1.Start()
337+
if err != nil {
338+
t.Error(err.Error())
339+
}
340+
defer svc1.Shutdown()
341+
health := svc1.GetHealth()
342+
343+
fmt.Printf("%v", health)
344+
if !health.DbConnected || !health.RabbitConnected || health.RabbitBackPressure {
345+
t.Error("bus expected to be healthy but failed health check")
346+
}
347+
}
348+
326349
func noopTraceContext() context.Context {
327350
return context.Background()
328351
// tracer := opentracing.NoopTracer{}

tests/consts.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package tests
22

33
import (
4+
"time"
5+
46
"github.com/wework/grabbit/gbus"
57
"github.com/wework/grabbit/gbus/builder"
68
"github.com/wework/grabbit/gbus/policy"
@@ -26,7 +28,8 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu
2628
busBuilder := builder.
2729
New().
2830
Bus(connStr).
29-
WithPolicies(&policy.Durable{}).
31+
WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}).
32+
WorkerNum(3, 1).
3033
WithConfirms()
3134

3235
if txnl {

0 commit comments

Comments
 (0)