Skip to content

Commit c3ad317

Browse files
authored
chore(job): optimized memory allocation (#96)
1 parent 77960ac commit c3ad317

8 files changed

+107
-50
lines changed

consumer_test.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestJobReachTimeout(t *testing.T) {
104104
WithWorkerCount(2),
105105
)
106106
assert.NoError(t, err)
107-
assert.NoError(t, q.Queue(m, job.WithTimeout(30*time.Millisecond)))
107+
assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(30 * time.Millisecond)}))
108108
q.Start()
109109
time.Sleep(50 * time.Millisecond)
110110
q.Release()
@@ -138,8 +138,8 @@ func TestCancelJobAfterShutdown(t *testing.T) {
138138
WithWorkerCount(2),
139139
)
140140
assert.NoError(t, err)
141-
assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond)))
142-
assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond)))
141+
assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)}))
142+
assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)}))
143143
q.Start()
144144
time.Sleep(10 * time.Millisecond)
145145
assert.Equal(t, 2, int(q.metric.busyWorkers))
@@ -367,8 +367,10 @@ func TestRetryCountWithNewMessage(t *testing.T) {
367367

368368
assert.NoError(t, q.Queue(
369369
m,
370-
job.WithRetryCount(3),
371-
job.WithRetryDelay(50*time.Millisecond),
370+
job.AllowOption{
371+
RetryCount: job.Int64(3),
372+
RetryDelay: job.Time(50 * time.Millisecond),
373+
},
372374
))
373375
assert.Len(t, messages, 0)
374376
q.Start()
@@ -403,8 +405,10 @@ func TestRetryCountWithNewTask(t *testing.T) {
403405
messages <- "foobar"
404406
return nil
405407
},
406-
job.WithRetryCount(3),
407-
job.WithRetryDelay(50*time.Millisecond),
408+
job.AllowOption{
409+
RetryCount: job.Int64(3),
410+
RetryDelay: job.Time(50 * time.Millisecond),
411+
},
408412
))
409413
assert.Len(t, messages, 0)
410414
q.Start()
@@ -437,8 +441,10 @@ func TestCancelRetryCountWithNewTask(t *testing.T) {
437441
messages <- "foobar"
438442
return nil
439443
},
440-
job.WithRetryCount(3),
441-
job.WithRetryDelay(100*time.Millisecond),
444+
job.AllowOption{
445+
RetryCount: job.Int64(3),
446+
RetryDelay: job.Time(100 * time.Millisecond),
447+
},
442448
))
443449
assert.Len(t, messages, 0)
444450
q.Start()
@@ -478,8 +484,10 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) {
478484

479485
assert.NoError(t, q.Queue(
480486
m,
481-
job.WithRetryCount(3),
482-
job.WithRetryDelay(100*time.Millisecond),
487+
job.AllowOption{
488+
RetryCount: job.Int64(3),
489+
RetryDelay: job.Time(100 * time.Millisecond),
490+
},
483491
))
484492
assert.Len(t, messages, 0)
485493
q.Start()

job/benchmark_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package job
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
func BenchmarkNewTask(b *testing.B) {
10+
for i := 0; i < b.N; i++ {
11+
NewTask(func(context.Context) error {
12+
return nil
13+
},
14+
AllowOption{
15+
RetryCount: Int64(100),
16+
RetryDelay: Time(30 * time.Millisecond),
17+
Timeout: Time(3 * time.Millisecond),
18+
},
19+
)
20+
}
21+
}
22+
23+
func BenchmarkNewOption(b *testing.B) {
24+
for i := 0; i < b.N; i++ {
25+
_ = NewOptions(
26+
AllowOption{
27+
RetryCount: Int64(100),
28+
RetryDelay: Time(30 * time.Millisecond),
29+
Timeout: Time(3 * time.Millisecond),
30+
},
31+
)
32+
}
33+
}

job/job.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (m *Message) Encode() []byte {
4747
return b
4848
}
4949

50-
func NewMessage(m core.QueuedMessage, opts ...Option) *Message {
50+
func NewMessage(m core.QueuedMessage, opts ...AllowOption) *Message {
5151
o := NewOptions(opts...)
5252

5353
return &Message{
@@ -58,7 +58,7 @@ func NewMessage(m core.QueuedMessage, opts ...Option) *Message {
5858
}
5959
}
6060

61-
func NewTask(task TaskFunc, opts ...Option) *Message {
61+
func NewTask(task TaskFunc, opts ...AllowOption) *Message {
6262
o := NewOptions(opts...)
6363

6464
return &Message{

job/job_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package job

job/option.go

+25-34
Original file line numberDiff line numberDiff line change
@@ -8,54 +8,45 @@ type Options struct {
88
timeout time.Duration
99
}
1010

11-
// An Option configures a mutex.
12-
type Option interface {
13-
apply(*Options)
14-
}
15-
16-
// OptionFunc is a function that configures a job.
17-
type OptionFunc func(*Options)
18-
19-
// apply calls f(option)
20-
func (f OptionFunc) apply(option *Options) {
21-
f(option)
22-
}
23-
24-
func newDefaultOptions() *Options {
25-
return &Options{
11+
func newDefaultOptions() Options {
12+
return Options{
2613
retryCount: 0,
2714
retryDelay: 100 * time.Millisecond,
2815
timeout: 60 * time.Minute,
2916
}
3017
}
3118

19+
type AllowOption struct {
20+
RetryCount *int64
21+
RetryDelay *time.Duration
22+
Timeout *time.Duration
23+
}
24+
3225
// NewOptions with custom parameter
33-
func NewOptions(opts ...Option) *Options {
26+
func NewOptions(opts ...AllowOption) Options {
3427
o := newDefaultOptions()
3528

36-
// Loop through each option
37-
for _, opt := range opts {
38-
// Call the option giving the instantiated
39-
opt.apply(o)
29+
if len(opts) != 0 {
30+
if opts[0].RetryCount != nil && *opts[0].RetryCount != o.retryCount {
31+
o.retryCount = *opts[0].RetryCount
32+
}
33+
34+
if opts[0].RetryDelay != nil && *opts[0].RetryDelay != o.retryDelay {
35+
o.retryDelay = *opts[0].RetryDelay
36+
}
37+
38+
if opts[0].Timeout != nil && *opts[0].Timeout != o.timeout {
39+
o.timeout = *opts[0].Timeout
40+
}
4041
}
4142

4243
return o
4344
}
4445

45-
func WithRetryCount(count int64) Option {
46-
return OptionFunc(func(o *Options) {
47-
o.retryCount = count
48-
})
49-
}
50-
51-
func WithRetryDelay(t time.Duration) Option {
52-
return OptionFunc(func(o *Options) {
53-
o.retryDelay = t
54-
})
46+
func Int64(val int64) *int64 {
47+
return &val
5548
}
5649

57-
func WithTimeout(t time.Duration) Option {
58-
return OptionFunc(func(o *Options) {
59-
o.timeout = t
60-
})
50+
func Time(v time.Duration) *time.Duration {
51+
return &v
6152
}

job/option_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package job
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestOptions(t *testing.T) {
11+
o := NewOptions(
12+
AllowOption{
13+
RetryCount: Int64(100),
14+
RetryDelay: Time(30 * time.Millisecond),
15+
Timeout: Time(3 * time.Millisecond),
16+
},
17+
)
18+
19+
assert.Equal(t, int64(100), o.retryCount)
20+
assert.Equal(t, 30*time.Millisecond, o.retryDelay)
21+
assert.Equal(t, 3*time.Millisecond, o.timeout)
22+
}

queue.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (q *Queue) Wait() {
114114
}
115115

116116
// Queue to queue all job
117-
func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error {
117+
func (q *Queue) Queue(m core.QueuedMessage, opts ...job.AllowOption) error {
118118
if atomic.LoadInt32(&q.stopFlag) == 1 {
119119
return ErrQueueShutdown
120120
}
@@ -133,7 +133,7 @@ func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error {
133133
}
134134

135135
// QueueTask to queue job task
136-
func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.Option) error {
136+
func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error {
137137
if atomic.LoadInt32(&q.stopFlag) == 1 {
138138
return ErrQueueShutdown
139139
}

queue_example_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ func ExampleNewPool_queueTaskTimeout() {
7575

7676
rets <- idx
7777
return nil
78-
}, job.WithTimeout(100*time.Millisecond)); err != nil {
78+
}, job.AllowOption{
79+
Timeout: job.Time(100 * time.Millisecond),
80+
}); err != nil {
7981
log.Println(err)
8082
}
8183
}

0 commit comments

Comments
 (0)