Skip to content

Commit 59b7046

Browse files
asmyasnikovCopilotkprokopenko
authored
replaced check IsContextError to check stream context error (#2102)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: asmyasnikov <14202262+asmyasnikov@users.noreply.github.com> Co-authored-by: Konstantin Prokopenko <kprokopenko@users.noreply.github.com>
1 parent e4a185f commit 59b7046

3 files changed

Lines changed: 123 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed gRPC stream operations (`CloseSend`, `SendMsg`, `RecvMsg`) to check the stream context directly instead of inspecting the error type, so errors from a cancelled stream are no longer misclassified as transport errors
2+
13
## v3.135.2
24
* Fixed closing idle sessions from the session pool when the `Close` context is already cancelled
35

internal/conn/grpc_client_stream.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package conn
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67

78
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
@@ -60,8 +61,8 @@ func (s *grpcClientStream) CloseSend() (err error) {
6061

6162
err = s.stream.CloseSend()
6263
if err != nil {
63-
if xerrors.IsContextError(err) {
64-
return xerrors.WithStackTrace(err)
64+
if ctxErr := s.streamCtx.Err(); ctxErr != nil {
65+
return xerrors.WithStackTrace(fmt.Errorf("stream context is done: %w", xerrors.Join(err, ctxErr)))
6566
}
6667

6768
if !s.wrapping {
@@ -95,8 +96,8 @@ func (s *grpcClientStream) SendMsg(m any) (err error) {
9596

9697
err = s.stream.SendMsg(m)
9798
if err != nil {
98-
if xerrors.IsContextError(err) {
99-
return xerrors.WithStackTrace(err)
99+
if ctxErr := s.streamCtx.Err(); ctxErr != nil {
100+
return xerrors.WithStackTrace(fmt.Errorf("stream context is done: %w", xerrors.Join(err, ctxErr)))
100101
}
101102

102103
if !s.wrapping {
@@ -150,8 +151,8 @@ func (s *grpcClientStream) RecvMsg(m any) (err error) {
150151
return io.EOF
151152
}
152153

153-
if xerrors.IsContextError(err) {
154-
return xerrors.WithStackTrace(err)
154+
if ctxErr := s.streamCtx.Err(); ctxErr != nil {
155+
return xerrors.WithStackTrace(fmt.Errorf("stream context is done: %w", xerrors.Join(err, ctxErr)))
155156
}
156157

157158
if !s.wrapping {

internal/conn/grpc_client_stream_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package conn
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"testing"
@@ -173,6 +174,42 @@ func TestGrpcClientStream_CloseSend(t *testing.T) {
173174
require.True(t, xerrors.IsContextError(err))
174175
})
175176

177+
t.Run("StreamContextDoneReturnsNonTransportError", func(t *testing.T) {
178+
ctrl := gomock.NewController(t)
179+
mockStream := mock.NewMockClientStream(ctrl)
180+
181+
// Use a non-gRPC-status error that gRPC may return on stream termination.
182+
// IsContextError returns false for such errors, so the old code fell through
183+
// to transport wrapping even when the stream context was already cancelled.
184+
streamErr := errors.New("stream transport: connection closed")
185+
mockStream.EXPECT().CloseSend().Return(streamErr)
186+
187+
config := &mockConfig{
188+
dialTimeout: 5 * time.Second,
189+
connectionTTL: 0,
190+
}
191+
e := endpoint.New("test-endpoint:2135")
192+
parentConn := newConn(e, config)
193+
194+
cancelledCtx, cancel := context.WithCancel(context.Background())
195+
cancel() // cancel the stream context before calling CloseSend
196+
197+
s := &grpcClientStream{
198+
parentConn: parentConn,
199+
stream: mockStream,
200+
streamCtx: cancelledCtx,
201+
wrapping: true,
202+
traceID: "test-trace-id",
203+
}
204+
205+
err := s.CloseSend()
206+
require.Error(t, err)
207+
// When the stream context is done, the error must NOT be wrapped as a
208+
// transport error regardless of what gRPC returned.
209+
require.False(t, xerrors.IsTransportError(err))
210+
require.ErrorIs(t, err, streamErr)
211+
})
212+
176213
t.Run("TransportErrorWithWrapping", func(t *testing.T) {
177214
ctrl := gomock.NewController(t)
178215
mockStream := mock.NewMockClientStream(ctrl)
@@ -280,6 +317,44 @@ func TestGrpcClientStream_SendMsg(t *testing.T) {
280317
require.True(t, xerrors.IsContextError(err))
281318
})
282319

320+
t.Run("StreamContextDoneReturnsNonTransportError", func(t *testing.T) {
321+
ctrl := gomock.NewController(t)
322+
mockStream := mock.NewMockClientStream(ctrl)
323+
324+
msg := &Ydb_Query.ExecuteQueryRequest{}
325+
// Use a non-gRPC-status error that gRPC may return on stream termination.
326+
// IsContextError returns false for such errors, so the old code fell through
327+
// to transport wrapping even when the stream context was already cancelled.
328+
streamErr := errors.New("stream transport: connection closed")
329+
mockStream.EXPECT().SendMsg(msg).Return(streamErr)
330+
331+
config := &mockConfig{
332+
dialTimeout: 5 * time.Second,
333+
connectionTTL: 0,
334+
}
335+
e := endpoint.New("test-endpoint:2135")
336+
parentConn := newConn(e, config)
337+
338+
cancelledCtx, cancel := context.WithCancel(context.Background())
339+
cancel() // cancel the stream context before calling SendMsg
340+
341+
s := &grpcClientStream{
342+
parentConn: parentConn,
343+
stream: mockStream,
344+
streamCtx: cancelledCtx,
345+
wrapping: true,
346+
traceID: "test-trace-id",
347+
sentMark: &modificationMark{},
348+
}
349+
350+
err := s.SendMsg(msg)
351+
require.Error(t, err)
352+
// When the stream context is done, the error must NOT be wrapped as a
353+
// transport error regardless of what gRPC returned.
354+
require.False(t, xerrors.IsTransportError(err))
355+
require.ErrorIs(t, err, streamErr)
356+
})
357+
283358
t.Run("TransportErrorRetryable", func(t *testing.T) {
284359
ctrl := gomock.NewController(t)
285360
mockStream := mock.NewMockClientStream(ctrl)
@@ -457,6 +532,45 @@ func TestGrpcClientStream_RecvMsg(t *testing.T) {
457532
require.True(t, xerrors.IsContextError(err))
458533
})
459534

535+
t.Run("StreamContextDoneReturnsNonTransportError", func(t *testing.T) {
536+
ctrl := gomock.NewController(t)
537+
mockStream := mock.NewMockClientStream(ctrl)
538+
539+
msg := &Ydb_Query.ExecuteQueryResponsePart{}
540+
// Use a non-gRPC-status error that gRPC may return on stream termination.
541+
// IsContextError returns false for such errors, so the old code fell through
542+
// to transport wrapping even when the stream context was already cancelled.
543+
streamErr := errors.New("stream transport: connection closed")
544+
mockStream.EXPECT().RecvMsg(msg).Return(streamErr)
545+
mockStream.EXPECT().Trailer().Return(metadata.MD{})
546+
547+
config := &mockConfig{
548+
dialTimeout: 5 * time.Second,
549+
connectionTTL: 0,
550+
}
551+
e := endpoint.New("test-endpoint:2135")
552+
parentConn := newConn(e, config)
553+
554+
cancelledCtx, cancel := context.WithCancel(context.Background())
555+
cancel() // cancel the stream context before calling RecvMsg
556+
557+
s := &grpcClientStream{
558+
parentConn: parentConn,
559+
stream: mockStream,
560+
streamCtx: cancelledCtx,
561+
wrapping: true,
562+
traceID: "test-trace-id",
563+
sentMark: &modificationMark{},
564+
}
565+
566+
err := s.RecvMsg(msg)
567+
require.Error(t, err)
568+
// When the stream context is done, the error must NOT be wrapped as a
569+
// transport error regardless of what gRPC returned.
570+
require.False(t, xerrors.IsTransportError(err))
571+
require.ErrorIs(t, err, streamErr)
572+
})
573+
460574
t.Run("TransportErrorRetryable", func(t *testing.T) {
461575
ctrl := gomock.NewController(t)
462576
mockStream := mock.NewMockClientStream(ctrl)

0 commit comments

Comments
 (0)