Skip to content

Commit 0dfca91

Browse files
committed
chore: init
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 8cd2795 commit 0dfca91

File tree

6 files changed

+256
-0
lines changed

6 files changed

+256
-0
lines changed

Diff for: go.mod

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module github/appleboy/queue
2+
3+
go 1.16
4+
5+
require github.com/rs/zerolog v1.23.0

Diff for: go.sum

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
2+
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
3+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
4+
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
5+
github.com/rs/zerolog v1.23.0 h1:UskrK+saS9P9Y789yNNulYKdARjPZuS35B8gJF2x60g=
6+
github.com/rs/zerolog v1.23.0/go.mod h1:6c7hFfxPOy7TacJc4Fcdi24/J0NKYGzjG8FWRI916Qo=
7+
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
8+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
9+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
10+
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
11+
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
12+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
13+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
14+
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
15+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
16+
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
17+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
18+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
19+
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
20+
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
21+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
22+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
23+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
24+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
25+
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
26+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
27+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
28+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

Diff for: logger.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package queue
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/rs/zerolog/log"
7+
)
8+
9+
// Logger interface is used throughout gorush
10+
type Logger interface {
11+
Infof(format string, args ...interface{})
12+
Errorf(format string, args ...interface{})
13+
Fatalf(format string, args ...interface{})
14+
Info(args ...interface{})
15+
Error(args ...interface{})
16+
Fatal(args ...interface{})
17+
}
18+
19+
type defaultLogger struct{}
20+
21+
func (l defaultLogger) Infof(format string, args ...interface{}) {
22+
log.Info().Msgf(format, args...)
23+
}
24+
25+
func (l defaultLogger) Errorf(format string, args ...interface{}) {
26+
log.Error().Msgf(format, args...)
27+
}
28+
29+
func (l defaultLogger) Fatalf(format string, args ...interface{}) {
30+
log.Fatal().Msgf(format, args...)
31+
}
32+
33+
func (l defaultLogger) Info(args ...interface{}) {
34+
log.Info().Msg(fmt.Sprint(args...))
35+
}
36+
37+
func (l defaultLogger) Error(args ...interface{}) {
38+
log.Error().Msg(fmt.Sprint(args...))
39+
}
40+
41+
func (l defaultLogger) Fatal(args ...interface{}) {
42+
log.Fatal().Msg(fmt.Sprint(args...))
43+
}

Diff for: queue.go

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package queue
2+
3+
import (
4+
"errors"
5+
"runtime"
6+
"sync"
7+
"sync/atomic"
8+
)
9+
10+
type (
11+
// A Queue is a message queue.
12+
Queue struct {
13+
logger Logger
14+
workerCount int
15+
routineGroup *routineGroup
16+
quit chan struct{}
17+
worker Worker
18+
stopOnce sync.Once
19+
runningWorkers int32
20+
}
21+
)
22+
23+
// Option for queue system
24+
type Option func(*Queue)
25+
26+
// ErrMissingWorker missing define worker
27+
var ErrMissingWorker = errors.New("missing worker module")
28+
29+
// WithWorkerCount set worker count
30+
func WithWorkerCount(num int) Option {
31+
return func(q *Queue) {
32+
q.workerCount = num
33+
}
34+
}
35+
36+
// WithLogger set custom logger
37+
func WithLogger(l Logger) Option {
38+
return func(q *Queue) {
39+
q.logger = l
40+
}
41+
}
42+
43+
// WithWorker set custom worker
44+
func WithWorker(w Worker) Option {
45+
return func(q *Queue) {
46+
q.worker = w
47+
}
48+
}
49+
50+
// NewQueue returns a Queue.
51+
func NewQueue(opts ...Option) (*Queue, error) {
52+
q := &Queue{
53+
workerCount: runtime.NumCPU(),
54+
routineGroup: newRoutineGroup(),
55+
quit: make(chan struct{}),
56+
logger: new(defaultLogger),
57+
}
58+
59+
// Loop through each option
60+
for _, opt := range opts {
61+
// Call the option giving the instantiated
62+
opt(q)
63+
}
64+
65+
if q.worker == nil {
66+
return nil, ErrMissingWorker
67+
}
68+
69+
return q, nil
70+
}
71+
72+
// Capacity for queue max size
73+
func (q *Queue) Capacity() int {
74+
return q.worker.Capacity()
75+
}
76+
77+
// Usage for count of queue usage
78+
func (q *Queue) Usage() int {
79+
return q.worker.Usage()
80+
}
81+
82+
// Start to enable all worker
83+
func (q *Queue) Start() {
84+
q.startWorker()
85+
}
86+
87+
// Shutdown stops all queues.
88+
func (q *Queue) Shutdown() {
89+
q.stopOnce.Do(func() {
90+
q.worker.Shutdown()
91+
close(q.quit)
92+
})
93+
}
94+
95+
// Workers returns the numbers of workers has been created.
96+
func (q *Queue) Workers() int {
97+
return int(atomic.LoadInt32(&q.runningWorkers))
98+
}
99+
100+
// Wait all process
101+
func (q *Queue) Wait() {
102+
q.routineGroup.Wait()
103+
}
104+
105+
// Queue to queue all job
106+
func (q *Queue) Queue(job QueuedMessage) error {
107+
return q.worker.Queue(job)
108+
}
109+
110+
func (q *Queue) work() {
111+
num := atomic.AddInt32(&q.runningWorkers, 1)
112+
if err := q.worker.BeforeRun(); err != nil {
113+
q.logger.Fatal(err)
114+
}
115+
q.routineGroup.Run(func() {
116+
// to handle panic cases from inside the worker
117+
// in such case, we start a new goroutine
118+
defer func() {
119+
atomic.AddInt32(&q.runningWorkers, -1)
120+
if err := recover(); err != nil {
121+
q.logger.Error(err)
122+
go q.work()
123+
}
124+
}()
125+
q.logger.Infof("start the worker num: %d", num)
126+
q.worker.Run(q.quit)
127+
q.logger.Infof("stop the worker num: %d", num)
128+
})
129+
if err := q.worker.AfterRun(); err != nil {
130+
q.logger.Fatal(err)
131+
}
132+
}
133+
134+
func (q *Queue) startWorker() {
135+
for i := 0; i < q.workerCount; i++ {
136+
go q.work()
137+
}
138+
}

Diff for: thread.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package queue
2+
3+
import "sync"
4+
5+
type routineGroup struct {
6+
waitGroup sync.WaitGroup
7+
}
8+
9+
func newRoutineGroup() *routineGroup {
10+
return new(routineGroup)
11+
}
12+
13+
func (g *routineGroup) Run(fn func()) {
14+
g.waitGroup.Add(1)
15+
16+
go func() {
17+
defer g.waitGroup.Done()
18+
fn()
19+
}()
20+
}
21+
22+
func (g *routineGroup) Wait() {
23+
g.waitGroup.Wait()
24+
}

Diff for: worker.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package queue
2+
3+
// Worker interface
4+
type Worker interface {
5+
BeforeRun() error
6+
Run(chan struct{}) error
7+
AfterRun() error
8+
9+
Shutdown() error
10+
Queue(job QueuedMessage) error
11+
Capacity() int
12+
Usage() int
13+
}
14+
15+
// QueuedMessage ...
16+
type QueuedMessage interface {
17+
Bytes() []byte
18+
}

0 commit comments

Comments
 (0)