@@ -406,39 +406,34 @@ func checkWorkspaceEntryType(
406
406
return (entry .typ == DELETE ) && (entry .fileName == "" )
407
407
}
408
408
409
- func checkTxnOffsetZero (ls * LocalDisttaeDataSource , writes []Entry ) {
410
- if len (writes ) > 200 && ls .txnOffset == 0 && ls .table .accountId == 0 && ls .table .tableName == "mo_increment_columns" {
411
- logutil .Info ("yyyyyy zero txnOffset" ,
412
- zap .String ("txn" , hex .EncodeToString (ls .table .db .op .Txn ().ID )),
413
- zap .Bool ("isSnapOp" , ls .table .db .op .IsSnapOp ()),
414
- zap .String ("entries" , stringifySlice (writes [len (writes )- 2 :], func (a any ) string {
415
- e := a .(Entry )
416
- batstr := "nil"
417
- if e .bat != nil {
418
- batstr = common .MoBatchToString (e .bat , 3 )
419
- }
420
- return e .String () + " " + batstr
421
- })))
422
- }
423
- }
424
-
425
409
func checkTxnLastInsertRow (ls * LocalDisttaeDataSource , writes []Entry , cursor int , outBatch * batch.Batch ) {
426
- if len (writes ) > 400 && ls .table .accountId == 0 && ls .table .tableName == "mo_increment_columns" && writes [len (writes )- 1 ].typ == INSERT && writes [len (writes )- 1 ].tableId == ls .table .tableId {
410
+ injected , writesT := objectio .Debug19357Injected ()
411
+ if injected && int64 (len (writes )) > writesT && len (outBatch .Vecs ) == 3 && ls .table .accountId == 0 && ls .table .tableName == "mo_increment_columns" && writes [len (writes )- 1 ].typ == INSERT && writes [len (writes )- 1 ].tableId == ls .table .tableId {
412
+ outLen := outBatch .Vecs [0 ].Length ()
413
+ var slim * batch.Batch
414
+ if outLen > 0 {
415
+ start := outLen - 3
416
+ if start < 0 {
417
+ start = 0
418
+ }
419
+ slim , _ = outBatch .Window (start , outLen )
420
+ }
421
+
427
422
logutil .Info ("yyyyyy checkTxnLastInsertRow" ,
428
423
zap .String ("txn" , hex .EncodeToString (ls .table .db .op .Txn ().ID )),
429
424
zap .Int ("txnOffset" , ls .txnOffset ),
430
425
zap .Int ("cursor" , cursor ),
431
426
zap .Int ("writes" , len (writes )),
432
427
zap .Bool ("isSnapOp" , ls .table .db .op .IsSnapOp ()),
433
- zap .String ("entries" , stringifySlice (writes [len (writes )- 2 :], func (a any ) string {
428
+ zap .String ("entries" , stringifySlice (writes [len (writes )- 1 :], func (a any ) string {
434
429
e := a .(Entry )
435
430
batstr := "nil"
436
431
if e .bat != nil {
437
432
batstr = common .MoBatchToString (e .bat , 3 )
438
433
}
439
434
return e .String () + " " + batstr
440
435
})),
441
- zap .String ("outBatch" , common .MoBatchToString (outBatch , 3 )),
436
+ zap .String ("outBatch" , common .MoBatchToString (slim , 3 )),
442
437
)
443
438
}
444
439
}
@@ -488,13 +483,20 @@ func (ls *LocalDisttaeDataSource) filterInMemUnCommittedInserts(
488
483
retainedRowIds = vector.MustFixedColWithTypeCheck [objectio.Rowid ](entry .bat .Vecs [0 ])
489
484
offsets := engine_util .RowIdsToOffset (retainedRowIds , int64 (0 )).([]int64 )
490
485
486
+ offsetLen := len (offsets )
487
+ badOffsetStart := offsetLen > 0 && offsets [0 ] > 0
488
+
491
489
b := retainedRowIds [0 ].BorrowBlockID ()
492
490
sels , err := ls .ApplyTombstones (
493
491
ls .ctx , b , offsets , engine .Policy_CheckUnCommittedOnly )
494
492
if err != nil {
495
493
return err
496
494
}
497
495
496
+ if (len (sels ) < offsetLen || badOffsetStart ) && ls .table .accountId == 0 && ls .table .tableName == "mo_increment_columns" {
497
+ logutil .Info ("Shrink retainedRowIds" , zap .Any ("sels" , sels ), zap .Any ("offsetsLen" , offsetLen ), zap .Bool ("badOffsetStart" , badOffsetStart ), zap .Int ("wsCursor" , ls .wsCursor ), zap .Int ("txnOffset" , ls .txnOffset ))
498
+ }
499
+
498
500
if len (sels ) == 0 {
499
501
continue
500
502
}
0 commit comments