-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbus.go
94 lines (78 loc) · 2.26 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package eventsource
import (
"context"
"errors"
"fmt"
"github.com/AhmadWaleed/eventsource/command"
)
// Constructor command implementing this interface
// can be used to construct new aggregate
type Constructor interface {
New() bool
}
// AggregateCommand confirms that the command must have aggregate id
// which can be helpful to replay aggreate events from event store
// to build aggregate current state.
type AggregateCommand interface {
AggregateID() string
}
// Command is an model for AggregateCommand used to
// avoid code boilerplate for commands implmenting AggregateCommand
type Command struct {
ID string
}
func (c Command) AggregateID() string {
return c.ID
}
type AggregateHandler interface {
command.Handler
}
func NewCommandBus(repo AggregateRootRepository, middlewares ...command.Middleware) command.CommandSender {
return &commandBus{
repo: repo,
middlewares: middlewares,
}
}
// commandBus default command bus which can be used to syncronously
// process aggregate commands. Its an implmentation of command.CommandSender inferface.
type commandBus struct {
repo AggregateRootRepository
middlewares []command.Middleware
}
// Send process aggregate command, publishes the relevant
// events and save the aggreate state/events into event store.
func (b *commandBus) Send(ctx context.Context, cmd interface{}) error {
for _, m := range b.middlewares {
if err := m.Before(ctx, cmd); err != nil {
return err
}
}
agrCmd, ok := cmd.(AggregateCommand)
if !ok {
return errors.New("command must be a AggregateCommand")
}
var aggregate AggregateRoot
if v, ok := agrCmd.(Constructor); ok && v.New() {
aggregate = b.repo.New().(AggregateRoot)
} else {
aggregateID := agrCmd.AggregateID()
v, err := b.repo.GetByID(ctx, aggregateID)
if err != nil {
return fmt.Errorf("unable to get aggregate by ID: %v", err)
}
aggregate = v
}
handler, ok := aggregate.(AggregateHandler)
if !ok {
return fmt.Errorf("%T do not implement CommandHandler", aggregate)
}
err := handler.Handle(ctx, agrCmd)
if err != nil {
return fmt.Errorf("could not apply command, %T, to aggregate, %T", cmd, aggregate)
}
err = b.repo.Save(ctx, aggregate)
if err != nil {
return fmt.Errorf("could not save aggregate %T %v", aggregate, err)
}
return nil
}