Skip to content

Commit 40cbe4f

Browse files
committed
use buffered channel to manage items in query pool
1 parent 890206e commit 40cbe4f

File tree

3 files changed

+322
-92
lines changed

3 files changed

+322
-92
lines changed

internal/pool/defaults.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
11
package pool
22

3-
const DefaultLimit = 50
3+
import "time"
4+
5+
const (
6+
DefaultLimit = 50
7+
8+
// defaultCreateRetryDelay specifies amount of time that spawner will wait
9+
// before retrying unsuccessful item create.
10+
defaultCreateRetryDelay = 500 * time.Millisecond
11+
12+
// defaultSpawnGoroutinesNumber specifies amount of spawnItems goroutines.
13+
// Having more than one spawner can potentially decrease warm-up time
14+
// and connections re-establishment time after connectivity failure.
15+
// Too high value will result in frequent connection establishment
16+
// attempts (see defaultCreateRetryDelay) during connectivity
17+
// issues which in some cases might not be desirable.
18+
defaultSpawnGoroutinesNumber = 2
19+
)
420

521
var defaultTrace = &Trace{
622
OnNew: func(info *NewStartInfo) func(info *NewDoneInfo) {

internal/pool/pool.go

+135-91
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,26 @@ type (
3838
createTimeout time.Duration
3939
closeTimeout time.Duration
4040

41-
mu xsync.Mutex
42-
idle []PT
43-
index map[PT]struct{}
44-
done chan struct{}
41+
// queue is a buffered channel that holds ready-to-use items.
42+
// Newly created items are sent to this channel by spawner goroutine.
43+
// getItem reads from this channel to get items for usage.
44+
// putItems sends item to this channel when it's no longer needed.
45+
// Len of the buffered channel should be equal to configured pool size
46+
// (MUST NOT be less).
47+
// If item is in this queue, then it's considered idle (not in use).
48+
queue chan PT
49+
50+
// itemTokens similarly to 'queue' is a buffered channel, and it holds 'tokens'.
51+
// Presence of token in this channel indicates that there's requests to create item.
52+
// Every token will eventually result in creation of new item (spawnItems makes sure of that).
53+
//
54+
// itemTokens must have same size as queue.
55+
// Sum of every existing token plus sum of every existing item in any time MUST be equal
56+
// to pool size. New token MUST be added by getItem/putItem if they discovered item in use to be
57+
// no good and discarded it.
58+
itemTokens chan struct{}
59+
60+
done chan struct{}
4561

4662
stats *safeStats
4763
}
@@ -159,6 +175,15 @@ func New[PT Item[T], T any](
159175
}
160176
}
161177

178+
p.queue = make(chan PT, p.limit)
179+
p.itemTokens = make(chan struct{}, p.limit)
180+
go func() {
181+
// fill tokens
182+
for i := 0; i < p.limit; i++ {
183+
p.itemTokens <- struct{}{}
184+
}
185+
}()
186+
162187
onDone := p.trace.OnNew(&NewStartInfo{
163188
Context: &ctx,
164189
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/pool.New"),
@@ -172,16 +197,61 @@ func New[PT Item[T], T any](
172197

173198
p.createItem = createItemWithTimeoutHandling(p.createItem, p)
174199

175-
p.idle = make([]PT, 0, p.limit)
176-
p.index = make(map[PT]struct{}, p.limit)
177200
p.stats = &safeStats{
178201
v: stats.Stats{Limit: p.limit},
179202
onChange: p.trace.OnChange,
180203
}
181204

205+
for i := 0; i < defaultSpawnGoroutinesNumber; i++ {
206+
go p.spawnItems(ctx)
207+
}
208+
182209
return p
183210
}
184211

212+
// spawnItems creates one item per each available itemToken and sends new item to internal item queue.
213+
// It ensures that pool would always have amount of connections equal to configured limit.
214+
// If item creation ended with error it will be retried infinity with configured interval until success.
215+
func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
216+
spawnLoop:
217+
for {
218+
select {
219+
case <-ctx.Done():
220+
break spawnLoop
221+
case <-p.done:
222+
break spawnLoop
223+
case <-p.itemTokens:
224+
// got token, must create item
225+
for {
226+
item, err := p.createItem(ctx)
227+
if err != nil {
228+
select {
229+
case <-ctx.Done():
230+
break spawnLoop
231+
case <-p.done:
232+
break spawnLoop
233+
case <-time.After(defaultCreateRetryDelay):
234+
// try again.
235+
// token must always result in new item and not be lost.
236+
}
237+
} else {
238+
// item is created successfully, put it in queue
239+
select {
240+
case <-ctx.Done():
241+
break spawnLoop
242+
case <-p.done:
243+
break spawnLoop
244+
case p.queue <- item:
245+
p.stats.Idle().Inc()
246+
}
247+
248+
continue spawnLoop
249+
}
250+
}
251+
}
252+
}
253+
}
254+
185255
// defaultCreateItem returns a new item
186256
func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
187257
var item T
@@ -247,31 +317,12 @@ func createItemWithContext[PT Item[T], T any](
247317
return xerrors.WithStackTrace(err)
248318
}
249319

250-
needCloseItem := true
251-
defer func() {
252-
if needCloseItem {
253-
_ = p.closeItem(ctx, newItem)
254-
}
255-
}()
256-
257320
select {
258321
case <-p.done:
259322
return xerrors.WithStackTrace(errClosedPool)
260323
case <-ctx.Done():
261-
p.mu.Lock()
262-
defer p.mu.Unlock()
263-
264-
if len(p.index) < p.limit {
265-
p.idle = append(p.idle, newItem)
266-
p.index[newItem] = struct{}{}
267-
p.stats.Index().Inc()
268-
needCloseItem = false
269-
}
270-
271324
return xerrors.WithStackTrace(ctx.Err())
272325
case ch <- newItem:
273-
needCloseItem = false
274-
275326
return nil
276327
}
277328
}
@@ -280,6 +331,10 @@ func (p *Pool[PT, T]) Stats() stats.Stats {
280331
return p.stats.Get()
281332
}
282333

334+
// getItem retrieves item from the queue.
335+
// If retrieved item happens to be not alive, then it's destroyed
336+
// and tokens queue is filled to +1 so new item can be created by spawner goroutine.
337+
// After, the process will be repeated until alive item is retrieved.
283338
func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
284339
onDone := p.trace.OnGet(&GetStartInfo{
285340
Context: &ctx,
@@ -295,48 +350,32 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
295350
return nil, xerrors.WithStackTrace(err)
296351
}
297352

298-
select {
299-
case <-p.done:
300-
return nil, xerrors.WithStackTrace(errClosedPool)
301-
case <-ctx.Done():
302-
return nil, xerrors.WithStackTrace(ctx.Err())
303-
default:
304-
var item PT
305-
p.mu.WithLock(func() {
306-
if len(p.idle) > 0 {
307-
item, p.idle = p.idle[0], p.idle[1:]
308-
p.stats.Idle().Dec()
309-
}
310-
})
311-
312-
if item != nil {
313-
if item.IsAlive() {
314-
return item, nil
353+
// get item and ensure it's alive.
354+
// Infinite loop here guarantees that we either return alive item
355+
// or block infinitely until we have one.
356+
// It is assumed that calling code should use context if it wishes to time out the call.
357+
for {
358+
select {
359+
case <-p.done:
360+
return nil, xerrors.WithStackTrace(errClosedPool)
361+
case <-ctx.Done():
362+
return nil, xerrors.WithStackTrace(ctx.Err())
363+
case item := <-p.queue: // get or wait for item
364+
p.stats.Idle().Dec()
365+
if item != nil {
366+
if item.IsAlive() {
367+
// item is alive, return it
368+
p.stats.InUse().Inc()
369+
370+
return item, nil
371+
}
372+
// item is not alive
373+
_ = p.closeItem(ctx, item) // clean up dead item
315374
}
316-
_ = p.closeItem(ctx, item)
317-
p.mu.WithLock(func() {
318-
delete(p.index, item)
319-
})
320-
p.stats.Index().Dec()
321-
}
322-
323-
item, err := p.createItem(ctx)
324-
if err != nil {
325-
return nil, xerrors.WithStackTrace(err)
326-
}
375+
p.itemTokens <- struct{}{} // signal spawn goroutine to create a new item
327376

328-
addedToIndex := false
329-
p.mu.WithLock(func() {
330-
if len(p.index) < p.limit {
331-
p.index[item] = struct{}{}
332-
addedToIndex = true
333-
}
334-
})
335-
if addedToIndex {
336-
p.stats.Index().Inc()
377+
// and try again
337378
}
338-
339-
return item, nil
340379
}
341380
}
342381

@@ -358,25 +397,29 @@ func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
358397
select {
359398
case <-p.done:
360399
return xerrors.WithStackTrace(errClosedPool)
400+
case <-ctx.Done():
401+
return xerrors.WithStackTrace(ctx.Err())
361402
default:
362-
if !item.IsAlive() {
403+
p.stats.InUse().Dec()
404+
if item.IsAlive() {
405+
// put back in the queue
406+
select {
407+
case <-p.done:
408+
return xerrors.WithStackTrace(errClosedPool)
409+
case <-ctx.Done():
410+
return xerrors.WithStackTrace(ctx.Err())
411+
case p.queue <- item:
412+
p.stats.Idle().Inc()
413+
}
414+
} else {
415+
// item is not alive
416+
// add token and close
417+
p.itemTokens <- struct{}{}
363418
_ = p.closeItem(ctx, item)
364-
365-
p.mu.WithLock(func() {
366-
delete(p.index, item)
367-
})
368-
p.stats.Index().Dec()
369-
370-
return xerrors.WithStackTrace(errItemIsNotAlive)
371419
}
372-
373-
p.mu.WithLock(func() {
374-
p.idle = append(p.idle, item)
375-
})
376-
p.stats.Idle().Inc()
377-
378-
return nil
379420
}
421+
422+
return nil
380423
}
381424

382425
func (p *Pool[PT, T]) closeItem(ctx context.Context, item PT) error {
@@ -417,9 +460,6 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
417460
_ = p.putItem(ctx, item)
418461
}()
419462

420-
p.stats.InUse().Inc()
421-
defer p.stats.InUse().Dec()
422-
423463
err = f(ctx, item)
424464
if err != nil {
425465
return xerrors.WithStackTrace(err)
@@ -479,17 +519,21 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
479519
})
480520
}()
481521

522+
// Only closing done channel.
523+
// Due to multiple senders queue is not closed here,
524+
// we're just making sure to drain it fully to close any existing item.
482525
close(p.done)
483-
484-
p.mu.Lock()
485-
defer p.mu.Unlock()
486-
487526
var g errgroup.Group
488-
for item := range p.index {
489-
item := item
490-
g.Go(func() error {
491-
return item.Close(ctx)
492-
})
527+
shutdownLoop:
528+
for {
529+
select {
530+
case item := <-p.queue:
531+
g.Go(func() error {
532+
return item.Close(ctx)
533+
})
534+
default:
535+
break shutdownLoop
536+
}
493537
}
494538
if err := g.Wait(); err != nil {
495539
return xerrors.WithStackTrace(err)

0 commit comments

Comments
 (0)