Skip to content

Commit c576dc3

Browse files
author
Guy Baron
authored
Merge pull request #68 from wework/v1.x
rollup v1.x to master
2 parents c6204e8 + b07357f commit c576dc3

File tree

5 files changed

+99
-14
lines changed

5 files changed

+99
-14
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ Planned:
2222

2323
1) Deduplication of inbound messages
2424

25+
## Stable relase
26+
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
2527

2628
## Supported transactional resources
2729
1) MySql > 8.0 (InnoDB)
@@ -107,7 +109,7 @@ gb.HandleEvent("name of exchange", "name of topic", SomeEvent{}, eventHandler)
107109
Start the bus
108110
```Go
109111
gb.Start()
110-
defer gb.Shutsown()
112+
defer gb.Shutdown()
111113
```
112114

113115
Send a command

gbus/saga/glue.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
106106
imsm.lock.Lock()
107107
defer imsm.lock.Unlock()
108108
msgName := message.PayloadFQN
109-
exchange, routingKey := invocation.Routing()
110109

111110
defs := imsm.msgToDefMap[strings.ToLower(msgName)]
112111

@@ -125,7 +124,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
125124
imsm.log().
126125
WithFields(log.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
127126
Info("created new saga")
128-
if invkErr := newInstance.invoke(exchange, routingKey, invocation, message); invkErr != nil {
127+
if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil {
129128
imsm.log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
130129
return invkErr
131130
}
@@ -156,7 +155,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
156155
return e
157156
}
158157

159-
if invkErr := instance.invoke(exchange, routingKey, invocation, message); invkErr != nil {
158+
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
160159
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
161160
return invkErr
162161
}
@@ -178,7 +177,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
178177

179178
for _, instance := range instances {
180179

181-
if invkErr := instance.invoke(exchange, routingKey, invocation, message); invkErr != nil {
180+
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
182181
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
183182
return invkErr
184183
}
@@ -193,6 +192,19 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
193192
return nil
194193
}
195194

195+
func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {
196+
sginv := &sagaInvocation{
197+
decoratedBus: invocation.Bus(),
198+
decoratedInvocation: invocation,
199+
inboundMsg: message,
200+
sagaID: instance.ID,
201+
ctx: invocation.Ctx(),
202+
invokingService: imsm.svcName}
203+
204+
exchange, routingKey := invocation.Routing()
205+
return instance.invoke(exchange, routingKey, sginv, message)
206+
}
207+
196208
func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance, lastMessage *gbus.BusMessage) error {
197209

198210
_, timedOut := lastMessage.Payload.(gbus.SagaTimeoutMessage)

gbus/saga/instance.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,12 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati
3030
}
3131

3232
valueOfMessage := reflect.ValueOf(message)
33-
sginv := &sagaInvocation{
34-
decoratedBus: invocation.Bus(),
35-
decoratedInvocation: invocation,
36-
inboundMsg: message,
37-
sagaID: si.ID,
38-
ctx: invocation.Ctx(),
39-
}
33+
4034
reflectedVal := reflect.ValueOf(si.UnderlyingInstance)
4135

4236
for _, methodName := range methodsToInvoke {
4337
params := make([]reflect.Value, 0)
44-
params = append(params, reflect.ValueOf(sginv), valueOfMessage)
38+
params = append(params, reflect.ValueOf(invocation), valueOfMessage)
4539
method := reflectedVal.MethodByName(methodName)
4640
log.Printf(" invoking method %v on saga instance %v", methodName, si.ID)
4741
returns := method.Call(params)

gbus/saga/invocation.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,26 @@ type sagaInvocation struct {
1414
inboundMsg *gbus.BusMessage
1515
sagaID string
1616
ctx context.Context
17+
invokingService string
1718
}
1819

1920
func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool) {
2021

2122
message.CorrelationID = si.inboundMsg.ID
23+
message.SagaID = si.sagaID
2224

2325
if !isEvent {
2426
//support saga-to-saga communication
2527
if si.inboundMsg.SagaID != "" {
28+
message.SagaCorrelationID = si.inboundMsg.SagaID
29+
}
30+
//if the saga is potentially invoking itself then set the SagaCorrelationID to reflect that
31+
//https://github.com/wework/grabbit/issues/64
32+
_, targetService := si.decoratedInvocation.Routing()
33+
if targetService == si.invokingService {
2634
message.SagaCorrelationID = message.SagaID
2735
}
2836

29-
message.SagaID = si.sagaID
3037
}
3138

3239
}

tests/saga_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package tests
22

33
import (
4+
"context"
45
"log"
6+
"reflect"
57
"testing"
68
"time"
79

@@ -224,6 +226,37 @@ func TestSagaTimeout(t *testing.T) {
224226
<-proceed
225227
}
226228

229+
func TestSagaSelfMessaging(t *testing.T) {
230+
proceed := make(chan bool)
231+
b := createNamedBusForTest(testSvc1)
232+
233+
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
234+
235+
_, ok := message.Payload.(*Event1)
236+
if !ok {
237+
t.Errorf("handler invoced with wrong message type\r\nexpeted:%v\r\nactual:%v", reflect.TypeOf(Command1{}), reflect.TypeOf(message.Payload))
238+
}
239+
proceed <- true
240+
241+
return nil
242+
}
243+
244+
err := b.HandleEvent("test_exchange", "test_topic", Event1{}, handler)
245+
if err != nil {
246+
t.Errorf("Registering handler returned false, expected true with error: %s", err.Error())
247+
}
248+
249+
b.RegisterSaga(&SelfSendingSaga{})
250+
251+
b.Start()
252+
defer b.Shutdown()
253+
254+
b.Send(context.TODO(), testSvc1, gbus.NewBusMessage(Command1{}))
255+
256+
<-proceed
257+
258+
}
259+
227260
/*Test Sagas*/
228261

229262
type SagaA struct {
@@ -370,3 +403,40 @@ func (s *TimingOutSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMes
370403
Data: "TimingOutSaga.Timeout",
371404
}))
372405
}
406+
407+
type SelfSendingSaga struct {
408+
}
409+
410+
func (*SelfSendingSaga) StartedBy() []gbus.Message {
411+
starters := make([]gbus.Message, 0)
412+
return append(starters, Command1{})
413+
}
414+
415+
func (s *SelfSendingSaga) IsComplete() bool {
416+
return false
417+
}
418+
419+
func (s *SelfSendingSaga) New() gbus.Saga {
420+
return &SelfSendingSaga{}
421+
}
422+
423+
func (s *SelfSendingSaga) RegisterAllHandlers(register gbus.HandlerRegister) {
424+
register.HandleMessage(Command1{}, s.HandleCommand1)
425+
register.HandleMessage(Command2{}, s.HandleCommand2)
426+
register.HandleMessage(Reply2{}, s.HandleReply2)
427+
}
428+
429+
func (s *SelfSendingSaga) HandleCommand1(invocation gbus.Invocation, message *gbus.BusMessage) error {
430+
cmd2 := gbus.NewBusMessage(Command2{})
431+
return invocation.Bus().Send(invocation.Ctx(), testSvc1, cmd2)
432+
}
433+
434+
func (s *SelfSendingSaga) HandleCommand2(invocation gbus.Invocation, message *gbus.BusMessage) error {
435+
reply := gbus.NewBusMessage(Reply2{})
436+
return invocation.Reply(invocation.Ctx(), reply)
437+
}
438+
439+
func (s *SelfSendingSaga) HandleReply2(invocation gbus.Invocation, message *gbus.BusMessage) error {
440+
evt1 := gbus.NewBusMessage(Event1{})
441+
return invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "test_topic", evt1)
442+
}

0 commit comments

Comments
 (0)