Skip to content

Commit 7812ea6

Browse files
committed
logs and traces for write messages in transaction
1 parent 6a9f4b2 commit 7812ea6

File tree

20 files changed

+491
-187
lines changed

20 files changed

+491
-187
lines changed

examples/topic/topicwriter/topic_writer_transaction.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func CopyMessagesBetweenTopics(ctx context.Context, db *ydb.Driver, reader *topi
5656

5757
func CopyMessagesBetweenTopicsTxWriter(ctx context.Context, db *ydb.Driver, reader *topicreader.Reader, topic string) error {
5858
return db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
59-
writer, err := db.Topic().StartWransactionalWriter(ctx, tx, topic)
59+
writer, err := db.Topic().StartTransactionalWriter(ctx, tx, topic)
6060
if err != nil {
6161
return err
6262
}

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

-4
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,6 @@ func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_Wri
166166
},
167167
}
168168

169-
if r.Tx.ID != "" || r.Tx.Session != "" {
170-
res.WriteRequest.Tx = &Ydb_Topic.TransactionIdentity{}
171-
}
172-
173169
return res, nil
174170
}
175171

internal/topic/topicclientinternal/client.go

+35-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package topicclientinternal
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
78
"google.golang.org/grpc"
@@ -14,6 +15,8 @@ import (
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreaderinternal"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1720
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1821
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topiclistener"
1922
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
@@ -23,6 +26,10 @@ import (
2326
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2427
)
2528

29+
var (
30+
errUnsupportedTransactionType = xerrors.Wrap(errors.New("ydb: unsuppotred transaction type. Use transaction from Driver().Query().DoTx(...)"))
31+
)
32+
2633
type Client struct {
2734
cfg topic.Config
2835
cred credentials.Credentials
@@ -269,6 +276,32 @@ func (c *Client) StartReader(
269276

270277
// StartWriter create new topic writer wrapper
271278
func (c *Client) StartWriter(topicPath string, opts ...topicoptions.WriterOption) (*topicwriter.Writer, error) {
279+
cfg := c.createWriterConfig(topicPath, opts)
280+
writer, err := topicwriterinternal.NewWriterReconnector(cfg)
281+
if err != nil {
282+
return nil, err
283+
}
284+
285+
return topicwriter.NewWriter(writer), nil
286+
}
287+
288+
func (c *Client) StartTransactionalWriter(transaction tx.Identifier, topicpath string, opts ...topicoptions.WriterOption) (*topicwriter.TxWriter, error) {
289+
internalTx, ok := transaction.(tx.Transaction)
290+
if !ok {
291+
return nil, xerrors.WithStackTrace(errUnsupportedTransactionType)
292+
}
293+
294+
cfg := c.createWriterConfig(topicpath, opts)
295+
writer, err := topicwriterinternal.NewWriterReconnector(cfg)
296+
if err != nil {
297+
return nil, err
298+
}
299+
300+
txWriter := topicwriterinternal.NewTopicWriterTransaction(writer, internalTx, cfg.Tracer)
301+
return topicwriter.NewTxWriterInternal(txWriter), nil
302+
}
303+
304+
func (c *Client) createWriterConfig(topicPath string, opts []topicoptions.WriterOption) topicwriterinternal.WriterReconnectorConfig {
272305
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context) (
273306
topicwriterinternal.RawTopicWriterStream,
274307
error,
@@ -281,14 +314,9 @@ func (c *Client) StartWriter(topicPath string, opts ...topicoptions.WriterOption
281314
topicwriterinternal.WithTopic(topicPath),
282315
topicwriterinternal.WithCommonConfig(c.cfg.Common),
283316
topicwriterinternal.WithTrace(c.cfg.Trace),
317+
topicwriterinternal.WithCredentials(c.cred),
284318
}
285319

286320
options = append(options, opts...)
287-
288-
writer, err := topicwriterinternal.NewWriter(c.cred, options)
289-
if err != nil {
290-
return nil, err
291-
}
292-
293-
return topicwriter.NewWriter(writer), nil
321+
return topicwriterinternal.NewWriterReconnectorConfig(options...)
294322
}

internal/topic/topicreaderinternal/stream_reader_impl.go

-2
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,6 @@ func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler(
297297
defer onDone()
298298

299299
ctx = traceCtx
300-
//nolint:godox
301-
// TODO: trace
302300
if transactionResult == nil {
303301
topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd)
304302
} else {

internal/topic/topicwriterinternal/message.go

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1314
)
1415

@@ -20,6 +21,8 @@ type PublicMessage struct {
2021
Data io.Reader
2122
Metadata map[string][]byte
2223

24+
tx tx.Transaction
25+
2326
// partitioning at level message available by protocol, but doesn't available by current server implementation
2427
// the field hidden from public access for prevent runtime errors.
2528
// it will be published after implementation on server side.

internal/topic/topicwriterinternal/writer.go

-67
This file was deleted.

internal/topic/topicwriterinternal/writer_config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type WritersCommonConfig struct {
1818
defaultPartitioning rawtopicwriter.Partitioning
1919
compressorCount int
2020

21-
tracer *trace.Topic
21+
Tracer *trace.Topic
2222
cred credentials.Credentials
2323
credUpdateInterval time.Duration
2424
clock clockwork.Clock

internal/topic/topicwriterinternal/writer_options.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func WithWaitAckOnWrite(val bool) PublicWriterOption {
145145

146146
func WithTrace(tracer *trace.Topic) PublicWriterOption {
147147
return func(cfg *WriterReconnectorConfig) {
148-
cfg.tracer = cfg.tracer.Compose(tracer)
148+
cfg.Tracer = cfg.Tracer.Compose(tracer)
149149
}
150150
}
151151

internal/topic/topicwriterinternal/writer_reconnector.go

+33-10
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ var (
3838
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
3939
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit"))
4040
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
41+
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+"))
4142

4243
// errProducerIDNotEqualMessageGroupID is temporary
4344
// WithMessageGroupID is optional parameter because it allowed to be skipped by protocol.
@@ -73,14 +74,14 @@ func (cfg *WriterReconnectorConfig) validate() error {
7374
return nil
7475
}
7576

76-
func newWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnectorConfig {
77+
func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnectorConfig {
7778
cfg := WriterReconnectorConfig{
7879
WritersCommonConfig: WritersCommonConfig{
7980
cred: credentials.NewAnonymousCredentials(),
8081
credUpdateInterval: time.Hour,
8182
clock: clockwork.NewRealClock(),
8283
compressorCount: runtime.NumCPU(),
83-
tracer: &trace.Topic{},
84+
Tracer: &trace.Topic{},
8485
},
8586
AutoSetSeqNo: true,
8687
AutoSetCreatedTime: true,
@@ -119,25 +120,29 @@ type WriterReconnector struct {
119120
background background.Worker
120121
retrySettings topic.RetrySettings
121122
writerInstanceID string
122-
sessionID string
123123
semaphore *semaphore.Weighted
124124
firstInitResponseProcessedChan empty.Chan
125125
lastSeqNo int64
126126
encodersMap *EncoderMap
127127
initDoneCh empty.Chan
128128
initInfo InitialInfo
129129
m xsync.RWMutex
130+
sessionID string
130131
firstConnectionHandled atomic.Bool
131132
initDone bool
132133
}
133134

134-
func newWriterReconnector(
135-
cfg WriterReconnectorConfig, //nolint:gocritic
136-
) *WriterReconnector {
135+
func NewWriterReconnector(
136+
cfg WriterReconnectorConfig,
137+
) (*WriterReconnector, error) {
138+
if err := cfg.validate(); err != nil {
139+
return nil, err
140+
}
141+
137142
res := newWriterReconnectorStopped(cfg)
138143
res.start()
139144

140-
return res
145+
return res, nil
141146
}
142147

143148
func newWriterReconnectorStopped(
@@ -313,7 +318,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage)
313318
sessionID = w.sessionID
314319
})
315320
onCompressDone := trace.TopicOnWriterCompressMessages(
316-
w.cfg.tracer,
321+
w.cfg.Tracer,
317322
w.writerInstanceID,
318323
sessionID,
319324
w.cfg.forceCodec.ToInt32(),
@@ -354,7 +359,7 @@ func (w *WriterReconnector) Close(ctx context.Context) error {
354359
}
355360

356361
func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) {
357-
onDone := trace.TopicOnWriterClose(w.cfg.tracer, w.writerInstanceID, reason)
362+
onDone := trace.TopicOnWriterClose(w.cfg.Tracer, w.writerInstanceID, reason)
358363
defer func() {
359364
onDone(resErr)
360365
}()
@@ -461,7 +466,7 @@ func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context, att
461466
err error,
462467
) {
463468
traceOnDone := trace.TopicOnWriterReconnect(
464-
w.cfg.tracer,
469+
w.cfg.Tracer,
465470
w.writerInstanceID,
466471
w.cfg.topic,
467472
w.cfg.producerID,
@@ -625,6 +630,13 @@ func (w *WriterReconnector) createWriterStreamConfig(stream RawTopicWriterStream
625630
return cfg
626631
}
627632

633+
func (w *WriterReconnector) GetSessionID() (sessionID string) {
634+
w.m.WithLock(func() {
635+
sessionID = w.sessionID
636+
})
637+
return sessionID
638+
}
639+
628640
func sendMessagesToStream(
629641
stream RawTopicWriterStream,
630642
targetCodec rawtopiccommon.Codec,
@@ -684,6 +696,17 @@ func createWriteRequest(messages []messageWithDataContent, targetCodec rawtopicc
684696
res rawtopicwriter.WriteRequest,
685697
err error,
686698
) {
699+
for i := 1; i < len(messages); i++ {
700+
if messages[i-1].tx != messages[i].tx {
701+
return res, xerrors.WithStackTrace(errDiffetentTransactions)
702+
}
703+
}
704+
705+
if len(messages) > 0 {
706+
res.Tx.ID = messages[0].tx.ID()
707+
res.Tx.Session = messages[0].tx.SessionID()
708+
}
709+
687710
res.Codec = targetCodec
688711
res.Messages = make([]rawtopicwriter.MessageData, len(messages))
689712
for i := range messages {

internal/topic/topicwriterinternal/writer_reconnector_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ var testCommonEncoders = NewEncoderMap()
3131
func TestWriterImpl_AutoSeq(t *testing.T) {
3232
t.Run("OK", func(t *testing.T) {
3333
ctx := xtest.Context(t)
34-
w := newWriterReconnectorStopped(newWriterReconnectorConfig(
34+
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(
3535
WithAutoSetSeqNo(true),
3636
WithAutosetCreatedTime(false),
3737
))
@@ -63,7 +63,7 @@ func TestWriterImpl_AutoSeq(t *testing.T) {
6363
t.Run("PredefinedSeqNo", func(t *testing.T) {
6464
ctx := xtest.Context(t)
6565

66-
w := newWriterReconnectorStopped(newWriterReconnectorConfig(WithAutoSetSeqNo(true)))
66+
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(WithAutoSetSeqNo(true)))
6767
w.firstConnectionHandled.Store(true)
6868
require.Error(t, w.Write(ctx, newTestMessages(1)))
6969
})
@@ -72,7 +72,7 @@ func TestWriterImpl_AutoSeq(t *testing.T) {
7272
func TestWriterImpl_CheckMessages(t *testing.T) {
7373
t.Run("MessageSize", func(t *testing.T) {
7474
ctx := xtest.Context(t)
75-
w := newWriterReconnectorStopped(newWriterReconnectorConfig())
75+
w := newWriterReconnectorStopped(NewWriterReconnectorConfig())
7676
w.firstConnectionHandled.Store(true)
7777

7878
maxSize := 5
@@ -282,7 +282,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
282282
func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
283283
xtest.TestManyTimes(t, func(t testing.TB) {
284284
ctx := xtest.Context(t)
285-
w := newWriterReconnectorStopped(newWriterReconnectorConfig(
285+
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(
286286
WithAutoSetSeqNo(false),
287287
WithMaxQueueLen(2),
288288
))
@@ -994,7 +994,7 @@ func newTestMessagesWithContent(numbers ...int) []messageWithDataContent {
994994

995995
func newTestWriterStopped(opts ...PublicWriterOption) *WriterReconnector {
996996
cfgOptions := append(defaultTestWriterOptions(), opts...)
997-
cfg := newWriterReconnectorConfig(cfgOptions...)
997+
cfg := NewWriterReconnectorConfig(cfgOptions...)
998998
res := newWriterReconnectorStopped(cfg)
999999

10001000
if cfg.AdditionalEncoders == nil {
@@ -1074,7 +1074,7 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv {
10741074
}))
10751075
writerOptions = append(writerOptions, options.writerOptions...)
10761076

1077-
res.writer = newWriterReconnectorStopped(newWriterReconnectorConfig(writerOptions...))
1077+
res.writer = newWriterReconnectorStopped(NewWriterReconnectorConfig(writerOptions...))
10781078

10791079
res.stream.EXPECT().Recv().DoAndReturn(res.receiveMessageHandler).AnyTimes()
10801080

0 commit comments

Comments
 (0)