-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbatcher.go
310 lines (259 loc) · 7.59 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*
Package batcher is a library for batching requests.
This library encapsualtes the process of grouping requests into batches
for batch processing in an asynchronous manner.
This implementation is adapted from the Google batch API
https://github.com/terraform-providers/terraform-provider-google/blob/master/google/batcher.go
However there are a number of notable differences
- Usage assumes 1 batcher for 1 API (instead of 1 batcher for multiple APIs)
- Clients should only receive their own response, and not the batch response
their request is sent with
- Config conventions follow the framework as defined in
https://github.com/tensorflow/serving/tree/master/tensorflow_serving/batching#batch-scheduling-parameters-and-tuning
*/
package batcher
import (
"context"
"fmt"
"log"
"sync"
"time"
)
/*
RequestBatcher handles receiving of new requests, and all the background
asynchronous tasks to batch and send batch.
A new RequestBatcher should be created using the NewRequestBatcher function.
Expected usage pattern of the RequestBatcher involves declaring a
RequestBatcher on global scope and calling SendRequestWithTimeout from
multiple goroutines.
*/
type RequestBatcher struct {
sync.Mutex
*BatchingConfig
running bool
parentCtx context.Context
curBatch *startedBatch
}
/*
BatchingConfig determines how batching is done in a RequestBatcher.
*/
type BatchingConfig struct {
// Maximum request size of each batch.
MaxBatchSize int
// Maximum wait time before batch should be executed.
BatchTimeout time.Duration
// User defined SendF for sending a batch request.
// See SendFunc for type definition of this function.
SendF SendFunc
}
/*
SendFunc is a function type for sending a batch of requests.
A batch of requests is a slice of inputs to SendRequestWithTimeout.
*/
type SendFunc func(body *[]interface{}) (*[]interface{}, error)
// startedBatch refers to a batch awaiting for more requests to come in
// before having SendFunc applied to it's content
type startedBatch struct {
// Combined batch request
body []interface{}
// subscribers is a registry of the requests (batchSubscriber)
// combined to make this batch
subscribers []batchSubscriber
// timer for keeping track of BatchTimeout
timer *time.Timer
}
// singleResponse represents a single response received from SendF
type singleResponse struct {
body interface{}
err error
}
// batchSubscriber contains the response queue to awaits for a singleResponse
type batchSubscriber struct {
// singleRequestBody is the original request this subscriber represents
singleRequestBody interface{}
// respCh is the channel created to communicate the result to a waiting
// goroutine
respCh chan *singleResponse
}
/*
NewRequestBatcher creates a new RequestBatcher
from a Context and a BatchingConfig.
In the typical usage pattern, a RequestBatcher should always be alive so it
is safe and recommended to use the background context.
*/
func NewRequestBatcher(
ctx context.Context,
config *BatchingConfig,
) *RequestBatcher {
batcher := &RequestBatcher{
BatchingConfig: config,
parentCtx: ctx,
running: true,
}
if batcher.SendF == nil {
log.Fatal("Expecting SendF")
}
go func(b *RequestBatcher) {
<-b.parentCtx.Done()
log.Printf("Parent context cancelled")
b.stop()
}(batcher)
return batcher
}
// stop would safely releases all batcher allocated resources
func (b *RequestBatcher) stop() {
b.Lock()
defer b.Unlock()
log.Println("Stopping batcher")
b.running = false
if b.curBatch != nil {
b.curBatch.timer.Stop()
for i := len(b.curBatch.subscribers) - 1; i >= 0; i-- {
close(b.curBatch.subscribers[i].respCh)
}
}
log.Println("Batcher stopped")
}
/*
SendRequestWithTimeout is a method to make a single request.
It manages registering the request into the batcher,
and waiting on the response.
Arguments:
newRequestBody {*interface{}} -- A request body. SendF will expect
a slice of objects like newRequestBody.
Returns:
interface{} -- A response body. SendF's output is expected to be a slice
of objects like this.
error -- Error
*/
func (b *RequestBatcher) SendRequestWithTimeout(
newRequestBody *interface{},
timeout time.Duration,
) (interface{}, error) {
// Check that request is valid
if newRequestBody == nil {
return nil, fmt.Errorf("Received `nil` request")
}
if timeout <= b.BatchTimeout {
errmsg := fmt.Sprintf(
"Timeout period should be longer than batch timout, %v",
b.BatchTimeout,
)
return nil, fmt.Errorf(errmsg)
}
respCh, err := b.registerRequest(newRequestBody)
if err != nil {
log.Printf("[ERROR] Failed to register request: %v", err)
return nil, fmt.Errorf("Failed to register request")
}
ctx, cancel := context.WithTimeout(b.parentCtx, timeout)
defer cancel()
select {
case resp := <-respCh:
if resp.err != nil {
log.Printf("[ERROR] Failed to process request: %v", resp.err)
return nil, resp.err
}
return resp.body, nil
case <-ctx.Done():
return nil, fmt.Errorf("Request timeout after %v", timeout)
}
}
// registerRequest safely determines if new request should be
// added to existing batch or to a new batch
func (b *RequestBatcher) registerRequest(
newRequestBody *interface{},
) (<-chan *singleResponse, error) {
respCh := make(chan *singleResponse, 1)
sub := batchSubscriber{
singleRequestBody: *newRequestBody,
respCh: respCh,
}
b.Lock()
defer b.Unlock()
if b.curBatch != nil {
// Check if new request can be appended to curBatch
if len(b.curBatch.body) < b.MaxBatchSize {
// Append request to current batch
b.curBatch.body = append(b.curBatch.body, *newRequestBody)
b.curBatch.subscribers = append(b.curBatch.subscribers, sub)
// Check if current batch is full
if len(b.curBatch.body) >= b.MaxBatchSize {
// Send current batch
b.curBatch.timer.Stop()
b.sendCurBatch()
}
return respCh, nil
}
// Send current batch
b.curBatch.timer.Stop()
b.sendCurBatch()
}
// Create new batch from request
b.curBatch = &startedBatch{
body: []interface{}{*newRequestBody},
subscribers: []batchSubscriber{sub},
}
// Start a timer to send request after batch timeout
b.curBatch.timer = time.AfterFunc(b.BatchTimeout, b.sendCurBatchWithSafety)
return respCh, nil
}
// sendCurBatch pops curBatch and sends it without mutex
func (b *RequestBatcher) sendCurBatch() {
// Acquire batch
batch := b.curBatch
b.curBatch = nil
if batch != nil {
go func() {
b.send(batch)
}()
}
}
// sendCurBatchWithSafety pops curBatch and sends it with mutex
func (b *RequestBatcher) sendCurBatchWithSafety() {
// Acquire batch
b.Lock()
batch := b.curBatch
b.curBatch = nil
b.Unlock()
if batch != nil {
go func() {
b.send(batch)
}()
}
}
// send calls SendF on a startedBatch
func (b *RequestBatcher) send(batch *startedBatch) {
// Attempt to apply SendF
batchResp, err := b.SendF(&batch.body)
if err != nil {
for i := len(batch.subscribers) - 1; i >= 0; i-- {
batch.subscribers[i].respCh <- &singleResponse{
body: nil,
err: err,
}
close(batch.subscribers[i].respCh)
}
return
}
// Raise error if number of entries mismatch
if len(*batchResp) != len(batch.body) {
log.Printf("[ERROR] SendF returned different number of entries.")
for i := len(batch.subscribers) - 1; i >= 0; i-- {
batch.subscribers[i].respCh <- &singleResponse{
body: nil,
err: fmt.Errorf("API error"),
}
close(batch.subscribers[i].respCh)
}
return
}
// On success, place response into subscribed response queues.
for i := len(batch.subscribers) - 1; i >= 0; i-- {
batch.subscribers[i].respCh <- &singleResponse{
body: (*batchResp)[i],
err: nil,
}
close(batch.subscribers[i].respCh)
}
}