Skip to content

Commit 1c9e3b5

Browse files
committed
fixes with linters
1 parent 40b55a6 commit 1c9e3b5

File tree

11 files changed

+44
-48
lines changed

11 files changed

+44
-48
lines changed

examples/topic/topicwriter/topic_writer_transaction.go

-3
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ func CopyMessagesBetweenTopicsTxWriter(
2727
if err != nil {
2828
return err
2929
}
30-
3130
for _, mess := range batch.Messages {
32-
3331
if err = writer.Write(ctx, topicwriter.Message{Data: mess}); err != nil {
3432
return err
3533
}
@@ -69,7 +67,6 @@ func TableAndTopicWithinTransaction(
6967
err = writer.Write(ctx, topicwriter.Message{
7068
Data: strings.NewReader(fmt.Sprintf("val: %v processed", val)),
7169
})
72-
7370
if err != nil {
7471
return err
7572
}

internal/grpcwrapper/rawtopic/rawtopiccommon/transaction.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package rawtopiccommon
22

33
import "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
44

5-
type TransactionIdentity struct { //nolint:revive,stylecheck
5+
type TransactionIdentity struct {
66
ID string
77
Session string
88
}

internal/query/transaction.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,9 @@ func (tx *Transaction) OnCompleted(f baseTx.OnTransactionCompletedFunc) {
395395
}
396396

397397
func (tx *Transaction) waitOnBeforeCommit(ctx context.Context) (resErr error) {
398-
399398
tx.onBeforeCommit.Range(func(f *baseTx.OnTransactionBeforeCommit) bool {
400399
resErr = (*f)(ctx)
400+
401401
return resErr == nil
402402
})
403403

internal/topic/topicclientinternal/client.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ import (
2626
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2727
)
2828

29-
var (
30-
errUnsupportedTransactionType = xerrors.Wrap(errors.New("ydb: unsuppotred transaction type. Use transaction from Driver().Query().DoTx(...)"))
31-
)
29+
var errUnsupportedTransactionType = xerrors.Wrap(errors.New("ydb: unsuppotred transaction type. Use transaction from Driver().Query().DoTx(...)")) //nolint:lll
3230

3331
type Client struct {
3432
cfg topic.Config
@@ -285,7 +283,11 @@ func (c *Client) StartWriter(topicPath string, opts ...topicoptions.WriterOption
285283
return topicwriter.NewWriter(writer), nil
286284
}
287285

288-
func (c *Client) StartTransactionalWriter(transaction tx.Identifier, topicpath string, opts ...topicoptions.WriterOption) (*topicwriter.TxWriter, error) {
286+
func (c *Client) StartTransactionalWriter(
287+
transaction tx.Identifier,
288+
topicpath string,
289+
opts ...topicoptions.WriterOption,
290+
) (*topicwriter.TxWriter, error) {
289291
internalTx, ok := transaction.(tx.Transaction)
290292
if !ok {
291293
return nil, xerrors.WithStackTrace(errUnsupportedTransactionType)
@@ -298,10 +300,14 @@ func (c *Client) StartTransactionalWriter(transaction tx.Identifier, topicpath s
298300
}
299301

300302
txWriter := topicwriterinternal.NewTopicWriterTransaction(writer, internalTx, cfg.Tracer)
303+
301304
return topicwriter.NewTxWriterInternal(txWriter), nil
302305
}
303306

304-
func (c *Client) createWriterConfig(topicPath string, opts []topicoptions.WriterOption) topicwriterinternal.WriterReconnectorConfig {
307+
func (c *Client) createWriterConfig(
308+
topicPath string,
309+
opts []topicoptions.WriterOption,
310+
) topicwriterinternal.WriterReconnectorConfig {
305311
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context) (
306312
topicwriterinternal.RawTopicWriterStream,
307313
error,
@@ -318,5 +324,6 @@ func (c *Client) createWriterConfig(topicPath string, opts []topicoptions.Writer
318324
}
319325

320326
options = append(options, opts...)
327+
321328
return topicwriterinternal.NewWriterReconnectorConfig(options...)
322329
}

internal/topic/topicwriterinternal/writer_reconnector.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var (
3838
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
3939
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit"))
4040
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
41-
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+"))
41+
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll
4242

4343
// errProducerIDNotEqualMessageGroupID is temporary
4444
// WithMessageGroupID is optional parameter because it allowed to be skipped by protocol.
@@ -634,6 +634,7 @@ func (w *WriterReconnector) GetSessionID() (sessionID string) {
634634
w.m.WithLock(func() {
635635
sessionID = w.sessionID
636636
})
637+
637638
return sessionID
638639
}
639640

internal/tx/transaction.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@ type Transaction interface {
1313
SessionID() string
1414

1515
// OnBeforeCommit add callback, which will be called before commit transaction
16-
// the method will be not call the method if some error happen and transaction will not be commited
16+
// the method will be not call the method if some error happen and transaction will not be committed
1717
OnBeforeCommit(f OnTransactionBeforeCommit)
1818
OnCompleted(f OnTransactionCompletedFunc)
1919
Rollback(ctx context.Context) error
2020
}
2121

22-
type OnTransactionBeforeCommit func(ctx context.Context) error
23-
type OnTransactionCompletedFunc func(transactionResult error)
22+
type (
23+
OnTransactionBeforeCommit func(ctx context.Context) error
24+
OnTransactionCompletedFunc func(transactionResult error)
25+
)
2426

2527
func AsTransaction(id Identifier) (Transaction, error) {
2628
if t, ok := id.(Transaction); ok {

log/topic.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -709,12 +709,15 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
709709
}
710710
}
711711
}
712-
t.OnWriterBeforeCommitTransaction = func(info trace.TopicOnWriterBeforeCommitTransactionStartInfo) func(trace.TopicOnWriterBeforeCommitTransactionDoneInfo) {
712+
t.OnWriterBeforeCommitTransaction = func(
713+
info trace.TopicOnWriterBeforeCommitTransactionStartInfo,
714+
) func(trace.TopicOnWriterBeforeCommitTransactionDoneInfo) {
713715
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
714716
return nil
715717
}
716718

717719
start := time.Now()
720+
718721
return func(doneInfo trace.TopicOnWriterBeforeCommitTransactionDoneInfo) {
719722
ctx := with(*info.Ctx, TRACE, "ydb", "topic", "writer", "beforecommit")
720723
l.Log(ctx, "wait of flush messages before commit transaction",
@@ -726,8 +729,11 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
726729
)
727730
}
728731
}
729-
t.OnWriterAfterFinishTransaction = func(info trace.TopicOnWriterAfterFinishTransactionStartInfo) func(trace.TopicOnWriterAfterFinishTransactionDoneInfo) {
732+
t.OnWriterAfterFinishTransaction = func(
733+
info trace.TopicOnWriterAfterFinishTransactionStartInfo,
734+
) func(trace.TopicOnWriterAfterFinishTransactionDoneInfo) {
730735
start := time.Now()
736+
731737
return func(doneInfo trace.TopicOnWriterAfterFinishTransactionDoneInfo) {
732738
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "beforecommit")
733739
l.Log(ctx, "close writer after transaction finished",

topic/client.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,9 @@ type Client interface {
5252
// StartTransactionalWriter start writer for write messages within transaction
5353
//
5454
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
55-
StartTransactionalWriter(tx tx.Identifier, topicpath string, opts ...topicoptions.WriterOption) (*topicwriter.TxWriter, error)
55+
StartTransactionalWriter(
56+
tx tx.Identifier,
57+
topicpath string,
58+
opts ...topicoptions.WriterOption,
59+
) (*topicwriter.TxWriter, error)
5660
}

topic/topicwriter/topicwriter.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ package topicwriter
22

33
import (
44
"context"
5-
"errors"
6-
7-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
85

96
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal"
107
)
@@ -13,10 +10,7 @@ type (
1310
Message = topicwriterinternal.PublicMessage
1411
)
1512

16-
var (
17-
ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
18-
errBadTxId = xerrors.Wrap(errors.New("ydb: bad transaction type, supported transactions from query service only"))
19-
)
13+
var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
2014

2115
// Writer represent write session to topic
2216
// It handles connection problems, reconnect to server when need and resend buffered messages

topic/topicwriter/write_options.go

+1-22
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,7 @@
11
package topicwriter
22

3-
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
4-
5-
type writeOptions struct {
6-
transaction tx.Identifier
7-
commit bool
8-
}
3+
type writeOptions struct{}
94

105
type WriteOption interface {
116
applyWriteOption(options *writeOptions)
127
}
13-
14-
type writeOptionWithTx struct {
15-
tx tx.Identifier
16-
}
17-
18-
func (o writeOptionWithTx) applyWriteOption(options *writeOptions) {
19-
options.transaction = o.tx
20-
}
21-
22-
type writeOptionCommitTx struct {
23-
commit bool
24-
}
25-
26-
func (o writeOptionCommitTx) applyWriteOption(options *writeOptions) {
27-
options.commit = o.commit
28-
}

trace/topic.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,16 @@ type (
113113
OnWriterInitStream func(TopicWriterInitStreamStartInfo) func(TopicWriterInitStreamDoneInfo)
114114
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
115115
OnWriterClose func(TopicWriterCloseStartInfo) func(TopicWriterCloseDoneInfo)
116+
116117
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
117-
OnWriterBeforeCommitTransaction func(TopicOnWriterBeforeCommitTransactionStartInfo) func(TopicOnWriterBeforeCommitTransactionDoneInfo)
118+
OnWriterBeforeCommitTransaction func(
119+
TopicOnWriterBeforeCommitTransactionStartInfo,
120+
) func(TopicOnWriterBeforeCommitTransactionDoneInfo)
121+
118122
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
119-
OnWriterAfterFinishTransaction func(TopicOnWriterAfterFinishTransactionStartInfo) func(TopicOnWriterAfterFinishTransactionDoneInfo)
123+
OnWriterAfterFinishTransaction func(
124+
TopicOnWriterAfterFinishTransactionStartInfo,
125+
) func(TopicOnWriterAfterFinishTransactionDoneInfo)
120126

121127
// TopicWriterStreamEvents
122128

0 commit comments

Comments
 (0)