-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemitter.go
139 lines (121 loc) · 2.22 KB
/
emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package emitter
import (
"context"
"sync"
"time"
)
type Emitter struct {
interval time.Duration
batchSize int
outbox OutboxSource
handler Handler
logger Logger
errorLogger Logger
ctx context.Context
wg *sync.WaitGroup
stopSignal chan any
}
func NewEmitter(ctx context.Context, outbox OutboxSource, handler HandlerFunc, opts ...Option) *Emitter {
e := &Emitter{
ctx: ctx,
outbox: outbox,
handler: handler,
}
for _, opt := range opts {
opt(e)
}
if e.interval == 0 {
e.interval = 30 * time.Second
}
if e.batchSize == 0 {
e.batchSize = 10
}
return e
}
// Run starts the emitter.
func (e *Emitter) Run() {
if e.wg != nil {
return
}
e.withLogger(func(logger Logger) {
logger.Printf("Emitter is running ...")
})
e.wg = &sync.WaitGroup{}
e.wg.Add(1)
e.stopSignal = make(chan any)
go func() {
defer e.wg.Done()
ticker := time.NewTicker(e.interval)
defer ticker.Stop()
for {
select {
case <-e.stopSignal:
return
case <-ticker.C:
e._Proc()
}
}
}()
}
func (e *Emitter) _Proc() {
for {
select {
case <-e.stopSignal:
return
default:
}
msgCount := e._ProcOneBatch(e.ctx)
if msgCount <= 0 {
return
}
}
}
func (e *Emitter) _ProcOneBatch(ctx context.Context) int {
msgs, err := e.outbox.GetOutboxMsg(ctx, e.batchSize)
if err != nil {
e.withLogger(func(logger Logger) {
logger.Printf("failed to GetOutboxMsg: %v", err)
})
return 0
}
if len(msgs) == 0 {
return 0
}
recIDs, err := e.handler.Process(ctx, msgs...)
if err != nil {
e.withLogger(func(logger Logger) {
logger.Printf("failed to process OutboxMsgs: %v", err)
})
return 0
}
err = e.outbox.DeleteOutboxMsg(ctx, recIDs...)
if err != nil {
e.withLogger(func(logger Logger) {
logger.Printf("failed to DeleteOutboxMsg: %v", err)
})
return 0
}
return len(recIDs)
}
// Stop stops the emitter.
func (e *Emitter) Stop() {
if nil == e.wg {
return
}
close(e.stopSignal)
e.wg.Wait()
e.stopSignal = nil
e.wg = nil
}
func (e *Emitter) withLogger(do func(Logger)) {
if e.logger != nil {
do(e.logger)
}
}
func (e *Emitter) withErrorLogger(do func(Logger)) {
if e.errorLogger != nil {
do(e.errorLogger)
} else {
e.withLogger(do)
}
}