Skip to content

Commit aa37482

Browse files
authored
refactor: refactor QueuedMessage to TaskMessage across the codebase (#139)
- Rename `QueuedMessage` to `TaskMessage` throughout the codebase - Change method `Bytes()` to `Payload()` for `TaskMessage` - Update comments to reflect the new `TaskMessage` interface - Add a new mock file `mock_task_message.go` for the `TaskMessage` interface - Rename `mock_message.go` to `mock_queued_message.go` - Update test cases to use `TaskMessage` instead of `QueuedMessage` - Remove unused import `github.com/golang-queue/queue/job` from `ring.go` - Comment out the `Data` struct and its methods in `ring.go` Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent cc471ea commit aa37482

13 files changed

+152
-77
lines changed

README.md

+8-8
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ func main() {
130130
rets := make(chan string, taskN)
131131

132132
// initial queue pool
133-
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
133+
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
134134
v, ok := m.(*job)
135135
if !ok {
136-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
136+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
137137
return err
138138
}
139139
}
@@ -207,10 +207,10 @@ func main() {
207207
nsq.WithChannel("foobar"),
208208
// concurrent job number
209209
nsq.WithMaxInFlight(10),
210-
nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
210+
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
211211
v, ok := m.(*job)
212212
if !ok {
213-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
213+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
214214
return err
215215
}
216216
}
@@ -286,10 +286,10 @@ func main() {
286286
nats.WithAddr("127.0.0.1:4222"),
287287
nats.WithSubj("example"),
288288
nats.WithQueue("foobar"),
289-
nats.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
289+
nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
290290
v, ok := m.(*job)
291291
if !ok {
292-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
292+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
293293
return err
294294
}
295295
}
@@ -370,10 +370,10 @@ func main() {
370370
w := redisdb.NewWorker(
371371
redisdb.WithAddr("127.0.0.1:6379"),
372372
redisdb.WithChannel("foobar"),
373-
redisdb.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
373+
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
374374
v, ok := m.(*job)
375375
if !ok {
376-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
376+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
377377
return err
378378
}
379379
}

_example/example02/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ func main() {
3030
rets := make(chan string, taskN)
3131

3232
// initial queue pool
33-
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
33+
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
3434
v, ok := m.(*job)
3535
if !ok {
36-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
36+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
3737
return err
3838
}
3939
}

benchmark_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212
var count = 1
1313

1414
type testqueue interface {
15-
Queue(task core.QueuedMessage) error
16-
Request() (core.QueuedMessage, error)
15+
Queue(task core.TaskMessage) error
16+
Request() (core.TaskMessage, error)
1717
}
1818

1919
func testQueue(b *testing.B, pool testqueue) {
@@ -94,7 +94,7 @@ func BenchmarkQueue(b *testing.B) {
9494
// Payload: []byte(`{"timeout":3600000000000}`),
9595
// }
9696
// w := NewRing(
97-
// WithFn(func(ctx context.Context, m core.QueuedMessage) error {
97+
// WithFn(func(ctx context.Context, m core.TaskMessage) error {
9898
// return nil
9999
// }),
100100
// )
@@ -119,7 +119,7 @@ func BenchmarkRingWithTask(b *testing.B) {
119119
},
120120
}
121121
w := NewRing(
122-
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
122+
WithFn(func(ctx context.Context, m core.TaskMessage) error {
123123
return nil
124124
}),
125125
)

core/worker.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,24 @@ import (
44
"context"
55
)
66

7-
// Worker represents a worker that processes queued messages.
8-
// It provides methods to run the worker, shut it down, queue messages, and request messages from the queue.
7+
// Worker represents an interface for a worker that processes tasks.
8+
// It provides methods to run tasks, shut down the worker, queue tasks, and request tasks from the queue.
99
type Worker interface {
1010
// Run starts the worker and processes the given task in the provided context.
1111
// It returns an error if the task cannot be processed.
12-
Run(ctx context.Context, task QueuedMessage) error
12+
Run(ctx context.Context, task TaskMessage) error
1313

1414
// Shutdown stops the worker and performs any necessary cleanup.
1515
// It returns an error if the shutdown process fails.
1616
Shutdown() error
1717

1818
// Queue adds a task to the worker's queue.
1919
// It returns an error if the task cannot be added to the queue.
20-
Queue(task QueuedMessage) error
20+
Queue(task TaskMessage) error
2121

2222
// Request retrieves a task from the worker's queue.
2323
// It returns the queued message and an error if the retrieval fails.
24-
Request() (QueuedMessage, error)
24+
Request() (TaskMessage, error)
2525
}
2626

2727
// QueuedMessage represents an interface for a message that can be queued.
@@ -31,6 +31,8 @@ type QueuedMessage interface {
3131
Bytes() []byte
3232
}
3333

34+
// TaskMessage represents an interface for a task message that can be queued.
35+
// It embeds the QueuedMessage interface and adds a method to retrieve the payload of the message.
3436
type TaskMessage interface {
3537
QueuedMessage
3638
Payload() []byte

metric_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313

1414
func TestMetricData(t *testing.T) {
1515
w := NewRing(
16-
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
17-
switch string(m.Bytes()) {
16+
WithFn(func(ctx context.Context, m core.TaskMessage) error {
17+
switch string(m.Payload()) {
1818
case "foo1":
1919
panic("missing something")
2020
case "foo2":

mocks/mock_message.go mocks/mock_queued_message.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mocks/mock_task_message.go

+67
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mocks/mock_worker.go

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mocks/mocks.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
package mocks
22

33
//go:generate mockgen -package=mocks -destination=mock_worker.go github.com/golang-queue/queue/core Worker
4-
//go:generate mockgen -package=mocks -destination=mock_message.go github.com/golang-queue/queue/core QueuedMessage
4+
//go:generate mockgen -package=mocks -destination=mock_queued_message.go github.com/golang-queue/queue/core QueuedMessage
5+
//go:generate mockgen -package=mocks -destination=mock_task_message.go github.com/golang-queue/queue/core TaskMessage

options.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ var (
1111
defaultCapacity = 0
1212
defaultWorkerCount = int64(runtime.NumCPU())
1313
defaultNewLogger = NewLogger()
14-
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
14+
defaultFn = func(context.Context, core.TaskMessage) error { return nil }
1515
defaultMetric = NewMetric()
1616
)
1717

@@ -67,7 +67,7 @@ func WithWorker(w core.Worker) Option {
6767
}
6868

6969
// WithFn set custom job function
70-
func WithFn(fn func(context.Context, core.QueuedMessage) error) Option {
70+
func WithFn(fn func(context.Context, core.TaskMessage) error) Option {
7171
return OptionFunc(func(q *Options) {
7272
q.fn = fn
7373
})
@@ -86,7 +86,7 @@ type Options struct {
8686
logger Logger
8787
queueSize int
8888
worker core.Worker
89-
fn func(context.Context, core.QueuedMessage) error
89+
fn func(context.Context, core.TaskMessage) error
9090
afterFn func()
9191
metric Metric
9292
}

queue_test.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ func (m mockMessage) Bytes() []byte {
2828
return bytesconv.StrToBytes(m.message)
2929
}
3030

31+
func (m mockMessage) Payload() []byte {
32+
return bytesconv.StrToBytes(m.message)
33+
}
34+
3135
func TestNewQueueWithZeroWorker(t *testing.T) {
3236
controller := gomock.NewController(t)
3337
defer controller.Finish()
@@ -61,8 +65,9 @@ func TestNewQueueWithDefaultWorker(t *testing.T) {
6165
assert.Nil(t, q)
6266

6367
w := mocks.NewMockWorker(controller)
64-
m := mocks.NewMockQueuedMessage(controller)
68+
m := mocks.NewMockTaskMessage(controller)
6569
m.EXPECT().Bytes().Return([]byte("test")).AnyTimes()
70+
m.EXPECT().Payload().Return([]byte("test")).AnyTimes()
6671
w.EXPECT().Shutdown().Return(nil)
6772
w.EXPECT().Request().Return(m, nil).AnyTimes()
6873
w.EXPECT().Run(context.Background(), m).Return(nil).AnyTimes()
@@ -83,7 +88,7 @@ func TestHandleTimeout(t *testing.T) {
8388
Body: []byte("foo"),
8489
}
8590
w := NewRing(
86-
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
91+
WithFn(func(ctx context.Context, m core.TaskMessage) error {
8792
time.Sleep(200 * time.Millisecond)
8893
return nil
8994
}),
@@ -115,7 +120,7 @@ func TestJobComplete(t *testing.T) {
115120
Body: []byte("foo"),
116121
}
117122
w := NewRing(
118-
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
123+
WithFn(func(ctx context.Context, m core.TaskMessage) error {
119124
return errors.New("job completed")
120125
}),
121126
)
@@ -136,7 +141,7 @@ func TestJobComplete(t *testing.T) {
136141
}
137142

138143
w = NewRing(
139-
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
144+
WithFn(func(ctx context.Context, m core.TaskMessage) error {
140145
time.Sleep(200 * time.Millisecond)
141146
return errors.New("job completed")
142147
}),
@@ -196,11 +201,11 @@ func TestMockWorkerAndMessage(t *testing.T) {
196201
controller := gomock.NewController(t)
197202
defer controller.Finish()
198203

199-
m := mocks.NewMockQueuedMessage(controller)
204+
m := mocks.NewMockTaskMessage(controller)
200205

201206
w := mocks.NewMockWorker(controller)
202207
w.EXPECT().Shutdown().Return(nil)
203-
w.EXPECT().Request().DoAndReturn(func() (core.QueuedMessage, error) {
208+
w.EXPECT().Request().DoAndReturn(func() (core.TaskMessage, error) {
204209
return m, errors.New("nil")
205210
})
206211

0 commit comments

Comments
 (0)