Skip to content

Commit 5d077c4

Browse files
author
Guy Baron
authored
Merge pull request #99 from wework/v1.x
merge v1.x into master
2 parents 5624310 + 4ab2be5 commit 5d077c4

File tree

7 files changed

+79
-31
lines changed

7 files changed

+79
-31
lines changed

README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
2020
4) Publisher confirms
2121
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
2222
6) Deadlettering
23+
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
2324

2425
Planned:
2526

2627
1) Deduplication of inbound messages
2728

2829
## Stable release
29-
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
30+
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
3031

3132
## Supported transactional resources
3233
1) MySql > 8.0 (InnoDB)
@@ -78,8 +79,8 @@ Register a command handler
7879
```Go
7980

8081

81-
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error
82-
cmd, ok := message.Payload.(SomeCommand)
82+
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error{
83+
cmd, ok := message.Payload.(*SomeCommand)
8384
if ok {
8485
fmt.Printf("handler invoked with message %v", cmd)
8586
return nil
@@ -96,7 +97,7 @@ Register an event handler
9697

9798

9899
eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) {
99-
evt, ok := message.Payload.(SomeEvent)
100+
evt, ok := message.Payload.(*SomeEvent)
100101
if ok {
101102
fmt.Printf("handler invoked with event %v", evt)
102103
return nil

docs/LOGGING.md

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Logging
2+
3+
grabbit supports structured logging via the [logrus](https://github.com/sirupsen/logrus) logging package.
4+
The logger is accessible to message handlers via the past in invocation instance.
5+
6+
```go
7+
8+
func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
9+
invocation.Log().WithField("name", "rhinof").Info("handler invoked")
10+
return nil
11+
}
12+
13+
```
14+
15+
grabbit will create a default instance of logrus FieldLogger if no such logger is set when the bus is created.
16+
In order to set a custom logger when creating the bus you need to call the Builder.WithLogger method passing it
17+
a logrus.FieldLogger instance.
18+
19+
```go
20+
21+
22+
```

gbus/abstractions.go

+1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ type Invocation interface {
192192
Tx() *sql.Tx
193193
Ctx() context.Context
194194
Routing() (exchange, routingKey string)
195+
DeliveryInfo() DeliveryInfo
195196
}
196197

197198
//Serializer is the base interface for all message serializers

gbus/bus.go

+24-18
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ type DefaultBus struct {
2828
Outbox TxOutbox
2929
PrefetchCount uint
3030
AmqpConnStr string
31-
amqpConn *amqp.Connection
31+
ingressConn *amqp.Connection
32+
egressConn *amqp.Connection
3233
workers []*worker
33-
AMQPChannel *amqp.Channel
34-
outAMQPChannel *amqp.Channel
34+
ingressChannel *amqp.Channel
35+
egressChannel *amqp.Channel
3536
serviceQueue amqp.Queue
3637
rpcQueue amqp.Queue
3738
SvcName string
@@ -83,7 +84,7 @@ func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
8384
*/
8485
uid := xid.New().String()
8586
qName := b.SvcName + "_rpc_" + uid
86-
q, e := b.AMQPChannel.QueueDeclare(qName,
87+
q, e := b.ingressChannel.QueueDeclare(qName,
8788
false, /*durable*/
8889
true, /*autoDelete*/
8990
false, /*exclusive*/
@@ -97,7 +98,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
9798
var q amqp.Queue
9899

99100
if b.PurgeOnStartup {
100-
msgsPurged, purgeError := b.AMQPChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
101+
msgsPurged, purgeError := b.ingressChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
101102
if purgeError != nil {
102103
b.Log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue")
103104
return q, purgeError
@@ -108,7 +109,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
108109
if b.DLX != "" {
109110
args["x-dead-letter-exchange"] = b.DLX
110111
}
111-
q, e := b.AMQPChannel.QueueDeclare(qName,
112+
q, e := b.ingressChannel.QueueDeclare(qName,
112113
true, /*durable*/
113114
false, /*autoDelete*/
114115
false, /*exclusive*/
@@ -124,7 +125,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
124125
func (b *DefaultBus) bindServiceQueue() error {
125126

126127
if b.deadletterHandler != nil && b.DLX != "" {
127-
err := b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/
128+
err := b.ingressChannel.ExchangeDeclare(b.DLX, /*name*/
128129
"fanout", /*kind*/
129130
true, /*durable*/
130131
false, /*autoDelete*/
@@ -144,7 +145,7 @@ func (b *DefaultBus) bindServiceQueue() error {
144145
for _, subscription := range b.DelayedSubscriptions {
145146
topic := subscription[0]
146147
exchange := subscription[1]
147-
e := b.AMQPChannel.ExchangeDeclare(exchange, /*name*/
148+
e := b.ingressChannel.ExchangeDeclare(exchange, /*name*/
148149
"topic", /*kind*/
149150
true, /*durable*/
150151
false, /*autoDelete*/
@@ -178,27 +179,32 @@ func (b *DefaultBus) Start() error {
178179

179180
var e error
180181
//create amqo connection and channel
181-
if b.amqpConn, e = b.connect(MaxRetryCount); e != nil {
182+
if b.ingressConn, e = b.connect(MaxRetryCount); e != nil {
183+
return e
184+
}
185+
if b.egressConn, e = b.connect(MaxRetryCount); e != nil {
182186
return e
183187
}
184188

185-
if b.AMQPChannel, e = b.createAMQPChannel(b.amqpConn); e != nil {
189+
if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil {
186190
return e
187191
}
188-
if b.outAMQPChannel, e = b.createAMQPChannel(b.amqpConn); e != nil {
192+
if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil {
189193
return e
190194
}
191195

192196
//register on failure notifications
193197
b.amqpErrors = make(chan *amqp.Error)
194198
b.amqpBlocks = make(chan amqp.Blocking)
195-
b.amqpConn.NotifyClose(b.amqpErrors)
196-
b.amqpConn.NotifyBlocked(b.amqpBlocks)
197-
b.outAMQPChannel.NotifyClose(b.amqpErrors)
199+
b.ingressConn.NotifyClose(b.amqpErrors)
200+
b.ingressConn.NotifyBlocked(b.amqpBlocks)
201+
b.egressConn.NotifyClose(b.amqpErrors)
202+
b.egressConn.NotifyBlocked(b.amqpBlocks)
203+
b.egressChannel.NotifyClose(b.amqpErrors)
198204
//TODO:Figure out what should be done
199205

200206
//init the outbox that sends the messages to the amqp transport and handles publisher confirms
201-
if e := b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil {
207+
if e := b.Outgoing.init(b.egressChannel, b.Confirm, true); e != nil {
202208
return e
203209
}
204210
/*
@@ -208,7 +214,7 @@ func (b *DefaultBus) Start() error {
208214
if b.IsTxnl {
209215

210216
var amqpChan *amqp.Channel
211-
if amqpChan, e = b.createAMQPChannel(b.amqpConn); e != nil {
217+
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
212218
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
213219
return e
214220
}
@@ -269,7 +275,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
269275
workers := make([]*worker, 0)
270276
for i := uint(0); i < workerNum; i++ {
271277
//create a channel per worker as we can't share channels across go routines
272-
amqpChan, createChanErr := b.createAMQPChannel(b.amqpConn)
278+
amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn)
273279
if createChanErr != nil {
274280
return nil, createChanErr
275281
}
@@ -681,7 +687,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
681687
}
682688

683689
func (b *DefaultBus) bindQueue(topic, exchange string) error {
684-
return b.AMQPChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
690+
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
685691
}
686692

687693
type rpcPolicy struct {

gbus/invocation.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,19 @@ var _ Messaging = &defaultInvocationContext{}
1313

1414
type defaultInvocationContext struct {
1515
*Glogged
16-
invocingSvc string
17-
bus *DefaultBus
18-
inboundMsg *BusMessage
19-
tx *sql.Tx
20-
ctx context.Context
21-
exchange string
22-
routingKey string
16+
invocingSvc string
17+
bus *DefaultBus
18+
inboundMsg *BusMessage
19+
tx *sql.Tx
20+
ctx context.Context
21+
exchange string
22+
routingKey string
23+
deliveryInfo DeliveryInfo
24+
}
25+
26+
type DeliveryInfo struct {
27+
Attempt uint
28+
MaxRetryCount uint
2329
}
2430

2531
func (dfi *defaultInvocationContext) Log() logrus.FieldLogger {
@@ -79,3 +85,7 @@ func (dfi *defaultInvocationContext) Ctx() context.Context {
7985
func (dfi *defaultInvocationContext) Routing() (exchange, routingKey string) {
8086
return dfi.exchange, dfi.routingKey
8187
}
88+
89+
func (dfi *defaultInvocationContext) DeliveryInfo() DeliveryInfo {
90+
return dfi.deliveryInfo
91+
}

gbus/saga/invocation.go

+4
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) {
8080
return si.decoratedInvocation.Routing()
8181
}
8282

83+
func (si *sagaInvocation) DeliveryInfo() gbus.DeliveryInfo {
84+
return si.decoratedInvocation.DeliveryInfo()
85+
}
86+
8387
//func (si *sagaInvocation) Log() logrus.FieldLogger {
8488
// return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID)
8589
//}

gbus/worker.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
330330
//this is the action that will get retried
331331
// each retry should run a new and separate transaction which should end with a commit or rollback
332332

333-
action := func(attempts uint) (actionErr error) {
333+
action := func(attempt uint) (actionErr error) {
334334
var tx *sql.Tx
335335
var txCreateErr error
336336
if worker.isTxnl {
@@ -343,7 +343,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
343343
}
344344

345345
worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers")
346-
worker.span.LogFields(slog.Uint64("attempt", uint64(attempts+1)))
346+
worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1)))
347347
defer func() {
348348
if p := recover(); p != nil {
349349
pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack())
@@ -373,6 +373,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
373373
ctx: hsctx,
374374
exchange: delivery.Exchange,
375375
routingKey: delivery.RoutingKey,
376+
deliveryInfo: DeliveryInfo{
377+
Attempt: attempt,
378+
MaxRetryCount: MaxRetryCount,
379+
},
376380
}
377381
ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()))
378382
handlerErr = handler(ctx, message)

0 commit comments

Comments
 (0)