Skip to content

Commit 3ad874e

Browse files
committedMar 16, 2019
Simplify queuedMessage and handler implementation
1 parent 262932c commit 3ad874e

File tree

4 files changed

+36
-54
lines changed

4 files changed

+36
-54
lines changed
 

‎context_helper.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package subee
22

33
import (
44
"context"
5+
"time"
56
)
67

7-
type loggerContextKey struct{}
8+
type (
9+
loggerContextKey struct{}
10+
enqueuedAtContextKey struct{}
11+
)
812

913
// GetLogger return Logger implementation set in the context.
1014
func GetLogger(ctx context.Context) Logger {
@@ -14,3 +18,11 @@ func GetLogger(ctx context.Context) Logger {
1418
func setLogger(ctx context.Context, l Logger) context.Context {
1519
return context.WithValue(ctx, loggerContextKey{}, l)
1620
}
21+
22+
func getEnqueuedAt(ctx context.Context) time.Time {
23+
return ctx.Value(enqueuedAtContextKey{}).(time.Time)
24+
}
25+
26+
func setEnqueuedAt(ctx context.Context, t time.Time) context.Context {
27+
return context.WithValue(ctx, enqueuedAtContextKey{}, t)
28+
}

‎process.go

+16-19
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,8 @@ func (p *processImpl) startConsumingProcess(ctx context.Context) error {
4242
consumer := chainConsumerInterceptors(p.Consumer, p.ConsumerInterceptors...)
4343

4444
err := p.subscribe(ctx, func(in Message) {
45-
msg := &singleMessage{
46-
Ctx: p.createConsumingContext(),
47-
Msg: in,
48-
EnqueuedAt: time.Now(),
49-
}
50-
p.handleMessage(msg, func(in queuedMessage) error {
51-
m := in.(*singleMessage)
52-
return errors.WithStack(consumer.Consume(m.Ctx, m.Msg))
45+
p.handleMessage(p.createConsumingContext(), &singleMessage{Message: in}, func(ctx context.Context) error {
46+
return errors.WithStack(consumer.Consume(ctx, in))
5347
})
5448
})
5549
return errors.WithStack(err)
@@ -73,9 +67,9 @@ func (p *processImpl) startBatchConsumingProcess(ctx context.Context) error {
7367
defer p.Logger.Print("Finish batch consuming process")
7468

7569
for m := range outCh {
76-
p.handleMessage(m, func(in queuedMessage) error {
77-
m := in.(*multiMessages)
78-
return errors.WithStack(batchConsumer.BatchConsume(m.Ctx, m.Msgs))
70+
msgs := m.Msgs
71+
p.handleMessage(m.Ctx, m, func(ctx context.Context) error {
72+
return errors.WithStack(batchConsumer.BatchConsume(ctx, msgs))
7973
})
8074
}
8175
}()
@@ -93,10 +87,11 @@ func (p *processImpl) createConsumingContext() context.Context {
9387
ctx := context.Background()
9488
ctx = p.StatsHandler.TagProcess(ctx, &BeginTag{})
9589
ctx = p.StatsHandler.TagProcess(ctx, &EnqueueTag{})
90+
ctx = setEnqueuedAt(ctx, time.Now().UTC())
9691
return ctx
9792
}
9893

99-
func (p *processImpl) handleMessage(m queuedMessage, handle func(queuedMessage) error) {
94+
func (p *processImpl) handleMessage(ctx context.Context, m queuedMessage, handle func(context.Context) error) {
10095
p.wg.Add(1)
10196
go func() {
10297
defer p.wg.Done()
@@ -107,16 +102,18 @@ func (p *processImpl) handleMessage(m queuedMessage, handle func(queuedMessage)
107102
m.Ack()
108103
}
109104

110-
p.StatsHandler.HandleProcess(m.Context(), &Dequeue{
111-
BeginTime: m.GetEnqueuedAt(),
105+
enqueuedAt := getEnqueuedAt(ctx)
106+
107+
p.StatsHandler.HandleProcess(ctx, &Dequeue{
108+
BeginTime: enqueuedAt,
112109
EndTime: time.Now(),
113110
})
114111

115112
beginTime := time.Now()
116113

117-
m.SetContext(p.StatsHandler.TagProcess(m.Context(), &ConsumeBeginTag{}))
114+
ctx = p.StatsHandler.TagProcess(ctx, &ConsumeBeginTag{})
118115

119-
err := handle(m)
116+
err := handle(ctx)
120117

121118
if !p.AckImmediately {
122119
if err != nil {
@@ -126,15 +123,15 @@ func (p *processImpl) handleMessage(m queuedMessage, handle func(queuedMessage)
126123
}
127124
}
128125

129-
p.StatsHandler.HandleProcess(m.Context(), &ConsumeEnd{
126+
p.StatsHandler.HandleProcess(ctx, &ConsumeEnd{
130127
BeginTime: beginTime,
131128
EndTime: time.Now(),
132129
Error: err,
133130
})
134131

135-
p.StatsHandler.HandleProcess(m.Context(), &End{
132+
p.StatsHandler.HandleProcess(ctx, &End{
136133
MsgCount: m.Count(),
137-
BeginTime: m.GetEnqueuedAt(),
134+
BeginTime: enqueuedAt,
138135
EndTime: time.Now(),
139136
})
140137
}()

‎queue.go

+7-30
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,17 @@ import (
88
type queuedMessage interface {
99
Acknowledger
1010
Count() int
11-
Context() context.Context
12-
SetContext(ctx context.Context)
13-
GetEnqueuedAt() time.Time
1411
}
1512

1613
type singleMessage struct {
17-
Ctx context.Context
18-
Msg Message
19-
EnqueuedAt time.Time
14+
Message
2015
}
2116

22-
func (s *singleMessage) Ack() { s.Msg.Ack() }
23-
24-
func (s *singleMessage) Nack() { s.Msg.Nack() }
25-
26-
func (s *singleMessage) Context() context.Context { return s.Ctx }
27-
28-
func (s *singleMessage) SetContext(ctx context.Context) { s.Ctx = ctx }
29-
3017
func (s *singleMessage) Count() int { return 1 }
3118

32-
func (s *singleMessage) GetEnqueuedAt() time.Time { return s.EnqueuedAt }
33-
3419
type multiMessages struct {
35-
Ctx context.Context
36-
Msgs []Message
37-
EnqueuedAt time.Time
20+
Ctx context.Context
21+
Msgs []Message
3822
}
3923

4024
func (m *multiMessages) Ack() {
@@ -51,22 +35,16 @@ func (m *multiMessages) Nack() {
5135

5236
func (m *multiMessages) Count() int { return len(m.Msgs) }
5337

54-
func (m *multiMessages) Context() context.Context { return m.Ctx }
55-
56-
func (m *multiMessages) SetContext(ctx context.Context) { m.Ctx = ctx }
57-
58-
func (m *multiMessages) GetEnqueuedAt() time.Time { return m.EnqueuedAt }
59-
6038
func createBufferedQueue(
6139
createCtx func() context.Context,
6240
chunkSize int,
6341
flushInterval time.Duration,
6442
) (
6543
chan<- Message,
66-
<-chan queuedMessage,
44+
<-chan *multiMessages,
6745
) {
6846
inCh := make(chan Message, chunkSize)
69-
outCh := make(chan queuedMessage)
47+
outCh := make(chan *multiMessages)
7048

7149
go func() {
7250
defer close(outCh)
@@ -76,9 +54,8 @@ func createBufferedQueue(
7654

7755
if len(msgs) > 0 {
7856
outCh <- &multiMessages{
79-
Ctx: createCtx(),
80-
Msgs: msgs,
81-
EnqueuedAt: time.Now(),
57+
Ctx: createCtx(),
58+
Msgs: msgs,
8259
}
8360
}
8461

‎queue_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ func TestCreateBufferedQueue(t *testing.T) {
4141
if got, want := out.Count(), n; got != want {
4242
t.Errorf("Item[%d] has %d messages, want %d", i, got, want)
4343
}
44-
45-
if m, ok := out.(*multiMessages); !ok {
46-
t.Errorf("Item[%d] is %T type, want *subee.multiMessages type", i, m)
47-
}
4844
}
4945

5046
_, ok := <-outCh

0 commit comments

Comments
 (0)
Please sign in to comment.