Skip to content

Commit c069bb3

Browse files
authored
Merge pull request #1373 from ydb-platform/create-session-error
added throwing create session error to result error from `internal/table.Client.internalPoolGet`
2 parents 69ba142 + 6d9e74c commit c069bb3

File tree

5 files changed

+102
-96
lines changed

5 files changed

+102
-96
lines changed

internal/table/client.go

+54-74
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type Client struct {
8080
clock clockwork.Clock
8181

8282
// read-write fields
83-
mu xsync.Mutex
83+
mu xsync.RWMutex
8484
index map[*session]sessionInfo
8585
createInProgress int // KIKIMR-9163: in-create-process counter
8686
limit int // Upper bound for Client size.
@@ -352,45 +352,53 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
352352
return s, nil
353353
}
354354

355-
type getOptions struct {
356-
t *trace.Table
355+
type getSettings struct {
356+
t *trace.Table
357+
maxAttempts int
357358
}
358359

359-
type getOption func(o *getOptions)
360+
type getOption func(o *getSettings)
360361

361362
func withTrace(t *trace.Table) getOption {
362-
return func(o *getOptions) {
363+
return func(o *getSettings) {
363364
o.t = o.t.Compose(t)
364365
}
365366
}
366367

367-
func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *session, err error) {
368+
const maxAttempts = 100
369+
370+
func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *session, finalErr error) { //nolint:funlen
368371
if c.isClosed() {
369372
return nil, xerrors.WithStackTrace(errClosedClient)
370373
}
371374

372-
const maxAttempts = 100
373-
374-
var (
375-
start = time.Now()
376-
i = 0
377-
o = getOptions{t: c.config.Trace()}
378-
)
375+
settings := getSettings{t: c.config.Trace(), maxAttempts: maxAttempts}
379376
for _, opt := range opts {
380377
if opt != nil {
381-
opt(&o)
378+
opt(&settings)
382379
}
383380
}
384381

385-
onDone := trace.TableOnPoolGet(o.t, &ctx,
382+
var (
383+
start = time.Now()
384+
i int
385+
lastErr error
386+
createSessionErr error
387+
waitFromChErr error
388+
)
389+
390+
onDone := trace.TableOnPoolGet(settings.t, &ctx,
386391
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/table.(*Client).internalPoolGet"),
387392
)
388393
defer func() {
389-
onDone(s, i, err)
394+
onDone(s, i, finalErr)
390395
}()
391396

392-
for s == nil && err == nil && i < maxAttempts && !c.isClosed() {
393-
i++
397+
for ; i < settings.maxAttempts; i++ {
398+
if c.isClosed() {
399+
return nil, xerrors.WithStackTrace(errClosedClient)
400+
}
401+
394402
s = tryGetIdleSession(c)
395403
if s != nil {
396404
if !s.isReady() {
@@ -403,18 +411,36 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses
403411
return s, nil
404412
}
405413

406-
s, err = tryCreateNewSession(ctx, c)
407-
if s != nil || !isCreateSessionErrorRetriable(err) {
408-
return s, xerrors.WithStackTrace(err)
414+
s, createSessionErr = tryCreateNewSession(ctx, c)
415+
if s != nil {
416+
return s, nil
409417
}
410418

411-
s, err = c.internalPoolWaitFromCh(ctx, o.t)
412-
if err != nil {
413-
err = xerrors.WithStackTrace(err)
419+
if !isCreateSessionErrorRetriable(createSessionErr) {
420+
return nil, xerrors.WithStackTrace(createSessionErr)
414421
}
422+
423+
s, waitFromChErr = c.internalPoolWaitFromCh(ctx, settings.t)
424+
if s != nil {
425+
return s, nil
426+
}
427+
428+
if waitFromChErr != nil && !isCreateSessionErrorRetriable(waitFromChErr) {
429+
return nil, xerrors.WithStackTrace(waitFromChErr)
430+
}
431+
432+
lastErr = xerrors.WithStackTrace(xerrors.Join(createSessionErr, waitFromChErr))
415433
}
416434

417-
return handleNoProgress(s, err, start, c, i)
435+
c.mu.RLock()
436+
defer c.mu.RUnlock()
437+
438+
return nil, xerrors.WithStackTrace(
439+
fmt.Errorf("failed to get session from pool ("+
440+
"attempts: %d, latency: %v, pool has %d sessions (%d busy, %d idle, %d create_in_progress): %w",
441+
i, time.Since(start), len(c.index), len(c.index)-c.idle.Len(), c.idle.Len(), c.createInProgress, lastErr,
442+
),
443+
)
418444
}
419445

420446
func tryGetIdleSession(c *Client) *session {
@@ -442,38 +468,6 @@ func tryCreateNewSession(ctx context.Context, c *Client) (*session, error) {
442468
return s, err
443469
}
444470

445-
func handleNoProgress(s *session, err error, start time.Time, c *Client, attempts int) (*session, error) {
446-
if s == nil && err == nil {
447-
if c.isClosed() {
448-
err = xerrors.WithStackTrace(errClosedClient)
449-
} else {
450-
err = xerrors.WithStackTrace(errNoProgress)
451-
}
452-
}
453-
454-
if err != nil {
455-
var (
456-
index int
457-
idle int
458-
createInProgress int
459-
)
460-
c.mu.WithLock(func() {
461-
index = len(c.index)
462-
idle = c.idle.Len()
463-
createInProgress = c.createInProgress
464-
})
465-
466-
err = xerrors.WithStackTrace(
467-
fmt.Errorf("failed to get session from pool ("+
468-
"attempts: %d, latency: %v, pool has %d sessions (%d busy, %d idle, %d create_in_progress): %w",
469-
attempts, time.Since(start), index, index-idle, idle, createInProgress, err,
470-
),
471-
)
472-
}
473-
474-
return s, err
475-
}
476-
477471
// Get returns first idle session from the Client and removes it from
478472
// there. If no items stored in Client it creates new one returns it.
479473
func (c *Client) Get(ctx context.Context) (s *session, err error) {
@@ -708,7 +702,9 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
708702
}
709703

710704
defer func() {
711-
err = handleTransactionError(ctx, tx, err)
705+
if err != nil {
706+
_ = tx.Rollback(ctx)
707+
}
712708
}()
713709

714710
if err = executeTxOperation(ctx, c, op, tx); err != nil {
@@ -724,22 +720,6 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
724720
}, config.RetryOptions...)
725721
}
726722

727-
func handleTransactionError(ctx context.Context, tx table.Transaction, err error) error {
728-
if err != nil {
729-
errRollback := tx.Rollback(ctx)
730-
if errRollback != nil {
731-
return xerrors.NewWithIssues("",
732-
xerrors.WithStackTrace(err),
733-
xerrors.WithStackTrace(errRollback),
734-
)
735-
}
736-
737-
return xerrors.WithStackTrace(err)
738-
}
739-
740-
return nil
741-
}
742-
743723
func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) {
744724
if panicCallback := c.config.PanicCallback(); panicCallback != nil {
745725
defer func() {

internal/table/client_test.go

+7-10
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ func TestSessionPoolCloseWhenWaiting(t *testing.T) {
108108
wait = make(chan struct{})
109109
got = make(chan error)
110110
)
111-
p := newClientWithStubBuilder(
112-
t,
111+
p := newClientWithStubBuilder(t,
113112
testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{
114113
testutil.TableCreateSession: func(interface{}) (proto.Message, error) {
115114
return &Ydb_Table.CreateSessionResult{
@@ -119,15 +118,13 @@ func TestSessionPoolCloseWhenWaiting(t *testing.T) {
119118
})),
120119
1,
121120
config.WithSizeLimit(1),
122-
config.WithTrace(
123-
&trace.Table{
124-
OnPoolWait: func(trace.TablePoolWaitStartInfo) func(trace.TablePoolWaitDoneInfo) {
125-
wait <- struct{}{}
121+
config.WithTrace(&trace.Table{
122+
OnPoolWait: func(trace.TablePoolWaitStartInfo) func(trace.TablePoolWaitDoneInfo) {
123+
wait <- struct{}{}
126124

127-
return nil
128-
},
125+
return nil
129126
},
130-
),
127+
}),
131128
)
132129
defer func() {
133130
_ = p.Close(context.Background())
@@ -174,7 +171,7 @@ func TestSessionPoolCloseWhenWaiting(t *testing.T) {
174171
case err := <-got:
175172
if !xerrors.Is(err, errClosedClient) {
176173
t.Fatalf(
177-
"unexpected error: %v; want %v",
174+
"unexpected error: %q; want %q'",
178175
err, errClosedClient,
179176
)
180177
}

internal/table/errors.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ var (
2828
// requested session is closed early.
2929
errSessionClosed = xerrors.Wrap(errors.New("session closed early"))
3030

31-
// errNoProgress returned by a Client instance to indicate that
32-
// operation could not be completed.
33-
errNoProgress = xerrors.Wrap(errors.New("no progress"))
34-
3531
// errParamsRequired returned by a Client instance to indicate that required params is not defined
3632
errParamsRequired = xerrors.Wrap(errors.New("params required"))
3733
)
3834

3935
func isCreateSessionErrorRetriable(err error) bool {
36+
if err == nil {
37+
panic(err)
38+
}
39+
4040
switch {
4141
case
4242
xerrors.Is(err, errSessionPoolOverflow),

internal/table/retry_test.go

+36-6
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/table"
2021
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
2122
)
2223

23-
func TestRetryerBackoffRetryCancelation(t *testing.T) {
24+
func TestDoBackoffRetryCancelation(t *testing.T) {
2425
for _, testErr := range []error{
2526
// Errors leading to Wait repeat.
2627
xerrors.Transport(
@@ -78,7 +79,7 @@ func TestRetryerBackoffRetryCancelation(t *testing.T) {
7879
}
7980
}
8081

81-
func TestRetryerBadSession(t *testing.T) {
82+
func TestDoBadSession(t *testing.T) {
8283
closed := make(map[table.Session]bool)
8384
p := SessionProviderFunc{
8485
OnGet: func(ctx context.Context) (*session, error) {
@@ -133,7 +134,36 @@ func TestRetryerBadSession(t *testing.T) {
133134
}
134135
}
135136

136-
func TestRetryerSessionClosing(t *testing.T) {
137+
func TestDoCreateSessionError(t *testing.T) {
138+
p := SessionProviderFunc{
139+
OnGet: func(ctx context.Context) (*session, error) {
140+
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE))
141+
},
142+
OnPut: func(ctx context.Context, s *session) error {
143+
if s.isClosing() {
144+
return s.Close(ctx)
145+
}
146+
147+
return nil
148+
},
149+
}
150+
ctx, cancel := xcontext.WithTimeout(xtest.Context(t), time.Second)
151+
defer cancel()
152+
err := do(ctx, p, config.New(),
153+
func(ctx context.Context, s table.Session) error {
154+
return nil
155+
},
156+
nil,
157+
)
158+
if !xerrors.Is(err, context.DeadlineExceeded) {
159+
t.Errorf("unexpected error: %v", err)
160+
}
161+
if !xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE) {
162+
t.Errorf("unexpected error: %v", err)
163+
}
164+
}
165+
166+
func TestDoSessionClosing(t *testing.T) {
137167
closed := make(map[table.Session]bool)
138168
p := SessionProviderFunc{
139169
OnGet: func(ctx context.Context) (*session, error) {
@@ -183,7 +213,7 @@ func TestRetryerSessionClosing(t *testing.T) {
183213
}
184214
}
185215

186-
func TestRetryerImmediateReturn(t *testing.T) {
216+
func TestDoImmediateReturn(t *testing.T) {
187217
for _, testErr := range []error{
188218
xerrors.Operation(
189219
xerrors.WithStatusCode(Ydb.StatusIds_GENERIC_ERROR),
@@ -236,7 +266,7 @@ func TestRetryerImmediateReturn(t *testing.T) {
236266

237267
// We are testing all suspentions of custom operation func against to all deadline
238268
// timeouts - all sub-tests must have latency less than timeouts (+tolerance)
239-
func TestRetryContextDeadline(t *testing.T) {
269+
func TestDoContextDeadline(t *testing.T) {
240270
timeouts := []time.Duration{
241271
50 * time.Millisecond,
242272
100 * time.Millisecond,
@@ -372,7 +402,7 @@ func (e *CustomError) Unwrap() error {
372402
return e.Err
373403
}
374404

375-
func TestRetryWithCustomErrors(t *testing.T) {
405+
func TestDoWithCustomErrors(t *testing.T) {
376406
var (
377407
limit = 10
378408
ctx = context.Background()

internal/table/session.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -1297,8 +1297,7 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value
12971297
return nil
12981298
}
12991299

1300-
// BeginTransaction begins new transaction within given session with given
1301-
// settings.
1300+
// BeginTransaction begins new transaction within given session with given settings.
13021301
func (s *session) BeginTransaction(
13031302
ctx context.Context,
13041303
txSettings *table.TransactionSettings,

0 commit comments

Comments
 (0)