Skip to content

Commit b518441

Browse files
authored
Merge pull request #1105 from ydb-platform/query-fix
some fixes of query service client
2 parents b226ff1 + f73d6e1 commit b518441

File tree

7 files changed

+88
-63
lines changed

7 files changed

+88
-63
lines changed

internal/pool/pool.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ type (
2222
Pool[T any] struct {
2323
clock clockwork.Clock
2424

25-
create func(ctx context.Context, onClose func(item *T)) (*T, error)
26-
close func(ctx context.Context, item *T) error
25+
createItem func(ctx context.Context, onClose func(item *T)) (*T, error)
26+
deleteItem func(ctx context.Context, item *T) error
27+
checkErr func(err error) bool
2728

2829
mu xsync.Mutex
2930
index map[*T]itemInfo
@@ -42,17 +43,19 @@ type (
4243
func New[T any](
4344
limit int,
4445
createItem func(ctx context.Context, onClose func(item *T)) (*T, error),
45-
closeItem func(ctx context.Context, item *T) error,
46+
deleteItem func(ctx context.Context, item *T) error,
47+
checkErr func(err error) bool,
4648
opts ...option[T],
4749
) *Pool[T] {
4850
p := &Pool[T]{
49-
clock: clockwork.NewRealClock(),
50-
create: createItem,
51-
close: closeItem,
52-
index: make(map[*T]itemInfo),
53-
idle: list.New(),
54-
waitQ: list.New(),
55-
limit: limit,
51+
clock: clockwork.NewRealClock(),
52+
createItem: createItem,
53+
deleteItem: deleteItem,
54+
checkErr: checkErr,
55+
index: make(map[*T]itemInfo),
56+
idle: list.New(),
57+
waitQ: list.New(),
58+
limit: limit,
5659
waitChPool: sync.Pool{
5760
New: func() interface{} {
5861
ch := make(chan *T)
@@ -70,30 +73,34 @@ func New[T any](
7073
}
7174

7275
func (p *Pool[T]) try(ctx context.Context, f func(ctx context.Context, item *T) error) error {
73-
t, err := p.get(ctx)
76+
item, err := p.get(ctx)
7477
if err != nil {
7578
return xerrors.WithStackTrace(err)
7679
}
7780

7881
defer func() {
7982
select {
8083
case <-p.done:
81-
_ = p.close(ctx, t)
84+
_ = p.deleteItem(ctx, item)
8285
default:
8386
p.mu.Lock()
8487
defer p.mu.Unlock()
8588

8689
if p.idle.Len() >= p.limit {
87-
_ = p.close(ctx, t)
90+
_ = p.deleteItem(ctx, item)
8891
}
8992

90-
if !p.notify(t) {
91-
p.pushIdle(t, p.clock.Now())
93+
if !p.notify(item) {
94+
p.pushIdle(item, p.clock.Now())
9295
}
9396
}
9497
}()
9598

96-
if err = f(ctx, t); err != nil {
99+
if err = f(ctx, item); err != nil {
100+
if p.checkErr(err) {
101+
_ = p.deleteItem(ctx, item)
102+
}
103+
97104
return xerrors.WithStackTrace(err)
98105
}
99106

@@ -116,7 +123,7 @@ func (p *Pool[T]) With(ctx context.Context, f func(ctx context.Context, item *T)
116123
return nil
117124
}
118125

119-
func (p *Pool[T]) createItem(ctx context.Context) (item *T, err error) {
126+
func (p *Pool[T]) newItem(ctx context.Context) (item *T, err error) {
120127
select {
121128
case <-p.done:
122129
return nil, xerrors.WithStackTrace(errClosedPool)
@@ -140,7 +147,7 @@ func (p *Pool[T]) createItem(ctx context.Context) (item *T, err error) {
140147
})
141148
}()
142149

143-
item, err = p.create(ctx, p.removeItem)
150+
item, err = p.createItem(ctx, p.removeItem)
144151
if err != nil {
145152
return nil, xerrors.WithStackTrace(err)
146153
}
@@ -153,7 +160,7 @@ func (p *Pool[T]) removeItem(item *T) {
153160
p.mu.WithLock(func() {
154161
info, has := p.index[item]
155162
if !has {
156-
panic("item not found in pool")
163+
return
157164
}
158165

159166
delete(p.index, item)
@@ -187,7 +194,7 @@ func (p *Pool[T]) get(ctx context.Context) (item *T, err error) {
187194
}
188195

189196
// Second, we try to create item.
190-
item, _ = p.createItem(ctx)
197+
item, _ = p.newItem(ctx)
191198
if item != nil {
192199
return item, nil
193200
}
@@ -276,7 +283,7 @@ func (p *Pool[T]) Close(ctx context.Context) (err error) {
276283
p.wg.Add(1)
277284
go func() {
278285
defer p.wg.Done()
279-
_ = p.close(ctx, item)
286+
_ = p.deleteItem(ctx, item)
280287
}()
281288
}
282289
}

internal/query/client.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,7 @@ func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
128128
}
129129

130130
type createSessionConfig struct {
131-
onDetach func(id string)
132-
onAttach func(id string)
131+
onAttach func(s *Session)
133132
onClose func(s *Session)
134133
}
135134

@@ -142,22 +141,26 @@ func createSession(
142141
xerrors.Transport(err),
143142
)
144143
}
144+
145145
if s.GetStatus() != Ydb.StatusIds_SUCCESS {
146146
return nil, xerrors.WithStackTrace(
147147
xerrors.FromOperation(s),
148148
)
149149
}
150+
150151
defer func() {
151152
if finalErr != nil {
152153
_ = deleteSession(ctx, client, s.GetSessionId())
153154
}
154155
}()
156+
155157
attachCtx, cancelAttach := xcontext.WithCancel(context.Background())
156158
defer func() {
157159
if finalErr != nil {
158160
cancelAttach()
159161
}
160162
}()
163+
161164
attach, err := client.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
162165
SessionId: s.GetSessionId(),
163166
})
@@ -166,33 +169,42 @@ func createSession(
166169
xerrors.Transport(err),
167170
)
168171
}
172+
169173
defer func() {
170174
if finalErr != nil {
171175
_ = attach.CloseSend()
172176
}
173177
}()
178+
174179
state, err := attach.Recv()
175180
if err != nil {
176181
return nil, xerrors.WithStackTrace(xerrors.Transport(err))
177182
}
183+
178184
if state.GetStatus() != Ydb.StatusIds_SUCCESS {
179185
return nil, xerrors.WithStackTrace(xerrors.FromOperation(state))
180186
}
181-
if cfg.onAttach != nil {
182-
cfg.onAttach(s.GetSessionId())
183-
}
187+
184188
session := &Session{
185189
id: s.GetSessionId(),
186190
nodeID: s.GetNodeId(),
187191
queryClient: client,
188192
status: query.SessionStatusReady,
189193
}
194+
195+
if cfg.onAttach != nil {
196+
cfg.onAttach(session)
197+
}
198+
190199
session.close = sync.OnceFunc(func() {
191-
if cfg.onDetach != nil {
192-
cfg.onDetach(session.id)
200+
if cfg.onClose != nil {
201+
cfg.onClose(session)
193202
}
203+
194204
_ = attach.CloseSend()
205+
195206
cancelAttach()
207+
196208
atomic.StoreUint32(
197209
(*uint32)(&session.status),
198210
uint32(query.SessionStatusClosed),
@@ -204,15 +216,11 @@ func createSession(
204216
for {
205217
switch session.Status() {
206218
case query.SessionStatusReady, query.SessionStatusInUse:
207-
state, err := attach.Recv()
208-
if err != nil || state.GetStatus() != Ydb.StatusIds_SUCCESS {
219+
sessionState, recvErr := attach.Recv()
220+
if recvErr != nil || sessionState.GetStatus() != Ydb.StatusIds_SUCCESS {
209221
return
210222
}
211223
default:
212-
if cfg.onClose != nil {
213-
cfg.onClose(session)
214-
}
215-
216224
return
217225
}
218226
}
@@ -262,6 +270,7 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client
262270

263271
return nil
264272
},
273+
xerrors.MustDeleteSession,
265274
)
266275

267276
return client, ctx.Err()

internal/query/client_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ func TestCreateSession(t *testing.T) {
3939
t.Log("execute")
4040
attached := 0
4141
s, err := createSession(ctx, service, createSessionConfig{
42-
onAttach: func(id string) {
42+
onAttach: func(s *Session) {
4343
attached++
4444
},
45-
onDetach: func(id string) {
45+
onClose: func(s *Session) {
4646
attached--
4747
},
4848
})
@@ -169,6 +169,9 @@ func newTestPool(createSession func(ctx context.Context) (*Session, error)) *poo
169169
func(ctx context.Context, s *Session) error {
170170
return s.Close(ctx)
171171
},
172+
func(err error) bool {
173+
return true
174+
},
172175
)
173176
}
174177

internal/query/execute_query.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/ydb-platform/ydb-go-sdk/v3/query"
1515
)
1616

17-
type executeSettings interface {
17+
type executeConfig interface {
1818
ExecMode() query.ExecMode
1919
StatsMode() query.StatsMode
2020
TxControl() *query.TransactionControl
@@ -23,21 +23,21 @@ type executeSettings interface {
2323
CallOptions() []grpc.CallOption
2424
}
2525

26-
func executeQueryRequest(a *allocator.Allocator, sessionID, q string, settings executeSettings) (
26+
func executeQueryRequest(a *allocator.Allocator, sessionID, q string, cfg executeConfig) (
2727
*Ydb_Query.ExecuteQueryRequest,
2828
[]grpc.CallOption,
2929
) {
3030
request := a.QueryExecuteQueryRequest()
3131

3232
request.SessionId = sessionID
33-
request.ExecMode = Ydb_Query.ExecMode(settings.ExecMode())
34-
request.TxControl = settings.TxControl().ToYDB(a)
35-
request.Query = queryFromText(a, q, Ydb_Query.Syntax(settings.Syntax()))
36-
request.Parameters = settings.Params().ToYDB(a)
37-
request.StatsMode = Ydb_Query.StatsMode(settings.StatsMode())
33+
request.ExecMode = Ydb_Query.ExecMode(cfg.ExecMode())
34+
request.TxControl = cfg.TxControl().ToYDB(a)
35+
request.Query = queryFromText(a, q, Ydb_Query.Syntax(cfg.Syntax()))
36+
request.Parameters = cfg.Params().ToYDB(a)
37+
request.StatsMode = Ydb_Query.StatsMode(cfg.StatsMode())
3838
request.ConcurrentResultSets = false
3939

40-
return request, settings.CallOptions()
40+
return request, cfg.CallOptions()
4141
}
4242

4343
func queryFromText(
@@ -51,32 +51,33 @@ func queryFromText(
5151
return content
5252
}
5353

54-
func execute(
55-
ctx context.Context, session *Session, client Ydb_Query_V1.QueryServiceClient, q string, settings executeSettings,
56-
) (*transaction, *result, error) {
54+
func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient, q string, cfg executeConfig) (
55+
_ *transaction, _ *result, finalErr error,
56+
) {
5757
a := allocator.New()
58-
request, callOptions := executeQueryRequest(a, session.id, q, settings)
58+
defer a.Free()
59+
60+
request, callOptions := executeQueryRequest(a, s.id, q, cfg)
61+
62+
streamCtx, streamCancel := xcontext.WithCancel(context.Background())
5963
defer func() {
60-
a.Free()
64+
if finalErr != nil {
65+
streamCancel()
66+
}
6167
}()
6268

63-
ctx, cancel := xcontext.WithCancel(ctx)
64-
65-
stream, err := client.ExecuteQuery(ctx, request, callOptions...)
69+
stream, err := c.ExecuteQuery(streamCtx, request, callOptions...)
6670
if err != nil {
67-
cancel()
68-
6971
return nil, nil, xerrors.WithStackTrace(err)
7072
}
71-
r, txID, err := newResult(ctx, stream, cancel)
72-
if err != nil {
73-
cancel()
7473

74+
r, txID, err := newResult(ctx, stream, streamCancel)
75+
if err != nil {
7576
return nil, nil, xerrors.WithStackTrace(err)
7677
}
7778

7879
return &transaction{
7980
id: txID,
80-
s: session,
81+
s: s,
8182
}, r, nil
8283
}

internal/query/result.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type result struct {
3030
func newResult(
3131
ctx context.Context,
3232
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
33-
stop func(),
33+
streamCancel func(),
3434
) (_ *result, txID string, _ error) {
3535
interrupted := make(chan struct{})
3636
r := result{
@@ -40,7 +40,7 @@ func newResult(
4040
closed: make(chan struct{}),
4141
interrupt: sync.OnceFunc(func() {
4242
close(interrupted)
43-
stop()
43+
streamCancel()
4444
}),
4545
}
4646
select {

internal/query/session.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,10 @@ func (s *Session) Status() query.SessionStatus {
7777
func (s *Session) Execute(
7878
ctx context.Context, q string, opts ...query.ExecuteOption,
7979
) (query.Transaction, query.Result, error) {
80-
return execute(ctx, s, s.queryClient, q, query.ExecuteSettings(opts...))
80+
tx, r, err := execute(ctx, s, s.queryClient, q, query.ExecuteSettings(opts...))
81+
if err != nil {
82+
return nil, nil, xerrors.WithStackTrace(err)
83+
}
84+
85+
return tx, r, nil
8186
}

internal/query/transaction_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,14 @@ func (s testExecuteSettings) CallOptions() []grpc.CallOption {
135135
return s.callOptions
136136
}
137137

138-
var _ executeSettings = testExecuteSettings{}
138+
var _ executeConfig = testExecuteSettings{}
139139

140140
func TestTxExecuteSettings(t *testing.T) {
141141
for _, tt := range []struct {
142142
name string
143143
txID string
144144
txOpts []query.TxExecuteOption
145-
settings executeSettings
145+
settings executeConfig
146146
}{
147147
{
148148
name: "WithTxID",

0 commit comments

Comments
 (0)