Skip to content

Commit e74f4b9

Browse files
add support rollback running txn to 1.2 (#18775)
add support rollback running txn Approved by: @reusee, @sukki37
1 parent 76c9cf7 commit e74f4b9

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed

pkg/incrservice/column_cache.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,9 +495,7 @@ func (col *columnCache) waitPrevAllocatingLocked(ctx context.Context) error {
495495
}
496496

497497
func (col *columnCache) close() error {
498-
col.Lock()
499-
defer col.Unlock()
500-
return col.waitPrevAllocatingLocked(context.Background())
498+
return nil
501499
}
502500

503501
func insertAutoValues[T constraints.Integer](

pkg/txn/client/operator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ type txnOperator struct {
241241
fprints footPrints
242242

243243
waitActiveCost time.Duration
244+
runningSQL atomic.Bool
244245
}
245246

246247
func newTxnOperator(
@@ -517,6 +518,10 @@ func (tc *txnOperator) WriteAndCommit(ctx context.Context, requests []txn.TxnReq
517518
}
518519

519520
func (tc *txnOperator) Commit(ctx context.Context) (err error) {
521+
if tc.runningSQL.Load() {
522+
util.GetLogger().Fatal("commit on running txn")
523+
}
524+
520525
tc.commitCounter.addEnter()
521526
defer tc.commitCounter.addExit()
522527
txn := tc.getTxnMeta(false)
@@ -552,6 +557,10 @@ func (tc *txnOperator) Commit(ctx context.Context) (err error) {
552557
}
553558

554559
func (tc *txnOperator) Rollback(ctx context.Context) (err error) {
560+
if tc.runningSQL.Load() {
561+
util.GetLogger().Fatal("commit on running txn")
562+
}
563+
555564
tc.rollbackCounter.addEnter()
556565
defer tc.rollbackCounter.addExit()
557566
v2.TxnRollbackCounter.Inc()
@@ -1262,10 +1271,12 @@ func (tc *txnOperator) doCostAction(
12621271
}
12631272

12641273
func (tc *txnOperator) EnterRunSql() {
1274+
tc.runningSQL.Store(true)
12651275
tc.runSqlCounter.addEnter()
12661276
}
12671277

12681278
func (tc *txnOperator) ExitRunSql() {
1279+
tc.runningSQL.Store(false)
12691280
tc.runSqlCounter.addExit()
12701281
}
12711282

pkg/txn/client/operator_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,109 @@ func TestWaitCommittedLogAppliedInRCMode(t *testing.T) {
545545
nil)
546546
}
547547

548+
func TestCannotCommitRunningSQLTxn(t *testing.T) {
549+
runOperatorTests(
550+
t,
551+
func(
552+
ctx context.Context,
553+
tc *txnOperator,
554+
_ *testTxnSender,
555+
) {
556+
defer func() {
557+
if err := recover(); err != nil {
558+
require.NotNil(t, err)
559+
}
560+
}()
561+
562+
tc.EnterRunSql()
563+
_ = tc.Commit(ctx)
564+
},
565+
)
566+
}
567+
568+
func TestCannotRollbackRunningSQLTxn(t *testing.T) {
569+
runOperatorTests(
570+
t,
571+
func(
572+
ctx context.Context,
573+
tc *txnOperator,
574+
_ *testTxnSender,
575+
) {
576+
defer func() {
577+
if err := recover(); err != nil {
578+
require.NotNil(t, err)
579+
}
580+
}()
581+
582+
tc.EnterRunSql()
583+
_ = tc.Rollback(ctx)
584+
},
585+
)
586+
}
587+
588+
func TestEmptyLockSkipped(t *testing.T) {
589+
runOperatorTests(
590+
t,
591+
func(
592+
ctx context.Context,
593+
tc *txnOperator,
594+
_ *testTxnSender,
595+
) {
596+
require.False(t, tc.LockSkipped(1, lock.LockMode_Exclusive))
597+
},
598+
)
599+
}
600+
601+
func TestLockSkipped(t *testing.T) {
602+
runOperatorTests(
603+
t,
604+
func(
605+
ctx context.Context,
606+
tc *txnOperator,
607+
_ *testTxnSender,
608+
) {
609+
require.True(t, tc.LockSkipped(1, lock.LockMode_Exclusive))
610+
require.False(t, tc.LockSkipped(1, lock.LockMode_Shared))
611+
require.False(t, tc.LockSkipped(2, lock.LockMode_Exclusive))
612+
},
613+
WithTxnSkipLock(
614+
[]uint64{1},
615+
[]lock.LockMode{lock.LockMode_Exclusive},
616+
),
617+
)
618+
}
619+
620+
func TestHasLockTable(t *testing.T) {
621+
runOperatorTests(
622+
t,
623+
func(
624+
ctx context.Context,
625+
tc *txnOperator,
626+
_ *testTxnSender,
627+
) {
628+
require.NoError(t, tc.AddLockTable(lock.LockTable{Table: 1}))
629+
require.True(t, tc.HasLockTable(1))
630+
require.False(t, tc.HasLockTable(2))
631+
},
632+
)
633+
}
634+
635+
func TestBase(t *testing.T) {
636+
runOperatorTests(
637+
t,
638+
func(
639+
ctx context.Context,
640+
tc *txnOperator,
641+
_ *testTxnSender,
642+
) {
643+
require.NotNil(t, tc.TxnRef())
644+
require.Equal(t, tc.Txn().SnapshotTS, tc.SnapshotTS())
645+
require.NotEqual(t, timestamp.Timestamp{}, tc.CreateTS())
646+
require.Equal(t, txn.TxnStatus_Active, tc.Status())
647+
},
648+
)
649+
}
650+
548651
func runOperatorTests(
549652
t *testing.T,
550653
tc func(context.Context, *txnOperator, *testTxnSender),

0 commit comments

Comments
 (0)