Skip to content

Commit d9bb901

Browse files
committed
refactoring of attach stream goroutine label
1 parent 7ab31e5 commit d9bb901

File tree

1 file changed

+28
-31
lines changed

1 file changed

+28
-31
lines changed

internal/query/session_core.go

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
)
2727

2828
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
29-
var setGoroutineLabelForAttachStream = os.Getenv("YDB_QUERY_SESSION_ATTACH_STREAM_GOROUTINE_LABEL") == "1"
29+
var markGoroutineWithLabelNodeIDForAttachStream = os.Getenv("YDB_QUERY_SESSION_ATTACH_STREAM_GOROUTINE_LABEL") == "1"
3030

3131
type (
3232
Core interface {
@@ -159,20 +159,7 @@ func Open(
159159
core.id = response.GetSessionId()
160160
core.nodeID = uint32(response.GetNodeId())
161161

162-
if setGoroutineLabelForAttachStream {
163-
attachRes := make(chan error)
164-
pprof.Do(ctx, pprof.Labels(
165-
"node_id", strconv.Itoa(int(core.NodeID())),
166-
), func(ctx context.Context) {
167-
go func() {
168-
attachRes <- core.attach(ctx)
169-
}()
170-
})
171-
err = <-attachRes
172-
} else {
173-
err = core.attach(ctx)
174-
}
175-
if err != nil {
162+
if err = core.attach(ctx); err != nil {
176163
_ = core.deleteSession(ctx)
177164

178165
return nil, xerrors.WithStackTrace(err)
@@ -199,14 +186,14 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
199186
}
200187
}()
201188

202-
attach, err := core.Client.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
189+
attachStream, err := core.Client.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
203190
SessionId: core.id,
204191
})
205192
if err != nil {
206193
return xerrors.WithStackTrace(err)
207194
}
208195

209-
_, err = attach.Recv()
196+
_, err = attachStream.Recv()
210197
if err != nil {
211198
return xerrors.WithStackTrace(err)
212199
}
@@ -224,24 +211,34 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
224211
return nil
225212
})
226213

227-
go func() {
228-
defer func() {
229-
select {
230-
case <-core.done:
231-
return
232-
default:
233-
close(core.done)
234-
}
235-
}()
214+
if markGoroutineWithLabelNodeIDForAttachStream {
215+
pprof.Do(ctx, pprof.Labels(
216+
"node_id", strconv.Itoa(int(core.NodeID())),
217+
), func(context.Context) {
218+
go core.listenAttachStream(attachStream)
219+
})
220+
} else {
221+
go core.listenAttachStream(attachStream)
222+
}
236223

237-
for core.IsAlive() {
238-
if _, recvErr := attach.Recv(); recvErr != nil {
239-
return
240-
}
224+
return nil
225+
}
226+
227+
func (core *sessionCore) listenAttachStream(attachStream Ydb_Query_V1.QueryService_AttachSessionClient) {
228+
defer func() {
229+
select {
230+
case <-core.done:
231+
return
232+
default:
233+
close(core.done)
241234
}
242235
}()
243236

244-
return nil
237+
for core.IsAlive() {
238+
if _, recvErr := attachStream.Recv(); recvErr != nil {
239+
return
240+
}
241+
}
245242
}
246243

247244
func (core *sessionCore) deleteSession(ctx context.Context) (finalErr error) {

0 commit comments

Comments
 (0)