@@ -11,41 +11,26 @@ import (
11
11
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
12
12
)
13
13
14
- func TableAndTopicWithinTransaction (ctx context.Context , db * ydb.Driver , writer * topicwriter.Writer , id int64 ) error {
15
- return db .Query ().DoTx (ctx , func (ctx context.Context , t query.TxActor ) error {
16
- row , err := t .QueryRow (ctx , "SELECT val FROM table WHERE id=$id" , query .WithParameters (
17
- ydb .ParamsBuilder ().
18
- Param ("$id" ).Int64 (id ).
19
- Build ()))
20
- if err != nil {
21
- return err
22
- }
23
-
24
- var val int64
25
- if err = row .Scan (& val ); err != nil {
26
- return err
27
- }
28
-
29
- err = writer .WriteWithTx (ctx , t , topicwriter.Message {
30
- Data : strings .NewReader (fmt .Sprintf ("val: %v processed" , val )),
31
- })
14
+ func CopyMessagesBetweenTopicsTxWriter (
15
+ ctx context.Context ,
16
+ db * ydb.Driver ,
17
+ reader * topicreader.Reader ,
18
+ topic string ,
19
+ ) error {
20
+ return db .Query ().DoTx (ctx , func (ctx context.Context , tx query.TxActor ) error {
21
+ writer , err := db .Topic ().StartTransactionalWriter (tx , topic )
32
22
if err != nil {
33
23
return err
34
24
}
35
- return nil
36
- })
37
- }
38
25
39
- func CopyMessagesBetweenTopics (ctx context.Context , db * ydb.Driver , reader * topicreader.Reader , writer * topicwriter.Writer ) error {
40
- return db .Query ().DoTx (ctx , func (ctx context.Context , tx query.TxActor ) error {
41
26
batch , err := reader .PopMessagesBatchTx (ctx , tx )
42
27
if err != nil {
43
28
return err
44
29
}
45
30
46
31
for _ , mess := range batch .Messages {
47
32
48
- if err = writer .WriteWithTx (ctx , tx , topicwriter.Message {Data : mess }); err != nil {
33
+ if err = writer .Write (ctx , topicwriter.Message {Data : mess }); err != nil {
49
34
return err
50
35
}
51
36
}
@@ -54,25 +39,41 @@ func CopyMessagesBetweenTopics(ctx context.Context, db *ydb.Driver, reader *topi
54
39
}, query .WithIdempotent ())
55
40
}
56
41
57
- func CopyMessagesBetweenTopicsTxWriter (ctx context.Context , db * ydb.Driver , reader * topicreader.Reader , topic string ) error {
58
- return db .Query ().DoTx (ctx , func (ctx context.Context , tx query.TxActor ) error {
59
- writer , err := db .Topic ().StartTransactionalWriter (ctx , tx , topic )
42
+ func TableAndTopicWithinTransaction (
43
+ ctx context.Context ,
44
+ db * ydb.Driver ,
45
+ topicPath string ,
46
+ id int64 ,
47
+ ) error {
48
+ return db .Query ().DoTx (ctx , func (ctx context.Context , t query.TxActor ) error {
49
+ row , err := t .QueryRow (ctx , "SELECT val FROM table WHERE id=$id" , query .WithParameters (
50
+ ydb .ParamsBuilder ().
51
+ Param ("$id" ).Int64 (id ).
52
+ Build ()))
60
53
if err != nil {
61
54
return err
62
55
}
63
56
64
- batch , err := reader .PopMessagesBatchTx (ctx , tx )
57
+ var val int64
58
+ if err = row .Scan (& val ); err != nil {
59
+ return err
60
+ }
61
+
62
+ // the writer is dedicated for the transaction, it can't be used outside the transaction
63
+ // it is no needs to close or flush the messages - it happened internally on transaction commit
64
+ writer , err := db .Topic ().StartTransactionalWriter (t , topicPath )
65
65
if err != nil {
66
66
return err
67
67
}
68
68
69
- for _ , mess := range batch .Messages {
69
+ err = writer .Write (ctx , topicwriter.Message {
70
+ Data : strings .NewReader (fmt .Sprintf ("val: %v processed" , val )),
71
+ })
70
72
71
- if err = writer .Write (ctx , topicwriter.Message {Data : mess }); err != nil {
72
- return err
73
- }
73
+ if err != nil {
74
+ return err
74
75
}
75
76
76
77
return nil
77
- }, query . WithIdempotent () )
78
+ })
78
79
}
0 commit comments