Skip to content

Commit 2b882be

Browse files
committed
chore: refactor message handling and update dependencies
- Change function parameter type from `core.QueuedMessage` to `core.TaskMessage` in multiple files - Update Go version in `go.mod` from 1.20 to 1.22 - Update `github.com/golang-queue/queue` dependency to version `v0.3.0` - Add `Payload` method to `mockMessage` struct in `nsq_test.go` - Modify test assertions to use `job.AllowOption` for timeout settings in `nsq_test.go` Signed-off-by: appleboy <[email protected]>
1 parent e581c0e commit 2b882be

File tree

7 files changed

+38
-24
lines changed

7 files changed

+38
-24
lines changed

_example/producer-consumer/consumer/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func main() {
3636
nsq.WithAddr("127.0.0.1:4150"),
3737
nsq.WithTopic("example"),
3838
nsq.WithChannel("foobar"),
39-
nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
39+
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
4040
var v *job
4141
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
4242
return err

_example/worker/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func main() {
3535
nsq.WithChannel("foobar"),
3636
// concurrent job number
3737
nsq.WithMaxInFlight(10),
38-
nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
38+
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
3939
var v *job
4040
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
4141
return err

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
module github.com/golang-queue/nsq
22

3-
go 1.20
3+
go 1.22
44

55
require (
6-
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98
6+
github.com/golang-queue/queue v0.3.0
77
github.com/nsqio/go-nsq v1.1.0
88
github.com/stretchr/testify v1.10.0
99
go.uber.org/goleak v1.3.0
1010
)
1111

1212
require (
1313
github.com/davecgh/go-spew v1.1.1 // indirect
14-
github.com/goccy/go-json v0.10.0 // indirect
1514
github.com/golang/snappy v0.0.4 // indirect
15+
github.com/jpillora/backoff v1.0.0 // indirect
1616
github.com/pmezard/go-difflib v1.0.0 // indirect
1717
gopkg.in/yaml.v3 v3.0.1 // indirect
1818
)

go.sum

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1+
github.com/appleboy/com v0.2.1 h1:dHAHauX3eYDuheAahI83HIGFxpi0SEb2ZAu9EZ9hbUM=
2+
github.com/appleboy/com v0.2.1/go.mod h1:kByEI3/vzI5GM1+O5QdBHLsXaOsmFsJcOpCSgASi4sg=
13
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
24
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
4-
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
5-
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg=
6-
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU=
7-
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
5+
github.com/golang-queue/queue v0.3.0 h1:gyBLNT9EDOsChazYScp8iLiwLfG0SdnCDmNUybcHig4=
6+
github.com/golang-queue/queue v0.3.0/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
87
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
98
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
109
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
10+
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
11+
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
1112
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
13+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
1214
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
15+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
1316
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
1417
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
1518
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -18,7 +21,10 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
1821
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
1922
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
2023
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
24+
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
25+
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
2126
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2227
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
28+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2329
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
2430
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

nsq.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (w *Worker) startConsumer() (err error) {
101101
}
102102

103103
// Run start the worker
104-
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
104+
func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error {
105105
return w.opts.runFunc(ctx, task)
106106
}
107107

@@ -129,7 +129,7 @@ func (w *Worker) Shutdown() error {
129129
}
130130

131131
// Queue send notification to queue
132-
func (w *Worker) Queue(job core.QueuedMessage) error {
132+
func (w *Worker) Queue(job core.TaskMessage) error {
133133
if atomic.LoadInt32(&w.stopFlag) == 1 {
134134
return queue.ErrQueueShutdown
135135
}
@@ -138,7 +138,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
138138
}
139139

140140
// Request fetch new task from queue
141-
func (w *Worker) Request() (core.QueuedMessage, error) {
141+
func (w *Worker) Request() (core.TaskMessage, error) {
142142
if err := w.startConsumer(); err != nil {
143143
return nil, err
144144
}

nsq_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ func (m mockMessage) Bytes() []byte {
3131
return []byte(m.Message)
3232
}
3333

34+
func (m mockMessage) Payload() []byte {
35+
return []byte(m.Message)
36+
}
37+
3438
func TestNSQDefaultFlow(t *testing.T) {
3539
m := &mockMessage{
3640
Message: "foo",
@@ -80,7 +84,7 @@ func TestNSQCustomFuncAndWait(t *testing.T) {
8084
WithAddr(host+":4150"),
8185
WithTopic("test3"),
8286
WithMaxInFlight(10),
83-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
87+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
8488
time.Sleep(500 * time.Millisecond)
8589
return nil
8690
}),
@@ -131,7 +135,7 @@ func TestJobReachTimeout(t *testing.T) {
131135
WithAddr(host+":4150"),
132136
WithTopic("timeout"),
133137
WithMaxInFlight(2),
134-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
138+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
135139
for {
136140
select {
137141
case <-ctx.Done():
@@ -155,7 +159,9 @@ func TestJobReachTimeout(t *testing.T) {
155159
assert.NoError(t, err)
156160
q.Start()
157161
time.Sleep(400 * time.Millisecond)
158-
assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond)))
162+
assert.NoError(t, q.Queue(m, job.AllowOption{
163+
Timeout: job.Time(20 * time.Millisecond),
164+
}))
159165
time.Sleep(2 * time.Second)
160166
q.Release()
161167
}
@@ -168,7 +174,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
168174
WithAddr(host+":4150"),
169175
WithTopic("cancel"),
170176
WithLogger(queue.NewLogger()),
171-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
177+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
172178
for {
173179
select {
174180
case <-ctx.Done():
@@ -192,7 +198,9 @@ func TestCancelJobAfterShutdown(t *testing.T) {
192198
assert.NoError(t, err)
193199
q.Start()
194200
time.Sleep(400 * time.Millisecond)
195-
assert.NoError(t, q.Queue(m, job.WithTimeout(3*time.Second)))
201+
assert.NoError(t, q.Queue(m, job.AllowOption{
202+
Timeout: job.Time(3 * time.Second),
203+
}))
196204
time.Sleep(2 * time.Second)
197205
q.Release()
198206
}
@@ -205,7 +213,7 @@ func TestGoroutineLeak(t *testing.T) {
205213
WithAddr(host+":4150"),
206214
WithTopic("GoroutineLeak"),
207215
WithLogger(queue.NewEmptyLogger()),
208-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
216+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
209217
for {
210218
select {
211219
case <-ctx.Done():
@@ -249,7 +257,7 @@ func TestGoroutinePanic(t *testing.T) {
249257
w := NewWorker(
250258
WithAddr(host+":4150"),
251259
WithTopic("GoroutinePanic"),
252-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
260+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
253261
panic("missing something")
254262
}),
255263
)
@@ -275,7 +283,7 @@ func TestNSQStatsinQueue(t *testing.T) {
275283
w := NewWorker(
276284
WithAddr(host+":4150"),
277285
WithTopic("nsq_stats"),
278-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
286+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
279287
log.Println("get message")
280288
return nil
281289
}),

options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Options struct {
2727
addr string
2828
topic string
2929
channel string
30-
runFunc func(context.Context, core.QueuedMessage) error
30+
runFunc func(context.Context, core.TaskMessage) error
3131
logger queue.Logger
3232
logLevel nsq.LogLevel
3333
}
@@ -54,7 +54,7 @@ func WithChannel(channel string) Option {
5454
}
5555

5656
// WithRunFunc setup the run func of queue
57-
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
57+
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
5858
return OptionFunc(func(o *Options) {
5959
o.runFunc = fn
6060
})
@@ -90,7 +90,7 @@ func newOptions(opts ...Option) Options {
9090

9191
logger: queue.NewLogger(),
9292
logLevel: nsq.LogLevelInfo,
93-
runFunc: func(context.Context, core.QueuedMessage) error {
93+
runFunc: func(context.Context, core.TaskMessage) error {
9494
return nil
9595
},
9696
}

0 commit comments

Comments
 (0)