Skip to content

Commit 27efe91

Browse files
committed
refactor: refactor core message handling and update job timeout tests
- Change `core.QueuedMessage` to `core.TaskMessage` in `runFunc` definition and `WithRunFunc` function - Update `runFunc` initialization in `newOptions` to use `core.TaskMessage` - Modify `TestJobReachTimeout` to use `job.AllowOption` with `Timeout` - Modify `TestCancelJobAfterShutdown` to use `job.AllowOption` with `Timeout` Signed-off-by: appleboy <[email protected]>
1 parent 8c33923 commit 27efe91

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

options.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Option func(*options)
3030
// AMQP 0-9-1 Model Explained
3131
// ref: https://www.rabbitmq.com/tutorials/amqp-concepts.html
3232
type options struct {
33-
runFunc func(context.Context, core.QueuedMessage) error
33+
runFunc func(context.Context, core.TaskMessage) error
3434
logger queue.Logger
3535
addr string
3636
queue string
@@ -102,7 +102,7 @@ func WithQueue(val string) Option {
102102
}
103103

104104
// WithRunFunc setup the run func of queue
105-
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
105+
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
106106
return func(w *options) {
107107
w.runFunc = fn
108108
}
@@ -125,7 +125,7 @@ func newOptions(opts ...Option) options {
125125
routingKey: "test-key",
126126
logger: queue.NewLogger(),
127127
autoAck: false,
128-
runFunc: func(context.Context, core.QueuedMessage) error {
128+
runFunc: func(context.Context, core.TaskMessage) error {
129129
return nil
130130
},
131131
}

rabbitmq_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ func TestJobReachTimeout(t *testing.T) {
125125
assert.NoError(t, err)
126126
q.Start()
127127
time.Sleep(50 * time.Millisecond)
128-
assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond)))
128+
assert.NoError(t, q.Queue(m, job.AllowOption{
129+
Timeout: job.Time(20 * time.Millisecond),
130+
}))
129131
time.Sleep(100 * time.Millisecond)
130132
q.Shutdown()
131133
q.Wait()
@@ -162,7 +164,9 @@ func TestCancelJobAfterShutdown(t *testing.T) {
162164
assert.NoError(t, err)
163165
q.Start()
164166
time.Sleep(50 * time.Millisecond)
165-
assert.NoError(t, q.Queue(m, job.WithTimeout(150*time.Millisecond)))
167+
assert.NoError(t, q.Queue(m, job.AllowOption{
168+
Timeout: job.Time(150 * time.Millisecond),
169+
}))
166170
time.Sleep(100 * time.Millisecond)
167171
q.Shutdown()
168172
q.Wait()

0 commit comments

Comments
 (0)