Skip to content

Commit eaa2e0c

Browse files
committed
support write status written in tx
1 parent e2b4472 commit eaa2e0c

File tree

4 files changed

+19
-8
lines changed

4 files changed

+19
-8
lines changed

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

+16-7
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,13 @@ func (r *WriteResult) GetAcks() (res traceAck) {
233233
}
234234
for i := range r.Acks {
235235
ack := &r.Acks[i]
236-
if ack.MessageWriteStatus.Type == WriteStatusTypeWritten {
236+
switch ack.MessageWriteStatus.Type {
237+
case WriteStatusTypeWritten:
237238
res.WrittenCount++
238-
}
239-
if ack.MessageWriteStatus.Type == WriteStatusTypeSkipped {
239+
case WriteStatusTypeSkipped:
240240
res.SkipCount++
241+
case WriteStatusTypeWrittenInTx:
242+
res.WrittenInTxCount++
241243
}
242244

243245
if ack.SeqNo < res.SeqNoMin {
@@ -263,6 +265,7 @@ type traceAck = struct {
263265
WrittenOffsetMin int64
264266
WrittenOffsetMax int64
265267
WrittenCount int
268+
WrittenInTxCount int
266269
SkipCount int
267270
}
268271

@@ -301,6 +304,12 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
301304
s.SkippedReason = WriteStatusSkipReason(v.Skipped.GetReason())
302305

303306
return nil
307+
308+
case *Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_WrittenInTx_:
309+
s.Type = WriteStatusTypeWrittenInTx
310+
311+
return nil
312+
304313
default:
305314
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected write status type: %v", reflect.TypeOf(v))))
306315
}
@@ -309,19 +318,19 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
309318
type WriteStatusType int
310319

311320
const (
312-
WriteStatusTypeUnknown WriteStatusType = iota
313-
WriteStatusTypeWritten
321+
WriteStatusTypeWritten WriteStatusType = iota + 1
314322
WriteStatusTypeSkipped
323+
WriteStatusTypeWrittenInTx
315324
)
316325

317326
func (t WriteStatusType) String() string {
318327
switch t {
319-
case WriteStatusTypeUnknown:
320-
return "Unknown"
321328
case WriteStatusTypeSkipped:
322329
return "Skipped"
323330
case WriteStatusTypeWritten:
324331
return "Written"
332+
case WriteStatusTypeWrittenInTx:
333+
return "WrittenInTx"
325334
default:
326335
return strconv.Itoa(int(t))
327336
}

internal/topic/topicwriterinternal/writer_reconnector.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ func createWriteRequest(messages []messageWithDataContent, targetCodec rawtopicc
702702
}
703703
}
704704

705-
if len(messages) > 0 {
705+
if len(messages) > 0 && messages[0].tx != nil {
706706
res.Tx.ID = messages[0].tx.ID()
707707
res.Tx.Session = messages[0].tx.SessionID()
708708
}

log/topic.go

+1
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
863863
Int64("written_offset_min", acks.WrittenOffsetMin),
864864
Int64("written_offset_max", acks.WrittenOffsetMax),
865865
Int("written_offset_count", acks.WrittenCount),
866+
Int("written_in_tx_count", acks.WrittenInTxCount),
866867
Int("skip_count", acks.SkipCount),
867868
versionField(),
868869
)

trace/topic.go

+1
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ type (
493493
WrittenOffsetMin int64
494494
WrittenOffsetMax int64
495495
WrittenCount int
496+
WrittenInTxCount int
496497
SkipCount int
497498
}
498499
}

0 commit comments

Comments
 (0)