Skip to content

Commit 0dd1682

Browse files
committed
initial write in tx support at transaction level and in raw messages
1 parent 2f1e034 commit 0dd1682

File tree

14 files changed

+147
-21
lines changed

14 files changed

+147
-21
lines changed

examples/topic/table_topic_transactions/table_topic_transactions.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
78
"github.com/ydb-platform/ydb-go-sdk/v3"
89
"github.com/ydb-platform/ydb-go-sdk/v3/query"
910
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
@@ -19,7 +20,7 @@ func pumpFromTopicToTable(ctx context.Context, db *ydb.Driver, topic, consumer s
1920
}
2021

2122
return db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
22-
batch, err := reader.PopBatchTx(ctx, tx)
23+
batch, err := reader.PopMessagesBatchTx(ctx, tx)
2324
if err != nil {
2425
return err
2526
}

examples/topic/topicwriter/topic_writer_transaction.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ func CopyMessagesBetweenTopics(ctx context.Context, db *ydb.Driver, reader *topi
4343
return err
4444
}
4545

46-
sendMessages := make([]topicwriter.Message, len(batch.Messages))
47-
for i, mess := range batch.Messages {
48-
sendMessages[i] = topicwriter.Message{Data: mess}
46+
for _, mess := range batch.Messages {
47+
48+
if err = writer.WriteWithTx(ctx, tx, topicwriter.Message{Data: mess}); err != nil {
49+
return err
50+
}
4951
}
5052

51-
return writer.WriteWithTx(ctx, tx, sendMessages...)
53+
return nil
5254
}, query.WithIdempotent())
5355
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package rawtopiccommon
2+
3+
import "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
4+
5+
type TransactionIdentity struct { //nolint:revive,stylecheck
6+
ID string
7+
Session string
8+
}
9+
10+
func (t TransactionIdentity) ToProto() *Ydb_Topic.TransactionIdentity {
11+
if t.ID == "" && t.Session == "" {
12+
return nil
13+
}
14+
15+
return &Ydb_Topic.TransactionIdentity{
16+
Id: t.ID,
17+
Session: t.Session,
18+
}
19+
}

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ type WriteRequest struct {
145145

146146
Messages []MessageData
147147
Codec rawtopiccommon.Codec
148+
Tx rawtopiccommon.TransactionIdentity
148149
}
149150

150151
func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest, err error) {
@@ -161,9 +162,14 @@ func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_Wri
161162
WriteRequest: &Ydb_Topic.StreamWriteMessage_WriteRequest{
162163
Messages: messages,
163164
Codec: int32(r.Codec.ToProto()),
165+
Tx: r.Tx.ToProto(),
164166
},
165167
}
166168

169+
if r.Tx.ID != "" || r.Tx.Session != "" {
170+
res.WriteRequest.Tx = &Ydb_Topic.TransactionIdentity{}
171+
}
172+
167173
return res, nil
168174
}
169175

internal/grpcwrapper/rawtopic/update_offset_in_transaction.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,16 @@ import (
99

1010
type UpdateOffsetsInTransactionRequest struct {
1111
OperationParams rawydb.OperationParams
12-
Tx UpdateOffsetsInTransactionRequest_TransactionIdentity
12+
Tx rawtopiccommon.TransactionIdentity
1313
Topics []UpdateOffsetsInTransactionRequest_TopicOffsets
1414
Consumer string
1515
}
1616

1717
func (r *UpdateOffsetsInTransactionRequest) ToProto() *Ydb_Topic.UpdateOffsetsInTransactionRequest {
1818
req := &Ydb_Topic.UpdateOffsetsInTransactionRequest{
1919
OperationParams: r.OperationParams.ToProto(),
20-
Tx: &Ydb_Topic.TransactionIdentity{
21-
Id: r.Tx.ID,
22-
Session: r.Tx.Session,
23-
},
24-
Consumer: r.Consumer,
20+
Tx: r.Tx.ToProto(),
21+
Consumer: r.Consumer,
2522
}
2623

2724
req.Topics = make([]*Ydb_Topic.UpdateOffsetsInTransactionRequest_TopicOffsets, len(r.Topics))
@@ -56,11 +53,6 @@ func (r *UpdateOffsetsInTransactionRequest) ToProto() *Ydb_Topic.UpdateOffsetsIn
5653
return req
5754
}
5855

59-
type UpdateOffsetsInTransactionRequest_TransactionIdentity struct { //nolint:revive,stylecheck
60-
ID string
61-
Session string
62-
}
63-
6456
type UpdateOffsetsInTransactionRequest_TopicOffsets struct { //nolint:revive,stylecheck
6557
Path string // Topic path
6658
Partitions []UpdateOffsetsInTransactionRequest_PartitionOffsets

internal/grpcwrapper/rawtopic/update_offset_in_transaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestUpdateOffsetsInTransactionRequestToProto(t *testing.T) {
2727
HasValue: true,
2828
},
2929
},
30-
Tx: UpdateOffsetsInTransactionRequest_TransactionIdentity{
30+
Tx: rawtopiccommon.TransactionIdentity{
3131
ID: "tx-id",
3232
Session: "session-id",
3333
},

internal/query/execute_query.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type executeSettings interface {
3030
Params() *params.Parameters
3131
CallOptions() []grpc.CallOption
3232
RetryOpts() []retry.Option
33+
AllowLazyTx() bool
3334
}
3435

3536
type executeScriptConfig interface {

internal/query/options/execute.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type (
3535
callOptions []grpc.CallOption
3636
txControl *tx.Control
3737
retryOptions []retry.Option
38+
allowLazyTx bool
3839
}
3940

4041
// Execute is an interface for execute method options
@@ -57,8 +58,15 @@ type (
5758
callback func(stats.QueryStats)
5859
}
5960
execModeOption = ExecMode
61+
lazyTxOption struct {
62+
allow bool
63+
}
6064
)
6165

66+
func (s *executeSettings) AllowLazyTx() bool {
67+
return s.allowLazyTx
68+
}
69+
6270
func (s *executeSettings) RetryOpts() []retry.Option {
6371
return s.retryOptions
6472
}
@@ -81,6 +89,10 @@ func (syntax Syntax) applyExecuteOption(s *executeSettings) {
8189
s.syntax = syntax
8290
}
8391

92+
func (lazyTx lazyTxOption) applyExecuteOption(s *executeSettings) {
93+
s.allowLazyTx = lazyTx.allow
94+
}
95+
8496
const (
8597
SyntaxYQL = Syntax(Ydb_Query.Syntax_SYNTAX_YQL_V1)
8698
SyntaxPostgreSQL = Syntax(Ydb_Query.Syntax_SYNTAX_PG)
@@ -176,6 +188,7 @@ var (
176188
_ Execute = StatsMode(0)
177189
_ Execute = txCommitOption{}
178190
_ Execute = (*txControlOption)(nil)
191+
_ Execute = lazyTxOption{}
179192
)
180193

181194
func WithCommit() txCommitOption {
@@ -190,6 +203,10 @@ func WithSyntax(syntax Syntax) syntaxOption {
190203
return syntax
191204
}
192205

206+
func WithLazyTx(allowLazy bool) lazyTxOption {
207+
return lazyTxOption{allow: allowLazy}
208+
}
209+
193210
func (opt statsModeOption) applyExecuteOption(s *executeSettings) {
194211
s.statsMode = opt.mode
195212
s.statsCallback = opt.callback

internal/query/transaction.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type (
3535

3636
completed bool
3737

38-
onCompleted xsync.Set[*baseTx.OnTransactionCompletedFunc]
38+
onBeforeCommit xsync.Set[*baseTx.OnTransactionBeforeCommit]
39+
onCompleted xsync.Set[*baseTx.OnTransactionCompletedFunc]
3940
}
4041
)
4142

@@ -88,6 +89,14 @@ func (tx *Transaction) QueryResultSet(
8889
return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx)
8990
}
9091

92+
if !options.ExecuteSettings(opts...).AllowLazyTx() {
93+
// It needs to execute before create settings because if tx must be not lazy, and it is first query
94+
// change txcontrol during unlazy
95+
if err := tx.UnLazy(ctx); err != nil {
96+
return nil, err
97+
}
98+
}
99+
91100
settings, err := tx.executeSettings(opts...)
92101
if err != nil {
93102
return nil, xerrors.WithStackTrace(err)
@@ -100,6 +109,11 @@ func (tx *Transaction) QueryResultSet(
100109
}),
101110
}
102111
if settings.TxControl().Commit {
112+
err = tx.waitOnBeforeCommit(ctx)
113+
if err != nil {
114+
return nil, err
115+
}
116+
103117
// notification about complete transaction must be sended for any error or for successfully read all result if
104118
// it was execution with commit flag
105119
resultOpts = append(resultOpts,
@@ -130,6 +144,14 @@ func (tx *Transaction) QueryRow(
130144
onDone(finalErr)
131145
}()
132146

147+
if !options.ExecuteSettings(opts...).AllowLazyTx() {
148+
// It needs to execute before create settings because if tx must be not lazy, and it is first query
149+
// change txcontrol during unlazy
150+
if err := tx.UnLazy(ctx); err != nil {
151+
return nil, err
152+
}
153+
}
154+
133155
settings := options.ExecuteSettings(
134156
append(
135157
[]options.Execute{options.WithTxControl(tx.txControl())},
@@ -144,6 +166,11 @@ func (tx *Transaction) QueryRow(
144166
}),
145167
}
146168
if settings.TxControl().Commit {
169+
err := tx.waitOnBeforeCommit(ctx)
170+
if err != nil {
171+
return nil, err
172+
}
173+
147174
// notification about complete transaction must be sended for any error or for successfully read all result if
148175
// it was execution with commit flag
149176
resultOpts = append(resultOpts,
@@ -192,6 +219,14 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
192219
return xerrors.WithStackTrace(errExecuteOnCompletedTx)
193220
}
194221

222+
if !options.ExecuteSettings(opts...).AllowLazyTx() {
223+
// It needs to execute before create settings because if tx must be not lazy, and it is first query
224+
// change txcontrol during unlazy
225+
if err := tx.UnLazy(ctx); err != nil {
226+
return err
227+
}
228+
}
229+
195230
settings, err := tx.executeSettings(opts...)
196231
if err != nil {
197232
return xerrors.WithStackTrace(err)
@@ -204,6 +239,11 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
204239
}),
205240
}
206241
if settings.TxControl().Commit {
242+
err = tx.waitOnBeforeCommit(ctx)
243+
if err != nil {
244+
return err
245+
}
246+
207247
// notification about complete transaction must be sended for any error or for successfully read all result if
208248
// it was execution with commit flag
209249
resultOpts = append(resultOpts,
@@ -256,6 +296,14 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
256296
return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx)
257297
}
258298

299+
if !options.ExecuteSettings(opts...).AllowLazyTx() {
300+
// It needs to execute before create settings because if tx must be not lazy, and it is first query
301+
// change txcontrol during unlazy
302+
if err := tx.UnLazy(ctx); err != nil {
303+
return nil, err
304+
}
305+
}
306+
259307
settings, err := tx.executeSettings(opts...)
260308
if err != nil {
261309
return nil, xerrors.WithStackTrace(err)
@@ -268,6 +316,11 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
268316
}),
269317
}
270318
if settings.TxControl().Commit {
319+
err = tx.waitOnBeforeCommit(ctx)
320+
if err != nil {
321+
return nil, err
322+
}
323+
271324
// notification about complete transaction must be sended for any error or for successfully read all result if
272325
// it was execution with commit flag
273326
resultOpts = append(resultOpts,
@@ -310,7 +363,12 @@ func (tx *Transaction) CommitTx(ctx context.Context) (finalErr error) {
310363
tx.completed = true
311364
}()
312365

313-
err := commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID())
366+
err := tx.waitOnBeforeCommit(ctx)
367+
if err != nil {
368+
return err
369+
}
370+
371+
err = commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID())
314372
if err != nil {
315373
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) {
316374
tx.s.SetStatus(session.StatusClosed)
@@ -360,10 +418,24 @@ func (tx *Transaction) Rollback(ctx context.Context) (finalErr error) {
360418
return nil
361419
}
362420

421+
func (tx *Transaction) OnBeforeCommit(f baseTx.OnTransactionBeforeCommit) {
422+
tx.onBeforeCommit.Add(&f)
423+
}
424+
363425
func (tx *Transaction) OnCompleted(f baseTx.OnTransactionCompletedFunc) {
364426
tx.onCompleted.Add(&f)
365427
}
366428

429+
func (tx *Transaction) waitOnBeforeCommit(ctx context.Context) (resErr error) {
430+
431+
tx.onBeforeCommit.Range(func(f *baseTx.OnTransactionBeforeCommit) bool {
432+
resErr = (*f)(ctx)
433+
return resErr == nil
434+
})
435+
436+
return resErr
437+
}
438+
367439
func (tx *Transaction) notifyOnCompleted(err error) {
368440
tx.completed = true
369441

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func (r *topicStreamReaderImpl) createUpdateOffsetRequest(
318318

319319
return &rawtopic.UpdateOffsetsInTransactionRequest{
320320
OperationParams: rawydb.NewRawOperationParamsFromProto(operation.Params(ctx, 0, 0, operation.ModeSync)),
321-
Tx: rawtopic.UpdateOffsetsInTransactionRequest_TransactionIdentity{
321+
Tx: rawtopiccommon.TransactionIdentity{
322322
ID: tx.ID(),
323323
Session: tx.SessionID(),
324324
},

0 commit comments

Comments
 (0)