Skip to content

Commit 5034fda

Browse files
committed
example of transactional writer
1 parent 0dd1682 commit 5034fda

File tree

3 files changed

+34
-0
lines changed

3 files changed

+34
-0
lines changed

examples/topic/topicwriter/topic_writer_transaction.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,26 @@ func CopyMessagesBetweenTopics(ctx context.Context, db *ydb.Driver, reader *topi
5353
return nil
5454
}, query.WithIdempotent())
5555
}
56+
57+
func CopyMessagesBetweenTopicsTxWriter(ctx context.Context, db *ydb.Driver, reader *topicreader.Reader, topic string) error {
58+
return db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
59+
writer, err := db.Topic().StartWransactionalWriter(ctx, tx, topic)
60+
if err != nil {
61+
return err
62+
}
63+
64+
batch, err := reader.PopMessagesBatchTx(ctx, tx)
65+
if err != nil {
66+
return err
67+
}
68+
69+
for _, mess := range batch.Messages {
70+
71+
if err = writer.Write(ctx, topicwriter.Message{Data: mess}); err != nil {
72+
return err
73+
}
74+
}
75+
76+
return nil
77+
}, query.WithIdempotent())
78+
}

topic/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package topic
33
import (
44
"context"
55

6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
67
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topiclistener"
78
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
89
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
@@ -47,4 +48,7 @@ type Client interface {
4748
// StartWriter start write session to topic
4849
// it is fast non block call, connection starts in background
4950
StartWriter(topicPath string, opts ...topicoptions.WriterOption) (*topicwriter.Writer, error)
51+
52+
// StartTransactionalWriter
53+
StartWransactionalWriter(ctx context.Context, tx tx.Identifier, topicpath string, opts ...topicoptions.WriterOption) (*topicwriter.TxWriter, error)
5054
}

topic/topicwriter/topicwriter.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,10 @@ func (w *Writer) Close(ctx context.Context) error {
9292
func (w *Writer) Flush(ctx context.Context) error {
9393
return w.inner.Flush(ctx)
9494
}
95+
96+
type TxWriter struct{}
97+
98+
func (w *TxWriter) Write(ctx context.Context, messages ...Message) error {
99+
//TODO implement me
100+
panic("implement me")
101+
}

0 commit comments

Comments
 (0)