Skip to content

Commit a0677b9

Browse files
authored
refactor: refactor worker and task metrics to use int64 types (#136)
- Change `BusyWorkers` type from `uint64` to `int64` - Add `CompletedTasks` method to `Metric` interface and implementation - Update `IncBusyWorker` and `DecBusyWorker` to use `atomic.AddInt64` - Modify test assertions to use `uint64` and `int64` types - Change `defaultWorkerCount` and related functions from `int` to `int64` - Update `NewPool` and related functions to use `int64` for size and worker count - Add `CompletedTasks` method to `Queue` and update related methods to use `uint64` and `int64` types Signed-off-by: appleboy <[email protected]>
1 parent fd95db8 commit a0677b9

8 files changed

+50
-34
lines changed

metric.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,25 @@ import "sync/atomic"
66
type Metric interface {
77
IncBusyWorker()
88
DecBusyWorker()
9-
BusyWorkers() uint64
9+
BusyWorkers() int64
1010
SuccessTasks() uint64
1111
FailureTasks() uint64
1212
SubmittedTasks() uint64
13+
CompletedTasks() uint64
1314
IncSuccessTask()
1415
IncFailureTask()
1516
IncSubmittedTask()
17+
IncCompletedTask()
1618
}
1719

1820
var _ Metric = (*metric)(nil)
1921

2022
type metric struct {
21-
busyWorkers uint64
23+
busyWorkers int64
2224
successTasks uint64
2325
failureTasks uint64
2426
submittedTasks uint64
27+
completedTasks uint64
2528
}
2629

2730
// NewMetric for default metric structure
@@ -30,15 +33,15 @@ func NewMetric() Metric {
3033
}
3134

3235
func (m *metric) IncBusyWorker() {
33-
atomic.AddUint64(&m.busyWorkers, 1)
36+
atomic.AddInt64(&m.busyWorkers, 1)
3437
}
3538

3639
func (m *metric) DecBusyWorker() {
37-
atomic.AddUint64(&m.busyWorkers, ^uint64(0))
40+
atomic.AddInt64(&m.busyWorkers, ^int64(0))
3841
}
3942

40-
func (m *metric) BusyWorkers() uint64 {
41-
return atomic.LoadUint64(&m.busyWorkers)
43+
func (m *metric) BusyWorkers() int64 {
44+
return atomic.LoadInt64(&m.busyWorkers)
4245
}
4346

4447
func (m *metric) IncSuccessTask() {
@@ -53,6 +56,10 @@ func (m *metric) IncSubmittedTask() {
5356
atomic.AddUint64(&m.submittedTasks, 1)
5457
}
5558

59+
func (m *metric) IncCompletedTask() {
60+
atomic.AddUint64(&m.completedTasks, 1)
61+
}
62+
5663
func (m *metric) SuccessTasks() uint64 {
5764
return atomic.LoadUint64(&m.successTasks)
5865
}
@@ -64,3 +71,7 @@ func (m *metric) FailureTasks() uint64 {
6471
func (m *metric) SubmittedTasks() uint64 {
6572
return atomic.LoadUint64(&m.submittedTasks)
6673
}
74+
75+
func (m *metric) CompletedTasks() uint64 {
76+
return atomic.LoadUint64(&m.completedTasks)
77+
}

metric_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func TestMetricData(t *testing.T) {
4444
}))
4545
q.Start()
4646
time.Sleep(50 * time.Millisecond)
47-
assert.Equal(t, 4, q.SubmittedTasks())
48-
assert.Equal(t, 2, q.SuccessTasks())
49-
assert.Equal(t, 2, q.FailureTasks())
47+
assert.Equal(t, uint64(4), q.SubmittedTasks())
48+
assert.Equal(t, uint64(2), q.SuccessTasks())
49+
assert.Equal(t, uint64(2), q.FailureTasks())
5050
q.Release()
5151
}

options.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
var (
1111
defaultCapacity = 0
12-
defaultWorkerCount = runtime.NumCPU()
12+
defaultWorkerCount = int64(runtime.NumCPU())
1313
defaultNewLogger = NewLogger()
1414
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
1515
defaultMetric = NewMetric()
@@ -29,7 +29,7 @@ func (f OptionFunc) apply(option *Options) {
2929
}
3030

3131
// WithWorkerCount set worker count
32-
func WithWorkerCount(num int) Option {
32+
func WithWorkerCount(num int64) Option {
3333
return OptionFunc(func(q *Options) {
3434
if num <= 0 {
3535
num = defaultWorkerCount
@@ -82,7 +82,7 @@ func WithAfterFn(afterFn func()) Option {
8282

8383
// Options for custom args in Queue
8484
type Options struct {
85-
workerCount int
85+
workerCount int64
8686
logger Logger
8787
queueSize int
8888
worker core.Worker

pool.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package queue
22

33
// NewPool initializes a new pool
4-
func NewPool(size int, opts ...Option) *Queue {
4+
func NewPool(size int64, opts ...Option) *Queue {
55
o := []Option{
66
WithWorkerCount(size),
77
WithWorker(NewRing(opts...)),

pool_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func TestNewPoolWithQueueTask(t *testing.T) {
11-
totalN := 5
11+
totalN := int64(5)
1212
taskN := 100
1313
rets := make(chan struct{}, taskN)
1414

@@ -26,13 +26,13 @@ func TestNewPoolWithQueueTask(t *testing.T) {
2626

2727
// shutdown all, and now running worker is 0
2828
p.Release()
29-
assert.Equal(t, 0, p.BusyWorkers())
29+
assert.Equal(t, int64(0), p.BusyWorkers())
3030
}
3131

3232
func TestPoolNumber(t *testing.T) {
3333
p := NewPool(0)
3434
p.Start()
3535
// shutdown all, and now running worker is 0
3636
p.Release()
37-
assert.Equal(t, 0, p.BusyWorkers())
37+
assert.Equal(t, int64(0), p.BusyWorkers())
3838
}

queue.go

+15-10
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type (
2222
sync.Mutex
2323
metric *metric
2424
logger Logger
25-
workerCount int
25+
workerCount int64
2626
routineGroup *routineGroup
2727
quit chan struct{}
2828
ready chan struct{}
@@ -95,23 +95,28 @@ func (q *Queue) Release() {
9595
}
9696

9797
// BusyWorkers returns the numbers of workers in the running process.
98-
func (q *Queue) BusyWorkers() int {
99-
return int(q.metric.BusyWorkers())
98+
func (q *Queue) BusyWorkers() int64 {
99+
return q.metric.BusyWorkers()
100100
}
101101

102102
// BusyWorkers returns the numbers of success tasks.
103-
func (q *Queue) SuccessTasks() int {
104-
return int(q.metric.SuccessTasks())
103+
func (q *Queue) SuccessTasks() uint64 {
104+
return q.metric.SuccessTasks()
105105
}
106106

107107
// BusyWorkers returns the numbers of failure tasks.
108-
func (q *Queue) FailureTasks() int {
109-
return int(q.metric.FailureTasks())
108+
func (q *Queue) FailureTasks() uint64 {
109+
return q.metric.FailureTasks()
110110
}
111111

112112
// BusyWorkers returns the numbers of submitted tasks.
113-
func (q *Queue) SubmittedTasks() int {
114-
return int(q.metric.SubmittedTasks())
113+
func (q *Queue) SubmittedTasks() uint64 {
114+
return q.metric.SubmittedTasks()
115+
}
116+
117+
// CompletedTasks returns the numbers of completed tasks.
118+
func (q *Queue) CompletedTasks() uint64 {
119+
return q.metric.CompletedTasks()
115120
}
116121

117122
// Wait all process
@@ -269,7 +274,7 @@ func (q *Queue) handle(m *job.Message) error {
269274
}
270275

271276
// UpdateWorkerCount to update worker number dynamically.
272-
func (q *Queue) UpdateWorkerCount(num int) {
277+
func (q *Queue) UpdateWorkerCount(num int64) {
273278
q.Lock()
274279
q.workerCount = num
275280
q.Unlock()

queue_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestNewQueueWithZeroWorker(t *testing.T) {
4848

4949
q.Start()
5050
time.Sleep(50 * time.Millisecond)
51-
assert.Equal(t, 0, q.BusyWorkers())
51+
assert.Equal(t, int64(0), q.BusyWorkers())
5252
q.Release()
5353
}
5454

@@ -74,7 +74,7 @@ func TestNewQueueWithDefaultWorker(t *testing.T) {
7474

7575
q.Start()
7676
q.Release()
77-
assert.Equal(t, 0, q.BusyWorkers())
77+
assert.Equal(t, int64(0), q.BusyWorkers())
7878
}
7979

8080
func TestHandleTimeout(t *testing.T) {

ring_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
142142
assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)}))
143143
q.Start()
144144
time.Sleep(10 * time.Millisecond)
145-
assert.Equal(t, 2, q.BusyWorkers())
145+
assert.Equal(t, int64(2), q.BusyWorkers())
146146
q.Release()
147147
}
148148

@@ -231,10 +231,10 @@ func TestIncreaseWorkerCount(t *testing.T) {
231231

232232
q.Start()
233233
time.Sleep(100 * time.Millisecond)
234-
assert.Equal(t, 5, q.BusyWorkers())
234+
assert.Equal(t, int64(5), q.BusyWorkers())
235235
q.UpdateWorkerCount(10)
236236
time.Sleep(100 * time.Millisecond)
237-
assert.Equal(t, 10, q.BusyWorkers())
237+
assert.Equal(t, int64(10), q.BusyWorkers())
238238
q.Release()
239239
}
240240

@@ -261,12 +261,12 @@ func TestDecreaseWorkerCount(t *testing.T) {
261261

262262
q.Start()
263263
time.Sleep(20 * time.Millisecond)
264-
assert.Equal(t, 5, q.BusyWorkers())
264+
assert.Equal(t, int64(5), q.BusyWorkers())
265265
q.UpdateWorkerCount(3)
266266
time.Sleep(100 * time.Millisecond)
267-
assert.Equal(t, 3, q.BusyWorkers())
267+
assert.Equal(t, int64(3), q.BusyWorkers())
268268
time.Sleep(100 * time.Millisecond)
269-
assert.Equal(t, 2, q.BusyWorkers())
269+
assert.Equal(t, int64(2), q.BusyWorkers())
270270
q.Release()
271271
}
272272

0 commit comments

Comments
 (0)