Skip to content

Commit 61cafd1

Browse files
authored
Merge pull request #1307 from adwski/internal-pool-with-buffered-channel
use buffered channel to manage items in query pool
2 parents c2b8607 + 5ca728f commit 61cafd1

File tree

4 files changed

+329
-94
lines changed

4 files changed

+329
-94
lines changed

internal/pool/defaults.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pool
22

3-
const DefaultLimit = 50
3+
const (
4+
DefaultLimit = 50
5+
)
46

57
var defaultTrace = &Trace{
68
OnNew: func(info *NewStartInfo) func(info *NewDoneInfo) {

internal/pool/errors.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,4 @@ import (
44
"errors"
55
)
66

7-
var (
8-
errClosedPool = errors.New("closed pool")
9-
errItemIsNotAlive = errors.New("item is not alive")
10-
)
7+
var errClosedPool = errors.New("closed pool")

internal/pool/pool.go

+155-89
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"golang.org/x/sync/errgroup"
@@ -38,12 +39,32 @@ type (
3839
createTimeout time.Duration
3940
closeTimeout time.Duration
4041

41-
mu xsync.Mutex
42-
idle []PT
43-
index map[PT]struct{}
44-
done chan struct{}
42+
// queue is a buffered channel that holds ready-to-use items.
43+
// Newly created items are sent to this channel by spawner goroutine.
44+
// getItem reads from this channel to get items for usage.
45+
// putItems sends item to this channel when it's no longer needed.
46+
// Len of the buffered channel should be equal to configured pool size
47+
// (MUST NOT be less).
48+
// If item is in this queue, then it's considered idle (not in use).
49+
queue chan PT
50+
51+
// itemTokens similarly to 'queue' is a buffered channel, and it holds 'tokens'.
52+
// Presence of token in this channel indicates that there's requests to create item.
53+
// Every token will eventually result in creation of new item (spawnItems makes sure of that).
54+
//
55+
// itemTokens must have same size as queue.
56+
// Sum of every existing token plus sum of every existing item in any time MUST be equal
57+
// to pool size. New token MUST be added by getItem/putItem if they discovered item in use to be
58+
// no good and discarded it.
59+
itemTokens chan struct{}
60+
61+
done chan struct{}
4562

4663
stats *safeStats
64+
65+
spawnCancel context.CancelFunc
66+
67+
wg *sync.WaitGroup
4768
}
4869
option[PT Item[T], T any] func(p *Pool[PT, T])
4970
)
@@ -159,6 +180,15 @@ func New[PT Item[T], T any](
159180
}
160181
}
161182

183+
p.queue = make(chan PT, p.limit)
184+
p.itemTokens = make(chan struct{}, p.limit)
185+
go func() {
186+
// fill tokens
187+
for i := 0; i < p.limit; i++ {
188+
p.itemTokens <- struct{}{}
189+
}
190+
}()
191+
162192
onDone := p.trace.OnNew(&NewStartInfo{
163193
Context: &ctx,
164194
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/pool.New"),
@@ -172,16 +202,73 @@ func New[PT Item[T], T any](
172202

173203
p.createItem = createItemWithTimeoutHandling(p.createItem, p)
174204

175-
p.idle = make([]PT, 0, p.limit)
176-
p.index = make(map[PT]struct{}, p.limit)
177205
p.stats = &safeStats{
178206
v: stats.Stats{Limit: p.limit},
179207
onChange: p.trace.OnChange,
180208
}
181209

210+
var spawnCtx context.Context
211+
p.wg = &sync.WaitGroup{}
212+
spawnCtx, p.spawnCancel = xcontext.WithCancel(xcontext.ValueOnly(ctx))
213+
p.wg.Add(1)
214+
go p.spawnItems(spawnCtx)
215+
182216
return p
183217
}
184218

219+
// spawnItems creates one item per each available itemToken and sends new item to internal item queue.
220+
// It ensures that pool would always have amount of connections equal to configured limit.
221+
// If item creation ended with error it will be retried infinity with configured interval until success.
222+
func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
223+
defer p.wg.Done()
224+
for {
225+
select {
226+
case <-ctx.Done():
227+
return
228+
case <-p.done:
229+
return
230+
case <-p.itemTokens:
231+
// got token, must create item
232+
createLoop:
233+
for {
234+
select {
235+
case <-ctx.Done():
236+
return
237+
case <-p.done:
238+
return
239+
default:
240+
p.wg.Add(1)
241+
err := p.trySpawn(ctx)
242+
if err == nil {
243+
break createLoop
244+
}
245+
}
246+
// spawn was unsuccessful, need to try again.
247+
// token must always result in new item and not be lost.
248+
}
249+
}
250+
}
251+
}
252+
253+
func (p *Pool[PT, T]) trySpawn(ctx context.Context) error {
254+
defer p.wg.Done()
255+
item, err := p.createItem(ctx)
256+
if err != nil {
257+
return err
258+
}
259+
// item was created successfully, put it in queue
260+
select {
261+
case <-ctx.Done():
262+
return nil
263+
case <-p.done:
264+
return nil
265+
case p.queue <- item:
266+
p.stats.Idle().Inc()
267+
}
268+
269+
return nil
270+
}
271+
185272
// defaultCreateItem returns a new item
186273
func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
187274
var item T
@@ -247,31 +334,12 @@ func createItemWithContext[PT Item[T], T any](
247334
return xerrors.WithStackTrace(err)
248335
}
249336

250-
needCloseItem := true
251-
defer func() {
252-
if needCloseItem {
253-
_ = p.closeItem(ctx, newItem)
254-
}
255-
}()
256-
257337
select {
258338
case <-p.done:
259339
return xerrors.WithStackTrace(errClosedPool)
260340
case <-ctx.Done():
261-
p.mu.Lock()
262-
defer p.mu.Unlock()
263-
264-
if len(p.index) < p.limit {
265-
p.idle = append(p.idle, newItem)
266-
p.index[newItem] = struct{}{}
267-
p.stats.Index().Inc()
268-
needCloseItem = false
269-
}
270-
271341
return xerrors.WithStackTrace(ctx.Err())
272342
case ch <- newItem:
273-
needCloseItem = false
274-
275343
return nil
276344
}
277345
}
@@ -280,6 +348,10 @@ func (p *Pool[PT, T]) Stats() stats.Stats {
280348
return p.stats.Get()
281349
}
282350

351+
// getItem retrieves item from the queue.
352+
// If retrieved item happens to be not alive, then it's destroyed
353+
// and tokens queue is filled to +1 so new item can be created by spawner goroutine.
354+
// After, the process will be repeated until alive item is retrieved.
283355
func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
284356
onDone := p.trace.OnGet(&GetStartInfo{
285357
Context: &ctx,
@@ -295,48 +367,30 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
295367
return nil, xerrors.WithStackTrace(err)
296368
}
297369

298-
select {
299-
case <-p.done:
300-
return nil, xerrors.WithStackTrace(errClosedPool)
301-
case <-ctx.Done():
302-
return nil, xerrors.WithStackTrace(ctx.Err())
303-
default:
304-
var item PT
305-
p.mu.WithLock(func() {
306-
if len(p.idle) > 0 {
307-
item, p.idle = p.idle[0], p.idle[1:]
308-
p.stats.Idle().Dec()
309-
}
310-
})
311-
312-
if item != nil {
313-
if item.IsAlive() {
314-
return item, nil
315-
}
316-
_ = p.closeItem(ctx, item)
317-
p.mu.WithLock(func() {
318-
delete(p.index, item)
319-
})
320-
p.stats.Index().Dec()
321-
}
322-
323-
item, err := p.createItem(ctx)
324-
if err != nil {
325-
return nil, xerrors.WithStackTrace(err)
326-
}
370+
// get item and ensure it's alive.
371+
// Infinite loop here guarantees that we either return alive item
372+
// or block infinitely until we have one.
373+
// It is assumed that calling code should use context if it wishes to time out the call.
374+
for {
375+
select {
376+
case <-p.done:
377+
return nil, xerrors.WithStackTrace(errClosedPool)
378+
case <-ctx.Done():
379+
return nil, xerrors.WithStackTrace(ctx.Err())
380+
case item := <-p.queue: // get or wait for item
381+
p.stats.Idle().Dec()
382+
if item != nil {
383+
if item.IsAlive() {
384+
// item is alive, return it
327385

328-
addedToIndex := false
329-
p.mu.WithLock(func() {
330-
if len(p.index) < p.limit {
331-
p.index[item] = struct{}{}
332-
addedToIndex = true
386+
return item, nil
387+
}
388+
// item is not alive
389+
_ = p.closeItem(ctx, item) // clean up dead item
333390
}
334-
})
335-
if addedToIndex {
336-
p.stats.Index().Inc()
391+
p.itemTokens <- struct{}{} // signal spawn goroutine to create a new item
392+
// and try again
337393
}
338-
339-
return item, nil
340394
}
341395
}
342396

@@ -358,25 +412,28 @@ func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
358412
select {
359413
case <-p.done:
360414
return xerrors.WithStackTrace(errClosedPool)
415+
case <-ctx.Done():
416+
return xerrors.WithStackTrace(ctx.Err())
361417
default:
362-
if !item.IsAlive() {
418+
if item.IsAlive() {
419+
// put back in the queue
420+
select {
421+
case <-p.done:
422+
return xerrors.WithStackTrace(errClosedPool)
423+
case <-ctx.Done():
424+
return xerrors.WithStackTrace(ctx.Err())
425+
case p.queue <- item:
426+
p.stats.Idle().Inc()
427+
}
428+
} else {
429+
// item is not alive
430+
// add token and close
431+
p.itemTokens <- struct{}{}
363432
_ = p.closeItem(ctx, item)
364-
365-
p.mu.WithLock(func() {
366-
delete(p.index, item)
367-
})
368-
p.stats.Index().Dec()
369-
370-
return xerrors.WithStackTrace(errItemIsNotAlive)
371433
}
372-
373-
p.mu.WithLock(func() {
374-
p.idle = append(p.idle, item)
375-
})
376-
p.stats.Idle().Inc()
377-
378-
return nil
379434
}
435+
436+
return nil
380437
}
381438

382439
func (p *Pool[PT, T]) closeItem(ctx context.Context, item PT) error {
@@ -412,14 +469,13 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
412469

413470
return xerrors.WithStackTrace(err)
414471
}
472+
p.stats.InUse().Inc()
415473

416474
defer func() {
417475
_ = p.putItem(ctx, item)
476+
p.stats.InUse().Dec()
418477
}()
419478

420-
p.stats.InUse().Inc()
421-
defer p.stats.InUse().Dec()
422-
423479
err = f(ctx, item)
424480
if err != nil {
425481
return xerrors.WithStackTrace(err)
@@ -479,17 +535,27 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
479535
})
480536
}()
481537

538+
// canceling spawner (and any underlying createItem calls)
539+
p.spawnCancel()
540+
541+
// Only closing done channel.
542+
// Due to multiple senders queue is not closed here,
543+
// we're just making sure to drain it fully to close any existing item.
482544
close(p.done)
483545

484-
p.mu.Lock()
485-
defer p.mu.Unlock()
546+
p.wg.Wait()
486547

487548
var g errgroup.Group
488-
for item := range p.index {
489-
item := item
490-
g.Go(func() error {
491-
return item.Close(ctx)
492-
})
549+
shutdownLoop:
550+
for {
551+
select {
552+
case item := <-p.queue:
553+
g.Go(func() error {
554+
return item.Close(ctx)
555+
})
556+
default:
557+
break shutdownLoop
558+
}
493559
}
494560
if err := g.Wait(); err != nil {
495561
return xerrors.WithStackTrace(err)

0 commit comments

Comments
 (0)