forked from easierway/service_decorators
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcircuit_break_decorator.go
153 lines (133 loc) · 4.56 KB
/
circuit_break_decorator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package service_decorators
import (
"errors"
"time"
)
// ErrorCircuitBreakTimeout happens when invoking is timeout
var ErrorCircuitBreakTimeout = errors.New("the invoking is timeout")
// ErrorCircuitBreakTooManyConcurrentRequests happens when the number of the concurrent requests beyonds the setting
var ErrorCircuitBreakTooManyConcurrentRequests = errors.New("the concurrency is beyond the limit")
// CircuitBreakDecoratorConfig includes the settings of CircuitBreakDecorator
type CircuitBreakDecoratorConfig struct {
// Timeout is about function excution duration. Default timeout is 1 second
timeout time.Duration
// MaxCurrentRequests defines the max concurrency
maxCurrentRequests int
// if TimeoutFallbackFunction is defined,
// it would be called when timeout error occurring
timeoutFallbackFunction ServiceFallbackFunc
// if BeyondMaxConcurrencyFallbackFunction is defined,
// it would be called when concurrency beyonding error occurring
beyondMaxConcurrencyFallbackFunction ServiceFallbackFunc
}
// CircuitBreakDecorator provides the circuit break,
// fallback, concurrency control
type CircuitBreakDecorator struct {
// CircuitBreakDecoratorConfig
Config *CircuitBreakDecoratorConfig
tokenBuffer chan struct{}
}
type serviceFuncResponse struct {
resp Response
err error
}
// CreateCircuitBreakDecorator is the helper method of
// creating CircuitBreakDecorator.
// The settings can be defined by WithXX method chain
func CreateCircuitBreakDecorator() *CircuitBreakDecoratorConfig {
return &CircuitBreakDecoratorConfig{
timeout: time.Second * 1,
}
}
// WithTimeout sets the method execution timeout
func (config *CircuitBreakDecoratorConfig) WithTimeout(timeOut time.Duration) *CircuitBreakDecoratorConfig {
config.timeout = timeOut
return config
}
// WithMaxCurrentRequests sets max concurrency
func (config *CircuitBreakDecoratorConfig) WithMaxCurrentRequests(maxCurReq int) *CircuitBreakDecoratorConfig {
config.maxCurrentRequests = maxCurReq
return config
}
// WithTimeoutFallbackFunction sets the fallback method for timeout error
func (config *CircuitBreakDecoratorConfig) WithTimeoutFallbackFunction(
fallbackFn ServiceFallbackFunc) *CircuitBreakDecoratorConfig {
config.timeoutFallbackFunction = fallbackFn
return config
}
// WithBeyondMaxConcurrencyFallbackFunction sets the fallback method for beyonding max concurrency error
func (config *CircuitBreakDecoratorConfig) WithBeyondMaxConcurrencyFallbackFunction(
fallbackFn ServiceFallbackFunc) *CircuitBreakDecoratorConfig {
config.beyondMaxConcurrencyFallbackFunction = fallbackFn
return config
}
// Build will create CircuitBreakDecorator with the settings defined by WithXX method chain
func (config *CircuitBreakDecoratorConfig) Build() (*CircuitBreakDecorator, error) {
var tokenBuf chan struct{}
if config.maxCurrentRequests < 0 {
return nil, errors.New("invalid max current requests setting")
}
if config.maxCurrentRequests > 0 {
tokenBuf = make(chan struct{}, config.maxCurrentRequests)
for i := 0; i < config.maxCurrentRequests; i++ {
tokenBuf <- struct{}{}
}
}
return &CircuitBreakDecorator{
Config: config,
tokenBuffer: tokenBuf,
}, nil
}
func (dec *CircuitBreakDecorator) getToken() bool {
select {
case <-dec.tokenBuffer:
return true
default:
return false
}
}
func (dec *CircuitBreakDecorator) releaseToken() {
select {
case dec.tokenBuffer <- struct{}{}:
return
default:
panic("There's a fatal bug here. Unexpected token has been returned.")
}
}
// Decorate is to add the circuit break/concurrency control logic to the function
func (dec *CircuitBreakDecorator) Decorate(innerFn ServiceFunc) ServiceFunc {
return func(req Request) (Response, error) {
ownToken := false
if dec.Config.maxCurrentRequests > 0 {
if !dec.getToken() {
if dec.Config.beyondMaxConcurrencyFallbackFunction != nil {
return dec.Config.
beyondMaxConcurrencyFallbackFunction(req,
ErrorCircuitBreakTooManyConcurrentRequests)
}
return nil, ErrorCircuitBreakTooManyConcurrentRequests
}
ownToken = true
}
output := make(chan serviceFuncResponse, 1)
go func(r Request, withToken bool) {
if withToken {
defer dec.releaseToken()
}
inResp, inErr := innerFn(r)
output <- serviceFuncResponse{
resp: inResp,
err: inErr,
}
}(req, ownToken)
select {
case inServResp := <-output:
return inServResp.resp, inServResp.err
case <-time.After(dec.Config.timeout):
if dec.Config.timeoutFallbackFunction != nil {
return dec.Config.timeoutFallbackFunction(req, ErrorCircuitBreakTimeout)
}
return nil, ErrorCircuitBreakTimeout
}
}
}