forked from kamilsk/semaphore
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsemaphore.go
109 lines (91 loc) · 2.89 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Package semaphore provides an implementation of Semaphore pattern
// with timeout of lock/unlock operations based on channels.
package semaphore
import "errors"
// HealthChecker defines helpful methods related with semaphore status.
type HealthChecker interface {
// Capacity returns a capacity of a semaphore.
// It must be safe to call Capacity concurrently on a single semaphore.
Capacity() int
// Occupied returns a current number of occupied slots.
// It must be safe to call Occupied concurrently on a single semaphore.
Occupied() int
}
// Releaser defines a method to release the previously occupied semaphore.
type Releaser interface {
// Release releases the previously occupied slot.
// If no places were occupied, then it returns an appropriate error.
// It must be safe to call Release concurrently on a single semaphore.
Release() error
}
// ReleaseFunc tells a semaphore to release the previously occupied slot
// and ignore an error if it occurs.
type ReleaseFunc func()
// Release calls f().
func (f ReleaseFunc) Release() error {
f()
return nil
}
// Semaphore provides the functionality of the same named pattern.
type Semaphore interface {
HealthChecker
Releaser
// Acquire tries to reduce the number of available slots for 1.
// The operation can be canceled using context. In this case,
// it returns an appropriate error.
// It must be safe to call Acquire concurrently on a single semaphore.
Acquire(deadline <-chan struct{}) (ReleaseFunc, error)
// Signal returns a channel to send to it release function
// only if Acquire is successful. In any case, the channel will be closed.
Signal(deadline <-chan struct{}) <-chan ReleaseFunc
}
// New constructs a new thread-safe Semaphore with the given capacity.
func New(capacity int) Semaphore {
return make(semaphore, capacity)
}
// IsEmpty checks if passed error is related to call Release on empty semaphore.
func IsEmpty(err error) bool {
return err == errEmpty
}
// IsTimeout checks if passed error is related to call Acquire on full semaphore.
func IsTimeout(err error) bool {
return err == errTimeout
}
var (
nothing ReleaseFunc = func() {}
errEmpty = errors.New("semaphore is empty")
errTimeout = errors.New("operation timeout")
)
type semaphore chan struct{}
func (sem semaphore) Acquire(deadline <-chan struct{}) (ReleaseFunc, error) {
select {
case sem <- struct{}{}:
return func() { _ = sem.Release() }, nil //nolint: gas
case <-deadline:
return nothing, errTimeout
}
}
func (sem semaphore) Capacity() int {
return cap(sem)
}
func (sem semaphore) Occupied() int {
return len(sem)
}
func (sem semaphore) Release() error {
select {
case <-sem:
return nil
default:
return errEmpty
}
}
func (sem semaphore) Signal(deadline <-chan struct{}) <-chan ReleaseFunc {
ch := make(chan ReleaseFunc, 1)
go func() {
if release, err := sem.Acquire(deadline); err == nil {
ch <- release
}
close(ch)
}()
return ch
}