Skip to content

Commit cc471ea

Browse files
authored
refactor: refactor message handling and encoding in queue system (#138)
- Remove `Encode` method call in `BenchmarkQueue` function - Add `TaskMessage` interface extending `QueuedMessage` with `Payload` method - Rename `Payload` field to `Body` in `Message` struct - Add `Payload` method to `Message` struct to return `Body` - Replace `Data` field and `Encode` method in `Message` struct with `Bytes` method using JSON marshalling - Update `NewMessage` to use `Body` instead of `Payload` - Add documentation for `Encode` method in `job` package - Update `TestMessageEncodeDecode` to use `Payload()` method - Replace `Encode` method call with direct handling of `task` in `Queue` methods - Change logger from `Errorf` to `Fatalf` in `Queue` error handling - Add type assertion and handling for `job.Message` in `Queue`'s `run` method - Import `job` package in `ring.go` - Add `Data` struct with `Payload` field and `Bytes` method in `ring.go` - Update `Ring`'s `Run` method to use `Data` struct for task handling Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 71e206e commit cc471ea

File tree

7 files changed

+59
-25
lines changed

7 files changed

+59
-25
lines changed

benchmark_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ func BenchmarkQueue(b *testing.B) {
7878
m := job.NewMessage(&mockMessage{
7979
message: "foo",
8080
})
81-
m.Encode()
8281

8382
for n := 0; n < b.N; n++ {
8483
if err := q.queue(&m); err != nil {

core/worker.go

+5
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,8 @@ type Worker interface {
3030
type QueuedMessage interface {
3131
Bytes() []byte
3232
}
33+
34+
type TaskMessage interface {
35+
QueuedMessage
36+
Payload() []byte
37+
}

job/job.go

+30-10
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type Message struct {
2121
Timeout time.Duration `json:"timeout" msgpack:"timeout"`
2222

2323
// Payload is the payload data of the task.
24-
Payload []byte `json:"body" msgpack:"body"`
24+
Body []byte `json:"body" msgpack:"body"`
2525

2626
// RetryCount set count of retry
2727
// default is 0, no retry.
@@ -48,19 +48,30 @@ type Message struct {
4848

4949
// Jitter eases contention by randomizing backoff steps
5050
Jitter bool `json:"jitter" msgpack:"jitter"`
51+
}
5152

52-
// Data to save Unsafe cast
53-
Data []byte
53+
// Payload returns the payload data of the Message.
54+
// It returns the byte slice of the payload.
55+
//
56+
// Returns:
57+
// - A byte slice containing the payload data.
58+
func (m *Message) Payload() []byte {
59+
return m.Body
5460
}
5561

56-
// Bytes get internal data
62+
// Bytes returns the byte slice of the Message struct.
63+
// If the marshalling process encounters an error, the function will panic.
64+
// It returns the marshalled byte slice.
65+
//
66+
// Returns:
67+
// - A byte slice containing the msgpack-encoded data.
5768
func (m *Message) Bytes() []byte {
58-
return m.Data
59-
}
69+
b, err := json.Marshal(m)
70+
if err != nil {
71+
panic(err)
72+
}
6073

61-
// Encode for encoding the structure
62-
func (m *Message) Encode() {
63-
m.Data = Encode(m)
74+
return b
6475
}
6576

6677
// NewMessage create new message
@@ -74,7 +85,7 @@ func NewMessage(m core.QueuedMessage, opts ...AllowOption) Message {
7485
RetryMin: o.retryMin,
7586
RetryMax: o.retryMax,
7687
Timeout: o.timeout,
77-
Payload: m.Bytes(),
88+
Body: m.Bytes(),
7889
}
7990
}
8091

@@ -92,6 +103,15 @@ func NewTask(task TaskFunc, opts ...AllowOption) Message {
92103
}
93104
}
94105

106+
// Encode takes a Message struct and marshals it into a byte slice using msgpack.
107+
// If the marshalling process encounters an error, the function will panic.
108+
// It returns the marshalled byte slice.
109+
//
110+
// Parameters:
111+
// - m: A pointer to the Message struct to be encoded.
112+
//
113+
// Returns:
114+
// - A byte slice containing the msgpack-encoded data.
95115
func Encode(m *Message) []byte {
96116
b, err := json.Marshal(m)
97117
if err != nil {

job/job_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ func TestMessageEncodeDecode(t *testing.T) {
3030
},
3131
)
3232

33-
m.Encode()
3433
out := Decode(m.Bytes())
3534

3635
assert.Equal(t, int64(100), out.RetryCount)
3736
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
3837
assert.Equal(t, 3*time.Millisecond, out.Timeout)
39-
assert.Equal(t, "foo", string(out.Payload))
38+
assert.Equal(t, "foo", string(out.Payload()))
4039
assert.Equal(t, 200*time.Millisecond, out.RetryMin)
4140
assert.Equal(t, 20*time.Second, out.RetryMax)
4241
assert.Equal(t, 4.0, out.RetryFactor)

queue.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ func (q *Queue) Wait() {
127127
// Queue to queue single job with binary
128128
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error {
129129
data := job.NewMessage(message, opts...)
130-
data.Encode()
131130

132131
return q.queue(&data)
133132
}
@@ -160,7 +159,7 @@ func (q *Queue) work(task core.QueuedMessage) {
160159
q.metric.DecBusyWorker()
161160
e := recover()
162161
if e != nil {
163-
q.logger.Errorf("panic error: %v", e)
162+
q.logger.Fatalf("panic error: %v", e)
164163
}
165164
q.schedule()
166165

@@ -182,13 +181,12 @@ func (q *Queue) work(task core.QueuedMessage) {
182181
}
183182

184183
func (q *Queue) run(task core.QueuedMessage) error {
185-
data := task.(*job.Message)
186-
if data.Task == nil {
187-
data = job.Decode(task.Bytes())
188-
data.Data = data.Payload
184+
switch t := task.(type) {
185+
case *job.Message:
186+
return q.handle(t)
187+
default:
188+
return errors.New("invalid task type")
189189
}
190-
191-
return q.handle(data)
192190
}
193191

194192
func (q *Queue) handle(m *job.Message) error {

queue_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestNewQueueWithDefaultWorker(t *testing.T) {
8080
func TestHandleTimeout(t *testing.T) {
8181
m := &job.Message{
8282
Timeout: 100 * time.Millisecond,
83-
Payload: []byte("foo"),
83+
Body: []byte("foo"),
8484
}
8585
w := NewRing(
8686
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
@@ -112,7 +112,7 @@ func TestHandleTimeout(t *testing.T) {
112112
func TestJobComplete(t *testing.T) {
113113
m := &job.Message{
114114
Timeout: 100 * time.Millisecond,
115-
Payload: []byte("foo"),
115+
Body: []byte("foo"),
116116
}
117117
w := NewRing(
118118
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
@@ -132,7 +132,7 @@ func TestJobComplete(t *testing.T) {
132132

133133
m = &job.Message{
134134
Timeout: 250 * time.Millisecond,
135-
Payload: []byte("foo"),
135+
Body: []byte("foo"),
136136
}
137137

138138
w = NewRing(

ring.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync/atomic"
77

88
"github.com/golang-queue/queue/core"
9+
"github.com/golang-queue/queue/job"
910
)
1011

1112
var _ core.Worker = (*Ring)(nil)
@@ -25,9 +26,21 @@ type Ring struct {
2526
stopFlag int32
2627
}
2728

29+
type Data struct {
30+
Payload []byte `json:"payload"`
31+
}
32+
33+
func (d *Data) Bytes() []byte {
34+
return d.Payload
35+
}
36+
2837
// Run to execute new task
2938
func (s *Ring) Run(ctx context.Context, task core.QueuedMessage) error {
30-
return s.runFunc(ctx, task)
39+
v, _ := task.(*job.Message)
40+
data := &Data{
41+
Payload: v.Body,
42+
}
43+
return s.runFunc(ctx, data)
3144
}
3245

3346
// Shutdown the worker

0 commit comments

Comments
 (0)