Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 70 additions & 10 deletions clientpool/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,33 @@ import (
)

type channelPool struct {
pool chan Client
opener ClientOpener
numActive atomic.Int32
maxClients int
pool chan Client
opener ClientOpener
numActive atomic.Int32
numTotal atomic.Int32
backgroundTaskInterval time.Duration
minClients int
maxClients int
isClosed atomic.Bool
}

const DefaultBackgroundTaskInterval = 5 * time.Second

// Make sure channelPool implements Pool interface.
var _ Pool = (*channelPool)(nil)

// NewChannelPool creates a new client pool implemented via channel.
func NewChannelPool(ctx context.Context, requiredInitialClients, bestEffortInitialClients, maxClients int, opener ClientOpener) (_ Pool, err error) {
if !(requiredInitialClients <= bestEffortInitialClients && bestEffortInitialClients <= maxClients) {
return NewChannelPoolWithMinClients(ctx, requiredInitialClients, bestEffortInitialClients, -1, maxClients, opener, DefaultBackgroundTaskInterval)
}

// NewChannelPoolWithMinClients creates a new client pool implemented via channel.
func NewChannelPoolWithMinClients(ctx context.Context, requiredInitialClients, bestEffortInitialClients, minClients, maxClients int, opener ClientOpener, backgroundTaskInterval time.Duration) (_ Pool, err error) {
if !(requiredInitialClients <= bestEffortInitialClients && bestEffortInitialClients <= maxClients && minClients <= maxClients) {
return nil, &ConfigError{
BestEffortInitialClients: bestEffortInitialClients,
RequiredInitialClients: requiredInitialClients,
MinClients: minClients,
MaxClients: maxClients,
}
}
Expand All @@ -44,6 +56,8 @@ func NewChannelPool(ctx context.Context, requiredInitialClients, bestEffortIniti
}
}()

var numTotal int32

for i := 0; i < requiredInitialClients; {
if ctxErr := ctx.Err(); ctxErr != nil {
if lastAttemptErr == nil {
Expand All @@ -59,6 +73,7 @@ func NewChannelPool(ctx context.Context, requiredInitialClients, bestEffortIniti
if err == nil {
pool <- c
i++
numTotal++
} else {
lastAttemptErr = err
if chatty.Allow() {
Expand All @@ -79,13 +94,25 @@ func NewChannelPool(ctx context.Context, requiredInitialClients, bestEffortIniti
break
}
pool <- c
numTotal++
}

return &channelPool{
pool: pool,
opener: opener,
maxClients: maxClients,
}, nil
if backgroundTaskInterval == 0 {
backgroundTaskInterval = DefaultBackgroundTaskInterval
}
cp := &channelPool{
pool: pool,
opener: opener,
maxClients: maxClients,
minClients: minClients,
backgroundTaskInterval: backgroundTaskInterval,
}
cp.numTotal.Store(numTotal)

if cp.minClients > 0 {
go cp.ensureMinClients()
}
return cp, nil
}

// Get returns a client from the pool.
Expand All @@ -112,6 +139,14 @@ func (cp *channelPool) Get() (client Client, err error) {
default:
}

// Instead of decrementing and re-incrementing numTotal, just decrement if we
// failed to open a new connection to replace the closed one.
defer func() {
if err != nil {
cp.numTotal.Add(-1)
}
}()

if cp.IsExhausted() {
err = ErrExhausted
return
Expand Down Expand Up @@ -141,6 +176,7 @@ func (cp *channelPool) Release(c Client) error {

newC, err := cp.opener()
if err != nil {
cp.numTotal.Add(-1)
return err
}
c = newC
Expand All @@ -151,6 +187,7 @@ func (cp *channelPool) Release(c Client) error {
return nil
default:
// Pool is full, just close it instead.
cp.numTotal.Add(-1)
return c.Close()
}
}
Expand All @@ -163,7 +200,9 @@ func (cp *channelPool) Close() error {
if err := c.Close(); err != nil {
lastErr = err
}
cp.numTotal.Add(-1)
}
cp.isClosed.Store(true)
return lastErr
}

Expand All @@ -181,3 +220,24 @@ func (cp *channelPool) NumAllocated() int32 {
func (cp *channelPool) IsExhausted() bool {
return cp.NumActiveClients() >= int32(cp.maxClients)
}

func (cp *channelPool) ensureMinClients() {
for !cp.isClosed.Load() {
time.Sleep(cp.backgroundTaskInterval)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a time.Ticker to loop through instead of using sleep, you can use closed signal as the condition of termination of the loop.


for cp.numTotal.Load() < int32(cp.minClients) && !cp.isClosed.Load() {
c, err := cp.opener()
if err != nil {
log.Warnf("clientpool: error creating background client (will retry): %v", err)
break
}
select {
case cp.pool <- c:
cp.numTotal.Add(1)
default:
// Pool is full
c.Close()
}
}
}
}
60 changes: 52 additions & 8 deletions clientpool/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,32 @@ import (
"errors"
"sync/atomic"
"testing"
"time"

"github.com/reddit/baseplate.go/clientpool"
)

func TestChannelPoolInvalidConfig(t *testing.T) {
const min, init, max = 5, 1, 1
_, err := clientpool.NewChannelPool(context.Background(), min, init, max, nil)
if err == nil {
t.Errorf(
"NewChannelPool with min %d and max %d expected an error, got nil.",
min,
max,
)
testCases := []struct {
Name string
Req, Init, Min, Max int
}{
{"badReq", 5, 1, 1, 1},
{"badInit", 1, 5, 1, 1},
{"badMin", 1, 1, 5, 1},
}

nilOpener := func() (clientpool.Client, error) {
return &testClient{}, nil
}

for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
_, err := clientpool.NewChannelPoolWithMinClients(context.Background(), tc.Req, tc.Init, tc.Min, tc.Max, nilOpener, clientpool.DefaultBackgroundTaskInterval)
if err == nil {
t.Errorf("NewChannelPoolWithMinClients(req=%d, init=%d, min=%d, max=%d) expected an error, got nil", tc.Req, tc.Init, tc.Min, tc.Max)
}
})
}
}

Expand Down Expand Up @@ -113,3 +126,34 @@ func TestChannelPoolWithOpenerFailure(t *testing.T) {
},
)
}

func TestChannelPoolMinClients(t *testing.T) {
opener := func(called *atomic.Int32) clientpool.ClientOpener {
return func() (clientpool.Client, error) {
if called != nil {
called.Add(1)
}
return &testClient{}, nil
}
}

const req, init, min, max = 1, 2, 5, 10
const backgroundTaskInterval = 10 * time.Millisecond
var openerCalled atomic.Int32
pool, err := clientpool.NewChannelPoolWithMinClients(context.Background(), req, init, min, max, opener(&openerCalled), backgroundTaskInterval)
if err != nil {
t.Fatal(err)
}
t.Logf("req: %d, init: %d, min: %d, max: %d", req, init, min, max)

time.Sleep(2 * backgroundTaskInterval)

t.Run("background-min-enforced", func(t *testing.T) {
got := pool.NumAllocated()
if got != min {
t.Errorf("pool should have created clients in the background, expected %d, got %d", min, got)
}
})

testPool(t, pool, &openerCalled, min, max)
}
5 changes: 4 additions & 1 deletion clientpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@ func (exhaustedError) Retryable() int {
type ConfigError struct {
BestEffortInitialClients int
RequiredInitialClients int
MinClients int
MaxClients int
}

var _ error = (*ConfigError)(nil)

func (e *ConfigError) Error() string {
return fmt.Sprintf(
"clientpool: need requiredClients (%d) <= initialClients (%d) <= maxClients (%d)",
"clientpool: need requiredClients (%d) <= initialClients (%d) <= maxClients (%d), and minClients (%d) <= maxClients (%d)",
e.RequiredInitialClients,
e.BestEffortInitialClients,
e.MaxClients,
e.MinClients,
e.MaxClients,
)
}
23 changes: 22 additions & 1 deletion thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ type ClientPoolConfig struct {
// pool can maintain.
MaxConnections int `yaml:"maxConnections"`

// MinConnections is the minimum number of thrift connections (idle+active)
// that the client pool will try to maintain via a background worker.
//
// If this value is 0 or negative, the background worker will not be started.
MinConnections int `yaml:"MinConnections"`

// BackgroundTaskInterval is the interval that the connection pool will check
// and try to ensure that there are MinConnections in the pool.
//
// If this is not set, the default duration is 5 seconds.
BackgroundTaskInterval time.Duration `yaml:"BackgroundTaskInterval"`

// MaxConnectionAge is the maximum duration that a pooled connection will be
// kept before closing in favor of a new one.
//
Expand Down Expand Up @@ -221,6 +233,9 @@ func (c ClientPoolConfig) Validate() error {
if c.InitialConnections > c.MaxConnections {
return ErrConfigInvalidConnections
}
if c.MinConnections > c.MaxConnections {
return ErrConfigInvalidMinConnections
}
return nil
}

Expand Down Expand Up @@ -258,6 +273,9 @@ func (c BaseplateClientPoolConfig) Validate() error {
if c.InitialConnections > c.MaxConnections {
errs = append(errs, ErrConfigInvalidConnections)
}
if c.MinConnections > c.MaxConnections {
errs = append(errs, ErrConfigInvalidMinConnections)
}
return errors.Join(errs...)
}

Expand Down Expand Up @@ -467,12 +485,15 @@ func newClientPool(
proto,
)
}
pool, err := clientpool.NewChannelPool(

pool, err := clientpool.NewChannelPoolWithMinClients(
ctx,
cfg.RequiredInitialConnections,
cfg.InitialConnections,
cfg.MinConnections,
cfg.MaxConnections,
opener,
cfg.BackgroundTaskInterval,
)
if err != nil {
return nil, fmt.Errorf(
Expand Down
7 changes: 4 additions & 3 deletions thriftbp/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ var (

// ClientPoolConfig errors are returned if the configuration validation fails.
var (
ErrConfigMissingServiceSlug = errors.New("`ServiceSlug` cannot be empty")
ErrConfigMissingAddr = errors.New("`Addr` cannot be empty")
ErrConfigInvalidConnections = errors.New("`InitialConnections` cannot be bigger than `MaxConnections`")
ErrConfigMissingServiceSlug = errors.New("`ServiceSlug` cannot be empty")
ErrConfigMissingAddr = errors.New("`Addr` cannot be empty")
ErrConfigInvalidConnections = errors.New("`InitialConnections` cannot be bigger than `MaxConnections`")
ErrConfigInvalidMinConnections = errors.New("`MinConnections` cannot be bigger than `MaxConnections`")
)

// WithDefaultRetryableCodes returns a list including the given error codes and
Expand Down