Skip to content

Commit bbdc746

Browse files
authored
chore(queue): support job timeout setting. (#17)
1 parent 9802634 commit bbdc746

File tree

6 files changed

+285
-23
lines changed

6 files changed

+285
-23
lines changed

logger.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ type Logger interface {
1616
Fatal(args ...interface{})
1717
}
1818

19-
func newLogger() Logger {
19+
// NewLogger for simple logger.
20+
func NewLogger() Logger {
2021
return defaultLogger{
2122
infoLogger: log.New(os.Stderr, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile),
2223
errorLogger: log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile),
@@ -53,3 +54,18 @@ func (l defaultLogger) Error(args ...interface{}) {
5354
func (l defaultLogger) Fatal(args ...interface{}) {
5455
l.fatalLogger.Println(fmt.Sprint(args...))
5556
}
57+
58+
// NewEmptyLogger for simple logger.
59+
func NewEmptyLogger() Logger {
60+
return emptyLogger{}
61+
}
62+
63+
// EmptyLogger no meesgae logger
64+
type emptyLogger struct{}
65+
66+
func (l emptyLogger) Infof(format string, args ...interface{}) {}
67+
func (l emptyLogger) Errorf(format string, args ...interface{}) {}
68+
func (l emptyLogger) Fatalf(format string, args ...interface{}) {}
69+
func (l emptyLogger) Info(args ...interface{}) {}
70+
func (l emptyLogger) Error(args ...interface{}) {}
71+
func (l emptyLogger) Fatal(args ...interface{}) {}

logger_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package queue
2+
3+
func ExampleNewEmptyLogger() {
4+
l := NewEmptyLogger()
5+
l.Info("test")
6+
l.Infof("test")
7+
l.Error("test")
8+
l.Errorf("test")
9+
l.Fatal("test")
10+
l.Fatalf("test")
11+
// Output:
12+
}

queue.go

+31-3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ import (
55
"runtime"
66
"sync"
77
"sync/atomic"
8+
"time"
89
)
910

11+
// ErrQueueShutdown close the queue.
12+
var ErrQueueShutdown = errors.New("queue has been closed")
13+
1014
type (
1115
// A Queue is a message queue.
1216
Queue struct {
@@ -17,9 +21,21 @@ type (
1721
worker Worker
1822
stopOnce sync.Once
1923
runningWorkers int32
24+
timeout time.Duration
25+
}
26+
27+
// Job with Timeout
28+
Job struct {
29+
Timeout time.Duration
30+
Body []byte
2031
}
2132
)
2233

34+
// Bytes get string body
35+
func (j Job) Bytes() []byte {
36+
return j.Body
37+
}
38+
2339
// Option for queue system
2440
type Option func(*Queue)
2541

@@ -53,7 +69,8 @@ func NewQueue(opts ...Option) (*Queue, error) {
5369
workerCount: runtime.NumCPU(),
5470
routineGroup: newRoutineGroup(),
5571
quit: make(chan struct{}),
56-
logger: newLogger(),
72+
logger: NewLogger(),
73+
timeout: 24 * 60 * time.Minute,
5774
}
5875

5976
// Loop through each option
@@ -106,7 +123,18 @@ func (q *Queue) Wait() {
106123

107124
// Queue to queue all job
108125
func (q *Queue) Queue(job QueuedMessage) error {
109-
return q.worker.Queue(job)
126+
return q.worker.Queue(Job{
127+
Timeout: q.timeout,
128+
Body: job.Bytes(),
129+
})
130+
}
131+
132+
// Queue to queue all job
133+
func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error {
134+
return q.worker.Queue(Job{
135+
Timeout: timeout,
136+
Body: job.Bytes(),
137+
})
110138
}
111139

112140
func (q *Queue) work() {
@@ -127,7 +155,7 @@ func (q *Queue) work() {
127155
}()
128156
q.logger.Infof("start the worker num: %d", num)
129157
if err := q.worker.Run(); err != nil {
130-
q.logger.Error(err)
158+
q.logger.Errorf("runtime error: %s", err.Error())
131159
}
132160
q.logger.Infof("stop the worker num: %d", num)
133161
})

queue_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ func TestWorkerStatus(t *testing.T) {
8888

8989
assert.NoError(t, q.Queue(m))
9090
assert.NoError(t, q.Queue(m))
91-
assert.NoError(t, q.Queue(m))
92-
assert.NoError(t, q.Queue(m))
91+
assert.NoError(t, q.QueueWithTimeout(10*time.Millisecond, m))
92+
assert.NoError(t, q.QueueWithTimeout(10*time.Millisecond, m))
9393
assert.Equal(t, 100, q.Capacity())
9494
assert.Equal(t, 4, q.Usage())
9595
q.Start()

simple/simple.go

+64-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package simple
22

33
import (
4+
"context"
45
"errors"
56
"sync"
67

@@ -18,10 +19,11 @@ var errMaxCapacity = errors.New("max capacity reached")
1819

1920
// Worker for simple queue using channel
2021
type Worker struct {
21-
queueNotification chan queue.QueuedMessage
22-
runFunc func(queue.QueuedMessage, <-chan struct{}) error
23-
stop chan struct{}
24-
stopOnce sync.Once
22+
taskQueue chan queue.QueuedMessage
23+
runFunc func(context.Context, queue.QueuedMessage) error
24+
stop chan struct{}
25+
logger queue.Logger
26+
stopOnce sync.Once
2527
}
2628

2729
// BeforeRun run script before start worker
@@ -36,36 +38,74 @@ func (s *Worker) AfterRun() error {
3638

3739
// Run start the worker
3840
func (s *Worker) Run() error {
39-
for notification := range s.queueNotification {
40-
// run custom process function
41-
_ = s.runFunc(notification, s.stop)
41+
// check queue status
42+
select {
43+
case <-s.stop:
44+
return queue.ErrQueueShutdown
45+
default:
46+
}
47+
48+
for task := range s.taskQueue {
49+
done := make(chan struct{})
50+
v, _ := task.(queue.Job)
51+
ctx, cancel := context.WithTimeout(context.Background(), v.Timeout)
52+
// vet doesn't complain if I do this
53+
_ = cancel
54+
55+
// run the job
56+
go func() {
57+
// run custom process function
58+
_ = s.runFunc(ctx, task)
59+
close(done)
60+
}()
61+
62+
select {
63+
case <-ctx.Done(): // timeout reached
64+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
65+
s.logger.Infof("job timeout: %s", v.Timeout.String())
66+
}
67+
// wait job
68+
<-done
69+
case <-s.stop: // shutdown service
70+
cancel()
71+
// wait job
72+
<-done
73+
case <-done: // job finish and continue to work
74+
}
75+
4276
}
4377
return nil
4478
}
4579

4680
// Shutdown worker
4781
func (s *Worker) Shutdown() error {
4882
s.stopOnce.Do(func() {
49-
close(s.queueNotification)
5083
close(s.stop)
84+
close(s.taskQueue)
5185
})
5286
return nil
5387
}
5488

5589
// Capacity for channel
5690
func (s *Worker) Capacity() int {
57-
return cap(s.queueNotification)
91+
return cap(s.taskQueue)
5892
}
5993

6094
// Usage for count of channel usage
6195
func (s *Worker) Usage() int {
62-
return len(s.queueNotification)
96+
return len(s.taskQueue)
6397
}
6498

6599
// Queue send notification to queue
66100
func (s *Worker) Queue(job queue.QueuedMessage) error {
67101
select {
68-
case s.queueNotification <- job:
102+
case <-s.stop:
103+
return queue.ErrQueueShutdown
104+
default:
105+
}
106+
107+
select {
108+
case s.taskQueue <- job:
69109
return nil
70110
default:
71111
return errMaxCapacity
@@ -75,23 +115,31 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
75115
// WithQueueNum setup the capcity of queue
76116
func WithQueueNum(num int) Option {
77117
return func(w *Worker) {
78-
w.queueNotification = make(chan queue.QueuedMessage, num)
118+
w.taskQueue = make(chan queue.QueuedMessage, num)
79119
}
80120
}
81121

82122
// WithRunFunc setup the run func of queue
83-
func WithRunFunc(fn func(queue.QueuedMessage, <-chan struct{}) error) Option {
123+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
84124
return func(w *Worker) {
85125
w.runFunc = fn
86126
}
87127
}
88128

129+
// WithLogger set custom logger
130+
func WithLogger(l queue.Logger) Option {
131+
return func(w *Worker) {
132+
w.logger = l
133+
}
134+
}
135+
89136
// NewWorker for struc
90137
func NewWorker(opts ...Option) *Worker {
91138
w := &Worker{
92-
queueNotification: make(chan queue.QueuedMessage, defaultQueueSize),
93-
stop: make(chan struct{}),
94-
runFunc: func(queue.QueuedMessage, <-chan struct{}) error {
139+
taskQueue: make(chan queue.QueuedMessage, defaultQueueSize),
140+
stop: make(chan struct{}),
141+
logger: queue.NewLogger(),
142+
runFunc: func(context.Context, queue.QueuedMessage) error {
95143
return nil
96144
},
97145
}

0 commit comments

Comments
 (0)