-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadpool.go
67 lines (58 loc) · 1.09 KB
/
threadpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package threadpool
import (
"context"
"errors"
)
var (
// ErrPoolSizeMustPresent pool size is illegal
ErrPoolSizeMustPresent = errors.New("pool size is illegal")
// ErrPoolIsFull pool is full
ErrPoolIsFull = errors.New("pool is full")
)
// ThreadPool thread pool interface
type ThreadPool interface {
// Go async invoke, returns error when the pool is full.
Go(func()) error
// Run sync invoke, pending the func when pool is full.
Run(func())
}
type threadPool struct {
wc chan func()
ctx context.Context
}
// NewPool create thread pool with size
func NewPool(ctx context.Context, size int) (ThreadPool, error) {
if size <= 0 {
return nil, ErrPoolSizeMustPresent
}
wc := make(chan func(), size)
p := &threadPool{
wc: wc,
ctx: ctx,
}
for i := 0; i < size; i++ {
go p.watch()
}
return p, nil
}
func (t *threadPool) Go(f func()) error {
select {
case t.wc <- f:
return nil
default:
return ErrPoolIsFull
}
}
func (t *threadPool) Run(f func()) {
t.wc <- f
}
func (t *threadPool) watch() {
for {
select {
case f := <-t.wc:
f()
case <-t.ctx.Done():
return
}
}
}